From ae8a2e48ebc7d34572d9a0cbbb04e5eab4e0f7e2 Mon Sep 17 00:00:00 2001
From: Andor Markus <51825189+andormarkus@users.noreply.github.com>
Date: Tue, 28 Jun 2022 13:33:00 +0200
Subject: [PATCH] Feature: EMR serverless application (#5218)
---
IMPLEMENTATION_COVERAGE.md | 20 +
docs/docs/services/emr-serverless.rst | 44 ++
moto/__init__.py | 3 +
moto/backend_index.py | 1 +
moto/emrserverless/__init__.py | 7 +
moto/emrserverless/exceptions.py | 18 +
moto/emrserverless/models.py | 269 +++++++++
moto/emrserverless/responses.py | 126 ++++
moto/emrserverless/urls.py | 16 +
moto/emrserverless/utils.py | 58 ++
tests/test_emrserverless/__init__.py | 0
.../test_emrserverless/test_emrserverless.py | 543 ++++++++++++++++++
tests/test_emrserverless/test_server.py | 13 +
13 files changed, 1118 insertions(+)
create mode 100644 docs/docs/services/emr-serverless.rst
create mode 100644 moto/emrserverless/__init__.py
create mode 100644 moto/emrserverless/exceptions.py
create mode 100644 moto/emrserverless/models.py
create mode 100644 moto/emrserverless/responses.py
create mode 100644 moto/emrserverless/urls.py
create mode 100644 moto/emrserverless/utils.py
create mode 100644 tests/test_emrserverless/__init__.py
create mode 100644 tests/test_emrserverless/test_emrserverless.py
create mode 100644 tests/test_emrserverless/test_server.py
diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md
index 83840c51a..22cf59c00 100644
--- a/IMPLEMENTATION_COVERAGE.md
+++ b/IMPLEMENTATION_COVERAGE.md
@@ -2455,6 +2455,26 @@
- [ ] untag_resource
+## emr-serverless
+
+71% implemented
+
+- [ ] cancel_job_run
+- [X] create_application
+- [X] delete_application
+- [X] get_application
+- [ ] get_job_run
+- [X] list_applications
+- [ ] list_job_runs
+- [ ] list_tags_for_resource
+- [X] start_application
+- [X] start_job_run
+- [X] stop_application
+- [ ] tag_resource
+- [ ] untag_resource
+- [X] update_application
+
+
## es
9% implemented
diff --git a/docs/docs/services/emr-serverless.rst b/docs/docs/services/emr-serverless.rst
new file mode 100644
index 000000000..831fa43f8
--- /dev/null
+++ b/docs/docs/services/emr-serverless.rst
@@ -0,0 +1,44 @@
+.. _implementedservice_emr-serverless:
+
+.. |start-h3| raw:: html
+
+
+
+.. |end-h3| raw:: html
+
+
+
+==============
+emr-serverless
+==============
+
+.. autoclass:: moto.emrserverless.models.EMRServerlessBackend
+
+|start-h3| Example usage |end-h3|
+
+.. sourcecode:: python
+
+ @mock_emrserverless
+ def test_emrserverless_behaviour:
+ boto3.client("emr-serverless")
+ ...
+
+
+
+|start-h3| Implemented features for this service |end-h3|
+
+- [X] cancel_job_run
+- [X] create_application
+- [X] delete_application
+- [X] get_application
+- [X] get_job_run
+- [X] list_applications
+- [X] list_job_runs
+- [ ] list_tags_for_resource
+- [X] start_application
+- [X] start_job_run
+- [X] stop_application
+- [ ] tag_resource
+- [ ] untag_resource
+- [ ] update_application
+
diff --git a/moto/__init__.py b/moto/__init__.py
index 482d253d8..650a45ab1 100644
--- a/moto/__init__.py
+++ b/moto/__init__.py
@@ -179,6 +179,9 @@ mock_xray = lazy_load(".xray", "mock_xray")
mock_xray_client = lazy_load(".xray", "mock_xray_client")
mock_wafv2 = lazy_load(".wafv2", "mock_wafv2")
mock_textract = lazy_load(".textract", "mock_textract")
+mock_emrserverless = lazy_load(
+ ".emrserverless", "mock_emrserverless", boto3_name="emr-serverless"
+)
class MockAll(ContextDecorator):
diff --git a/moto/backend_index.py b/moto/backend_index.py
index 6dd464cc5..a0c355ba0 100644
--- a/moto/backend_index.py
+++ b/moto/backend_index.py
@@ -64,6 +64,7 @@ backend_url_patterns = [
("emr", re.compile("https?://(.+)\\.elasticmapreduce\\.amazonaws.com")),
("emr", re.compile("https?://elasticmapreduce\\.(.+)\\.amazonaws.com")),
("emr-containers", re.compile("https?://emr-containers\\.(.+)\\.amazonaws\\.com")),
+ ("emr-serverless", re.compile("https?://emr-serverless\\.(.+)\\.amazonaws\\.com")),
("es", re.compile("https?://es\\.(.+)\\.amazonaws\\.com")),
("events", re.compile("https?://events\\.(.+)\\.amazonaws\\.com")),
("firehose", re.compile("https?://firehose\\.(.+)\\.amazonaws\\.com")),
diff --git a/moto/emrserverless/__init__.py b/moto/emrserverless/__init__.py
new file mode 100644
index 000000000..41abab280
--- /dev/null
+++ b/moto/emrserverless/__init__.py
@@ -0,0 +1,7 @@
+"""emrserverless module initialization; sets value for base decorator."""
+from .models import emrserverless_backends
+from ..core.models import base_decorator
+
+REGION = "us-east-1"
+RELEASE_LABEL = "emr-6.6.0"
+mock_emrserverless = base_decorator(emrserverless_backends)
diff --git a/moto/emrserverless/exceptions.py b/moto/emrserverless/exceptions.py
new file mode 100644
index 000000000..68bae4c29
--- /dev/null
+++ b/moto/emrserverless/exceptions.py
@@ -0,0 +1,18 @@
+"""Exceptions raised by the emrserverless service."""
+from moto.core.exceptions import JsonRESTError
+
+
+class ResourceNotFoundException(JsonRESTError):
+ code = 400
+
+ def __init__(self, resource):
+ super().__init__(
+ "ResourceNotFoundException", f"Application {resource} does not exist"
+ )
+
+
+class ValidationException(JsonRESTError):
+ code = 400
+
+ def __init__(self, message):
+ super().__init__("ValidationException", message)
diff --git a/moto/emrserverless/models.py b/moto/emrserverless/models.py
new file mode 100644
index 000000000..f1729b2cc
--- /dev/null
+++ b/moto/emrserverless/models.py
@@ -0,0 +1,269 @@
+"""EMRServerlessBackend class with methods for supported APIs."""
+import re
+from datetime import datetime
+import inspect
+
+from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
+from moto.core.utils import BackendDict, iso_8601_datetime_without_milliseconds
+from .utils import (
+ default_auto_start_configuration,
+ default_auto_stop_configuration,
+ get_partition,
+ paginated_list,
+ random_appplication_id,
+ # random_job_id,
+)
+
+from .exceptions import ResourceNotFoundException, ValidationException
+
+APPLICATION_ARN_TEMPLATE = (
+ "arn:{partition}:emr-containers:{region}:"
+ + str(ACCOUNT_ID)
+ + ":/applications/{application_id}"
+)
+
+JOB_ARN_TEMPLATE = (
+ "arn:{partition}:emr-containers:{region}:"
+ + str(ACCOUNT_ID)
+ + ":/applications/{application_id}/jobruns/{job_id}"
+)
+
+# Defaults used for creating an EMR Serverless application
+APPLICATION_STATUS = "STARTED"
+JOB_STATUS = "RUNNING"
+
+
+class FakeApplication(BaseModel):
+ def __init__(
+ self,
+ name,
+ release_label,
+ application_type,
+ client_token,
+ region_name,
+ initial_capacity,
+ maximum_capacity,
+ tags,
+ auto_start_configuration,
+ auto_stop_configuration,
+ network_configuration,
+ ):
+ # Provided parameters
+ self.name = name
+ self.release_label = release_label
+ self.application_type = application_type.capitalize()
+ self.client_token = client_token
+ self.initial_capacity = initial_capacity
+ self.maximum_capacity = maximum_capacity
+ self.auto_start_configuration = (
+ auto_start_configuration or default_auto_start_configuration()
+ )
+ self.auto_stop_configuration = (
+ auto_stop_configuration or default_auto_stop_configuration()
+ )
+ self.network_configuration = network_configuration
+ self.tags = tags or {}
+
+ # Service-generated-parameters
+ self.id = random_appplication_id()
+ self.arn = APPLICATION_ARN_TEMPLATE.format(
+ partition="aws", region=region_name, application_id=self.id
+ )
+ self.state = APPLICATION_STATUS
+ self.state_details = ""
+ self.created_at = iso_8601_datetime_without_milliseconds(
+ datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
+ )
+ self.updated_at = self.created_at
+
+ def __iter__(self):
+ yield "applicationId", self.id
+ yield "name", self.name
+ yield "arn", self.arn
+ yield "autoStartConfig", self.auto_start_configuration,
+ yield "autoStopConfig", self.auto_stop_configuration,
+
+ def to_dict(self):
+ """
+ Dictionary representation of an EMR Serverless Application.
+ When used in `list-applications`, capacity, auto-start/stop configs, and tags are not returned. https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_ListApplications.html
+ When used in `get-application`, more details are returned. https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_GetApplication.html#API_GetApplication_ResponseSyntax
+ """
+ caller_methods = inspect.stack()[1].function
+ caller_methods_type = caller_methods.split("_")[0]
+
+ if caller_methods_type in ["get", "update"]:
+ response = {
+ "applicationId": self.id,
+ "name": self.name,
+ "arn": self.arn,
+ "releaseLabel": self.release_label,
+ "type": self.application_type,
+ "state": self.state,
+ "stateDetails": self.state_details,
+ "createdAt": self.created_at,
+ "updatedAt": self.updated_at,
+ "autoStartConfiguration": self.auto_start_configuration,
+ "autoStopConfiguration": self.auto_stop_configuration,
+ "tags": self.tags,
+ }
+ else:
+ response = {
+ "id": self.id,
+ "name": self.name,
+ "arn": self.arn,
+ "releaseLabel": self.release_label,
+ "type": self.application_type,
+ "state": self.state,
+ "stateDetails": self.state_details,
+ "createdAt": self.created_at,
+ "updatedAt": self.updated_at,
+ }
+
+ if self.network_configuration:
+ response.update({"networkConfiguration": self.network_configuration})
+ if self.initial_capacity:
+ response.update({"initialCapacity": self.initial_capacity})
+ if self.maximum_capacity:
+ response.update({"maximumCapacity": self.maximum_capacity})
+
+ return response
+
+
+class EMRServerlessBackend(BaseBackend):
+ """Implementation of EMRServerless APIs."""
+
+ def __init__(self, region_name, account_id):
+ super().__init__(region_name, account_id)
+ self.region_name = region_name
+ self.applications = dict()
+ self.jobs = dict()
+ self.partition = get_partition(region_name)
+
+ def create_application(
+ self,
+ name,
+ release_label,
+ application_type,
+ client_token,
+ initial_capacity,
+ maximum_capacity,
+ tags,
+ auto_start_configuration,
+ auto_stop_configuration,
+ network_configuration,
+ ):
+
+ if application_type not in ["HIVE", "SPARK"]:
+ raise ValidationException(f"Unsupported engine {application_type}")
+
+ if not re.match(r"emr-[0-9]{1}\.[0-9]{1,2}\.0(" "|-[0-9]{8})", release_label):
+ raise ValidationException(
+ f"Type '{application_type}' is not supported for release label '{release_label}' or release label does not exist"
+ )
+
+ application = FakeApplication(
+ name=name,
+ release_label=release_label,
+ application_type=application_type,
+ region_name=self.region_name,
+ client_token=client_token,
+ initial_capacity=initial_capacity,
+ maximum_capacity=maximum_capacity,
+ tags=tags,
+ auto_start_configuration=auto_start_configuration,
+ auto_stop_configuration=auto_stop_configuration,
+ network_configuration=network_configuration,
+ )
+ self.applications[application.id] = application
+ return application
+
+ def delete_application(self, application_id):
+ if application_id not in self.applications.keys():
+ raise ResourceNotFoundException(application_id)
+
+ if self.applications[application_id].state not in ["CREATED", "STOPPED"]:
+ raise ValidationException(
+ f"Application {application_id} must be in one of the following statuses [CREATED, STOPPED]. "
+ f"Current status: {self.applications[application_id].state}"
+ )
+ self.applications[application_id].state = "TERMINATED"
+
+ def get_application(self, application_id):
+ if application_id not in self.applications.keys():
+ raise ResourceNotFoundException(application_id)
+
+ return self.applications[application_id].to_dict()
+
+ def list_applications(self, next_token, max_results, states):
+ applications = [
+ application.to_dict() for application in self.applications.values()
+ ]
+ if states:
+ applications = [
+ application
+ for application in applications
+ if application["state"] in states
+ ]
+ sort_key = "name"
+ return paginated_list(applications, sort_key, max_results, next_token)
+
+ def start_application(self, application_id):
+ if application_id not in self.applications.keys():
+ raise ResourceNotFoundException(application_id)
+ self.applications[application_id].state = "STARTED"
+
+ def stop_application(self, application_id):
+ if application_id not in self.applications.keys():
+ raise ResourceNotFoundException(application_id)
+ self.applications[application_id].state = "STOPPED"
+
+ def update_application(
+ self,
+ application_id,
+ initial_capacity,
+ maximum_capacity,
+ auto_start_configuration,
+ auto_stop_configuration,
+ network_configuration,
+ ):
+ if application_id not in self.applications.keys():
+ raise ResourceNotFoundException(application_id)
+
+ if self.applications[application_id].state not in ["CREATED", "STOPPED"]:
+ raise ValidationException(
+ f"Application {application_id} must be in one of the following statuses [CREATED, STOPPED]. "
+ f"Current status: {self.applications[application_id].state}"
+ )
+
+ if initial_capacity:
+ self.applications[application_id].initial_capacity = initial_capacity
+
+ if maximum_capacity:
+ self.applications[application_id].maximum_capacity = maximum_capacity
+
+ if auto_start_configuration:
+ self.applications[
+ application_id
+ ].auto_start_configuration = auto_start_configuration
+
+ if auto_stop_configuration:
+ self.applications[
+ application_id
+ ].auto_stop_configuration = auto_stop_configuration
+
+ if network_configuration:
+ self.applications[
+ application_id
+ ].network_configuration = network_configuration
+
+ self.applications[
+ application_id
+ ].updated_at = iso_8601_datetime_without_milliseconds(
+ datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
+ )
+
+ return self.applications[application_id].to_dict()
+
+
+emrserverless_backends = BackendDict(EMRServerlessBackend, "emr-serverless")
diff --git a/moto/emrserverless/responses.py b/moto/emrserverless/responses.py
new file mode 100644
index 000000000..7377b65a7
--- /dev/null
+++ b/moto/emrserverless/responses.py
@@ -0,0 +1,126 @@
+"""Handles incoming emrserverless requests, invokes methods, returns responses."""
+import json
+
+from moto.core.responses import BaseResponse
+from .models import emrserverless_backends
+
+DEFAULT_MAX_RESULTS = 100
+DEFAULT_NEXT_TOKEN = ""
+
+"""
+These are the available methos:
+ can_paginate()
+ cancel_job_run()
+ close()
+ create_application() -> DONE
+ delete_application() -> DONE
+ get_application() -> DONE
+ get_job_run()
+ get_paginator()
+ get_waiter()
+ list_applications() -> DONE
+ list_job_runs()
+ list_tags_for_resource()
+ start_application() -> DONE
+ start_job_run()
+ stop_application() -> DONE
+ tag_resource()
+ untag_resource()
+ update_application()
+"""
+
+
+class EMRServerlessResponse(BaseResponse):
+ """Handler for EMRServerless requests and responses."""
+
+ SERVICE_NAME = "emr-serverless"
+
+ @property
+ def emrserverless_backend(self):
+ """Return backend instance specific for this region."""
+ return emrserverless_backends[self.region]
+
+ def create_application(self):
+ name = self._get_param("name")
+ release_label = self._get_param("releaseLabel")
+ application_type = self._get_param("type")
+ client_token = self._get_param("clientToken")
+ initial_capacity = self._get_param("initialCapacity")
+ maximum_capacity = self._get_param("maximumCapacity")
+ tags = self._get_param("tags")
+ auto_start_configuration = self._get_param("autoStartConfiguration")
+ auto_stop_configuration = self._get_param("autoStopConfiguration")
+ network_configuration = self._get_param("networkConfiguration")
+
+ application = self.emrserverless_backend.create_application(
+ name=name,
+ release_label=release_label,
+ application_type=application_type,
+ client_token=client_token,
+ initial_capacity=initial_capacity,
+ maximum_capacity=maximum_capacity,
+ tags=tags,
+ auto_start_configuration=auto_start_configuration,
+ auto_stop_configuration=auto_stop_configuration,
+ network_configuration=network_configuration,
+ )
+ return (200, {}, json.dumps(dict(application)))
+
+ def delete_application(self):
+ application_id = self._get_param("applicationId")
+
+ self.emrserverless_backend.delete_application(application_id=application_id)
+ return (200, {}, None)
+
+ def get_application(self):
+ application_id = self._get_param("applicationId")
+
+ application = self.emrserverless_backend.get_application(
+ application_id=application_id
+ )
+ response = {"application": application}
+ return 200, {}, json.dumps(response)
+
+ def list_applications(self):
+ states = self.querystring.get("states", [])
+ max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
+ next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
+
+ applications, next_token = self.emrserverless_backend.list_applications(
+ next_token=next_token,
+ max_results=max_results,
+ states=states,
+ )
+ response = {"applications": applications, "nextToken": next_token}
+ return 200, {}, json.dumps(response)
+
+ def start_application(self):
+ application_id = self._get_param("applicationId")
+
+ self.emrserverless_backend.start_application(application_id=application_id)
+ return (200, {}, None)
+
+ def stop_application(self):
+ application_id = self._get_param("applicationId")
+
+ self.emrserverless_backend.stop_application(application_id=application_id)
+ return (200, {}, None)
+
+ def update_application(self):
+ application_id = self._get_param("applicationId")
+ initial_capacity = self._get_param("initialCapacity")
+ maximum_capacity = self._get_param("maximumCapacity")
+ auto_start_configuration = self._get_param("autoStartConfiguration")
+ auto_stop_configuration = self._get_param("autoStopConfiguration")
+ network_configuration = self._get_param("networkConfiguration")
+
+ application = self.emrserverless_backend.update_application(
+ application_id=application_id,
+ initial_capacity=initial_capacity,
+ maximum_capacity=maximum_capacity,
+ auto_start_configuration=auto_start_configuration,
+ auto_stop_configuration=auto_stop_configuration,
+ network_configuration=network_configuration,
+ )
+ response = {"application": application}
+ return 200, {}, json.dumps(response)
diff --git a/moto/emrserverless/urls.py b/moto/emrserverless/urls.py
new file mode 100644
index 000000000..06a037fed
--- /dev/null
+++ b/moto/emrserverless/urls.py
@@ -0,0 +1,16 @@
+"""emrserverless base URL and path."""
+from .responses import EMRServerlessResponse
+
+url_bases = [
+ r"https?://emr-serverless\.(.+)\.amazonaws\.com",
+]
+
+
+url_paths = {
+ "{0}/applications$": EMRServerlessResponse.dispatch,
+ "{0}/applications/(?P[^/]+)$": EMRServerlessResponse.dispatch,
+ "{0}/applications/(?P[^/]+)/start$": EMRServerlessResponse.dispatch,
+ "{0}/applications/(?P[^/]+)/stop$": EMRServerlessResponse.dispatch,
+ "{0}/applications/(?P[^/]+)/jobruns$": EMRServerlessResponse.dispatch,
+ "{0}/applications/(?P[^/]+)/jobruns/(?P[^/]+)$": EMRServerlessResponse.dispatch,
+}
diff --git a/moto/emrserverless/utils.py b/moto/emrserverless/utils.py
new file mode 100644
index 000000000..51f3839b5
--- /dev/null
+++ b/moto/emrserverless/utils.py
@@ -0,0 +1,58 @@
+# import json
+import random
+import string
+
+
+def get_partition(region):
+ valid_matches = [
+ # (region prefix, aws partition)
+ ("cn-", "aws-cn"),
+ ("us-gov-", "aws-us-gov"),
+ ("us-gov-iso-", "aws-iso"),
+ ("us-gov-iso-b-", "aws-iso-b"),
+ ]
+
+ for prefix, partition in valid_matches:
+ if region.startswith(prefix):
+ return partition
+ return "aws"
+
+
+def random_id(size=13):
+ chars = list(range(10)) + list(string.ascii_lowercase)
+ return "".join(str(random.choice(chars)) for x in range(size))
+
+
+def random_appplication_id():
+ return random_id(size=16)
+
+
+def random_job_id():
+ return random_id(size=16)
+
+
+def default_auto_start_configuration():
+ return {"enabled": True}
+
+
+def default_auto_stop_configuration():
+ return {"enabled": True, "idleTimeoutMinutes": 15}
+
+
+def paginated_list(full_list, sort_key, max_results, next_token):
+ """
+ Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results
+ number of elements, and the new next_token which can be passed back in for the next segment of the full list.
+ """
+ if next_token is None or not next_token:
+ next_token = 0
+ next_token = int(next_token)
+
+ sorted_list = sorted(full_list, key=lambda d: d[sort_key])
+
+ values = sorted_list[next_token : next_token + max_results]
+ if len(values) == max_results:
+ new_next_token = str(next_token + max_results)
+ else:
+ new_next_token = None
+ return values, new_next_token
diff --git a/tests/test_emrserverless/__init__.py b/tests/test_emrserverless/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/test_emrserverless/test_emrserverless.py b/tests/test_emrserverless/test_emrserverless.py
new file mode 100644
index 000000000..443960465
--- /dev/null
+++ b/tests/test_emrserverless/test_emrserverless.py
@@ -0,0 +1,543 @@
+"""Unit tests for emrserverless-supported APIs."""
+import re
+from datetime import datetime, timezone
+from contextlib import contextmanager
+
+import boto3
+import pytest
+import sure # noqa # pylint: disable=unused-import
+from botocore.exceptions import ClientError
+from moto import mock_emrserverless, settings
+from moto.core import ACCOUNT_ID
+from moto.emrserverless import REGION as DEFAULT_REGION
+from moto.emrserverless import RELEASE_LABEL as DEFAULT_RELEASE_LABEL
+from unittest.mock import patch
+
+
+@contextmanager
+def does_not_raise():
+ yield
+
+
+@pytest.fixture(scope="function")
+def client():
+ with mock_emrserverless():
+ yield boto3.client("emr-serverless", region_name=DEFAULT_REGION)
+
+
+@pytest.fixture(scope="function")
+def application_factory(client):
+ application_list = []
+
+ if settings.TEST_SERVER_MODE:
+ resp = client.create_application(
+ name="test-emr-serverless-application-STARTED",
+ type="SPARK",
+ releaseLabel=DEFAULT_RELEASE_LABEL,
+ )
+ application_list.append(resp["applicationId"])
+
+ resp = client.create_application(
+ name="test-emr-serverless-application-STOPPED",
+ type="SPARK",
+ releaseLabel=DEFAULT_RELEASE_LABEL,
+ )
+ client.stop_application(applicationId=resp["applicationId"])
+ application_list.append(resp["applicationId"])
+
+ else:
+ application_state = [
+ "STARTED",
+ "STOPPED",
+ "CREATING",
+ "CREATED",
+ "STARTING",
+ "STOPPING",
+ "TERMINATED",
+ ]
+
+ for state in application_state:
+ with patch("moto.emrserverless.models.APPLICATION_STATUS", state):
+ resp = client.create_application(
+ name=f"test-emr-serverless-application-{state}",
+ type="SPARK",
+ releaseLabel=DEFAULT_RELEASE_LABEL,
+ )
+
+ application_list.append(resp["applicationId"])
+
+ yield application_list
+
+
+class TestCreateApplication:
+ @staticmethod
+ @mock_emrserverless
+ def test_create_application(client):
+ resp = client.create_application(
+ name="test-emr-serverless-application",
+ type="SPARK",
+ releaseLabel=DEFAULT_RELEASE_LABEL,
+ )
+
+ assert resp["name"] == "test-emr-serverless-application"
+ assert re.match(r"[a-z,0-9]{16}", resp["applicationId"])
+ assert (
+ resp["arn"]
+ == f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/applications/{resp['applicationId']}"
+ )
+
+ @staticmethod
+ @mock_emrserverless
+ def test_create_application_incorrect_type(client):
+ with pytest.raises(ClientError) as exc:
+ client.create_application(
+ name="test-emr-serverless-application",
+ type="SPARK3",
+ releaseLabel=DEFAULT_RELEASE_LABEL,
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "Unsupported engine SPARK3"
+
+ @staticmethod
+ @mock_emrserverless
+ def test_create_application_incorrect_release_label(client):
+ with pytest.raises(ClientError) as exc:
+ client.create_application(
+ name="test-emr-serverless-application",
+ type="SPARK",
+ releaseLabel="emr-fake",
+ )
+
+ err = exc.value.response["Error"]
+
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == "Type 'SPARK' is not supported for release label 'emr-fake' or release label does not exist"
+ )
+
+
+class TestDeleteApplication:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, application_factory):
+ self.client = client
+ self.application_ids = application_factory
+
+ @pytest.mark.parametrize(
+ "index,status,expectation",
+ argvalues=(
+ [
+ (0, "STARTED", pytest.raises(ClientError)),
+ (1, "STOPPED", does_not_raise()),
+ ]
+ if settings.TEST_SERVER_MODE
+ else [
+ (0, "STARTED", pytest.raises(ClientError)),
+ (1, "STOPPED", does_not_raise()),
+ (2, "CREATING", pytest.raises(ClientError)),
+ (3, "CREATED", does_not_raise()),
+ (4, "STARTING", pytest.raises(ClientError)),
+ (5, "STOPPING", pytest.raises(ClientError)),
+ (6, "TERMINATED", pytest.raises(ClientError)),
+ ]
+ ),
+ )
+ def test_valid_application_id(self, index, status, expectation):
+ with expectation as exc:
+ resp = self.client.delete_application(
+ applicationId=self.application_ids[index]
+ )
+
+ if exc:
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == f"Application {self.application_ids[index]} must be in one of the following statuses [CREATED, STOPPED]. Current status: {status}"
+ )
+ else:
+ assert resp is not None
+ assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
+
+ def test_invalid_application_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.delete_application(applicationId="fake_application_id")
+
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Application fake_application_id does not exist"
+
+
+class TestGetApplication:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client):
+ self.client = client
+
+ @staticmethod
+ def get_expected_resp(application_id, extra_configuration):
+ response = {
+ "applicationId": application_id,
+ "name": "test-emr-serverless-application",
+ "arn": f"arn:aws:emr-containers:us-east-1:123456789012:/applications/{application_id}",
+ "releaseLabel": "emr-6.6.0",
+ "type": "Spark",
+ "state": "STARTED",
+ "stateDetails": "",
+ "autoStartConfiguration": {"enabled": True},
+ "autoStopConfiguration": {"enabled": True, "idleTimeoutMinutes": 15},
+ "tags": {},
+ "createdAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ "updatedAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ }
+ return {**response, **extra_configuration}
+
+ @pytest.mark.parametrize(
+ "extra_configuration",
+ [
+ {},
+ {
+ "initialCapacity": {
+ "Driver": {
+ "workerCount": 1,
+ "workerConfiguration": {
+ "cpu": "2 vCPU",
+ "memory": "4 GB",
+ "disk": "20 GB",
+ },
+ }
+ }
+ },
+ {
+ "maximumCapacity": {
+ "cpu": "400 vCPU",
+ "memory": "1024 GB",
+ "disk": "1000 GB",
+ }
+ },
+ {
+ "networkConfiguration": {
+ "subnetIds": ["subnet-0123456789abcdefg"],
+ "securityGroupIds": ["sg-0123456789abcdefg"],
+ }
+ },
+ {
+ "initialCapacity": {
+ "Driver": {
+ "workerCount": 1,
+ "workerConfiguration": {
+ "cpu": "2 vCPU",
+ "memory": "4 GB",
+ "disk": "20 GB",
+ },
+ }
+ },
+ "maximumCapacity": {
+ "cpu": "400 vCPU",
+ "memory": "1024 GB",
+ "disk": "1000 GB",
+ },
+ "networkConfiguration": {
+ "subnetIds": ["subnet-0123456789abcdefg"],
+ "securityGroupIds": ["sg-0123456789abcdefg"],
+ },
+ },
+ ],
+ )
+ def test_filtering(self, extra_configuration):
+ application_id = self.client.create_application(
+ name="test-emr-serverless-application",
+ type="SPARK",
+ releaseLabel=DEFAULT_RELEASE_LABEL,
+ **extra_configuration,
+ )["applicationId"]
+ expected_resp = self.get_expected_resp(application_id, extra_configuration)
+
+ actual_resp = self.client.get_application(applicationId=application_id)[
+ "application"
+ ]
+
+ assert actual_resp == expected_resp
+
+ def test_invalid_application_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.get_application(applicationId="fake_application_id")
+
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Application fake_application_id does not exist"
+
+
+class TestListApplication:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, application_factory):
+ self.client = client
+ self.application_ids = application_factory
+
+ def test_response_context(self):
+ resp = self.client.list_applications()
+ expected_resp = {
+ "id": self.application_ids[0],
+ "name": "test-emr-serverless-application-STARTED",
+ "arn": f"arn:aws:emr-containers:us-east-1:123456789012:/applications/{self.application_ids[0]}",
+ "releaseLabel": "emr-6.6.0",
+ "type": "Spark",
+ "state": "STARTED",
+ "stateDetails": "",
+ "createdAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ "updatedAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ }
+
+ actual_resp = [
+ app for app in resp["applications"] if app["id"] == expected_resp["id"]
+ ][0]
+
+ assert actual_resp == expected_resp
+
+ @pytest.mark.parametrize(
+ "list_applications_args,job_count",
+ argvalues=(
+ [
+ ({}, 2),
+ ({"states": ["STARTED"]}, 1),
+ ({"states": ["STARTED", "STOPPED"]}, 2),
+ ({"states": ["FOOBAA"]}, 0),
+ ({"maxResults": 1}, 1),
+ ]
+ if settings.TEST_SERVER_MODE
+ else [
+ ({}, 7),
+ ({"states": ["CREATED"]}, 1),
+ ({"states": ["CREATED", "STARTING"]}, 2),
+ ({"states": ["FOOBAA"]}, 0),
+ ({"maxResults": 1}, 1),
+ ]
+ ),
+ )
+ def test_filtering(self, list_applications_args, job_count):
+ resp = self.client.list_applications(**list_applications_args)
+ assert len(resp["applications"]) == job_count
+
+ def test_next_token(self):
+ if settings.TEST_SERVER_MODE:
+ resp = self.client.list_applications(maxResults=1)
+ assert len(resp["applications"]) == 1
+
+ resp = self.client.list_applications(nextToken=resp["nextToken"])
+ assert len(resp["applications"]) == 1
+ else:
+ resp = self.client.list_applications(maxResults=2)
+ assert len(resp["applications"]) == 2
+
+ resp = self.client.list_applications(nextToken=resp["nextToken"])
+ assert len(resp["applications"]) == 5
+
+
+class TestStartApplication:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, application_factory):
+ self.client = client
+ self.application_ids = application_factory
+
+ def test_valid_application_id(self):
+ resp = self.client.start_application(applicationId=self.application_ids[1])
+ assert resp is not None
+ assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
+
+ def test_invalid_application_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.start_application(applicationId="fake_application_id")
+
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Application fake_application_id does not exist"
+
+
+class TestStopApplication:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, application_factory):
+ self.client = client
+ self.application_ids = application_factory
+
+ def test_valid_application_id(self):
+ resp = self.client.stop_application(applicationId=self.application_ids[1])
+ assert resp is not None
+ assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
+
+ def test_invalid_application_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.stop_application(applicationId="fake_application_id")
+
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Application fake_application_id does not exist"
+
+
+class TestUpdateApplication:
+ @pytest.fixture(autouse=True)
+ def _setup_environment(self, client, application_factory):
+ self.client = client
+ self.application_ids = application_factory
+
+ @staticmethod
+ def get_expected_resp(application_id, extra_configuration):
+ response = {
+ "applicationId": application_id,
+ "name": "test-emr-serverless-application-STOPPED",
+ "arn": f"arn:aws:emr-containers:us-east-1:123456789012:/applications/{application_id}",
+ "releaseLabel": "emr-6.6.0",
+ "type": "Spark",
+ "state": "STOPPED",
+ "stateDetails": "",
+ "autoStartConfiguration": {"enabled": True},
+ "autoStopConfiguration": {"enabled": True, "idleTimeoutMinutes": 15},
+ "tags": {},
+ "createdAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ "updatedAt": (
+ datetime.today()
+ .replace(hour=0, minute=0, second=0, microsecond=0)
+ .replace(tzinfo=timezone.utc)
+ ),
+ }
+ return {**response, **extra_configuration}
+
+ @pytest.mark.parametrize(
+ "index,status,expectation",
+ argvalues=(
+ [
+ (0, "STARTED", pytest.raises(ClientError)),
+ (1, "STOPPED", does_not_raise()),
+ ]
+ if settings.TEST_SERVER_MODE
+ else [
+ (0, "STARTED", pytest.raises(ClientError)),
+ (1, "STOPPED", does_not_raise()),
+ (2, "CREATING", pytest.raises(ClientError)),
+ (3, "CREATED", does_not_raise()),
+ (4, "STARTING", pytest.raises(ClientError)),
+ (5, "STOPPING", pytest.raises(ClientError)),
+ (6, "TERMINATED", pytest.raises(ClientError)),
+ ]
+ ),
+ )
+ def test_application_status(self, index, status, expectation):
+ with expectation as exc:
+ resp = self.client.update_application(
+ applicationId=self.application_ids[index]
+ )
+
+ if exc:
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert (
+ err["Message"]
+ == f"Application {self.application_ids[index]} must be in one of the following statuses [CREATED, STOPPED]. Current status: {status}"
+ )
+ else:
+ assert resp is not None
+ assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
+
+ @pytest.mark.parametrize(
+ "update_configuration",
+ [
+ {},
+ {
+ "initialCapacity": {
+ "Driver": {
+ "workerCount": 1,
+ "workerConfiguration": {
+ "cpu": "2 vCPU",
+ "memory": "4 GB",
+ "disk": "20 GB",
+ },
+ }
+ }
+ },
+ {
+ "maximumCapacity": {
+ "cpu": "400 vCPU",
+ "memory": "1024 GB",
+ "disk": "1000 GB",
+ }
+ },
+ {"autoStartConfiguration": {"enabled": False}},
+ {
+ "autoStopConfiguration": {
+ "enabled": False,
+ "idleTimeoutMinutes": 5,
+ }
+ },
+ {
+ "networkConfiguration": {
+ "subnetIds": ["subnet-0123456789abcdefg"],
+ "securityGroupIds": ["sg-0123456789abcdefg"],
+ }
+ },
+ {
+ "initialCapacity": {
+ "Driver": {
+ "workerCount": 1,
+ "workerConfiguration": {
+ "cpu": "2 vCPU",
+ "memory": "4 GB",
+ "disk": "20 GB",
+ },
+ }
+ },
+ "maximumCapacity": {
+ "cpu": "400 vCPU",
+ "memory": "1024 GB",
+ "disk": "1000 GB",
+ },
+ "autoStartConfiguration": {"enabled": False},
+ "autoStopConfiguration": {
+ "enabled": False,
+ "idleTimeoutMinutes": 5,
+ },
+ "networkConfiguration": {
+ "subnetIds": ["subnet-0123456789abcdefg"],
+ "securityGroupIds": ["sg-0123456789abcdefg"],
+ },
+ },
+ ],
+ )
+ def test_valid_update(self, update_configuration):
+ expected_resp = self.get_expected_resp(
+ self.application_ids[1], update_configuration
+ )
+
+ actual_resp = self.client.update_application(
+ applicationId=self.application_ids[1], **update_configuration
+ )["application"]
+
+ assert actual_resp == expected_resp
+
+ def test_invalid_application_id(self):
+ with pytest.raises(ClientError) as exc:
+ self.client.update_application(applicationId="fake_application_id")
+
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "Application fake_application_id does not exist"
diff --git a/tests/test_emrserverless/test_server.py b/tests/test_emrserverless/test_server.py
new file mode 100644
index 000000000..f773d76bc
--- /dev/null
+++ b/tests/test_emrserverless/test_server.py
@@ -0,0 +1,13 @@
+"""Test different server responses."""
+import sure # noqa # pylint: disable=unused-import
+
+import moto.server as server
+
+
+def test_emrserverless_list():
+ backend = server.create_backend_app("emr-serverless")
+ test_client = backend.test_client()
+
+ resp = test_client.get("/applications")
+ resp.status_code.should.equal(200)
+ str(resp.data).should.contain("applications")