From 869b3c45c1dc8a2d6e2c014bbbb4226bd49a266d Mon Sep 17 00:00:00 2001 From: Jordan Dimov <32362173+jordan-dimov@users.noreply.github.com> Date: Thu, 25 Feb 2021 16:07:26 +0000 Subject: [PATCH] Implements core MediaConnect endpoints (#3718) * Adds initial models and implements create_flow endpoint. * Updates README with a Releases section * Removes unused import * Adds __init__.py files to MediaLive and MediaConnect test directories. * Adds list_flows endpoint and tests. * Adds describe_flow endpoint and tests. * Adds delete_flow endpoint and tests. * Adds URLs * Adds start_flow and stop_flow endpoints and tests. * Adds tag_resource and list_tags_for_resource endpoints and tests. * Uses unquote for ARNs --- IMPLEMENTATION_COVERAGE.md | 18 +- moto/__init__.py | 1 + moto/backends.py | 1 + moto/mediaconnect/__init__.py | 6 + moto/mediaconnect/exceptions.py | 9 + moto/mediaconnect/models.py | 191 +++++++++++++++++++ moto/mediaconnect/responses.py | 83 ++++++++ moto/mediaconnect/urls.py | 18 ++ tests/test_mediaconnect/__init__.py | 0 tests/test_mediaconnect/test_mediaconnect.py | 152 +++++++++++++++ tests/test_mediaconnect/test_server.py | 21 ++ tests/test_medialive/__init__.py | 0 12 files changed, 491 insertions(+), 9 deletions(-) create mode 100644 moto/mediaconnect/__init__.py create mode 100644 moto/mediaconnect/exceptions.py create mode 100644 moto/mediaconnect/models.py create mode 100644 moto/mediaconnect/responses.py create mode 100644 moto/mediaconnect/urls.py create mode 100644 tests/test_mediaconnect/__init__.py create mode 100644 tests/test_mediaconnect/test_mediaconnect.py create mode 100644 tests/test_mediaconnect/test_server.py create mode 100644 tests/test_medialive/__init__.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 93db3f403..df35c6b12 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -6738,30 +6738,30 @@ ## mediaconnect
-0% implemented +30% implemented - [ ] add_flow_outputs - [ ] add_flow_sources - [ ] add_flow_vpc_interfaces -- [ ] create_flow -- [ ] delete_flow -- [ ] describe_flow +- [x] create_flow +- [x] delete_flow +- [x] describe_flow - [ ] describe_offering - [ ] describe_reservation - [ ] grant_flow_entitlements - [ ] list_entitlements -- [ ] list_flows +- [x] list_flows - [ ] list_offerings - [ ] list_reservations -- [ ] list_tags_for_resource +- [x] list_tags_for_resource - [ ] purchase_offering - [ ] remove_flow_output - [ ] remove_flow_source - [ ] remove_flow_vpc_interface - [ ] revoke_flow_entitlement -- [ ] start_flow -- [ ] stop_flow -- [ ] tag_resource +- [x] start_flow +- [x] stop_flow +- [x] tag_resource - [ ] untag_resource - [ ] update_flow - [ ] update_flow_entitlement diff --git a/moto/__init__.py b/moto/__init__.py index 046453566..70f10306c 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -121,6 +121,7 @@ mock_kinesisvideoarchivedmedia = lazy_load( ) mock_medialive = lazy_load(".medialive", "mock_medialive") mock_support = lazy_load(".support", "mock_support") +mock_mediaconnect = lazy_load(".mediaconnect", "mock_mediaconnect") # import logging # logging.getLogger('boto').setLevel(logging.CRITICAL) diff --git a/moto/backends.py b/moto/backends.py index 7e6de162e..b25c45cf3 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -78,6 +78,7 @@ BACKENDS = { ), "forecast": ("forecast", "forecast_backends"), "support": ("support", "support_backends"), + "mediaconnect": ("mediaconnect", "mediaconnect_backends"), } diff --git a/moto/mediaconnect/__init__.py b/moto/mediaconnect/__init__.py new file mode 100644 index 000000000..0528d8c20 --- /dev/null +++ b/moto/mediaconnect/__init__.py @@ -0,0 +1,6 @@ +from __future__ import unicode_literals +from .models import mediaconnect_backends +from ..core.models import base_decorator + +mediaconnect_backend = mediaconnect_backends["us-east-1"] +mock_mediaconnect = base_decorator(mediaconnect_backends) diff --git a/moto/mediaconnect/exceptions.py b/moto/mediaconnect/exceptions.py new file mode 100644 index 000000000..2272827fc --- /dev/null +++ b/moto/mediaconnect/exceptions.py @@ -0,0 +1,9 @@ +from __future__ import unicode_literals +from moto.core.exceptions import JsonRESTError + + +class NotFoundException(JsonRESTError): + code = 400 + + def __init__(self, message): + super(NotFoundException, self).__init__("NotFoundException", message) diff --git a/moto/mediaconnect/models.py b/moto/mediaconnect/models.py new file mode 100644 index 000000000..ead3dbf7e --- /dev/null +++ b/moto/mediaconnect/models.py @@ -0,0 +1,191 @@ +from __future__ import unicode_literals + +from collections import OrderedDict +from uuid import uuid4 + +from boto3 import Session +from moto.core import BaseBackend, BaseModel +from moto.mediaconnect.exceptions import NotFoundException + + +class Flow(BaseModel): + def __init__(self, *args, **kwargs): + self.availability_zone = kwargs.get("availability_zone") + self.entitlements = kwargs.get("entitlements", []) + self.name = kwargs.get("name") + self.outputs = kwargs.get("outputs", []) + self.source = kwargs.get("source", {}) + self.source_failover_config = kwargs.get("source_failover_config", {}) + self.sources = kwargs.get("sources", []) + self.vpc_interfaces = kwargs.get("vpc_interfaces", []) + self.status = "STANDBY" # one of 'STANDBY'|'ACTIVE'|'UPDATING'|'DELETING'|'STARTING'|'STOPPING'|'ERROR' + self._previous_status = None + self.description = None + self.flow_arn = None + self.egress_ip = None + + def to_dict(self, include=None): + data = { + "availabilityZone": self.availability_zone, + "description": self.description, + "egressIp": self.egress_ip, + "entitlements": self.entitlements, + "flowArn": self.flow_arn, + "name": self.name, + "outputs": self.outputs, + "source": self.source, + "sourceFailoverConfig": self.source_failover_config, + "sources": self.sources, + "status": self.status, + "vpcInterfaces": self.vpc_interfaces, + } + if include: + new_data = {k: v for k, v in data.items() if k in include} + if "sourceType" in include: + new_data["sourceType"] = "OWNED" + return new_data + return data + + def resolve_transient_states(self): + if self.status in ["STARTING"]: + self.status = "ACTIVE" + if self.status in ["STOPPING"]: + self.status = "STANDBY" + if self.status in ["UPDATING"]: + self.status = self._previous_status + self._previous_status = None + + +class Resource(BaseModel): + def __init__(self, *args, **kwargs): + self.resource_arn = kwargs.get("resource_arn") + self.tags = OrderedDict() + + def to_dict(self): + data = { + "resourceArn": self.resource_arn, + "tags": self.tags, + } + return data + + +class MediaConnectBackend(BaseBackend): + def __init__(self, region_name=None): + super(MediaConnectBackend, self).__init__() + self.region_name = region_name + self._flows = OrderedDict() + self._resources = OrderedDict() + + def reset(self): + region_name = self.region_name + self.__dict__ = {} + self.__init__(region_name) + + def create_flow( + self, + availability_zone, + entitlements, + name, + outputs, + source, + source_failover_config, + sources, + vpc_interfaces, + ): + flow = Flow( + availability_zone=availability_zone, + entitlements=entitlements, + name=name, + outputs=outputs, + source=source, + source_failover_config=source_failover_config, + sources=sources, + vpc_interfaces=vpc_interfaces, + ) + flow.description = "A Moto test flow" + flow.egress_ip = "127.0.0.1" + flow_id = uuid4().hex + flow.flow_arn = "arn:aws:mediaconnect:flow:{}".format(flow_id) + self._flows[flow.flow_arn] = flow + return flow + + def list_flows(self, max_results, next_token): + flows = list(self._flows.values()) + if max_results is not None: + flows = flows[:max_results] + response_flows = [ + fl.to_dict( + include=[ + "availabilityZone", + "description", + "flowArn", + "name", + "sourceType", + "status", + ] + ) + for fl in flows + ] + return response_flows, next_token + + def describe_flow(self, flow_arn=None): + messages = {} + if flow_arn in self._flows: + flow = self._flows[flow_arn] + flow.resolve_transient_states() + else: + raise NotFoundException(message="Flow not found.") + return flow.to_dict(), messages + + def delete_flow(self, flow_arn): + if flow_arn in self._flows: + flow = self._flows[flow_arn] + del self._flows[flow_arn] + else: + raise NotFoundException(message="Flow not found.") + return flow_arn, flow.status + + def start_flow(self, flow_arn): + if flow_arn in self._flows: + flow = self._flows[flow_arn] + flow.status = "STARTING" + else: + raise NotFoundException(message="Flow not found.") + return flow_arn, flow.status + + def stop_flow(self, flow_arn): + if flow_arn in self._flows: + flow = self._flows[flow_arn] + flow.status = "STOPPING" + else: + raise NotFoundException(message="Flow not found.") + return flow_arn, flow.status + + def tag_resource(self, resource_arn, tags): + if resource_arn in self._resources: + resource = self._resources[resource_arn] + else: + resource = Resource(resource_arn=resource_arn) + resource.tags.update(tags) + self._resources[resource_arn] = resource + return None + + def list_tags_for_resource(self, resource_arn): + if resource_arn in self._resources: + resource = self._resources[resource_arn] + else: + raise NotFoundException(message="Resource not found.") + return resource.tags + + # add methods from here + + +mediaconnect_backends = {} +for region in Session().get_available_regions("mediaconnect"): + mediaconnect_backends[region] = MediaConnectBackend() +for region in Session().get_available_regions( + "mediaconnect", partition_name="aws-us-gov" +): + mediaconnect_backends[region] = MediaConnectBackend() +for region in Session().get_available_regions("mediaconnect", partition_name="aws-cn"): + mediaconnect_backends[region] = MediaConnectBackend() diff --git a/moto/mediaconnect/responses.py b/moto/mediaconnect/responses.py new file mode 100644 index 000000000..9b6e2c155 --- /dev/null +++ b/moto/mediaconnect/responses.py @@ -0,0 +1,83 @@ +from __future__ import unicode_literals + +import json + +from moto.core.responses import BaseResponse +from .models import mediaconnect_backends + +try: + from urllib import unquote +except ImportError: + from urllib.parse import unquote + + +class MediaConnectResponse(BaseResponse): + SERVICE_NAME = "mediaconnect" + + @property + def mediaconnect_backend(self): + return mediaconnect_backends[self.region] + + def create_flow(self): + availability_zone = self._get_param("availabilityZone") + entitlements = self._get_param("entitlements") + name = self._get_param("name") + outputs = self._get_param("outputs") + source = self._get_param("source") + source_failover_config = self._get_param("sourceFailoverConfig") + sources = self._get_param("sources") + vpc_interfaces = self._get_param("vpcInterfaces") + flow = self.mediaconnect_backend.create_flow( + availability_zone=availability_zone, + entitlements=entitlements, + name=name, + outputs=outputs, + source=source, + source_failover_config=source_failover_config, + sources=sources, + vpc_interfaces=vpc_interfaces, + ) + return json.dumps(dict(flow=flow.to_dict())) + + def list_flows(self): + max_results = self._get_int_param("maxResults") + next_token = self._get_param("nextToken") + flows, next_token = self.mediaconnect_backend.list_flows( + max_results=max_results, next_token=next_token, + ) + return json.dumps(dict(flows=flows, nextToken=next_token)) + + def describe_flow(self): + flow_arn = unquote(self._get_param("flowArn")) + flow, messages = self.mediaconnect_backend.describe_flow(flow_arn=flow_arn,) + return json.dumps(dict(flow=flow, messages=messages)) + + def delete_flow(self): + flow_arn = unquote(self._get_param("flowArn")) + flow_arn, status = self.mediaconnect_backend.delete_flow(flow_arn=flow_arn,) + return json.dumps(dict(flowArn=flow_arn, status=status)) + + def start_flow(self): + flow_arn = unquote(self._get_param("flowArn")) + flow_arn, status = self.mediaconnect_backend.start_flow(flow_arn=flow_arn,) + return json.dumps(dict(flowArn=flow_arn, status=status)) + + def stop_flow(self): + flow_arn = unquote(self._get_param("flowArn")) + flow_arn, status = self.mediaconnect_backend.stop_flow(flow_arn=flow_arn,) + return json.dumps(dict(flowArn=flow_arn, status=status)) + + def tag_resource(self): + resource_arn = unquote(self._get_param("resourceArn")) + tags = self._get_param("tags") + self.mediaconnect_backend.tag_resource( + resource_arn=resource_arn, tags=tags, + ) + return json.dumps(dict()) + + def list_tags_for_resource(self): + resource_arn = unquote(self._get_param("resourceArn")) + tags = self.mediaconnect_backend.list_tags_for_resource( + resource_arn=resource_arn, + ) + return json.dumps(dict(tags=tags)) diff --git a/moto/mediaconnect/urls.py b/moto/mediaconnect/urls.py new file mode 100644 index 000000000..dc66635b7 --- /dev/null +++ b/moto/mediaconnect/urls.py @@ -0,0 +1,18 @@ +from __future__ import unicode_literals +from .responses import MediaConnectResponse + +url_bases = [ + "https?://mediaconnect.(.+).amazonaws.com", +] + + +response = MediaConnectResponse() + + +url_paths = { + "{0}/v1/flows": response.dispatch, + "{0}/v1/flows/(?P[^/.]+)": response.dispatch, + "{0}/v1/flows/start/(?P[^/.]+)": response.dispatch, + "{0}/v1/flows/stop/(?P[^/.]+)": response.dispatch, + "{0}/tags/(?P[^/.]+)": response.dispatch, +} diff --git a/tests/test_mediaconnect/__init__.py b/tests/test_mediaconnect/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_mediaconnect/test_mediaconnect.py b/tests/test_mediaconnect/test_mediaconnect.py new file mode 100644 index 000000000..194ee9e24 --- /dev/null +++ b/tests/test_mediaconnect/test_mediaconnect.py @@ -0,0 +1,152 @@ +from __future__ import unicode_literals + +import boto3 +import sure # noqa +from moto import mock_mediaconnect + + +region = "eu-west-1" + + +def _create_flow_config(name, **kwargs): + availability_zone = kwargs.get("availability_zone", "AZ1") + entitlements = kwargs.get( + "entitlements", + [ + { + "DataTransferSubscriberFeePercent": 12, + "Description": "An entitlement", + "Encryption": {"Algorithm": "aes256", "RoleArn": "some:role"}, + "EntitlementStatus": "ENABLED", + "Name": "Entitlement A", + "Subscribers": [], + } + ], + ) + outputs = kwargs.get("outputs", [{"Name": "Output 1", "Protocol": "zixi-push"}]) + source = kwargs.get( + "source", + { + "Decryption": {"Algorithm": "aes256", "RoleArn": "some:role"}, + "Description": "A source", + "Name": "Source A", + }, + ) + source_failover_config = kwargs.get("source_failover_config", {}) + sources = kwargs.get("sources", []) + vpc_interfaces = kwargs.get("vpc_interfaces", []) + flow_config = dict( + AvailabilityZone=availability_zone, + Entitlements=entitlements, + Name=name, + Outputs=outputs, + Source=source, + SourceFailoverConfig=source_failover_config, + Sources=sources, + VpcInterfaces=vpc_interfaces, + ) + return flow_config + + +@mock_mediaconnect +def test_create_flow_succeeds(): + client = boto3.client("mediaconnect", region_name=region) + channel_config = _create_flow_config("test Flow 1") + + response = client.create_flow(**channel_config) + + response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + response["Flow"]["FlowArn"][:26].should.equal("arn:aws:mediaconnect:flow:") + response["Flow"]["Name"].should.equal("test Flow 1") + response["Flow"]["Status"].should.equal("STANDBY") + + +@mock_mediaconnect +def test_list_flows_succeeds(): + client = boto3.client("mediaconnect", region_name=region) + flow_1_config = _create_flow_config("test Flow 1") + flow_2_config = _create_flow_config("test Flow 2") + + client.create_flow(**flow_1_config) + client.create_flow(**flow_2_config) + + response = client.list_flows() + len(response["Flows"]).should.equal(2) + + response["Flows"][0]["Name"].should.equal("test Flow 1") + response["Flows"][0]["AvailabilityZone"].should.equal("AZ1") + response["Flows"][0]["SourceType"].should.equal("OWNED") + response["Flows"][0]["Status"].should.equal("STANDBY") + + response["Flows"][1]["Name"].should.equal("test Flow 2") + response["Flows"][1]["AvailabilityZone"].should.equal("AZ1") + response["Flows"][1]["SourceType"].should.equal("OWNED") + response["Flows"][1]["Status"].should.equal("STANDBY") + + +@mock_mediaconnect +def test_describe_flow_succeeds(): + client = boto3.client("mediaconnect", region_name=region) + channel_config = _create_flow_config("test Flow 1") + + create_response = client.create_flow(**channel_config) + create_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + flow_arn = create_response["Flow"]["FlowArn"] + describe_response = client.describe_flow(FlowArn=flow_arn) + describe_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + describe_response["Flow"]["Name"].should.equal("test Flow 1") + + +@mock_mediaconnect +def test_delete_flow_succeeds(): + client = boto3.client("mediaconnect", region_name=region) + channel_config = _create_flow_config("test Flow 1") + + create_response = client.create_flow(**channel_config) + create_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + flow_arn = create_response["Flow"]["FlowArn"] + delete_response = client.delete_flow(FlowArn=flow_arn) + delete_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + delete_response["FlowArn"].should.equal(flow_arn) + delete_response["Status"].should.equal("STANDBY") + + +@mock_mediaconnect +def test_start_stop_flow_succeeds(): + client = boto3.client("mediaconnect", region_name=region) + channel_config = _create_flow_config("test Flow 1") + + create_response = client.create_flow(**channel_config) + create_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + create_response["Flow"]["Status"].should.equal("STANDBY") + flow_arn = create_response["Flow"]["FlowArn"] + + start_response = client.start_flow(FlowArn=flow_arn) + start_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + start_response["FlowArn"].should.equal(flow_arn) + start_response["Status"].should.equal("STARTING") + + describe_response = client.describe_flow(FlowArn=flow_arn) + describe_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + describe_response["Flow"]["Status"].should.equal("ACTIVE") + + stop_response = client.stop_flow(FlowArn=flow_arn) + stop_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + stop_response["FlowArn"].should.equal(flow_arn) + stop_response["Status"].should.equal("STOPPING") + + describe_response = client.describe_flow(FlowArn=flow_arn) + describe_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + describe_response["Flow"]["Status"].should.equal("STANDBY") + + +@mock_mediaconnect +def test_tag_resource_succeeds(): + client = boto3.client("mediaconnect", region_name=region) + + tag_response = client.tag_resource(ResourceArn="some-arn", Tags={"Tag1": "Value1"}) + tag_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + + list_response = client.list_tags_for_resource(ResourceArn="some-arn") + list_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + list_response["Tags"].should.equal({"Tag1": "Value1"}) diff --git a/tests/test_mediaconnect/test_server.py b/tests/test_mediaconnect/test_server.py new file mode 100644 index 000000000..86b27b886 --- /dev/null +++ b/tests/test_mediaconnect/test_server.py @@ -0,0 +1,21 @@ +from __future__ import unicode_literals + +import sure # noqa + +import moto.server as server +from moto import mock_mediaconnect + +""" +Test the different server responses +""" + + +@mock_mediaconnect +def test_mediaconnect_list_flows(): + backend = server.create_backend_app("mediaconnect") + test_client = backend.test_client() + + res = test_client.get("/v1/flows") + + result = res.data.decode("utf-8") + result.should.contain('"flows": []') diff --git a/tests/test_medialive/__init__.py b/tests/test_medialive/__init__.py new file mode 100644 index 000000000..e69de29bb