From 3b9e6261f9d0f816bae76547c42acf4f16a36482 Mon Sep 17 00:00:00 2001
From: Andor Markus <51825189+andormarkus@users.noreply.github.com>
Date: Mon, 1 Nov 2021 11:36:38 +0100
Subject: [PATCH] Feature - emr-containers - jobs (#4515)
---
IMPLEMENTATION_COVERAGE.md | 8 +-
docs/index.rst | 2 +
moto/emrcontainers/exceptions.py | 9 +-
moto/emrcontainers/models.py | 207 ++++++-
moto/emrcontainers/responses.py | 64 +++
moto/emrcontainers/urls.py | 4 +-
moto/emrcontainers/utils.py | 10 +-
.../test_emrcontainers/test_emrcontainers.py | 523 ++++++++++++++----
8 files changed, 703 insertions(+), 124 deletions(-)
diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md
index edbf508b4..2c298934c 100644
--- a/IMPLEMENTATION_COVERAGE.md
+++ b/IMPLEMENTATION_COVERAGE.md
@@ -1921,19 +1921,19 @@
26% implemented
-- [ ] cancel_job_run
+- [X] cancel_job_run
- [ ] create_managed_endpoint
- [X] create_virtual_cluster
- [ ] delete_managed_endpoint
- [X] delete_virtual_cluster
-- [ ] describe_job_run
+- [X] describe_job_run
- [ ] describe_managed_endpoint
- [X] describe_virtual_cluster
-- [ ] list_job_runs
+- [X] list_job_runs
- [ ] list_managed_endpoints
- [ ] list_tags_for_resource
- [X] list_virtual_clusters
-- [ ] start_job_run
+- [X] start_job_run
- [ ] tag_resource
- [ ] untag_resource
diff --git a/docs/index.rst b/docs/index.rst
index fae4a3a7c..473b9ac2d 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -64,6 +64,8 @@ Currently implemented Services:
+---------------------------+-----------------------+------------------------------------+
| EMR | @mock_emr | core endpoints done |
+---------------------------+-----------------------+------------------------------------+
+| EMRContainers | @mock_emrcontainers | core endpoints done |
++---------------------------+-----------------------+------------------------------------+
| Firehose | @mock_firehose | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| Forecast | @mock_forecast | basic endpoints done |
diff --git a/moto/emrcontainers/exceptions.py b/moto/emrcontainers/exceptions.py
index b464eb92e..202541bb5 100644
--- a/moto/emrcontainers/exceptions.py
+++ b/moto/emrcontainers/exceptions.py
@@ -1,2 +1,9 @@
"""Exceptions raised by the emrcontainers service."""
-# from moto.core.exceptions import JsonRESTError
+from moto.core.exceptions import JsonRESTError
+
+
+class ResourceNotFoundException(JsonRESTError):
+ code = 400
+
+ def __init__(self, resource):
+ super().__init__("ResourceNotFoundException", resource)
diff --git a/moto/emrcontainers/models.py b/moto/emrcontainers/models.py
index 7312b15f6..e14a1efde 100644
--- a/moto/emrcontainers/models.py
+++ b/moto/emrcontainers/models.py
@@ -1,13 +1,14 @@
"""EMRContainersBackend class with methods for supported APIs."""
+import re
from datetime import datetime
from boto3 import Session
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID
from moto.core.utils import iso_8601_datetime_without_milliseconds
-from .utils import random_cluster_id, get_partition, paginated_list
+from .utils import random_cluster_id, random_job_id, get_partition, paginated_list
+from .exceptions import ResourceNotFoundException
-# String Templates
from ..config.exceptions import ValidationException
VIRTUAL_CLUSTER_ARN_TEMPLATE = (
@@ -16,8 +17,15 @@ VIRTUAL_CLUSTER_ARN_TEMPLATE = (
+ ":/virtualclusters/{virtual_cluster_id}"
)
+JOB_ARN_TEMPLATE = (
+ "arn:{partition}:emr-containers:{region}:"
+ + str(ACCOUNT_ID)
+ + ":/virtualclusters/{virtual_cluster_id}/jobruns/{job_id}"
+)
+
# Defaults used for creating a Virtual cluster
-ACTIVE_STATUS = "ACTIVE"
+VIRTUAL_CLUSTER_STATUS = "RUNNING"
+JOB_STATUS = "RUNNING"
class FakeCluster(BaseModel):
@@ -38,7 +46,7 @@ class FakeCluster(BaseModel):
self.arn = VIRTUAL_CLUSTER_ARN_TEMPLATE.format(
partition=aws_partition, region=region_name, virtual_cluster_id=self.id
)
- self.state = ACTIVE_STATUS
+ self.state = VIRTUAL_CLUSTER_STATUS
self.container_provider = container_provider
self.container_provider_id = container_provider["id"]
self.namespace = container_provider["info"]["eksInfo"]["namespace"]
@@ -70,6 +78,85 @@ class FakeCluster(BaseModel):
}
+class FakeJob(BaseModel):
+ def __init__(
+ self,
+ name,
+ virtual_cluster_id,
+ client_token,
+ execution_role_arn,
+ release_label,
+ job_driver,
+ configuration_overrides,
+ region_name,
+ aws_partition,
+ tags,
+ ):
+ self.id = random_job_id()
+ self.name = name
+ self.virtual_cluster_id = virtual_cluster_id
+ self.arn = JOB_ARN_TEMPLATE.format(
+ partition=aws_partition,
+ region=region_name,
+ virtual_cluster_id=self.virtual_cluster_id,
+ job_id=self.id,
+ )
+ self.state = JOB_STATUS
+ self.client_token = client_token
+ self.execution_role_arn = execution_role_arn
+ self.release_label = release_label
+ self.job_driver = job_driver
+ self.configuration_overrides = configuration_overrides
+ self.created_at = iso_8601_datetime_without_milliseconds(
+ datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
+ )
+ self.created_by = None
+ self.finished_at = None
+ self.state_details = None
+ self.failure_reason = None
+ self.tags = tags
+
+ def __iter__(self):
+ yield "id", self.id
+ yield "name", self.name
+ yield "virtualClusterId", self.virtual_cluster_id
+ yield "arn", self.arn
+ yield "state", self.state
+ yield "clientToken", self.client_token
+ yield "executionRoleArn", self.execution_role_arn
+ yield "releaseLabel", self.release_label
+ yield "configurationOverrides", self.release_label
+ yield "jobDriver", self.job_driver
+ yield "createdAt", self.created_at
+ yield "createdBy", self.created_by
+ yield "finishedAt", self.finished_at
+ yield "stateDetails", self.state_details
+ yield "failureReason", self.failure_reason
+ yield "tags", self.tags
+
+ def to_dict(self):
+ # Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeJobRun.html
+ # (response syntax section)
+ return {
+ "id": self.id,
+ "name": self.name,
+ "virtualClusterId": self.virtual_cluster_id,
+ "arn": self.arn,
+ "state": self.state,
+ "clientToken": self.client_token,
+ "executionRoleArn": self.execution_role_arn,
+ "releaseLabel": self.release_label,
+ "configurationOverrides": self.configuration_overrides,
+ "jobDriver": self.job_driver,
+ "createdAt": self.created_at,
+ "createdBy": self.created_by,
+ "finishedAt": self.finished_at,
+ "stateDetails": self.state_details,
+ "failureReason": self.failure_reason,
+ "tags": self.tags,
+ }
+
+
class EMRContainersBackend(BaseBackend):
"""Implementation of EMRContainers APIs."""
@@ -77,6 +164,8 @@ class EMRContainersBackend(BaseBackend):
super(EMRContainersBackend, self).__init__()
self.virtual_clusters = dict()
self.virtual_cluster_count = 0
+ self.jobs = dict()
+ self.job_count = 0
self.region_name = region_name
self.partition = get_partition(region_name)
@@ -173,8 +262,116 @@ class EMRContainersBackend(BaseBackend):
for virtual_cluster in virtual_clusters
if virtual_cluster["state"] in states
]
+ sort_key = "name"
+ return paginated_list(virtual_clusters, sort_key, max_results, next_token)
- return paginated_list(virtual_clusters, max_results, next_token)
+ def start_job_run(
+ self,
+ name,
+ virtual_cluster_id,
+ client_token,
+ execution_role_arn,
+ release_label,
+ job_driver,
+ configuration_overrides,
+ tags,
+ ):
+
+ if virtual_cluster_id not in self.virtual_clusters.keys():
+ raise ResourceNotFoundException(
+ f"Virtual cluster {virtual_cluster_id} doesn't exist."
+ )
+
+ if not re.match(
+ r"emr-[0-9]{1}\.[0-9]{1,2}\.0-(latest|[0-9]{8})", release_label
+ ):
+ raise ResourceNotFoundException(f"Release {release_label} doesn't exist.")
+
+ job = FakeJob(
+ name=name,
+ virtual_cluster_id=virtual_cluster_id,
+ client_token=client_token,
+ execution_role_arn=execution_role_arn,
+ release_label=release_label,
+ job_driver=job_driver,
+ configuration_overrides=configuration_overrides,
+ tags=tags,
+ region_name=self.region_name,
+ aws_partition=self.partition,
+ )
+
+ self.jobs[job.id] = job
+ self.job_count += 1
+ return job
+
+ def cancel_job_run(self, id, virtual_cluster_id):
+
+ if not re.match(r"[a-z,A-Z,0-9]{19}", id):
+ raise ValidationException("Invalid job run short id")
+
+ if id not in self.jobs.keys():
+ raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
+
+ if virtual_cluster_id != self.jobs[id].virtual_cluster_id:
+ raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
+
+ if self.jobs[id].state in [
+ "FAILED",
+ "CANCELLED",
+ "CANCEL_PENDING",
+ "COMPLETED",
+ ]:
+ raise ValidationException(f"Job run {id} is not in a cancellable state")
+
+ job = self.jobs[id]
+ job.state = "CANCELLED"
+ job.finished_at = iso_8601_datetime_without_milliseconds(
+ datetime.today().replace(hour=0, minute=1, second=0, microsecond=0)
+ )
+ job.state_details = "JobRun CANCELLED successfully."
+
+ return job
+
+ def list_job_runs(
+ self,
+ virtual_cluster_id,
+ created_before,
+ created_after,
+ name,
+ states,
+ max_results,
+ next_token,
+ ):
+ jobs = [job.to_dict() for job in self.jobs.values()]
+
+ jobs = [job for job in jobs if job["virtualClusterId"] == virtual_cluster_id]
+
+ if created_after:
+ jobs = [job for job in jobs if job["createdAt"] >= created_after]
+
+ if created_before:
+ jobs = [job for job in jobs if job["createdAt"] <= created_before]
+
+ if states:
+ jobs = [job for job in jobs if job["state"] in states]
+
+ if name:
+ jobs = [job for job in jobs if job["name"] in name]
+
+ sort_key = "id"
+ return paginated_list(jobs, sort_key, max_results, next_token)
+
+ def describe_job_run(self, id, virtual_cluster_id):
+ if not re.match(r"[a-z,A-Z,0-9]{19}", id):
+ raise ValidationException("Invalid job run short id")
+
+ if id not in self.jobs.keys():
+ raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
+
+ if virtual_cluster_id != self.jobs[id].virtual_cluster_id:
+ raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
+
+ return self.jobs[id].to_dict()
emrcontainers_backends = {}
diff --git a/moto/emrcontainers/responses.py b/moto/emrcontainers/responses.py
index d6fd67820..2fa540193 100644
--- a/moto/emrcontainers/responses.py
+++ b/moto/emrcontainers/responses.py
@@ -69,3 +69,67 @@ class EMRContainersResponse(BaseResponse):
response = {"virtualClusters": virtual_clusters, "nextToken": next_token}
return 200, {}, json.dumps(response)
+
+ def start_job_run(self):
+ name = self._get_param("name")
+ virtual_cluster_id = self._get_param("virtualClusterId")
+ client_token = self._get_param("clientToken")
+ execution_role_arn = self._get_param("executionRoleArn")
+ release_label = self._get_param("releaseLabel")
+ job_driver = self._get_param("jobDriver")
+ configuration_overrides = self._get_param("configurationOverrides")
+ tags = self._get_param("tags")
+
+ job = self.emrcontainers_backend.start_job_run(
+ name=name,
+ virtual_cluster_id=virtual_cluster_id,
+ client_token=client_token,
+ execution_role_arn=execution_role_arn,
+ release_label=release_label,
+ job_driver=job_driver,
+ configuration_overrides=configuration_overrides,
+ tags=tags,
+ )
+ return 200, {}, json.dumps(dict(job))
+
+ def cancel_job_run(self):
+ id = self._get_param("jobRunId")
+ virtual_cluster_id = self._get_param("virtualClusterId")
+
+ job = self.emrcontainers_backend.cancel_job_run(
+ id=id, virtual_cluster_id=virtual_cluster_id,
+ )
+ return 200, {}, json.dumps(dict(job))
+
+ def list_job_runs(self):
+ virtual_cluster_id = self._get_param("virtualClusterId")
+ created_before = self._get_param("createdBefore")
+ created_after = self._get_param("createdAfter")
+ name = self._get_param("name")
+ states = self.querystring.get("states", [])
+ max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
+ next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
+
+ job_runs, next_token = self.emrcontainers_backend.list_job_runs(
+ virtual_cluster_id=virtual_cluster_id,
+ created_before=created_before,
+ created_after=created_after,
+ name=name,
+ states=states,
+ max_results=max_results,
+ next_token=next_token,
+ )
+
+ response = {"jobRuns": job_runs, "nextToken": next_token}
+ return 200, {}, json.dumps(response)
+
+ def describe_job_run(self):
+ id = self._get_param("jobRunId")
+ virtual_cluster_id = self._get_param("virtualClusterId")
+
+ job_run = self.emrcontainers_backend.describe_job_run(
+ id=id, virtual_cluster_id=virtual_cluster_id,
+ )
+
+ response = {"jobRun": job_run}
+ return 200, {}, json.dumps(response)
diff --git a/moto/emrcontainers/urls.py b/moto/emrcontainers/urls.py
index 0883f3641..ce5d974d8 100644
--- a/moto/emrcontainers/urls.py
+++ b/moto/emrcontainers/urls.py
@@ -8,5 +8,7 @@ url_bases = [
url_paths = {
"{0}/virtualclusters$": EMRContainersResponse.dispatch,
- "{0}/virtualclusters/(?P[^/]+)/?$": EMRContainersResponse.dispatch,
+ "{0}/virtualclusters/(?P[^/]+)$": EMRContainersResponse.dispatch,
+ "{0}/virtualclusters/(?P[^/]+)/jobruns$": EMRContainersResponse.dispatch,
+ "{0}/virtualclusters/(?P[^/]+)/jobruns/(?P[^/]+)$": EMRContainersResponse.dispatch,
}
diff --git a/moto/emrcontainers/utils.py b/moto/emrcontainers/utils.py
index eeaf241f1..07276318a 100644
--- a/moto/emrcontainers/utils.py
+++ b/moto/emrcontainers/utils.py
@@ -23,11 +23,15 @@ def random_id(size=13):
return "".join(str(random.choice(chars)) for x in range(size))
-def random_cluster_id(size=13):
+def random_cluster_id():
return random_id(size=25)
-def paginated_list(full_list, max_results, next_token):
+def random_job_id():
+ return random_id(size=19)
+
+
+def paginated_list(full_list, sort_key, max_results, next_token):
"""
Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results
number of elements, and the new next_token which can be passed back in for the next segment of the full list.
@@ -36,7 +40,7 @@ def paginated_list(full_list, max_results, next_token):
next_token = 0
next_token = int(next_token)
- sorted_list = sorted(full_list, key=lambda d: d["name"])
+ sorted_list = sorted(full_list, key=lambda d: d[sort_key])
values = sorted_list[next_token : next_token + max_results]
if len(values) == max_results:
diff --git a/tests/test_emrcontainers/test_emrcontainers.py b/tests/test_emrcontainers/test_emrcontainers.py
index c26adc235..e9f1e9d28 100644
--- a/tests/test_emrcontainers/test_emrcontainers.py
+++ b/tests/test_emrcontainers/test_emrcontainers.py
@@ -6,7 +6,7 @@ from unittest import SkipTest
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
-from botocore.exceptions import ClientError
+from botocore.exceptions import ClientError, ParamValidationError
from moto import mock_emrcontainers, settings
from moto.core import ACCOUNT_ID
@@ -29,8 +29,10 @@ def virtual_cluster_factory(client):
cluster_state = ["RUNNING", "TERMINATING", "TERMINATED", "ARRESTED"]
cluster_list = []
- for i in range(4):
- with patch("moto.emrcontainers.models.ACTIVE_STATUS", cluster_state[i]):
+ for i in range(len(cluster_state)):
+ with patch(
+ "moto.emrcontainers.models.VIRTUAL_CLUSTER_STATUS", cluster_state[i]
+ ):
resp = client.create_virtual_cluster(
name="test-emr-virtual-cluster",
containerProvider={
@@ -45,6 +47,44 @@ def virtual_cluster_factory(client):
yield cluster_list
+@pytest.fixture()
+def job_factory(client, virtual_cluster_factory):
+ virtual_cluster_id = virtual_cluster_factory[0]
+ default_job_driver = {
+ "sparkSubmitJobDriver": {
+ "entryPoint": "s3://code/pi.py",
+ "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4",
+ }
+ }
+ default_execution_role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/iamrole-emrcontainers"
+ default_release_label = "emr-6.3.0-latest"
+
+ job_state = [
+ "PENDING",
+ "SUBMITTED",
+ "RUNNING",
+ "FAILED",
+ "CANCELLED",
+ "CANCEL_PENDING",
+ "COMPLETED",
+ ]
+
+ job_list = []
+ for i in range(len(job_state)):
+ with patch("moto.emrcontainers.models.JOB_STATUS", job_state[i]):
+ resp = client.start_job_run(
+ name=f"test_job_{i}",
+ virtualClusterId=virtual_cluster_id,
+ executionRoleArn=default_execution_role_arn,
+ releaseLabel=default_release_label,
+ jobDriver=default_job_driver,
+ )
+
+ job_list.append(resp["id"])
+
+ yield job_list
+
+
class TestCreateVirtualCluster:
@staticmethod
@mock_emrcontainers
@@ -157,117 +197,55 @@ class TestDescribeVirtualCluster:
class TestListVirtualClusters:
+ today = datetime.now()
+ yesterday = today - timedelta(days=1)
+ tomorrow = today + timedelta(days=1)
+
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory):
self.client = client
- def test_base(self):
- resp = self.client.list_virtual_clusters()
- assert len(resp["virtualClusters"]) == 4
-
- def test_valid_container_provider_id(self):
- resp = self.client.list_virtual_clusters(containerProviderId="test-eks-cluster")
- assert len(resp["virtualClusters"]) == 4
-
- def test_invalid_container_provider_id(self):
- resp = self.client.list_virtual_clusters(containerProviderId="foobaa")
- assert len(resp["virtualClusters"]) == 0
-
- def test_valid_container_provider_type(self):
- resp = self.client.list_virtual_clusters(containerProviderType="EKS")
- assert len(resp["virtualClusters"]) == 4
-
- def test_invalid_container_provider_type(self):
- resp = self.client.list_virtual_clusters(containerProviderType="AKS")
- assert len(resp["virtualClusters"]) == 0
-
- def test_created_after_yesterday(self):
- today = datetime.now()
- yesterday = today - timedelta(days=1)
- resp = self.client.list_virtual_clusters(createdAfter=yesterday)
- assert len(resp["virtualClusters"]) == 4
-
- def test_created_after_tomorrow(self):
- today = datetime.now()
- tomorrow = today + timedelta(days=1)
- resp = self.client.list_virtual_clusters(createdAfter=tomorrow)
- assert len(resp["virtualClusters"]) == 0
-
- def test_created_after_yesterday_running_state(self):
- today = datetime.now()
- yesterday = today - timedelta(days=1)
- resp = self.client.list_virtual_clusters(
- createdAfter=yesterday, states=["RUNNING"]
- )
- assert len(resp["virtualClusters"]) == 1
-
- def test_created_after_tomorrow_running_state(self):
- today = datetime.now()
- tomorrow = today + timedelta(days=1)
- resp = self.client.list_virtual_clusters(
- createdAfter=tomorrow, states=["RUNNING"]
- )
- assert len(resp["virtualClusters"]) == 0
-
- def test_created_after_yesterday_two_state_limit(self):
- today = datetime.now()
- yesterday = today - timedelta(days=1)
- resp = self.client.list_virtual_clusters(
- createdAfter=yesterday, states=["RUNNING", "TERMINATED"], maxResults=1
- )
- assert len(resp["virtualClusters"]) == 1
-
- def test_created_before_yesterday(self):
- today = datetime.now()
- yesterday = today - timedelta(days=1)
- resp = self.client.list_virtual_clusters(createdBefore=yesterday)
- assert len(resp["virtualClusters"]) == 0
-
- def test_created_before_tomorrow(self):
- today = datetime.now()
- tomorrow = today + timedelta(days=1)
- resp = self.client.list_virtual_clusters(createdBefore=tomorrow)
- assert len(resp["virtualClusters"]) == 4
-
- def test_created_before_yesterday_running_state(self):
- today = datetime.now()
- yesterday = today - timedelta(days=1)
- resp = self.client.list_virtual_clusters(
- createdBefore=yesterday, states=["RUNNING"]
- )
- assert len(resp["virtualClusters"]) == 0
-
- def test_created_before_tomorrow_running_state(self):
- today = datetime.now()
- tomorrow = today + timedelta(days=1)
- resp = self.client.list_virtual_clusters(
- createdBefore=tomorrow, states=["RUNNING"]
- )
- assert len(resp["virtualClusters"]) == 1
-
- def test_created_before_tomorrow_two_state_limit(self):
- today = datetime.now()
- tomorrow = today + timedelta(days=1)
- resp = self.client.list_virtual_clusters(
- createdBefore=tomorrow, states=["RUNNING", "TERMINATED"], maxResults=1
- )
- assert len(resp["virtualClusters"]) == 1
-
- def test_states_one_state(self):
- resp = self.client.list_virtual_clusters(states=["RUNNING"])
- assert len(resp["virtualClusters"]) == 1
-
- def test_states_two_state(self):
- resp = self.client.list_virtual_clusters(states=["RUNNING", "TERMINATED"])
- assert len(resp["virtualClusters"]) == 2
-
- def test_states_invalid_state(self):
- resp = self.client.list_virtual_clusters(states=["FOOBAA"])
- assert len(resp["virtualClusters"]) == 0
-
- def test_max_result(self):
- resp = self.client.list_virtual_clusters(maxResults=1)
- assert len(resp["virtualClusters"]) == 1
+ @pytest.mark.parametrize(
+ "list_virtual_clusters_args,job_count",
+ [
+ ({}, 4),
+ ({"containerProviderId": "test-eks-cluster"}, 4),
+ ({"containerProviderId": "foobaa"}, 0),
+ ({"containerProviderType": "EKS"}, 4),
+ ({"containerProviderType": "AKS"}, 0),
+ ({"createdAfter": yesterday}, 4),
+ ({"createdAfter": tomorrow}, 0),
+ ({"createdAfter": yesterday, "states": ["RUNNING"]}, 1),
+ ({"createdAfter": tomorrow, "states": ["RUNNING"]}, 0),
+ (
+ {
+ "createdAfter": yesterday,
+ "states": ["RUNNING", "TERMINATED"],
+ "maxResults": 1,
+ },
+ 1,
+ ),
+ ({"createdBefore": yesterday}, 0),
+ ({"createdBefore": tomorrow}, 4),
+ ({"createdBefore": yesterday, "states": ["RUNNING"]}, 0),
+ ({"createdBefore": tomorrow, "states": ["RUNNING"]}, 1),
+ (
+ {
+ "createdBefore": tomorrow,
+ "states": ["RUNNING", "TERMINATED"],
+ "maxResults": 1,
+ },
+ 1,
+ ),
+ ({"states": ["RUNNING"]}, 1),
+ ({"states": ["RUNNING", "TERMINATED"]}, 2),
+ ({"states": ["FOOBAA"]}, 0),
+ ({"maxResults": 1}, 1),
+ ],
+ )
+ def test_base(self, list_virtual_clusters_args, job_count):
+ resp = self.client.list_virtual_clusters(**list_virtual_clusters_args)
+ assert len(resp["virtualClusters"]) == job_count
def test_next_token(self):
resp = self.client.list_virtual_clusters(maxResults=2)
@@ -275,3 +253,328 @@ class TestListVirtualClusters:
resp = self.client.list_virtual_clusters(nextToken=resp["nextToken"])
assert len(resp["virtualClusters"]) == 2
+
+
+class TestStartJobRun:
+ default_job_driver = {
+ "sparkSubmitJobDriver": {
+ "entryPoint": "s3://code/pi.py",
+ "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4",
+ }
+ }
+
+ default_configuration_overrides = {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.dynamicAllocation.enabled": "false"},
+ }
+ ],
+ "monitoringConfiguration": {
+ "cloudWatchMonitoringConfiguration": {
+ "logGroupName": "/emr-containers/jobs",
+ "logStreamNamePrefix": "demo",
+ },
+ "s3MonitoringConfiguration": {"logUri": "s3://joblogs"},
+ },
+ }
+
+ default_execution_role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/iamrole-emrcontainers"
+
+ default_release_label = "emr-6.3.0-latest"
+
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, virtual_cluster_factory):
+ self.client = client
+ self.virtual_cluster_id = virtual_cluster_factory[0]
+
+ def test_start(self):
+ resp = self.client.start_job_run(
+ name="test_job",
+ virtualClusterId=self.virtual_cluster_id,
+ executionRoleArn=self.default_execution_role_arn,
+ releaseLabel=self.default_release_label,
+ jobDriver=self.default_job_driver,
+ configurationOverrides=self.default_configuration_overrides,
+ )
+
+ assert re.match(r"[a-z,0-9]{19}", resp["id"])
+ assert resp["name"] == "test_job"
+ assert (
+ resp["arn"]
+ == f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/virtualclusters/{self.virtual_cluster_id}/jobruns/{resp['id']}"
+ )
+ assert resp["virtualClusterId"] == self.virtual_cluster_id
+
+ def test_invalid_execution_role_arn(self):
+ with pytest.raises(ParamValidationError) as exc:
+ self.client.start_job_run(
+ name="test_job",
+ virtualClusterId="foobaa",
+ executionRoleArn="foobaa",
+ releaseLabel="foobaa",
+ jobDriver={},
+ )
+
+ assert exc.typename == "ParamValidationError"
+ assert (
+ "Parameter validation failed:\nInvalid length for parameter executionRoleArn, value: 6, valid min length: 20"
+ in exc.value.args
+ )
+
+ def test_invalid_virtual_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.start_job_run(
+ name="test_job",
+ virtualClusterId="foobaa",
+ executionRoleArn=self.default_execution_role_arn,
+ releaseLabel="foobaa",
+ jobDriver={},
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Virtual cluster foobaa doesn't exist."
+
+ def test_invalid_release_label(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.start_job_run(
+ name="test_job",
+ virtualClusterId=self.virtual_cluster_id,
+ executionRoleArn=self.default_execution_role_arn,
+ releaseLabel="foobaa",
+ jobDriver={},
+ )
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Release foobaa doesn't exist."
+
+
+class TestCancelJobRun:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, virtual_cluster_factory, job_factory):
+ self.client = client
+ self.virtual_cluster_id = virtual_cluster_factory[0]
+ self.job_list = job_factory
+
+ def test_valid_id_valid_cluster_id(self):
+ resp = self.client.cancel_job_run(
+ id=self.job_list[2], virtualClusterId=self.virtual_cluster_id
+ )
+
+ assert resp["id"] == self.job_list[2]
+ assert resp["virtualClusterId"] == self.virtual_cluster_id
+
+ def test_invalid_id_invalid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.cancel_job_run(id="foobaa", virtualClusterId="foobaa")
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "Invalid job run short id"
+
+ def test_invalid_id_valid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.cancel_job_run(
+ id="foobaa", virtualClusterId=self.virtual_cluster_id
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "Invalid job run short id"
+
+ def test_valid_id_invalid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.cancel_job_run(id=self.job_list[0], virtualClusterId="foobaa")
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == f"Job run {self.job_list[0]} doesn't exist."
+
+ def test_non_existing_id_invalid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.cancel_job_run(
+ id="123456789abcdefghij", virtualClusterId=self.virtual_cluster_id
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == f"Job run 123456789abcdefghij doesn't exist."
+
+ def test_wrong_job_state(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.cancel_job_run(
+ id=self.job_list[6], virtualClusterId=self.virtual_cluster_id
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == f"Job run {self.job_list[6]} is not in a cancellable state"
+ )
+
+
+class TestListJobRuns:
+ today = datetime.now()
+ yesterday = today - timedelta(days=1)
+ tomorrow = today + timedelta(days=1)
+
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, virtual_cluster_factory, job_factory):
+ self.client = client
+ self.virtual_cluster_id = virtual_cluster_factory[0]
+
+ @pytest.mark.parametrize(
+ "list_job_runs_arg,job_count",
+ [
+ ({}, 7),
+ ({"createdAfter": yesterday}, 7),
+ ({"createdAfter": tomorrow}, 0),
+ ({"createdAfter": yesterday, "states": ["RUNNING"]}, 1),
+ ({"createdAfter": tomorrow, "states": ["RUNNING"]}, 0),
+ (
+ {
+ "createdAfter": yesterday,
+ "states": ["RUNNING", "TERMINATED"],
+ "maxResults": 1,
+ },
+ 1,
+ ),
+ ({"createdBefore": yesterday}, 0),
+ ({"createdBefore": tomorrow}, 7),
+ (
+ {
+ "createdBefore": tomorrow,
+ "states": ["RUNNING", "TERMINATED"],
+ "maxResults": 1,
+ },
+ 1,
+ ),
+ ({"name": "test_job_1"}, 1),
+ ({"name": "foobaa"}, 0),
+ ({"states": ["RUNNING"]}, 1),
+ ({"states": ["RUNNING", "COMPLETED"]}, 2),
+ ({"states": ["FOOBAA"]}, 0),
+ ({"maxResults": 1}, 1),
+ ],
+ )
+ def test_base(self, list_job_runs_arg, job_count):
+ resp = self.client.list_job_runs(
+ virtualClusterId=self.virtual_cluster_id, **list_job_runs_arg
+ )
+ assert len(resp["jobRuns"]) == job_count
+
+ def test_invalid_virtual_cluster_id(self):
+ resp = self.client.list_job_runs(virtualClusterId="foobaa")
+ assert len(resp["jobRuns"]) == 0
+
+ def test_next_token(self):
+ resp = self.client.list_job_runs(
+ virtualClusterId=self.virtual_cluster_id, maxResults=2
+ )
+ assert len(resp["jobRuns"]) == 2
+
+ resp = self.client.list_job_runs(
+ virtualClusterId=self.virtual_cluster_id, nextToken=resp["nextToken"]
+ )
+ assert len(resp["jobRuns"]) == 5
+
+
+class TestDescribeJobRun:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, virtual_cluster_factory, job_factory):
+ self.client = client
+ self.virtual_cluster_id = virtual_cluster_factory[0]
+ self.job_list = job_factory
+
+ def test_valid_id_valid_cluster_id(self):
+ self.client.cancel_job_run(
+ id=self.job_list[2], virtualClusterId=self.virtual_cluster_id
+ )
+ resp = self.client.describe_job_run(
+ id=self.job_list[2], virtualClusterId=self.virtual_cluster_id
+ )
+
+ expected = {
+ "arn": f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/virtualclusters/{self.virtual_cluster_id}/jobruns/{self.job_list[2]}",
+ "createdAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ "executionRoleArn": f"arn:aws:iam::{ACCOUNT_ID}:role/iamrole-emrcontainers",
+ "finishedAt": (
+ datetime.today()
+ .replace(hour=0, minute=1, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ "id": self.job_list[2],
+ "jobDriver": {
+ "sparkSubmitJobDriver": {
+ "entryPoint": "s3://code/pi.py",
+ "sparkSubmitParameters": "--conf "
+ "spark.executor.instances=2 "
+ "--conf "
+ "spark.executor.memory=2G "
+ "--conf "
+ "spark.driver.memory=2G "
+ "--conf "
+ "spark.executor.cores=4",
+ }
+ },
+ "name": "test_job_2",
+ "releaseLabel": "emr-6.3.0-latest",
+ "state": "CANCELLED",
+ "stateDetails": "JobRun CANCELLED successfully.",
+ "virtualClusterId": self.virtual_cluster_id,
+ }
+
+ assert expected.items() <= resp["jobRun"].items()
+
+ def test_invalid_id_invalid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.describe_job_run(id="foobaa", virtualClusterId="foobaa")
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "Invalid job run short id"
+
+ def test_invalid_id_valid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.describe_job_run(
+ id="foobaa", virtualClusterId=self.virtual_cluster_id
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "Invalid job run short id"
+
+ def test_valid_id_invalid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.describe_job_run(id=self.job_list[0], virtualClusterId="foobaa")
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == f"Job run {self.job_list[0]} doesn't exist."
+
+ def test_non_existing_id_invalid_cluster_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.describe_job_run(
+ id="123456789abcdefghij", virtualClusterId=self.virtual_cluster_id
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == f"Job run 123456789abcdefghij doesn't exist."