Feature - emr-containers (#4492)

This commit is contained in:
Andor Markus 2021-10-30 13:12:08 +02:00 committed by GitHub
parent 13f985115b
commit 213c081628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 648 additions and 2 deletions

View File

@ -1914,6 +1914,27 @@
- [ ] update_studio_session_mapping
</details>
## emr-containers
<details>
<summary>27% implemented</summary>
- [ ] cancel_job_run
- [ ] create_managed_endpoint
- [X] create_virtual_cluster
- [ ] delete_managed_endpoint
- [X] delete_virtual_cluster
- [ ] describe_job_run
- [ ] describe_managed_endpoint
- [X] describe_virtual_cluster
- [ ] list_job_runs
- [ ] list_managed_endpoints
- [ ] list_tags_for_resource
- [X] list_virtual_clusters
- [ ] start_job_run
- [ ] tag_resource
- [ ] untag_resource
</details>
## events
<details>
<summary>78% implemented</summary>
@ -4598,7 +4619,6 @@
- ecr-public
- elastic-inference
- elasticache
- emr-containers
- es
- finspace
- finspace-data

View File

@ -95,6 +95,7 @@ Here's the partial list of the AWS services that currently have support:
| ELB | @mock_elb | |
| ELBv2 | @mock_elbv2 | |
| EMR | @mock_emr | |
| EMRContainer | @mock_emrcontainers | |
| Forecast | @mock_forecast | |
| Glacier | @mock_glacier | |
| Glue | @mock_glue | |

View File

@ -74,6 +74,9 @@ mock_elb_deprecated = lazy_load(".elb", "mock_elb_deprecated")
mock_elbv2 = lazy_load(".elbv2", "mock_elbv2")
mock_emr = lazy_load(".emr", "mock_emr")
mock_emr_deprecated = lazy_load(".emr", "mock_emr_deprecated")
mock_emrcontainers = lazy_load(
".emrcontainers", "mock_emrcontainers", boto3_name="emr-containers"
)
mock_events = lazy_load(".events", "mock_events")
mock_firehose = lazy_load(".firehose", "mock_firehose")
mock_forecast = lazy_load(".forecast", "mock_forecast")

View File

@ -1,4 +1,4 @@
# autogenerated by scripts/update_backend_index.py
# autogenerated by /Users/markus/git_projects_os/moto/scripts/update_backend_index.py
import re
backend_url_patterns = [
@ -57,6 +57,7 @@ backend_url_patterns = [
("elbv2", re.compile("https?://elasticloadbalancing\\.(.+)\\.amazonaws.com")),
("emr", re.compile("https?://(.+)\\.elasticmapreduce\\.amazonaws.com")),
("emr", re.compile("https?://elasticmapreduce\\.(.+)\\.amazonaws.com")),
("emr-containers", re.compile("https?://emr-containers\\.(.+)\\.amazonaws\\.com")),
("events", re.compile("https?://events\\.(.+)\\.amazonaws\\.com")),
("firehose", re.compile("https?://firehose\\.(.+)\\.amazonaws\\.com")),
("forecast", re.compile("https?://forecast\\.(.+)\\.amazonaws\\.com")),

View File

@ -0,0 +1,7 @@
"""emrcontainers module initialization; sets value for base decorator."""
from .models import emrcontainers_backends
from ..core.models import base_decorator
REGION = "us-east-1"
emrcontainers_backend = emrcontainers_backends["us-east-1"]
mock_emrcontainers = base_decorator(emrcontainers_backends)

View File

@ -0,0 +1,2 @@
"""Exceptions raised by the emrcontainers service."""
# from moto.core.exceptions import JsonRESTError

View File

@ -0,0 +1,190 @@
"""EMRContainersBackend class with methods for supported APIs."""
from datetime import datetime
from boto3 import Session
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID
from moto.core.utils import iso_8601_datetime_without_milliseconds
from .utils import random_cluster_id, get_partition, paginated_list
# String Templates
from ..config.exceptions import ValidationException
VIRTUAL_CLUSTER_ARN_TEMPLATE = (
"arn:{partition}:emr-containers:{region}:"
+ str(ACCOUNT_ID)
+ ":/virtualclusters/{virtual_cluster_id}"
)
# Defaults used for creating a Virtual cluster
ACTIVE_STATUS = "ACTIVE"
class FakeCluster(BaseModel):
def __init__(
self,
name,
container_provider,
client_token,
region_name,
aws_partition,
tags=None,
virtual_cluster_id=None,
):
self.id = virtual_cluster_id or random_cluster_id()
self.name = name
self.client_token = client_token
self.arn = VIRTUAL_CLUSTER_ARN_TEMPLATE.format(
partition=aws_partition, region=region_name, virtual_cluster_id=self.id
)
self.state = ACTIVE_STATUS
self.container_provider = container_provider
self.container_provider_id = container_provider["id"]
self.namespace = container_provider["info"]["eksInfo"]["namespace"]
self.creation_date = iso_8601_datetime_without_milliseconds(
datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
)
self.tags = tags
def __iter__(self):
yield "id", self.id
yield "name", self.name
yield "arn", self.arn
yield "state", self.state
yield "containerProvider", self.container_provider
yield "createdAt", self.creation_date
yield "tags", self.tags
def to_dict(self):
# Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeVirtualCluster.html
# (response syntax section)
return {
"id": self.id,
"name": self.name,
"arn": self.arn,
"state": self.state,
"containerProvider": self.container_provider,
"createdAt": self.creation_date,
"tags": self.tags,
}
class EMRContainersBackend(BaseBackend):
"""Implementation of EMRContainers APIs."""
def __init__(self, region_name=None):
super(EMRContainersBackend, self).__init__()
self.virtual_clusters = dict()
self.virtual_cluster_count = 0
self.region_name = region_name
self.partition = get_partition(region_name)
def reset(self):
"""Re-initialize all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_virtual_cluster(self, name, container_provider, client_token, tags=None):
occupied_namespaces = [
virtual_cluster.namespace
for virtual_cluster in self.virtual_clusters.values()
]
if container_provider["info"]["eksInfo"]["namespace"] in occupied_namespaces:
raise ValidationException(
"A virtual cluster already exists in the given namespace"
)
virtual_cluster = FakeCluster(
name=name,
container_provider=container_provider,
client_token=client_token,
tags=tags,
region_name=self.region_name,
aws_partition=self.partition,
)
self.virtual_clusters[virtual_cluster.id] = virtual_cluster
self.virtual_cluster_count += 1
return virtual_cluster
def delete_virtual_cluster(self, id):
if id not in self.virtual_clusters:
raise ValidationException("VirtualCluster does not exist")
self.virtual_clusters[id].state = "TERMINATED"
return self.virtual_clusters[id]
def describe_virtual_cluster(self, id):
if id not in self.virtual_clusters:
raise ValidationException(f"Virtual cluster {id} doesn't exist.")
return self.virtual_clusters[id].to_dict()
def list_virtual_clusters(
self,
container_provider_id,
container_provider_type,
created_after,
created_before,
states,
max_results,
next_token,
):
virtual_clusters = [
virtual_cluster.to_dict()
for virtual_cluster in self.virtual_clusters.values()
]
if container_provider_id:
virtual_clusters = [
virtual_cluster
for virtual_cluster in virtual_clusters
if virtual_cluster["containerProvider"]["id"] == container_provider_id
]
if container_provider_type:
virtual_clusters = [
virtual_cluster
for virtual_cluster in virtual_clusters
if virtual_cluster["containerProvider"]["type"]
== container_provider_type
]
if created_after:
virtual_clusters = [
virtual_cluster
for virtual_cluster in virtual_clusters
if virtual_cluster["createdAt"] >= created_after
]
if created_before:
virtual_clusters = [
virtual_cluster
for virtual_cluster in virtual_clusters
if virtual_cluster["createdAt"] <= created_before
]
if states:
virtual_clusters = [
virtual_cluster
for virtual_cluster in virtual_clusters
if virtual_cluster["state"] in states
]
return paginated_list(virtual_clusters, max_results, next_token)
emrcontainers_backends = {}
for available_region in Session().get_available_regions("emr-containers"):
emrcontainers_backends[available_region] = EMRContainersBackend(available_region)
for available_region in Session().get_available_regions(
"emr-containers", partition_name="aws-us-gov"
):
emrcontainers_backends[available_region] = EMRContainersBackend(available_region)
for available_region in Session().get_available_regions(
"emr-containers", partition_name="aws-cn"
):
emrcontainers_backends[available_region] = EMRContainersBackend(available_region)

View File

@ -0,0 +1,71 @@
"""Handles incoming emrcontainers requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import emrcontainers_backends
DEFAULT_MAX_RESULTS = 100
DEFAULT_NEXT_TOKEN = ""
DEFAULT_CONTAINER_PROVIDER_TYPE = "EKS"
class EMRContainersResponse(BaseResponse):
"""Handler for EMRContainers requests and responses."""
SERVICE_NAME = "emr-containers"
@property
def emrcontainers_backend(self):
"""Return backend instance specific for this region."""
return emrcontainers_backends[self.region]
def create_virtual_cluster(self):
name = self._get_param("name")
container_provider = self._get_param("containerProvider")
client_token = self._get_param("clientToken")
tags = self._get_param("tags")
virtual_cluster = self.emrcontainers_backend.create_virtual_cluster(
name=name,
container_provider=container_provider,
client_token=client_token,
tags=tags,
)
return 200, {}, json.dumps(dict(virtual_cluster))
def delete_virtual_cluster(self):
id = self._get_param("virtualClusterId")
virtual_cluster = self.emrcontainers_backend.delete_virtual_cluster(id=id)
return 200, {}, json.dumps(dict(virtual_cluster))
def describe_virtual_cluster(self):
id = self._get_param("virtualClusterId")
virtual_cluster = self.emrcontainers_backend.describe_virtual_cluster(id=id)
response = {"virtualCluster": virtual_cluster}
return 200, {}, json.dumps(response)
def list_virtual_clusters(self):
container_provider_id = self._get_param("containerProviderId")
container_provider_type = self._get_param(
"containerProviderType", DEFAULT_CONTAINER_PROVIDER_TYPE
)
created_after = self._get_param("createdAfter")
created_before = self._get_param("createdBefore")
states = self.querystring.get("states", [])
max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
virtual_clusters, next_token = self.emrcontainers_backend.list_virtual_clusters(
container_provider_id=container_provider_id,
container_provider_type=container_provider_type,
created_after=created_after,
created_before=created_before,
states=states,
max_results=max_results,
next_token=next_token,
)
response = {"virtualClusters": virtual_clusters, "nextToken": next_token}
return 200, {}, json.dumps(response)

View File

@ -0,0 +1,12 @@
"""emrcontainers base URL and path."""
from .responses import EMRContainersResponse
url_bases = [
r"https?://emr-containers\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/virtualclusters$": EMRContainersResponse.dispatch,
"{0}/virtualclusters/(?P<virtualClusterId>[^/]+)/?$": EMRContainersResponse.dispatch,
}

View File

@ -0,0 +1,46 @@
# import json
import string
import random
def get_partition(region):
valid_matches = [
# (region prefix, aws partition)
("cn-", "aws-cn"),
("us-gov-", "aws-us-gov"),
("us-gov-iso-", "aws-iso"),
("us-gov-iso-b-", "aws-iso-b"),
]
for prefix, partition in valid_matches:
if region.startswith(prefix):
return partition
return "aws"
def random_id(size=13):
chars = list(range(10)) + list(string.ascii_lowercase)
return "".join(str(random.choice(chars)) for x in range(size))
def random_cluster_id(size=13):
return random_id(size=25)
def paginated_list(full_list, max_results, next_token):
"""
Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results
number of elements, and the new next_token which can be passed back in for the next segment of the full list.
"""
if next_token is None or not next_token:
next_token = 0
next_token = int(next_token)
sorted_list = sorted(full_list, key=lambda d: d["name"])
values = sorted_list[next_token : next_token + max_results]
if len(values) == max_results:
new_next_token = str(next_token + max_results)
else:
new_next_token = None
return values, new_next_token

View File

View File

@ -0,0 +1,277 @@
"""Unit tests for emrcontainers-supported APIs."""
import re
from datetime import datetime, timezone, timedelta
from unittest import SkipTest
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_emrcontainers, settings
from moto.core import ACCOUNT_ID
from unittest.mock import patch
from moto.emrcontainers import REGION as DEFAULT_REGION
@pytest.fixture(scope="function")
def client():
with mock_emrcontainers():
yield boto3.client("emr-containers", region_name=DEFAULT_REGION)
@pytest.fixture(scope="function")
def virtual_cluster_factory(client):
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant manipulate time in server mode")
cluster_state = ["RUNNING", "TERMINATING", "TERMINATED", "ARRESTED"]
cluster_list = []
for i in range(4):
with patch("moto.emrcontainers.models.ACTIVE_STATUS", cluster_state[i]):
resp = client.create_virtual_cluster(
name="test-emr-virtual-cluster",
containerProvider={
"type": "EKS",
"id": "test-eks-cluster",
"info": {"eksInfo": {"namespace": f"emr-container-{i}"}},
},
)
cluster_list.append(resp["id"])
yield cluster_list
class TestCreateVirtualCluster:
@staticmethod
@mock_emrcontainers
def test_create_virtual_cluster(client):
resp = client.create_virtual_cluster(
name="test-emr-virtual-cluster",
containerProvider={
"type": "EKS",
"id": "test-eks-cluster",
"info": {"eksInfo": {"namespace": "emr-container"}},
},
)
cluster_count = len(client.list_virtual_clusters()["virtualClusters"])
assert resp["name"] == "test-emr-virtual-cluster"
assert re.match(r"[a-z,0-9]{25}", resp["id"])
assert (
resp["arn"]
== f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/virtualclusters/{resp['id']}"
)
assert cluster_count == 1
@staticmethod
@mock_emrcontainers
def test_create_virtual_cluster_on_same_namespace(client):
client.create_virtual_cluster(
name="test-emr-virtual-cluster",
containerProvider={
"type": "EKS",
"id": "test-eks-cluster",
"info": {"eksInfo": {"namespace": "emr-container"}},
},
)
with pytest.raises(ClientError) as exc:
client.create_virtual_cluster(
name="test-emr-virtual-cluster",
containerProvider={
"type": "EKS",
"id": "test-eks-cluster",
"info": {"eksInfo": {"namespace": "emr-container"}},
},
)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
err["Message"] == "A virtual cluster already exists in the given namespace"
)
class TestDeleteVirtualCluster:
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory):
self.client = client
self.virtual_cluster_ids = virtual_cluster_factory
def test_existing_virtual_cluster(self):
resp = self.client.delete_virtual_cluster(id=self.virtual_cluster_ids[0])
assert resp["id"] == self.virtual_cluster_ids[0]
def test_non_existing_virtual_cluster(self):
with pytest.raises(ClientError) as exc:
self.client.delete_virtual_cluster(id="foobaa")
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert err["Message"] == "VirtualCluster does not exist"
class TestDescribeVirtualCluster:
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory):
self.client = client
self.virtual_cluster_ids = virtual_cluster_factory
def test_existing_virtual_cluster(self):
resp = self.client.describe_virtual_cluster(id=self.virtual_cluster_ids[0])
expected_resp = {
"arn": f"arn:aws:emr-containers:us-east-1:{ACCOUNT_ID}:/virtualclusters/{self.virtual_cluster_ids[0]}",
"containerProvider": {
"id": "test-eks-cluster",
"info": {"eksInfo": {"namespace": "emr-container-0"}},
"type": "EKS",
},
"createdAt": (
datetime.today()
.replace(hour=0, minute=0, second=0, microsecond=0)
.replace(tzinfo=timezone.utc)
),
"id": self.virtual_cluster_ids[0],
"name": "test-emr-virtual-cluster",
"state": "RUNNING",
}
assert resp["virtualCluster"] == expected_resp
def test_non_existing_virtual_cluster(self):
with pytest.raises(ClientError) as exc:
self.client.describe_virtual_cluster(id="foobaa")
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert err["Message"] == "Virtual cluster foobaa doesn't exist."
class TestListVirtualClusters:
@pytest.fixture(autouse=True)
def _setup_environment(self, client, virtual_cluster_factory):
self.client = client
def test_base(self):
resp = self.client.list_virtual_clusters()
assert len(resp["virtualClusters"]) == 4
def test_valid_container_provider_id(self):
resp = self.client.list_virtual_clusters(containerProviderId="test-eks-cluster")
assert len(resp["virtualClusters"]) == 4
def test_invalid_container_provider_id(self):
resp = self.client.list_virtual_clusters(containerProviderId="foobaa")
assert len(resp["virtualClusters"]) == 0
def test_valid_container_provider_type(self):
resp = self.client.list_virtual_clusters(containerProviderType="EKS")
assert len(resp["virtualClusters"]) == 4
def test_invalid_container_provider_type(self):
resp = self.client.list_virtual_clusters(containerProviderType="AKS")
assert len(resp["virtualClusters"]) == 0
def test_created_after_yesterday(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(createdAfter=yesterday)
assert len(resp["virtualClusters"]) == 4
def test_created_after_tomorrow(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(createdAfter=tomorrow)
assert len(resp["virtualClusters"]) == 0
def test_created_after_yesterday_running_state(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdAfter=yesterday, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 1
def test_created_after_tomorrow_running_state(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdAfter=tomorrow, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 0
def test_created_after_yesterday_two_state_limit(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdAfter=yesterday, states=["RUNNING", "TERMINATED"], maxResults=1
)
assert len(resp["virtualClusters"]) == 1
def test_created_before_yesterday(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(createdBefore=yesterday)
assert len(resp["virtualClusters"]) == 0
def test_created_before_tomorrow(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(createdBefore=tomorrow)
assert len(resp["virtualClusters"]) == 4
def test_created_before_yesterday_running_state(self):
today = datetime.now()
yesterday = today - timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdBefore=yesterday, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 0
def test_created_before_tomorrow_running_state(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdBefore=tomorrow, states=["RUNNING"]
)
assert len(resp["virtualClusters"]) == 1
def test_created_before_tomorrow_two_state_limit(self):
today = datetime.now()
tomorrow = today + timedelta(days=1)
resp = self.client.list_virtual_clusters(
createdBefore=tomorrow, states=["RUNNING", "TERMINATED"], maxResults=1
)
assert len(resp["virtualClusters"]) == 1
def test_states_one_state(self):
resp = self.client.list_virtual_clusters(states=["RUNNING"])
assert len(resp["virtualClusters"]) == 1
def test_states_two_state(self):
resp = self.client.list_virtual_clusters(states=["RUNNING", "TERMINATED"])
assert len(resp["virtualClusters"]) == 2
def test_states_invalid_state(self):
resp = self.client.list_virtual_clusters(states=["FOOBAA"])
assert len(resp["virtualClusters"]) == 0
def test_max_result(self):
resp = self.client.list_virtual_clusters(maxResults=1)
assert len(resp["virtualClusters"]) == 1
def test_next_token(self):
resp = self.client.list_virtual_clusters(maxResults=2)
assert len(resp["virtualClusters"]) == 2
resp = self.client.list_virtual_clusters(nextToken=resp["nextToken"])
assert len(resp["virtualClusters"]) == 2

View File

@ -0,0 +1,16 @@
"""Test different server responses."""
import sure # noqa # pylint: disable=unused-import
import moto.server as server
"""
Test the different server responses
"""
def test_list_virtual_clusters():
backend = server.create_backend_app("emr-containers")
test_client = backend.test_client()
res = test_client.get("/virtualclusters")
b'{"virtualClusters": [], "nextToken": null}' in res.data