diff --git a/docs/docs/configuration/state_transition/index.rst b/docs/docs/configuration/state_transition/index.rst
index ef3fa2ce5..6dccd477b 100644
--- a/docs/docs/configuration/state_transition/index.rst
+++ b/docs/docs/configuration/state_transition/index.rst
@@ -36,7 +36,7 @@ Sticking with the example above, you may want to test what happens if the cluste
from moto.moto_api import state_manager
- state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 5})
+ state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 5})
create_and_wait_for_cluster("my_new_cluster")
@@ -46,7 +46,7 @@ In order to test what happens in the event of a timeout, we can order the cluste
from moto.moto_api import state_manager
- state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 600})
+ state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 600})
try:
create_and_wait_for_cluster("my_new_cluster")
@@ -124,7 +124,7 @@ This is an example request for `dax::cluster` to wait 5 seconds before the clust
.. sourcecode:: python
- post_body = dict(model_name="dax::cluster", transition={"progression": "time", "duration": 5})
+ post_body = dict(model_name="dax::cluster", transition={"progression": "time", "seconds": 5})
resp = requests.post("http://localhost:5000/moto-api/state-manager/set-transition", data=json.dumps(post_body))
An example request to see the currently configured transition for a specific model:
diff --git a/docs/docs/configuration/state_transition/models.rst b/docs/docs/configuration/state_transition/models.rst
index bafbb1be9..b3f3e8dbf 100644
--- a/docs/docs/configuration/state_transition/models.rst
+++ b/docs/docs/configuration/state_transition/models.rst
@@ -56,6 +56,19 @@ Advancement:
Call `boto3.client("dax").describe_clusters(..)`.
+Service: Glue
+---------------------
+
+**Model**: `glue::job_run` :raw-html:`
`
+Available States: :raw-html:`
`
+
+ "STARTING" --> "RUNNING" --> "SUCCEEDED"
+
+Transition type: `immediate` :raw-html:`
`
+Advancement:
+
+ Call `boto3.client("glue").get_job_run(..)`
+
Service: S3 (Glacier Restoration)
-----------------------------------
diff --git a/moto/glue/exceptions.py b/moto/glue/exceptions.py
index 81194fd06..c678ce011 100644
--- a/moto/glue/exceptions.py
+++ b/moto/glue/exceptions.py
@@ -60,6 +60,11 @@ class JobNotFoundException(EntityNotFoundException):
super().__init__("Job %s not found." % job)
+class JobRunNotFoundException(EntityNotFoundException):
+ def __init__(self, job_run):
+ super().__init__("Job run %s not found." % job_run)
+
+
class VersionNotFoundException(EntityNotFoundException):
def __init__(self):
super().__init__("Version not found.")
diff --git a/moto/glue/models.py b/moto/glue/models.py
index ccf9691a4..fdb0d9dc1 100644
--- a/moto/glue/models.py
+++ b/moto/glue/models.py
@@ -1,20 +1,17 @@
import time
from collections import OrderedDict
from datetime import datetime
+from typing import List
from uuid import uuid4
from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict
-from moto.glue.exceptions import (
- CrawlerRunningException,
- CrawlerNotRunningException,
- SchemaVersionNotFoundFromSchemaVersionIdException,
- SchemaVersionNotFoundFromSchemaIdException,
- SchemaNotFoundException,
- SchemaVersionMetadataAlreadyExistsException,
-)
+from moto.moto_api import state_manager
+from moto.moto_api._internal.managed_state_model import ManagedState
from .exceptions import (
JsonRESTError,
+ CrawlerRunningException,
+ CrawlerNotRunningException,
CrawlerAlreadyExistsException,
CrawlerNotFoundException,
DatabaseAlreadyExistsException,
@@ -25,7 +22,12 @@ from .exceptions import (
PartitionNotFoundException,
VersionNotFoundException,
JobNotFoundException,
+ JobRunNotFoundException,
ConcurrentRunsExceededException,
+ SchemaVersionNotFoundFromSchemaVersionIdException,
+ SchemaVersionNotFoundFromSchemaIdException,
+ SchemaNotFoundException,
+ SchemaVersionMetadataAlreadyExistsException,
)
from .utils import PartitionFilter
from .glue_schema_registry_utils import (
@@ -78,6 +80,10 @@ class GlueBackend(BaseBackend):
self.num_schemas = 0
self.num_schema_versions = 0
+ state_manager.register_default_transition(
+ model_name="glue::job_run", transition={"progression": "immediate"}
+ )
+
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""Default VPC endpoint service."""
@@ -850,7 +856,7 @@ class FakeJob:
self.description = description
self.log_uri = log_uri
self.role = role
- self.execution_property = execution_property
+ self.execution_property = execution_property or {}
self.command = command
self.default_arguments = default_arguments
self.non_overridable_arguments = non_overridable_arguments
@@ -858,7 +864,6 @@ class FakeJob:
self.max_retries = max_retries
self.allocated_capacity = allocated_capacity
self.timeout = timeout
- self.state = "READY"
self.max_capacity = max_capacity
self.security_configuration = security_configuration
self.notification_property = notification_property
@@ -871,6 +876,8 @@ class FakeJob:
self.backend = backend
self.backend.tag_resource(self.arn, tags)
+ self.job_runs: List[FakeJobRun] = []
+
def get_name(self):
return self.name
@@ -899,20 +906,26 @@ class FakeJob:
}
def start_job_run(self):
- if self.state == "RUNNING":
+ running_jobs = len(
+ [jr for jr in self.job_runs if jr.status in ["STARTING", "RUNNING"]]
+ )
+ if running_jobs >= self.execution_property.get("MaxConcurrentRuns", 1):
raise ConcurrentRunsExceededException(
f"Job with name {self.name} already running"
)
fake_job_run = FakeJobRun(job_name=self.name)
- self.state = "RUNNING"
+ self.job_runs.append(fake_job_run)
return fake_job_run.job_run_id
def get_job_run(self, run_id):
- fake_job_run = FakeJobRun(job_name=self.name, job_run_id=run_id)
- return fake_job_run
+ for job_run in self.job_runs:
+ if job_run.job_run_id == run_id:
+ job_run.advance()
+ return job_run
+ raise JobRunNotFoundException(run_id)
-class FakeJobRun:
+class FakeJobRun(ManagedState):
def __init__(
self,
job_name: int,
@@ -922,6 +935,11 @@ class FakeJobRun:
timeout: int = None,
worker_type: str = "Standard",
):
+ ManagedState.__init__(
+ self,
+ model_name="glue::job_run",
+ transitions=[("STARTING", "RUNNING"), ("RUNNING", "SUCCEEDED")],
+ )
self.job_name = job_name
self.job_run_id = job_run_id
self.arguments = arguments
@@ -945,7 +963,7 @@ class FakeJobRun:
"StartedOn": self.started_on.isoformat(),
"LastModifiedOn": self.modified_on.isoformat(),
"CompletedOn": self.completed_on.isoformat(),
- "JobRunState": "SUCCEEDED",
+ "JobRunState": self.status,
"Arguments": self.arguments or {"runSpark": "spark -f test_file.py"},
"ErrorMessage": "",
"PredecessorRuns": [
diff --git a/moto/s3/models.py b/moto/s3/models.py
index bae01746d..fc66608c7 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -27,7 +27,6 @@ from moto.core.utils import (
BackendDict,
)
from moto.cloudwatch.models import MetricDatum
-from moto.iam.access_control import IAMPolicy, PermissionResult
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.tagging_service import TaggingService
@@ -911,6 +910,8 @@ class FakeBucket(CloudFormationModel):
def allow_action(self, action, resource):
if self.policy is None:
return False
+ from moto.iam.access_control import IAMPolicy, PermissionResult
+
iam_policy = IAMPolicy(self.policy.decode())
result = iam_policy.is_action_permitted(action, resource)
return result == PermissionResult.PERMITTED
diff --git a/tests/test_glue/test_glue.py b/tests/test_glue/test_glue.py
index 7adf95d4f..5e3fcbfce 100644
--- a/tests/test_glue/test_glue.py
+++ b/tests/test_glue/test_glue.py
@@ -85,76 +85,28 @@ def test_get_job_exists():
"GlueVersion": "string",
}
job_name = create_test_job_w_all_attributes(client, **job_attributes)
- response = client.get_job(JobName=job_name)
- assert response["Job"]["Name"] == job_name
- assert response["Job"]["Description"]
- assert response["Job"]["LogUri"]
- assert response["Job"]["Role"]
- assert response["Job"]["CreatedOn"]
- assert response["Job"]["LastModifiedOn"]
- assert response["Job"]["ExecutionProperty"]
- assert response["Job"]["Command"]
- assert response["Job"]["DefaultArguments"]
- assert response["Job"]["NonOverridableArguments"]
- assert response["Job"]["Connections"]
- assert response["Job"]["MaxRetries"]
- assert response["Job"]["AllocatedCapacity"]
- assert response["Job"]["Timeout"]
- assert response["Job"]["MaxCapacity"]
- assert response["Job"]["WorkerType"]
- assert response["Job"]["NumberOfWorkers"]
- assert response["Job"]["SecurityConfiguration"]
- assert response["Job"]["NotificationProperty"]
- assert response["Job"]["GlueVersion"]
-
-
-@mock_glue
-def test_start_job_run():
- client = create_glue_client()
- job_name = create_test_job(client)
- response = client.start_job_run(JobName=job_name)
- assert response["JobRunId"]
-
-
-@mock_glue
-def test_start_job_run_already_running():
- client = create_glue_client()
- job_name = create_test_job(client)
- client.start_job_run(JobName=job_name)
- with pytest.raises(ClientError) as exc:
- client.start_job_run(JobName=job_name)
- exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
- exc.value.response["Error"]["Message"].should.match(
- f"Job with name {job_name} already running"
- )
-
-
-@mock_glue
-def test_get_job_run():
- client = create_glue_client()
- job_name = create_test_job(client)
- response = client.get_job_run(JobName=job_name, RunId="01")
- assert response["JobRun"]["Id"]
- assert response["JobRun"]["Attempt"]
- assert response["JobRun"]["PreviousRunId"]
- assert response["JobRun"]["TriggerName"]
- assert response["JobRun"]["StartedOn"]
- assert response["JobRun"]["LastModifiedOn"]
- assert response["JobRun"]["CompletedOn"]
- assert response["JobRun"]["JobRunState"]
- assert response["JobRun"]["Arguments"]
- assert response["JobRun"]["ErrorMessage"] == ""
- assert response["JobRun"]["PredecessorRuns"]
- assert response["JobRun"]["AllocatedCapacity"]
- assert response["JobRun"]["ExecutionTime"]
- assert response["JobRun"]["Timeout"]
- assert response["JobRun"]["MaxCapacity"]
- assert response["JobRun"]["WorkerType"]
- assert response["JobRun"]["NumberOfWorkers"]
- assert response["JobRun"]["SecurityConfiguration"]
- assert response["JobRun"]["LogGroupName"]
- assert response["JobRun"]["NotificationProperty"]
- assert response["JobRun"]["GlueVersion"]
+ job = client.get_job(JobName=job_name)["Job"]
+ job.should.have.key("Name").equals(job_name)
+ job.should.have.key("Description")
+ job.should.have.key("LogUri")
+ job.should.have.key("Role")
+ job.should.have.key("ExecutionProperty").equals({"MaxConcurrentRuns": 123})
+ job.should.have.key("CreatedOn")
+ job.should.have.key("LastModifiedOn")
+ job.should.have.key("ExecutionProperty")
+ job.should.have.key("Command")
+ job.should.have.key("DefaultArguments")
+ job.should.have.key("NonOverridableArguments")
+ job.should.have.key("Connections")
+ job.should.have.key("MaxRetries")
+ job.should.have.key("AllocatedCapacity")
+ job.should.have.key("Timeout")
+ job.should.have.key("MaxCapacity")
+ job.should.have.key("WorkerType")
+ job.should.have.key("NumberOfWorkers")
+ job.should.have.key("SecurityConfiguration")
+ job.should.have.key("NotificationProperty")
+ job.should.have.key("GlueVersion")
@mock_glue
diff --git a/tests/test_glue/test_glue_job_runs.py b/tests/test_glue/test_glue_job_runs.py
new file mode 100644
index 000000000..98e323a9a
--- /dev/null
+++ b/tests/test_glue/test_glue_job_runs.py
@@ -0,0 +1,140 @@
+import pytest
+import sure # noqa # pylint: disable=unused-import
+from botocore.client import ClientError
+from unittest import SkipTest
+
+from moto import mock_glue, settings
+from moto.moto_api import state_manager
+from .test_glue import create_test_job, create_glue_client
+
+
+@mock_glue
+def test_start_job_run():
+ client = create_glue_client()
+ job_name = create_test_job(client)
+ response = client.start_job_run(JobName=job_name)
+ assert response["JobRunId"]
+
+
+@mock_glue
+def test_start_job_run__multiple_runs_allowed():
+ if settings.TEST_SERVER_MODE:
+ raise SkipTest("Can't set transition directly in ServerMode")
+
+ state_manager.set_transition(
+ model_name="glue::job_run", transition={"progression": "manual", "times": 2}
+ )
+
+ glue = create_glue_client()
+ glue.create_job(
+ Name="somejobname",
+ Role="some-role",
+ ExecutionProperty={"MaxConcurrentRuns": 5},
+ Command={
+ "Name": "some-name",
+ "ScriptLocation": "some-location",
+ "PythonVersion": "some-version",
+ },
+ )
+ for _ in range(5):
+ glue.start_job_run(JobName="somejobname")
+
+ # The 6th should fail
+ with pytest.raises(ClientError) as exc:
+ glue.start_job_run(JobName="somejobname")
+ exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
+ exc.value.response["Error"]["Message"].should.match(
+ "Job with name somejobname already running"
+ )
+
+
+@mock_glue
+def test_start_job_run__single_run_allowed():
+ if settings.TEST_SERVER_MODE:
+ raise SkipTest("Can't set transition directly in ServerMode")
+
+ state_manager.set_transition(
+ model_name="glue::job_run", transition={"progression": "manual", "times": 2}
+ )
+
+ client = create_glue_client()
+ job_name = create_test_job(client)
+ client.start_job_run(JobName=job_name)
+ with pytest.raises(ClientError) as exc:
+ client.start_job_run(JobName=job_name)
+ exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
+ exc.value.response["Error"]["Message"].should.match(
+ f"Job with name {job_name} already running"
+ )
+
+
+@mock_glue
+def test_get_job_run():
+ state_manager.unset_transition("glue::job_run")
+ client = create_glue_client()
+ job_name = create_test_job(client)
+ job_run_id = client.start_job_run(JobName=job_name)["JobRunId"]
+
+ response = client.get_job_run(JobName=job_name, RunId=job_run_id)
+ response["JobRun"].should.have.key("Id").equals(job_run_id)
+ assert response["JobRun"]["Attempt"]
+ assert response["JobRun"]["PreviousRunId"]
+ assert response["JobRun"]["TriggerName"]
+ assert response["JobRun"]["StartedOn"]
+ assert response["JobRun"]["LastModifiedOn"]
+ assert response["JobRun"]["CompletedOn"]
+ response["JobRun"].should.have.key("JobRunState").equals("SUCCEEDED")
+ assert response["JobRun"]["Arguments"]
+ assert response["JobRun"]["ErrorMessage"] == ""
+ assert response["JobRun"]["PredecessorRuns"]
+ assert response["JobRun"]["AllocatedCapacity"]
+ assert response["JobRun"]["ExecutionTime"]
+ assert response["JobRun"]["Timeout"]
+ assert response["JobRun"]["MaxCapacity"]
+ assert response["JobRun"]["WorkerType"]
+ assert response["JobRun"]["NumberOfWorkers"]
+ assert response["JobRun"]["SecurityConfiguration"]
+ assert response["JobRun"]["LogGroupName"]
+ assert response["JobRun"]["NotificationProperty"]
+ assert response["JobRun"]["GlueVersion"]
+
+
+@mock_glue
+def test_get_job_run_that_doesnt_exist():
+ client = create_glue_client()
+ job_name = create_test_job(client)
+ with pytest.raises(ClientError) as exc:
+ client.get_job_run(JobName=job_name, RunId="unknown")
+ err = exc.value.response["Error"]
+ err["Code"].should.equal("EntityNotFoundException")
+
+
+@mock_glue
+def test_job_run_transition():
+ if settings.TEST_SERVER_MODE:
+ raise SkipTest("Can't set transition directly in ServerMode")
+
+ state_manager.set_transition(
+ model_name="glue::job_run", transition={"progression": "manual", "times": 2}
+ )
+
+ client = create_glue_client()
+ job_name = create_test_job(client)
+ # set transition
+ run_id = client.start_job_run(JobName=job_name)["JobRunId"]
+
+ # The job should change over time
+ expect_job_state(client, job_name, run_id, expected_state="STARTING")
+ expect_job_state(client, job_name, run_id, expected_state="RUNNING")
+ expect_job_state(client, job_name, run_id, expected_state="RUNNING")
+ # But finishes afterwards
+ expect_job_state(client, job_name, run_id, expected_state="SUCCEEDED")
+
+ # unset transition
+ state_manager.unset_transition("glue::job_run")
+
+
+def expect_job_state(client, job_name, run_id, expected_state):
+ client.get_job_run(JobName=job_name, RunId=run_id)["JobRun"][
+ "JobRunState"
+ ].should.equal(expected_state)