Glue: allow multiple job runs (#5483)

This commit is contained in:
Bert Blommers 2022-09-17 11:46:42 +00:00 committed by GitHub
parent e230750a7c
commit 9fc64ad93b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 219 additions and 90 deletions

View File

@ -36,7 +36,7 @@ Sticking with the example above, you may want to test what happens if the cluste
from moto.moto_api import state_manager from moto.moto_api import state_manager
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 5}) state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 5})
create_and_wait_for_cluster("my_new_cluster") create_and_wait_for_cluster("my_new_cluster")
@ -46,7 +46,7 @@ In order to test what happens in the event of a timeout, we can order the cluste
from moto.moto_api import state_manager from moto.moto_api import state_manager
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 600}) state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 600})
try: try:
create_and_wait_for_cluster("my_new_cluster") create_and_wait_for_cluster("my_new_cluster")
@ -124,7 +124,7 @@ This is an example request for `dax::cluster` to wait 5 seconds before the clust
.. sourcecode:: python .. sourcecode:: python
post_body = dict(model_name="dax::cluster", transition={"progression": "time", "duration": 5}) post_body = dict(model_name="dax::cluster", transition={"progression": "time", "seconds": 5})
resp = requests.post("http://localhost:5000/moto-api/state-manager/set-transition", data=json.dumps(post_body)) resp = requests.post("http://localhost:5000/moto-api/state-manager/set-transition", data=json.dumps(post_body))
An example request to see the currently configured transition for a specific model: An example request to see the currently configured transition for a specific model:

View File

@ -56,6 +56,19 @@ Advancement:
Call `boto3.client("dax").describe_clusters(..)`. Call `boto3.client("dax").describe_clusters(..)`.
Service: Glue
---------------------
**Model**: `glue::job_run` :raw-html:`<br />`
Available States: :raw-html:`<br />`
"STARTING" --> "RUNNING" --> "SUCCEEDED"
Transition type: `immediate` :raw-html:`<br />`
Advancement:
Call `boto3.client("glue").get_job_run(..)`
Service: S3 (Glacier Restoration) Service: S3 (Glacier Restoration)
----------------------------------- -----------------------------------

View File

@ -60,6 +60,11 @@ class JobNotFoundException(EntityNotFoundException):
super().__init__("Job %s not found." % job) super().__init__("Job %s not found." % job)
class JobRunNotFoundException(EntityNotFoundException):
def __init__(self, job_run):
super().__init__("Job run %s not found." % job_run)
class VersionNotFoundException(EntityNotFoundException): class VersionNotFoundException(EntityNotFoundException):
def __init__(self): def __init__(self):
super().__init__("Version not found.") super().__init__("Version not found.")

View File

@ -1,20 +1,17 @@
import time import time
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from typing import List
from uuid import uuid4 from uuid import uuid4
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict from moto.core.utils import BackendDict
from moto.glue.exceptions import ( from moto.moto_api import state_manager
CrawlerRunningException, from moto.moto_api._internal.managed_state_model import ManagedState
CrawlerNotRunningException,
SchemaVersionNotFoundFromSchemaVersionIdException,
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
)
from .exceptions import ( from .exceptions import (
JsonRESTError, JsonRESTError,
CrawlerRunningException,
CrawlerNotRunningException,
CrawlerAlreadyExistsException, CrawlerAlreadyExistsException,
CrawlerNotFoundException, CrawlerNotFoundException,
DatabaseAlreadyExistsException, DatabaseAlreadyExistsException,
@ -25,7 +22,12 @@ from .exceptions import (
PartitionNotFoundException, PartitionNotFoundException,
VersionNotFoundException, VersionNotFoundException,
JobNotFoundException, JobNotFoundException,
JobRunNotFoundException,
ConcurrentRunsExceededException, ConcurrentRunsExceededException,
SchemaVersionNotFoundFromSchemaVersionIdException,
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
) )
from .utils import PartitionFilter from .utils import PartitionFilter
from .glue_schema_registry_utils import ( from .glue_schema_registry_utils import (
@ -78,6 +80,10 @@ class GlueBackend(BaseBackend):
self.num_schemas = 0 self.num_schemas = 0
self.num_schema_versions = 0 self.num_schema_versions = 0
state_manager.register_default_transition(
model_name="glue::job_run", transition={"progression": "immediate"}
)
@staticmethod @staticmethod
def default_vpc_endpoint_service(service_region, zones): def default_vpc_endpoint_service(service_region, zones):
"""Default VPC endpoint service.""" """Default VPC endpoint service."""
@ -850,7 +856,7 @@ class FakeJob:
self.description = description self.description = description
self.log_uri = log_uri self.log_uri = log_uri
self.role = role self.role = role
self.execution_property = execution_property self.execution_property = execution_property or {}
self.command = command self.command = command
self.default_arguments = default_arguments self.default_arguments = default_arguments
self.non_overridable_arguments = non_overridable_arguments self.non_overridable_arguments = non_overridable_arguments
@ -858,7 +864,6 @@ class FakeJob:
self.max_retries = max_retries self.max_retries = max_retries
self.allocated_capacity = allocated_capacity self.allocated_capacity = allocated_capacity
self.timeout = timeout self.timeout = timeout
self.state = "READY"
self.max_capacity = max_capacity self.max_capacity = max_capacity
self.security_configuration = security_configuration self.security_configuration = security_configuration
self.notification_property = notification_property self.notification_property = notification_property
@ -871,6 +876,8 @@ class FakeJob:
self.backend = backend self.backend = backend
self.backend.tag_resource(self.arn, tags) self.backend.tag_resource(self.arn, tags)
self.job_runs: List[FakeJobRun] = []
def get_name(self): def get_name(self):
return self.name return self.name
@ -899,20 +906,26 @@ class FakeJob:
} }
def start_job_run(self): def start_job_run(self):
if self.state == "RUNNING": running_jobs = len(
[jr for jr in self.job_runs if jr.status in ["STARTING", "RUNNING"]]
)
if running_jobs >= self.execution_property.get("MaxConcurrentRuns", 1):
raise ConcurrentRunsExceededException( raise ConcurrentRunsExceededException(
f"Job with name {self.name} already running" f"Job with name {self.name} already running"
) )
fake_job_run = FakeJobRun(job_name=self.name) fake_job_run = FakeJobRun(job_name=self.name)
self.state = "RUNNING" self.job_runs.append(fake_job_run)
return fake_job_run.job_run_id return fake_job_run.job_run_id
def get_job_run(self, run_id): def get_job_run(self, run_id):
fake_job_run = FakeJobRun(job_name=self.name, job_run_id=run_id) for job_run in self.job_runs:
return fake_job_run if job_run.job_run_id == run_id:
job_run.advance()
return job_run
raise JobRunNotFoundException(run_id)
class FakeJobRun: class FakeJobRun(ManagedState):
def __init__( def __init__(
self, self,
job_name: int, job_name: int,
@ -922,6 +935,11 @@ class FakeJobRun:
timeout: int = None, timeout: int = None,
worker_type: str = "Standard", worker_type: str = "Standard",
): ):
ManagedState.__init__(
self,
model_name="glue::job_run",
transitions=[("STARTING", "RUNNING"), ("RUNNING", "SUCCEEDED")],
)
self.job_name = job_name self.job_name = job_name
self.job_run_id = job_run_id self.job_run_id = job_run_id
self.arguments = arguments self.arguments = arguments
@ -945,7 +963,7 @@ class FakeJobRun:
"StartedOn": self.started_on.isoformat(), "StartedOn": self.started_on.isoformat(),
"LastModifiedOn": self.modified_on.isoformat(), "LastModifiedOn": self.modified_on.isoformat(),
"CompletedOn": self.completed_on.isoformat(), "CompletedOn": self.completed_on.isoformat(),
"JobRunState": "SUCCEEDED", "JobRunState": self.status,
"Arguments": self.arguments or {"runSpark": "spark -f test_file.py"}, "Arguments": self.arguments or {"runSpark": "spark -f test_file.py"},
"ErrorMessage": "", "ErrorMessage": "",
"PredecessorRuns": [ "PredecessorRuns": [

View File

@ -27,7 +27,6 @@ from moto.core.utils import (
BackendDict, BackendDict,
) )
from moto.cloudwatch.models import MetricDatum from moto.cloudwatch.models import MetricDatum
from moto.iam.access_control import IAMPolicy, PermissionResult
from moto.moto_api import state_manager from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.tagging_service import TaggingService from moto.utilities.tagging_service import TaggingService
@ -911,6 +910,8 @@ class FakeBucket(CloudFormationModel):
def allow_action(self, action, resource): def allow_action(self, action, resource):
if self.policy is None: if self.policy is None:
return False return False
from moto.iam.access_control import IAMPolicy, PermissionResult
iam_policy = IAMPolicy(self.policy.decode()) iam_policy = IAMPolicy(self.policy.decode())
result = iam_policy.is_action_permitted(action, resource) result = iam_policy.is_action_permitted(action, resource)
return result == PermissionResult.PERMITTED return result == PermissionResult.PERMITTED

View File

@ -85,76 +85,28 @@ def test_get_job_exists():
"GlueVersion": "string", "GlueVersion": "string",
} }
job_name = create_test_job_w_all_attributes(client, **job_attributes) job_name = create_test_job_w_all_attributes(client, **job_attributes)
response = client.get_job(JobName=job_name) job = client.get_job(JobName=job_name)["Job"]
assert response["Job"]["Name"] == job_name job.should.have.key("Name").equals(job_name)
assert response["Job"]["Description"] job.should.have.key("Description")
assert response["Job"]["LogUri"] job.should.have.key("LogUri")
assert response["Job"]["Role"] job.should.have.key("Role")
assert response["Job"]["CreatedOn"] job.should.have.key("ExecutionProperty").equals({"MaxConcurrentRuns": 123})
assert response["Job"]["LastModifiedOn"] job.should.have.key("CreatedOn")
assert response["Job"]["ExecutionProperty"] job.should.have.key("LastModifiedOn")
assert response["Job"]["Command"] job.should.have.key("ExecutionProperty")
assert response["Job"]["DefaultArguments"] job.should.have.key("Command")
assert response["Job"]["NonOverridableArguments"] job.should.have.key("DefaultArguments")
assert response["Job"]["Connections"] job.should.have.key("NonOverridableArguments")
assert response["Job"]["MaxRetries"] job.should.have.key("Connections")
assert response["Job"]["AllocatedCapacity"] job.should.have.key("MaxRetries")
assert response["Job"]["Timeout"] job.should.have.key("AllocatedCapacity")
assert response["Job"]["MaxCapacity"] job.should.have.key("Timeout")
assert response["Job"]["WorkerType"] job.should.have.key("MaxCapacity")
assert response["Job"]["NumberOfWorkers"] job.should.have.key("WorkerType")
assert response["Job"]["SecurityConfiguration"] job.should.have.key("NumberOfWorkers")
assert response["Job"]["NotificationProperty"] job.should.have.key("SecurityConfiguration")
assert response["Job"]["GlueVersion"] job.should.have.key("NotificationProperty")
job.should.have.key("GlueVersion")
@mock_glue
def test_start_job_run():
client = create_glue_client()
job_name = create_test_job(client)
response = client.start_job_run(JobName=job_name)
assert response["JobRunId"]
@mock_glue
def test_start_job_run_already_running():
client = create_glue_client()
job_name = create_test_job(client)
client.start_job_run(JobName=job_name)
with pytest.raises(ClientError) as exc:
client.start_job_run(JobName=job_name)
exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
exc.value.response["Error"]["Message"].should.match(
f"Job with name {job_name} already running"
)
@mock_glue
def test_get_job_run():
client = create_glue_client()
job_name = create_test_job(client)
response = client.get_job_run(JobName=job_name, RunId="01")
assert response["JobRun"]["Id"]
assert response["JobRun"]["Attempt"]
assert response["JobRun"]["PreviousRunId"]
assert response["JobRun"]["TriggerName"]
assert response["JobRun"]["StartedOn"]
assert response["JobRun"]["LastModifiedOn"]
assert response["JobRun"]["CompletedOn"]
assert response["JobRun"]["JobRunState"]
assert response["JobRun"]["Arguments"]
assert response["JobRun"]["ErrorMessage"] == ""
assert response["JobRun"]["PredecessorRuns"]
assert response["JobRun"]["AllocatedCapacity"]
assert response["JobRun"]["ExecutionTime"]
assert response["JobRun"]["Timeout"]
assert response["JobRun"]["MaxCapacity"]
assert response["JobRun"]["WorkerType"]
assert response["JobRun"]["NumberOfWorkers"]
assert response["JobRun"]["SecurityConfiguration"]
assert response["JobRun"]["LogGroupName"]
assert response["JobRun"]["NotificationProperty"]
assert response["JobRun"]["GlueVersion"]
@mock_glue @mock_glue

View File

@ -0,0 +1,140 @@
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError
from unittest import SkipTest
from moto import mock_glue, settings
from moto.moto_api import state_manager
from .test_glue import create_test_job, create_glue_client
@mock_glue
def test_start_job_run():
client = create_glue_client()
job_name = create_test_job(client)
response = client.start_job_run(JobName=job_name)
assert response["JobRunId"]
@mock_glue
def test_start_job_run__multiple_runs_allowed():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't set transition directly in ServerMode")
state_manager.set_transition(
model_name="glue::job_run", transition={"progression": "manual", "times": 2}
)
glue = create_glue_client()
glue.create_job(
Name="somejobname",
Role="some-role",
ExecutionProperty={"MaxConcurrentRuns": 5},
Command={
"Name": "some-name",
"ScriptLocation": "some-location",
"PythonVersion": "some-version",
},
)
for _ in range(5):
glue.start_job_run(JobName="somejobname")
# The 6th should fail
with pytest.raises(ClientError) as exc:
glue.start_job_run(JobName="somejobname")
exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
exc.value.response["Error"]["Message"].should.match(
"Job with name somejobname already running"
)
@mock_glue
def test_start_job_run__single_run_allowed():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't set transition directly in ServerMode")
state_manager.set_transition(
model_name="glue::job_run", transition={"progression": "manual", "times": 2}
)
client = create_glue_client()
job_name = create_test_job(client)
client.start_job_run(JobName=job_name)
with pytest.raises(ClientError) as exc:
client.start_job_run(JobName=job_name)
exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
exc.value.response["Error"]["Message"].should.match(
f"Job with name {job_name} already running"
)
@mock_glue
def test_get_job_run():
state_manager.unset_transition("glue::job_run")
client = create_glue_client()
job_name = create_test_job(client)
job_run_id = client.start_job_run(JobName=job_name)["JobRunId"]
response = client.get_job_run(JobName=job_name, RunId=job_run_id)
response["JobRun"].should.have.key("Id").equals(job_run_id)
assert response["JobRun"]["Attempt"]
assert response["JobRun"]["PreviousRunId"]
assert response["JobRun"]["TriggerName"]
assert response["JobRun"]["StartedOn"]
assert response["JobRun"]["LastModifiedOn"]
assert response["JobRun"]["CompletedOn"]
response["JobRun"].should.have.key("JobRunState").equals("SUCCEEDED")
assert response["JobRun"]["Arguments"]
assert response["JobRun"]["ErrorMessage"] == ""
assert response["JobRun"]["PredecessorRuns"]
assert response["JobRun"]["AllocatedCapacity"]
assert response["JobRun"]["ExecutionTime"]
assert response["JobRun"]["Timeout"]
assert response["JobRun"]["MaxCapacity"]
assert response["JobRun"]["WorkerType"]
assert response["JobRun"]["NumberOfWorkers"]
assert response["JobRun"]["SecurityConfiguration"]
assert response["JobRun"]["LogGroupName"]
assert response["JobRun"]["NotificationProperty"]
assert response["JobRun"]["GlueVersion"]
@mock_glue
def test_get_job_run_that_doesnt_exist():
client = create_glue_client()
job_name = create_test_job(client)
with pytest.raises(ClientError) as exc:
client.get_job_run(JobName=job_name, RunId="unknown")
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
@mock_glue
def test_job_run_transition():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't set transition directly in ServerMode")
state_manager.set_transition(
model_name="glue::job_run", transition={"progression": "manual", "times": 2}
)
client = create_glue_client()
job_name = create_test_job(client)
# set transition
run_id = client.start_job_run(JobName=job_name)["JobRunId"]
# The job should change over time
expect_job_state(client, job_name, run_id, expected_state="STARTING")
expect_job_state(client, job_name, run_id, expected_state="RUNNING")
expect_job_state(client, job_name, run_id, expected_state="RUNNING")
# But finishes afterwards
expect_job_state(client, job_name, run_id, expected_state="SUCCEEDED")
# unset transition
state_manager.unset_transition("glue::job_run")
def expect_job_state(client, job_name, run_id, expected_state):
client.get_job_run(JobName=job_name, RunId=run_id)["JobRun"][
"JobRunState"
].should.equal(expected_state)