From cf3cf8b1346b4e04f73df244f8af5478aa64e713 Mon Sep 17 00:00:00 2001 From: Maria Date: Thu, 6 May 2021 19:33:48 +0200 Subject: [PATCH] DMS - basic task replication methods implementation (#3900) * DMS - basic task replication methods implementation * Add ResourceNotFoundFault tests --- IMPLEMENTATION_COVERAGE.md | 10 +- docs/index.rst | 2 + moto/__init__.py | 1 + moto/backends.py | 1 + moto/dms/__init__.py | 6 ++ moto/dms/exceptions.py | 25 +++++ moto/dms/models.py | 199 ++++++++++++++++++++++++++++++++++++ moto/dms/responses.py | 95 +++++++++++++++++ moto/dms/urls.py | 11 ++ moto/dms/utils.py | 49 +++++++++ tests/test_dms/__init__ | 0 tests/test_dms/test_dms.py | 202 +++++++++++++++++++++++++++++++++++++ 12 files changed, 596 insertions(+), 5 deletions(-) create mode 100644 moto/dms/__init__.py create mode 100644 moto/dms/exceptions.py create mode 100644 moto/dms/models.py create mode 100644 moto/dms/responses.py create mode 100644 moto/dms/urls.py create mode 100644 moto/dms/utils.py create mode 100644 tests/test_dms/__init__ create mode 100644 tests/test_dms/test_dms.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index b8837a469..0b8a15345 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -2960,14 +2960,14 @@ - [ ] create_event_subscription - [ ] create_replication_instance - [ ] create_replication_subnet_group -- [ ] create_replication_task +- [X] create_replication_task - [ ] delete_certificate - [ ] delete_connection - [ ] delete_endpoint - [ ] delete_event_subscription - [ ] delete_replication_instance - [ ] delete_replication_subnet_group -- [ ] delete_replication_task +- [X] delete_replication_task - [ ] delete_replication_task_assessment_run - [ ] describe_account_attributes - [ ] describe_applicable_individual_assessments @@ -2987,7 +2987,7 @@ - [ ] describe_replication_task_assessment_results - [ ] describe_replication_task_assessment_runs - [ ] describe_replication_task_individual_assessments -- [ ] describe_replication_tasks +- [X] describe_replication_tasks - [ ] describe_schemas - [ ] describe_table_statistics - [ ] import_certificate @@ -3002,10 +3002,10 @@ - [ ] refresh_schemas - [ ] reload_tables - [ ] remove_tags_from_resource -- [ ] start_replication_task +- [X] start_replication_task - [ ] start_replication_task_assessment - [ ] start_replication_task_assessment_run -- [ ] stop_replication_task +- [X] stop_replication_task - [ ] test_connection diff --git a/docs/index.rst b/docs/index.rst index 4f2d7e090..a5fc2ad8b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,6 +40,8 @@ Currently implemented Services: +---------------------------+-----------------------+------------------------------------+ | Data Pipeline | @mock_datapipeline | basic endpoints done | +---------------------------+-----------------------+------------------------------------+ +| DMS | @mock_dms | basic endpoints done | ++---------------------------+-----------------------+------------------------------------+ | DynamoDB | - @mock_dynamodb | - core endpoints done | | DynamoDB2 | - @mock_dynamodb2 | - core endpoints + partial indexes | +---------------------------+-----------------------+------------------------------------+ diff --git a/moto/__init__.py b/moto/__init__.py index 5d45e3d10..7c69d337d 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -44,6 +44,7 @@ mock_datapipeline_deprecated = lazy_load( ".datapipeline", "mock_datapipeline_deprecated" ) mock_datasync = lazy_load(".datasync", "mock_datasync") +mock_dms = lazy_load(".dms", "mock_dms") mock_dynamodb = lazy_load(".dynamodb", "mock_dynamodb") mock_dynamodb_deprecated = lazy_load(".dynamodb", "mock_dynamodb_deprecated") mock_dynamodb2 = lazy_load(".dynamodb2", "mock_dynamodb2") diff --git a/moto/backends.py b/moto/backends.py index 33f4b2973..ac2725330 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -21,6 +21,7 @@ BACKENDS = { "config": ("config", "config_backends"), "datapipeline": ("datapipeline", "datapipeline_backends"), "datasync": ("datasync", "datasync_backends"), + "dms": ("dms", "dms_backends"), "dynamodb": ("dynamodb", "dynamodb_backends"), "dynamodb2": ("dynamodb2", "dynamodb_backends2"), "dynamodbstreams": ("dynamodbstreams", "dynamodbstreams_backends"), diff --git a/moto/dms/__init__.py b/moto/dms/__init__.py new file mode 100644 index 000000000..7af2baf89 --- /dev/null +++ b/moto/dms/__init__.py @@ -0,0 +1,6 @@ +from __future__ import unicode_literals +from .models import dms_backends +from ..core.models import base_decorator + +dms_backend = dms_backends["us-east-1"] +mock_dms = base_decorator(dms_backends) diff --git a/moto/dms/exceptions.py b/moto/dms/exceptions.py new file mode 100644 index 000000000..6ae7a088b --- /dev/null +++ b/moto/dms/exceptions.py @@ -0,0 +1,25 @@ +from __future__ import unicode_literals +from moto.core.exceptions import JsonRESTError + + +class DmsClientError(JsonRESTError): + code = 400 + + +class ResourceNotFoundFault(DmsClientError): + def __init__(self, message): + super(ResourceNotFoundFault, self).__init__("ResourceNotFoundFault", message) + + +class InvalidResourceStateFault(DmsClientError): + def __init__(self, message): + super(InvalidResourceStateFault, self).__init__( + "InvalidResourceStateFault", message + ) + + +class ResourceAlreadyExistsFault(DmsClientError): + def __init__(self, message): + super(ResourceAlreadyExistsFault, self).__init__( + "ResourceAlreadyExistsFault", message + ) diff --git a/moto/dms/models.py b/moto/dms/models.py new file mode 100644 index 000000000..de857a464 --- /dev/null +++ b/moto/dms/models.py @@ -0,0 +1,199 @@ +from __future__ import unicode_literals + +import json + +from boto3 import Session +from datetime import datetime +from moto.core import ACCOUNT_ID, BaseBackend, BaseModel + +from .exceptions import ( + InvalidResourceStateFault, + ResourceAlreadyExistsFault, + ResourceNotFoundFault, +) +from .utils import filter_tasks + + +class DatabaseMigrationServiceBackend(BaseBackend): + def __init__(self, region_name=None): + super(DatabaseMigrationServiceBackend, self).__init__() + self.region_name = region_name + self.replication_tasks = {} + + def reset(self): + region_name = self.region_name + self.__dict__ = {} + self.__init__(region_name) + + def create_replication_task( + self, + replication_task_identifier, + source_endpoint_arn, + target_endpoint_arn, + replication_instance_arn, + migration_type, + table_mappings, + replication_task_settings, + cdc_start_time, + cdc_start_position, + cdc_stop_position, + tags, + task_data, + resource_identifier, + ): + replication_task = FakeReplicationTask( + replication_task_identifier=replication_task_identifier, + source_endpoint_arn=source_endpoint_arn, + target_endpoint_arn=target_endpoint_arn, + replication_instance_arn=replication_instance_arn, + migration_type=migration_type, + table_mappings=table_mappings, + replication_task_settings=replication_task_settings, + region_name=self.region_name, + ) + + if self.replication_tasks.get(replication_task.arn): + raise ResourceAlreadyExistsFault( + "The resource you are attempting to create already exists." + ) + + self.replication_tasks[replication_task.arn] = replication_task + + return replication_task + + def start_replication_task( + self, + replication_task_arn, + start_replication_task_type, + cdc_start_time, + cdc_start_position, + cdc_stop_position, + ): + if not self.replication_tasks.get(replication_task_arn): + raise ResourceNotFoundFault("Replication task could not be found.") + + return self.replication_tasks[replication_task_arn].start() + + def stop_replication_task(self, replication_task_arn): + if not self.replication_tasks.get(replication_task_arn): + raise ResourceNotFoundFault("Replication task could not be found.") + + return self.replication_tasks[replication_task_arn].stop() + + def delete_replication_task(self, replication_task_arn): + if not self.replication_tasks.get(replication_task_arn): + raise ResourceNotFoundFault("Replication task could not be found.") + + task = self.replication_tasks[replication_task_arn] + task.delete() + self.replication_tasks.pop(replication_task_arn) + + return task + + def describe_replication_tasks(self, filters, max_records, without_settings): + replication_tasks = filter_tasks(self.replication_tasks.values(), filters) + + if max_records and max_records > 0: + replication_tasks = replication_tasks[:max_records] + + return None, replication_tasks + + +class FakeReplicationTask(BaseModel): + def __init__( + self, + replication_task_identifier, + migration_type, + replication_instance_arn, + source_endpoint_arn, + target_endpoint_arn, + table_mappings, + replication_task_settings, + region_name, + ): + self.id = replication_task_identifier + self.region = region_name + self.migration_type = migration_type + self.replication_instance_arn = replication_instance_arn + self.source_endpoint_arn = source_endpoint_arn + self.target_endpoint_arn = target_endpoint_arn + self.table_mappings = table_mappings + self.replication_task_settings = replication_task_settings + + self.status = "creating" + + self.creation_date = datetime.utcnow() + self.start_date = None + self.stop_date = None + + @property + def arn(self): + return "arn:aws:dms:{region}:{account_id}:task:{task_id}".format( + region=self.region, account_id=ACCOUNT_ID, task_id=self.id + ) + + def to_dict(self): + start_date = self.start_date.isoformat() if self.start_date else None + stop_date = self.stop_date.isoformat() if self.stop_date else None + + return { + "ReplicationTaskIdentifier": self.id, + "SourceEndpointArn": self.source_endpoint_arn, + "TargetEndpointArn": self.target_endpoint_arn, + "ReplicationInstanceArn": self.replication_instance_arn, + "MigrationType": self.migration_type, + "TableMappings": json.dumps(self.table_mappings), + "ReplicationTaskSettings": json.dumps(self.replication_task_settings), + "Status": self.status, + "ReplicationTaskCreationDate": self.creation_date.isoformat(), + "ReplicationTaskStartDate": start_date, + "ReplicationTaskArn": self.arn, + "ReplicationTaskStats": { + "FullLoadProgressPercent": 100, + "ElapsedTimeMillis": 100, + "TablesLoaded": 1, + "TablesLoading": 0, + "TablesQueued": 0, + "TablesErrored": 0, + "FreshStartDate": start_date, + "StartDate": start_date, + "StopDate": stop_date, + "FullLoadStartDate": start_date, + "FullLoadFinishDate": stop_date, + }, + } + + def ready(self): + self.status = "ready" + return self + + def start(self): + self.status = "starting" + self.start_date = datetime.utcnow() + self.run() + return self + + def stop(self): + if self.status != "running": + raise InvalidResourceStateFault("Replication task is not running") + + self.status = "stopped" + self.stop_date = datetime.utcnow() + return self + + def delete(self): + self.status = "deleting" + return self + + def run(self): + self.status = "running" + return self + + +dms_backends = {} +for region in Session().get_available_regions("dms"): + dms_backends[region] = DatabaseMigrationServiceBackend() +for region in Session().get_available_regions("dms", partition_name="aws-us-gov"): + dms_backends[region] = DatabaseMigrationServiceBackend() +for region in Session().get_available_regions("dms", partition_name="aws-cn"): + dms_backends[region] = DatabaseMigrationServiceBackend() diff --git a/moto/dms/responses.py b/moto/dms/responses.py new file mode 100644 index 000000000..d94e43b87 --- /dev/null +++ b/moto/dms/responses.py @@ -0,0 +1,95 @@ +from __future__ import unicode_literals +from moto.core.responses import BaseResponse +from .models import dms_backends +import json + + +class DatabaseMigrationServiceResponse(BaseResponse): + SERVICE_NAME = "dms" + + @property + def dms_backend(self): + return dms_backends[self.region] + + # add methods from here + + def create_replication_task(self): + replication_task_identifier = self._get_param("ReplicationTaskIdentifier") + source_endpoint_arn = self._get_param("SourceEndpointArn") + target_endpoint_arn = self._get_param("TargetEndpointArn") + replication_instance_arn = self._get_param("ReplicationInstanceArn") + migration_type = self._get_param("MigrationType") + table_mappings = self._get_param("TableMappings") + replication_task_settings = self._get_param("ReplicationTaskSettings") + cdc_start_time = self._get_param("CdcStartTime") + cdc_start_position = self._get_param("CdcStartPosition") + cdc_stop_position = self._get_param("CdcStopPosition") + tags = self._get_list_prefix("Tags.member") + task_data = self._get_param("TaskData") + resource_identifier = self._get_param("ResourceIdentifier") + replication_task = self.dms_backend.create_replication_task( + replication_task_identifier=replication_task_identifier, + source_endpoint_arn=source_endpoint_arn, + target_endpoint_arn=target_endpoint_arn, + replication_instance_arn=replication_instance_arn, + migration_type=migration_type, + table_mappings=table_mappings, + replication_task_settings=replication_task_settings, + cdc_start_time=cdc_start_time, + cdc_start_position=cdc_start_position, + cdc_stop_position=cdc_stop_position, + tags=tags, + task_data=task_data, + resource_identifier=resource_identifier, + ) + + return json.dumps({"ReplicationTask": replication_task.to_dict()}) + + def start_replication_task(self): + replication_task_arn = self._get_param("ReplicationTaskArn") + start_replication_task_type = self._get_param("StartReplicationTaskType") + cdc_start_time = self._get_param("CdcStartTime") + cdc_start_position = self._get_param("CdcStartPosition") + cdc_stop_position = self._get_param("CdcStopPosition") + replication_task = self.dms_backend.start_replication_task( + replication_task_arn=replication_task_arn, + start_replication_task_type=start_replication_task_type, + cdc_start_time=cdc_start_time, + cdc_start_position=cdc_start_position, + cdc_stop_position=cdc_stop_position, + ) + + return json.dumps({"ReplicationTask": replication_task.to_dict()}) + + # add templates from here + + def stop_replication_task(self): + replication_task_arn = self._get_param("ReplicationTaskArn") + replication_task = self.dms_backend.stop_replication_task( + replication_task_arn=replication_task_arn, + ) + + return json.dumps({"ReplicationTask": replication_task.to_dict()}) + + def delete_replication_task(self): + replication_task_arn = self._get_param("ReplicationTaskArn") + replication_task = self.dms_backend.delete_replication_task( + replication_task_arn=replication_task_arn, + ) + + return json.dumps({"ReplicationTask": replication_task.to_dict()}) + + def describe_replication_tasks(self): + filters = self._get_list_prefix("Filters.member") + max_records = self._get_int_param("MaxRecords") + marker = self._get_param("Marker") + without_settings = self._get_param("WithoutSettings") + marker, replication_tasks = self.dms_backend.describe_replication_tasks( + filters=filters, max_records=max_records, without_settings=without_settings, + ) + + return json.dumps( + dict( + marker=marker, ReplicationTasks=[t.to_dict() for t in replication_tasks] + ) + ) diff --git a/moto/dms/urls.py b/moto/dms/urls.py new file mode 100644 index 000000000..74041ad97 --- /dev/null +++ b/moto/dms/urls.py @@ -0,0 +1,11 @@ +from __future__ import unicode_literals +from .responses import DatabaseMigrationServiceResponse + +url_bases = [ + "https?://dms.(.+).amazonaws.com", +] + + +url_paths = { + "{0}/$": DatabaseMigrationServiceResponse.dispatch, +} diff --git a/moto/dms/utils.py b/moto/dms/utils.py new file mode 100644 index 000000000..9272e5306 --- /dev/null +++ b/moto/dms/utils.py @@ -0,0 +1,49 @@ +from __future__ import unicode_literals + + +def match_task_arn(task, arns): + return task["ReplicationTaskArn"] in arns + + +def match_task_id(task, ids): + return task["ReplicationTaskIdentifier"] in ids + + +def match_task_migration_type(task, migration_types): + return task["MigrationType"] in migration_types + + +def match_task_endpoint_arn(task, endpoint_arns): + return ( + task["SourceEndpointArn"] in endpoint_arns + or task["TargetEndpointArn"] in endpoint_arns + ) + + +def match_task_replication_instance_arn(task, replication_instance_arns): + return task["ReplicationInstanceArn"] in replication_instance_arns + + +task_filter_functions = { + "replication-task-arn": match_task_arn, + "replication-task-id": match_task_id, + "migration-type": match_task_migration_type, + "endpoint-arn": match_task_endpoint_arn, + "replication-instance-arn": match_task_replication_instance_arn, +} + + +def filter_tasks(tasks, filters): + matching_tasks = tasks + + for f in filters: + filter_function = task_filter_functions[f["Name"]] + + if not filter_function: + continue + + matching_tasks = filter( + lambda task: filter_function(task, f["Values"]), matching_tasks + ) + + return matching_tasks diff --git a/tests/test_dms/__init__ b/tests/test_dms/__init__ new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_dms/test_dms.py b/tests/test_dms/test_dms.py new file mode 100644 index 000000000..bbbf2125a --- /dev/null +++ b/tests/test_dms/test_dms.py @@ -0,0 +1,202 @@ +from __future__ import unicode_literals + +from botocore.exceptions import ClientError +import boto3 +import sure # noqa +import pytest + +from moto import mock_dms + + +@mock_dms +def test_create_and_get_replication_task(): + client = boto3.client("dms", region_name="us-east-1") + + client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + + tasks = client.describe_replication_tasks( + Filters=[{"Name": "replication-task-id", "Values": ["test"]}] + ) + + tasks["ReplicationTasks"].should.have.length_of(1) + task = tasks["ReplicationTasks"][0] + task["ReplicationTaskIdentifier"].should.equal("test") + task["SourceEndpointArn"].should.equal("source-endpoint-arn") + task["TargetEndpointArn"].should.equal("target-endpoint-arn") + task["ReplicationInstanceArn"].should.equal("replication-instance-arn") + task["MigrationType"].should.equal("full-load") + task["Status"].should.equal("creating") + + +@mock_dms +def test_create_existing_replication_task_throws_error(): + client = boto3.client("dms", region_name="us-east-1") + + client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + + with pytest.raises(ClientError) as ex: + client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + + ex.value.operation_name.should.equal("CreateReplicationTask") + ex.value.response["Error"]["Code"].should.equal("ResourceAlreadyExistsFault") + ex.value.response["Error"]["Message"].should.equal( + "The resource you are attempting to create already exists." + ) + + +@mock_dms +def test_start_replication_task(): + client = boto3.client("dms", region_name="us-east-1") + + response = client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + task_arn = response["ReplicationTask"]["ReplicationTaskArn"] + client.start_replication_task( + ReplicationTaskArn=task_arn, StartReplicationTaskType="start-replication" + ) + tasks = client.describe_replication_tasks( + Filters=[{"Name": "replication-task-arn", "Values": [task_arn],}] + ) + + tasks["ReplicationTasks"][0]["Status"].should.equal("running") + + +@mock_dms +def test_start_replication_task_throws_resource_not_found_error(): + client = boto3.client("dms", region_name="us-east-1") + + with pytest.raises(ClientError) as ex: + client.start_replication_task( + ReplicationTaskArn="does-not-exist", + StartReplicationTaskType="start-replication", + ) + + ex.value.operation_name.should.equal("StartReplicationTask") + ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundFault") + ex.value.response["Error"]["Message"].should.equal( + "Replication task could not be found." + ) + + +@mock_dms +def test_stop_replication_task_throws_invalid_state_error(): + client = boto3.client("dms", region_name="us-east-1") + + response = client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + task_arn = response["ReplicationTask"]["ReplicationTaskArn"] + + with pytest.raises(ClientError) as ex: + client.stop_replication_task(ReplicationTaskArn=task_arn) + + ex.value.operation_name.should.equal("StopReplicationTask") + ex.value.response["Error"]["Code"].should.equal("InvalidResourceStateFault") + ex.value.response["Error"]["Message"].should.equal( + "Replication task is not running" + ) + + +@mock_dms +def test_stop_replication_task_throws_resource_not_found_error(): + client = boto3.client("dms", region_name="us-east-1") + + with pytest.raises(ClientError) as ex: + client.stop_replication_task(ReplicationTaskArn="does-not-exist") + + ex.value.operation_name.should.equal("StopReplicationTask") + ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundFault") + ex.value.response["Error"]["Message"].should.equal( + "Replication task could not be found." + ) + + +@mock_dms +def test_stop_replication_task(): + client = boto3.client("dms", region_name="us-east-1") + + response = client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + task_arn = response["ReplicationTask"]["ReplicationTaskArn"] + client.start_replication_task( + ReplicationTaskArn=task_arn, StartReplicationTaskType="start-replication" + ) + client.stop_replication_task(ReplicationTaskArn=task_arn) + tasks = client.describe_replication_tasks( + Filters=[{"Name": "replication-task-arn", "Values": [task_arn],}] + ) + + tasks["ReplicationTasks"][0]["Status"].should.equal("stopped") + + +@mock_dms +def test_delete_replication_task(): + client = boto3.client("dms", region_name="us-east-1") + + response = client.create_replication_task( + ReplicationTaskIdentifier="test", + SourceEndpointArn="source-endpoint-arn", + TargetEndpointArn="target-endpoint-arn", + ReplicationInstanceArn="replication-instance-arn", + MigrationType="full-load", + TableMappings='{"rules":[]}', + ) + task_arn = response["ReplicationTask"]["ReplicationTaskArn"] + client.delete_replication_task(ReplicationTaskArn=task_arn) + tasks = client.describe_replication_tasks( + Filters=[{"Name": "replication-task-arn", "Values": [task_arn],}] + ) + + tasks["ReplicationTasks"].should.have.length_of(0) + + +@mock_dms +def test_delete_replication_task_throws_resource_not_found_error(): + client = boto3.client("dms", region_name="us-east-1") + + with pytest.raises(ClientError) as ex: + client.delete_replication_task(ReplicationTaskArn="does-not-exist") + + ex.value.operation_name.should.equal("DeleteReplicationTask") + ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundFault") + ex.value.response["Error"]["Message"].should.equal( + "Replication task could not be found." + )