Feature - emr-containers - jobs (#4515)

This commit is contained in:
Andor Markus 2021-11-01 11:36:38 +01:00 committed by GitHub
parent f4e62f0dfd
commit 3b9e6261f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 703 additions and 124 deletions

View File

@ -1921,19 +1921,19 @@
<details> <details>
<summary>26% implemented</summary> <summary>26% implemented</summary>
- [ ] cancel_job_run - [X] cancel_job_run
- [ ] create_managed_endpoint - [ ] create_managed_endpoint
- [X] create_virtual_cluster - [X] create_virtual_cluster
- [ ] delete_managed_endpoint - [ ] delete_managed_endpoint
- [X] delete_virtual_cluster - [X] delete_virtual_cluster
- [ ] describe_job_run - [X] describe_job_run
- [ ] describe_managed_endpoint - [ ] describe_managed_endpoint
- [X] describe_virtual_cluster - [X] describe_virtual_cluster
- [ ] list_job_runs - [X] list_job_runs
- [ ] list_managed_endpoints - [ ] list_managed_endpoints
- [ ] list_tags_for_resource - [ ] list_tags_for_resource
- [X] list_virtual_clusters - [X] list_virtual_clusters
- [ ] start_job_run - [X] start_job_run
- [ ] tag_resource - [ ] tag_resource
- [ ] untag_resource - [ ] untag_resource
</details> </details>

View File

@ -64,6 +64,8 @@ Currently implemented Services:
+---------------------------+-----------------------+------------------------------------+ +---------------------------+-----------------------+------------------------------------+
| EMR | @mock_emr | core endpoints done | | EMR | @mock_emr | core endpoints done |
+---------------------------+-----------------------+------------------------------------+ +---------------------------+-----------------------+------------------------------------+
| EMRContainers | @mock_emrcontainers | core endpoints done |
+---------------------------+-----------------------+------------------------------------+
| Firehose | @mock_firehose | basic endpoints done | | Firehose | @mock_firehose | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+ +---------------------------+-----------------------+------------------------------------+
| Forecast | @mock_forecast | basic endpoints done | | Forecast | @mock_forecast | basic endpoints done |

View File

@ -1,2 +1,9 @@
"""Exceptions raised by the emrcontainers service.""" """Exceptions raised by the emrcontainers service."""
# from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
code = 400
def __init__(self, resource):
super().__init__("ResourceNotFoundException", resource)

View File

@ -1,13 +1,14 @@
"""EMRContainersBackend class with methods for supported APIs.""" """EMRContainersBackend class with methods for supported APIs."""
import re
from datetime import datetime from datetime import datetime
from boto3 import Session from boto3 import Session
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID from moto.core import BaseBackend, BaseModel, ACCOUNT_ID
from moto.core.utils import iso_8601_datetime_without_milliseconds from moto.core.utils import iso_8601_datetime_without_milliseconds
from .utils import random_cluster_id, get_partition, paginated_list from .utils import random_cluster_id, random_job_id, get_partition, paginated_list
from .exceptions import ResourceNotFoundException
# String Templates
from ..config.exceptions import ValidationException from ..config.exceptions import ValidationException
VIRTUAL_CLUSTER_ARN_TEMPLATE = ( VIRTUAL_CLUSTER_ARN_TEMPLATE = (
@ -16,8 +17,15 @@ VIRTUAL_CLUSTER_ARN_TEMPLATE = (
+ ":/virtualclusters/{virtual_cluster_id}" + ":/virtualclusters/{virtual_cluster_id}"
) )
JOB_ARN_TEMPLATE = (
"arn:{partition}:emr-containers:{region}:"
+ str(ACCOUNT_ID)
+ ":/virtualclusters/{virtual_cluster_id}/jobruns/{job_id}"
)
# Defaults used for creating a Virtual cluster # Defaults used for creating a Virtual cluster
ACTIVE_STATUS = "ACTIVE" VIRTUAL_CLUSTER_STATUS = "RUNNING"
JOB_STATUS = "RUNNING"
class FakeCluster(BaseModel): class FakeCluster(BaseModel):
@ -38,7 +46,7 @@ class FakeCluster(BaseModel):
self.arn = VIRTUAL_CLUSTER_ARN_TEMPLATE.format( self.arn = VIRTUAL_CLUSTER_ARN_TEMPLATE.format(
partition=aws_partition, region=region_name, virtual_cluster_id=self.id partition=aws_partition, region=region_name, virtual_cluster_id=self.id
) )
self.state = ACTIVE_STATUS self.state = VIRTUAL_CLUSTER_STATUS
self.container_provider = container_provider self.container_provider = container_provider
self.container_provider_id = container_provider["id"] self.container_provider_id = container_provider["id"]
self.namespace = container_provider["info"]["eksInfo"]["namespace"] self.namespace = container_provider["info"]["eksInfo"]["namespace"]
@ -70,6 +78,85 @@ class FakeCluster(BaseModel):
} }
class FakeJob(BaseModel):
def __init__(
self,
name,
virtual_cluster_id,
client_token,
execution_role_arn,
release_label,
job_driver,
configuration_overrides,
region_name,
aws_partition,
tags,
):
self.id = random_job_id()
self.name = name
self.virtual_cluster_id = virtual_cluster_id
self.arn = JOB_ARN_TEMPLATE.format(
partition=aws_partition,
region=region_name,
virtual_cluster_id=self.virtual_cluster_id,
job_id=self.id,
)
self.state = JOB_STATUS
self.client_token = client_token
self.execution_role_arn = execution_role_arn
self.release_label = release_label
self.job_driver = job_driver
self.configuration_overrides = configuration_overrides
self.created_at = iso_8601_datetime_without_milliseconds(
datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
)
self.created_by = None
self.finished_at = None
self.state_details = None
self.failure_reason = None
self.tags = tags
def __iter__(self):
yield "id", self.id
yield "name", self.name
yield "virtualClusterId", self.virtual_cluster_id
yield "arn", self.arn
yield "state", self.state
yield "clientToken", self.client_token
yield "executionRoleArn", self.execution_role_arn
yield "releaseLabel", self.release_label
yield "configurationOverrides", self.release_label
yield "jobDriver", self.job_driver
yield "createdAt", self.created_at
yield "createdBy", self.created_by
yield "finishedAt", self.finished_at
yield "stateDetails", self.state_details
yield "failureReason", self.failure_reason
yield "tags", self.tags
def to_dict(self):
# Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeJobRun.html
# (response syntax section)
return {
"id": self.id,
"name": self.name,
"virtualClusterId": self.virtual_cluster_id,
"arn": self.arn,
"state": self.state,
"clientToken": self.client_token,
"executionRoleArn": self.execution_role_arn,
"releaseLabel": self.release_label,
"configurationOverrides": self.configuration_overrides,
"jobDriver": self.job_driver,
"createdAt": self.created_at,
"createdBy": self.created_by,
"finishedAt": self.finished_at,
"stateDetails": self.state_details,
"failureReason": self.failure_reason,
"tags": self.tags,
}
class EMRContainersBackend(BaseBackend): class EMRContainersBackend(BaseBackend):
"""Implementation of EMRContainers APIs.""" """Implementation of EMRContainers APIs."""
@ -77,6 +164,8 @@ class EMRContainersBackend(BaseBackend):
super(EMRContainersBackend, self).__init__() super(EMRContainersBackend, self).__init__()
self.virtual_clusters = dict() self.virtual_clusters = dict()
self.virtual_cluster_count = 0 self.virtual_cluster_count = 0
self.jobs = dict()
self.job_count = 0
self.region_name = region_name self.region_name = region_name
self.partition = get_partition(region_name) self.partition = get_partition(region_name)
@ -173,8 +262,116 @@ class EMRContainersBackend(BaseBackend):
for virtual_cluster in virtual_clusters for virtual_cluster in virtual_clusters
if virtual_cluster["state"] in states if virtual_cluster["state"] in states
] ]
sort_key = "name"
return paginated_list(virtual_clusters, sort_key, max_results, next_token)
return paginated_list(virtual_clusters, max_results, next_token) def start_job_run(
self,
name,
virtual_cluster_id,
client_token,
execution_role_arn,
release_label,
job_driver,
configuration_overrides,
tags,
):
if virtual_cluster_id not in self.virtual_clusters.keys():
raise ResourceNotFoundException(
f"Virtual cluster {virtual_cluster_id} doesn't exist."
)
if not re.match(
r"emr-[0-9]{1}\.[0-9]{1,2}\.0-(latest|[0-9]{8})", release_label
):
raise ResourceNotFoundException(f"Release {release_label} doesn't exist.")
job = FakeJob(
name=name,
virtual_cluster_id=virtual_cluster_id,
client_token=client_token,
execution_role_arn=execution_role_arn,
release_label=release_label,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
tags=tags,
region_name=self.region_name,
aws_partition=self.partition,
)
self.jobs[job.id] = job
self.job_count += 1
return job
def cancel_job_run(self, id, virtual_cluster_id):
if not re.match(r"[a-z,A-Z,0-9]{19}", id):
raise ValidationException("Invalid job run short id")
if id not in self.jobs.keys():
raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
if virtual_cluster_id != self.jobs[id].virtual_cluster_id:
raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
if self.jobs[id].state in [
"FAILED",
"CANCELLED",
"CANCEL_PENDING",
"COMPLETED",
]:
raise ValidationException(f"Job run {id} is not in a cancellable state")
job = self.jobs[id]
job.state = "CANCELLED"
job.finished_at = iso_8601_datetime_without_milliseconds(
datetime.today().replace(hour=0, minute=1, second=0, microsecond=0)
)
job.state_details = "JobRun CANCELLED successfully."
return job
def list_job_runs(
self,
virtual_cluster_id,
created_before,
created_after,
name,
states,
max_results,
next_token,
):
jobs = [job.to_dict() for job in self.jobs.values()]
jobs = [job for job in jobs if job["virtualClusterId"] == virtual_cluster_id]
if created_after:
jobs = [job for job in jobs if job["createdAt"] >= created_after]
if created_before:
jobs = [job for job in jobs if job["createdAt"] <= created_before]
if states:
jobs = [job for job in jobs if job["state"] in states]
if name:
jobs = [job for job in jobs if job["name"] in name]
sort_key = "id"
return paginated_list(jobs, sort_key, max_results, next_token)
def describe_job_run(self, id, virtual_cluster_id):
if not re.match(r"[a-z,A-Z,0-9]{19}", id):
raise ValidationException("Invalid job run short id")
if id not in self.jobs.keys():
raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
if virtual_cluster_id != self.jobs[id].virtual_cluster_id:
raise ResourceNotFoundException(f"Job run {id} doesn't exist.")
return self.jobs[id].to_dict()
emrcontainers_backends = {} emrcontainers_backends = {}

View File

@ -69,3 +69,67 @@ class EMRContainersResponse(BaseResponse):
response = {"virtualClusters": virtual_clusters, "nextToken": next_token} response = {"virtualClusters": virtual_clusters, "nextToken": next_token}
return 200, {}, json.dumps(response) return 200, {}, json.dumps(response)
def start_job_run(self):
name = self._get_param("name")
virtual_cluster_id = self._get_param("virtualClusterId")
client_token = self._get_param("clientToken")
execution_role_arn = self._get_param("executionRoleArn")
release_label = self._get_param("releaseLabel")
job_driver = self._get_param("jobDriver")
configuration_overrides = self._get_param("configurationOverrides")
tags = self._get_param("tags")
job = self.emrcontainers_backend.start_job_run(
name=name,
virtual_cluster_id=virtual_cluster_id,
client_token=client_token,
execution_role_arn=execution_role_arn,
release_label=release_label,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
tags=tags,
)
return 200, {}, json.dumps(dict(job))
def cancel_job_run(self):
id = self._get_param("jobRunId")
virtual_cluster_id = self._get_param("virtualClusterId")
job = self.emrcontainers_backend.cancel_job_run(
id=id, virtual_cluster_id=virtual_cluster_id,
)
return 200, {}, json.dumps(dict(job))
def list_job_runs(self):
virtual_cluster_id = self._get_param("virtualClusterId")
created_before = self._get_param("createdBefore")
created_after = self._get_param("createdAfter")
name = self._get_param("name")
states = self.querystring.get("states", [])
max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
job_runs, next_token = self.emrcontainers_backend.list_job_runs(
virtual_cluster_id=virtual_cluster_id,
created_before=created_before,
created_after=created_after,
name=name,
states=states,
max_results=max_results,
next_token=next_token,
)
response = {"jobRuns": job_runs, "nextToken": next_token}
return 200, {}, json.dumps(response)
def describe_job_run(self):
id = self._get_param("jobRunId")
virtual_cluster_id = self._get_param("virtualClusterId")
job_run = self.emrcontainers_backend.describe_job_run(
id=id, virtual_cluster_id=virtual_cluster_id,
)
response = {"jobRun": job_run}
return 200, {}, json.dumps(response)

View File

@ -8,5 +8,7 @@ url_bases = [
url_paths = { url_paths = {
"{0}/virtualclusters$": EMRContainersResponse.dispatch, "{0}/virtualclusters$": EMRContainersResponse.dispatch,
"{0}/virtualclusters/(?P<virtualClusterId>[^/]+)/?$": EMRContainersResponse.dispatch, "{0}/virtualclusters/(?P<virtualClusterId>[^/]+)$": EMRContainersResponse.dispatch,
"{0}/virtualclusters/(?P<virtualClusterId>[^/]+)/jobruns$": EMRContainersResponse.dispatch,
"{0}/virtualclusters/(?P<virtualClusterId>[^/]+)/jobruns/(?P<jobRunId>[^/]+)$": EMRContainersResponse.dispatch,
} }

View File

@ -23,11 +23,15 @@ def random_id(size=13):
return "".join(str(random.choice(chars)) for x in range(size)) return "".join(str(random.choice(chars)) for x in range(size))
def random_cluster_id(size=13): def random_cluster_id():
return random_id(size=25) return random_id(size=25)
def paginated_list(full_list, max_results, next_token): def random_job_id():
return random_id(size=19)
def paginated_list(full_list, sort_key, max_results, next_token):
""" """
Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results
number of elements, and the new next_token which can be passed back in for the next segment of the full list. number of elements, and the new next_token which can be passed back in for the next segment of the full list.
@ -36,7 +40,7 @@ def paginated_list(full_list, max_results, next_token):
next_token = 0 next_token = 0
next_token = int(next_token) next_token = int(next_token)
sorted_list = sorted(full_list, key=lambda d: d["name"]) sorted_list = sorted(full_list, key=lambda d: d[sort_key])
values = sorted_list[next_token : next_token + max_results] values = sorted_list[next_token : next_token + max_results]
if len(values) == max_results: if len(values) == max_results:

View File

@ -6,7 +6,7 @@ from unittest import SkipTest
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError, ParamValidationError
from moto import mock_emrcontainers, settings from moto import mock_emrcontainers, settings
from moto.core import ACCOUNT_ID from moto.core import ACCOUNT_ID
@ -29,8 +29,10 @@ def virtual_cluster_factory(client):
cluster_state = ["RUNNING", "TERMINATING", "TERMINATED", "ARRESTED"] cluster_state = ["RUNNING", "TERMINATING", "TERMINATED", "ARRESTED"]
cluster_list = [] cluster_list = []
for i in range(4): for i in range(len(cluster_state)):
with patch("moto.emrcontainers.models.ACTIVE_STATUS", cluster_state[i]): with patch(
"moto.emrcontainers.models.VIRTUAL_CLUSTER_STATUS", cluster_state[i]
):
resp = client.create_virtual_cluster( resp = client.create_virtual_cluster(
name="test-emr-virtual-cluster", name="test-emr-virtual-cluster",
containerProvider={ containerProvider={
@ -45,6 +47,44 @@ def virtual_cluster_factory(client):
yield cluster_list yield cluster_list
@pytest.fixture()
def job_factory(client, virtual_cluster_factory):
virtual_cluster_id = virtual_cluster_factory[0]
default_job_driver = {
"sparkSubmitJobDriver": {
"entryPoint": "s3://code/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4",
}
}
default_execution_role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/iamrole-emrcontainers"
default_release_label = "emr-6.3.0-latest"
job_state = [
"PENDING",
"SUBMITTED",
"RUNNING",
"FAILED",
"CANCELLED",
"CANCEL_PENDING",
"COMPLETED",
]
job_list = []
for i in range(len(job_state)):
with patch("moto.emrcontainers.models.JOB_STATUS", job_state[i]):
resp = client.start_job_run(
name=f"test_job_{i}",
virtualClusterId=virtual_cluster_id,
executionRoleArn=default_execution_role_arn,
releaseLabel=default_release_label,
jobDriver=default_job_driver,
)
job_list.append(resp["id"])
yield job_list
class TestCreateVirtualCluster: class TestCreateVirtualCluster:
@staticmethod @staticmethod
@mock_emrcontainers @mock_emrcontainers
@ -157,117 +197,55 @@ class TestDescribeVirtualCluster:
class TestListVirtualClusters: class TestListVirtualClusters:
today = datetime.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory): def _setup_environment(self, client, virtual_cluster_factory):
self.client = client self.client = client
def test_base(self): @pytest.mark.parametrize(
resp = self.client.list_virtual_clusters() "list_virtual_clusters_args,job_count",
assert len(resp["virtualClusters"]) == 4 [
({}, 4),
def test_valid_container_provider_id(self): ({"containerProviderId": "test-eks-cluster"}, 4),
resp = self.client.list_virtual_clusters(containerProviderId="test-eks-cluster") ({"containerProviderId": "foobaa"}, 0),
assert len(resp["virtualClusters"]) == 4 ({"containerProviderType": "EKS"}, 4),
({"containerProviderType": "AKS"}, 0),
def test_invalid_container_provider_id(self): ({"createdAfter": yesterday}, 4),
resp = self.client.list_virtual_clusters(containerProviderId="foobaa") ({"createdAfter": tomorrow}, 0),
assert len(resp["virtualClusters"]) == 0 ({"createdAfter": yesterday, "states": ["RUNNING"]}, 1),
({"createdAfter": tomorrow, "states": ["RUNNING"]}, 0),
def test_valid_container_provider_type(self): (
resp = self.client.list_virtual_clusters(containerProviderType="EKS") {
assert len(resp["virtualClusters"]) == 4 "createdAfter": yesterday,
"states": ["RUNNING", "TERMINATED"],
def test_invalid_container_provider_type(self): "maxResults": 1,
resp = self.client.list_virtual_clusters(containerProviderType="AKS") },
assert len(resp["virtualClusters"]) == 0 1,
),
def test_created_after_yesterday(self): ({"createdBefore": yesterday}, 0),
today = datetime.now() ({"createdBefore": tomorrow}, 4),
yesterday = today - timedelta(days=1) ({"createdBefore": yesterday, "states": ["RUNNING"]}, 0),
resp = self.client.list_virtual_clusters(createdAfter=yesterday) ({"createdBefore": tomorrow, "states": ["RUNNING"]}, 1),
assert len(resp["virtualClusters"]) == 4 (
{
def test_created_after_tomorrow(self): "createdBefore": tomorrow,
today = datetime.now() "states": ["RUNNING", "TERMINATED"],
tomorrow = today + timedelta(days=1) "maxResults": 1,
resp = self.client.list_virtual_clusters(createdAfter=tomorrow) },
assert len(resp["virtualClusters"]) == 0 1,
),
def test_created_after_yesterday_running_state(self): ({"states": ["RUNNING"]}, 1),
today = datetime.now() ({"states": ["RUNNING", "TERMINATED"]}, 2),
yesterday = today - timedelta(days=1) ({"states": ["FOOBAA"]}, 0),
resp = self.client.list_virtual_clusters( ({"maxResults": 1}, 1),
createdAfter=yesterday, states=["RUNNING"] ],
) )
assert len(resp["virtualClusters"]) == 1 def test_base(self, list_virtual_clusters_args, job_count):
resp = self.client.list_virtual_clusters(**list_virtual_clusters_args)
def test_created_after_tomorrow_running_state(self): assert len(resp["virtualClusters"]) == job_count
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdAfter=tomorrow, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 0
def test_created_after_yesterday_two_state_limit(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdAfter=yesterday, states=["RUNNING", "TERMINATED"], maxResults=1
)
assert len(resp["virtualClusters"]) == 1
def test_created_before_yesterday(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(createdBefore=yesterday)
assert len(resp["virtualClusters"]) == 0
def test_created_before_tomorrow(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(createdBefore=tomorrow)
assert len(resp["virtualClusters"]) == 4
def test_created_before_yesterday_running_state(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdBefore=yesterday, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 0
def test_created_before_tomorrow_running_state(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdBefore=tomorrow, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 1
def test_created_before_tomorrow_two_state_limit(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdBefore=tomorrow, states=["RUNNING", "TERMINATED"], maxResults=1
)
assert len(resp["virtualClusters"]) == 1
def test_states_one_state(self):
resp = self.client.list_virtual_clusters(states=["RUNNING"])
assert len(resp["virtualClusters"]) == 1
def test_states_two_state(self):
resp = self.client.list_virtual_clusters(states=["RUNNING", "TERMINATED"])
assert len(resp["virtualClusters"]) == 2
def test_states_invalid_state(self):
resp = self.client.list_virtual_clusters(states=["FOOBAA"])
assert len(resp["virtualClusters"]) == 0
def test_max_result(self):
resp = self.client.list_virtual_clusters(maxResults=1)
assert len(resp["virtualClusters"]) == 1
def test_next_token(self): def test_next_token(self):
resp = self.client.list_virtual_clusters(maxResults=2) resp = self.client.list_virtual_clusters(maxResults=2)
@ -275,3 +253,328 @@ class TestListVirtualClusters:
resp = self.client.list_virtual_clusters(nextToken=resp["nextToken"]) resp = self.client.list_virtual_clusters(nextToken=resp["nextToken"])
assert len(resp["virtualClusters"]) == 2 assert len(resp["virtualClusters"]) == 2
class TestStartJobRun:
default_job_driver = {
"sparkSubmitJobDriver": {
"entryPoint": "s3://code/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4",
}
}
default_configuration_overrides = {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {"spark.dynamicAllocation.enabled": "false"},
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-containers/jobs",
"logStreamNamePrefix": "demo",
},
"s3MonitoringConfiguration": {"logUri": "s3://joblogs"},
},
}
default_execution_role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/iamrole-emrcontainers"
default_release_label = "emr-6.3.0-latest"
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory):
self.client = client
self.virtual_cluster_id = virtual_cluster_factory[0]
def test_start(self):
resp = self.client.start_job_run(
name="test_job",
virtualClusterId=self.virtual_cluster_id,
executionRoleArn=self.default_execution_role_arn,
releaseLabel=self.default_release_label,
jobDriver=self.default_job_driver,
configurationOverrides=self.default_configuration_overrides,
)
assert re.match(r"[a-z,0-9]{19}", resp["id"])
assert resp["name"] == "test_job"
assert (
resp["arn"]
== f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/virtualclusters/{self.virtual_cluster_id}/jobruns/{resp['id']}"
)
assert resp["virtualClusterId"] == self.virtual_cluster_id
def test_invalid_execution_role_arn(self):
with pytest.raises(ParamValidationError) as exc:
self.client.start_job_run(
name="test_job",
virtualClusterId="foobaa",
executionRoleArn="foobaa",
releaseLabel="foobaa",
jobDriver={},
)
assert exc.typename == "ParamValidationError"
assert (
"Parameter validation failed:\nInvalid length for parameter executionRoleArn, value: 6, valid min length: 20"
in exc.value.args
)
def test_invalid_virtual_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.start_job_run(
name="test_job",
virtualClusterId="foobaa",
executionRoleArn=self.default_execution_role_arn,
releaseLabel="foobaa",
jobDriver={},
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == "Virtual cluster foobaa doesn't exist."
def test_invalid_release_label(self):
with pytest.raises(ClientError) as exc:
self.client.start_job_run(
name="test_job",
virtualClusterId=self.virtual_cluster_id,
executionRoleArn=self.default_execution_role_arn,
releaseLabel="foobaa",
jobDriver={},
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == "Release foobaa doesn't exist."
class TestCancelJobRun:
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory, job_factory):
self.client = client
self.virtual_cluster_id = virtual_cluster_factory[0]
self.job_list = job_factory
def test_valid_id_valid_cluster_id(self):
resp = self.client.cancel_job_run(
id=self.job_list[2], virtualClusterId=self.virtual_cluster_id
)
assert resp["id"] == self.job_list[2]
assert resp["virtualClusterId"] == self.virtual_cluster_id
def test_invalid_id_invalid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.cancel_job_run(id="foobaa", virtualClusterId="foobaa")
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert err["Message"] == "Invalid job run short id"
def test_invalid_id_valid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.cancel_job_run(
id="foobaa", virtualClusterId=self.virtual_cluster_id
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert err["Message"] == "Invalid job run short id"
def test_valid_id_invalid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.cancel_job_run(id=self.job_list[0], virtualClusterId="foobaa")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == f"Job run {self.job_list[0]} doesn't exist."
def test_non_existing_id_invalid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.cancel_job_run(
id="123456789abcdefghij", virtualClusterId=self.virtual_cluster_id
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == f"Job run 123456789abcdefghij doesn't exist."
def test_wrong_job_state(self):
with pytest.raises(ClientError) as exc:
self.client.cancel_job_run(
id=self.job_list[6], virtualClusterId=self.virtual_cluster_id
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== f"Job run {self.job_list[6]} is not in a cancellable state"
)
class TestListJobRuns:
today = datetime.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory, job_factory):
self.client = client
self.virtual_cluster_id = virtual_cluster_factory[0]
@pytest.mark.parametrize(
"list_job_runs_arg,job_count",
[
({}, 7),
({"createdAfter": yesterday}, 7),
({"createdAfter": tomorrow}, 0),
({"createdAfter": yesterday, "states": ["RUNNING"]}, 1),
({"createdAfter": tomorrow, "states": ["RUNNING"]}, 0),
(
{
"createdAfter": yesterday,
"states": ["RUNNING", "TERMINATED"],
"maxResults": 1,
},
1,
),
({"createdBefore": yesterday}, 0),
({"createdBefore": tomorrow}, 7),
(
{
"createdBefore": tomorrow,
"states": ["RUNNING", "TERMINATED"],
"maxResults": 1,
},
1,
),
({"name": "test_job_1"}, 1),
({"name": "foobaa"}, 0),
({"states": ["RUNNING"]}, 1),
({"states": ["RUNNING", "COMPLETED"]}, 2),
({"states": ["FOOBAA"]}, 0),
({"maxResults": 1}, 1),
],
)
def test_base(self, list_job_runs_arg, job_count):
resp = self.client.list_job_runs(
virtualClusterId=self.virtual_cluster_id, **list_job_runs_arg
)
assert len(resp["jobRuns"]) == job_count
def test_invalid_virtual_cluster_id(self):
resp = self.client.list_job_runs(virtualClusterId="foobaa")
assert len(resp["jobRuns"]) == 0
def test_next_token(self):
resp = self.client.list_job_runs(
virtualClusterId=self.virtual_cluster_id, maxResults=2
)
assert len(resp["jobRuns"]) == 2
resp = self.client.list_job_runs(
virtualClusterId=self.virtual_cluster_id, nextToken=resp["nextToken"]
)
assert len(resp["jobRuns"]) == 5
class TestDescribeJobRun:
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory, job_factory):
self.client = client
self.virtual_cluster_id = virtual_cluster_factory[0]
self.job_list = job_factory
def test_valid_id_valid_cluster_id(self):
self.client.cancel_job_run(
id=self.job_list[2], virtualClusterId=self.virtual_cluster_id
)
resp = self.client.describe_job_run(
id=self.job_list[2], virtualClusterId=self.virtual_cluster_id
)
expected = {
"arn": f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/virtualclusters/{self.virtual_cluster_id}/jobruns/{self.job_list[2]}",
"createdAt": (
datetime.today()
.replace(hour=0, minute=0, second=0, microsecond=0)
.replace(tzinfo=timezone.utc)
),
"executionRoleArn": f"arn:aws:iam::{ACCOUNT_ID}:role/iamrole-emrcontainers",
"finishedAt": (
datetime.today()
.replace(hour=0, minute=1, second=0, microsecond=0)
.replace(tzinfo=timezone.utc)
),
"id": self.job_list[2],
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "s3://code/pi.py",
"sparkSubmitParameters": "--conf "
"spark.executor.instances=2 "
"--conf "
"spark.executor.memory=2G "
"--conf "
"spark.driver.memory=2G "
"--conf "
"spark.executor.cores=4",
}
},
"name": "test_job_2",
"releaseLabel": "emr-6.3.0-latest",
"state": "CANCELLED",
"stateDetails": "JobRun CANCELLED successfully.",
"virtualClusterId": self.virtual_cluster_id,
}
assert expected.items() <= resp["jobRun"].items()
def test_invalid_id_invalid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.describe_job_run(id="foobaa", virtualClusterId="foobaa")
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert err["Message"] == "Invalid job run short id"
def test_invalid_id_valid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.describe_job_run(
id="foobaa", virtualClusterId=self.virtual_cluster_id
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert err["Message"] == "Invalid job run short id"
def test_valid_id_invalid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.describe_job_run(id=self.job_list[0], virtualClusterId="foobaa")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == f"Job run {self.job_list[0]} doesn't exist."
def test_non_existing_id_invalid_cluster_id(self):
with pytest.raises(ClientError) as exc:
self.client.describe_job_run(
id="123456789abcdefghij", virtualClusterId=self.virtual_cluster_id
)
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == f"Job run 123456789abcdefghij doesn't exist."