DMS - basic task replication methods implementation (#3900)

* DMS - basic task replication methods implementation

* Add ResourceNotFoundFault tests
This commit is contained in:
Maria 2021-05-06 19:33:48 +02:00 committed by GitHub
parent f76571199f
commit cf3cf8b134
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 596 additions and 5 deletions

View File

@ -2960,14 +2960,14 @@
- [ ] create_event_subscription
- [ ] create_replication_instance
- [ ] create_replication_subnet_group
- [ ] create_replication_task
- [X] create_replication_task
- [ ] delete_certificate
- [ ] delete_connection
- [ ] delete_endpoint
- [ ] delete_event_subscription
- [ ] delete_replication_instance
- [ ] delete_replication_subnet_group
- [ ] delete_replication_task
- [X] delete_replication_task
- [ ] delete_replication_task_assessment_run
- [ ] describe_account_attributes
- [ ] describe_applicable_individual_assessments
@ -2987,7 +2987,7 @@
- [ ] describe_replication_task_assessment_results
- [ ] describe_replication_task_assessment_runs
- [ ] describe_replication_task_individual_assessments
- [ ] describe_replication_tasks
- [X] describe_replication_tasks
- [ ] describe_schemas
- [ ] describe_table_statistics
- [ ] import_certificate
@ -3002,10 +3002,10 @@
- [ ] refresh_schemas
- [ ] reload_tables
- [ ] remove_tags_from_resource
- [ ] start_replication_task
- [X] start_replication_task
- [ ] start_replication_task_assessment
- [ ] start_replication_task_assessment_run
- [ ] stop_replication_task
- [X] stop_replication_task
- [ ] test_connection
</details>

View File

@ -40,6 +40,8 @@ Currently implemented Services:
+---------------------------+-----------------------+------------------------------------+
| Data Pipeline | @mock_datapipeline | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| DMS | @mock_dms | basic endpoints done |
+---------------------------+-----------------------+------------------------------------+
| DynamoDB | - @mock_dynamodb | - core endpoints done |
| DynamoDB2 | - @mock_dynamodb2 | - core endpoints + partial indexes |
+---------------------------+-----------------------+------------------------------------+

View File

@ -44,6 +44,7 @@ mock_datapipeline_deprecated = lazy_load(
".datapipeline", "mock_datapipeline_deprecated"
)
mock_datasync = lazy_load(".datasync", "mock_datasync")
mock_dms = lazy_load(".dms", "mock_dms")
mock_dynamodb = lazy_load(".dynamodb", "mock_dynamodb")
mock_dynamodb_deprecated = lazy_load(".dynamodb", "mock_dynamodb_deprecated")
mock_dynamodb2 = lazy_load(".dynamodb2", "mock_dynamodb2")

View File

@ -21,6 +21,7 @@ BACKENDS = {
"config": ("config", "config_backends"),
"datapipeline": ("datapipeline", "datapipeline_backends"),
"datasync": ("datasync", "datasync_backends"),
"dms": ("dms", "dms_backends"),
"dynamodb": ("dynamodb", "dynamodb_backends"),
"dynamodb2": ("dynamodb2", "dynamodb_backends2"),
"dynamodbstreams": ("dynamodbstreams", "dynamodbstreams_backends"),

6
moto/dms/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import dms_backends
from ..core.models import base_decorator
dms_backend = dms_backends["us-east-1"]
mock_dms = base_decorator(dms_backends)

25
moto/dms/exceptions.py Normal file
View File

@ -0,0 +1,25 @@
from __future__ import unicode_literals
from moto.core.exceptions import JsonRESTError
class DmsClientError(JsonRESTError):
code = 400
class ResourceNotFoundFault(DmsClientError):
def __init__(self, message):
super(ResourceNotFoundFault, self).__init__("ResourceNotFoundFault", message)
class InvalidResourceStateFault(DmsClientError):
def __init__(self, message):
super(InvalidResourceStateFault, self).__init__(
"InvalidResourceStateFault", message
)
class ResourceAlreadyExistsFault(DmsClientError):
def __init__(self, message):
super(ResourceAlreadyExistsFault, self).__init__(
"ResourceAlreadyExistsFault", message
)

199
moto/dms/models.py Normal file
View File

@ -0,0 +1,199 @@
from __future__ import unicode_literals
import json
from boto3 import Session
from datetime import datetime
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from .exceptions import (
InvalidResourceStateFault,
ResourceAlreadyExistsFault,
ResourceNotFoundFault,
)
from .utils import filter_tasks
class DatabaseMigrationServiceBackend(BaseBackend):
def __init__(self, region_name=None):
super(DatabaseMigrationServiceBackend, self).__init__()
self.region_name = region_name
self.replication_tasks = {}
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_replication_task(
self,
replication_task_identifier,
source_endpoint_arn,
target_endpoint_arn,
replication_instance_arn,
migration_type,
table_mappings,
replication_task_settings,
cdc_start_time,
cdc_start_position,
cdc_stop_position,
tags,
task_data,
resource_identifier,
):
replication_task = FakeReplicationTask(
replication_task_identifier=replication_task_identifier,
source_endpoint_arn=source_endpoint_arn,
target_endpoint_arn=target_endpoint_arn,
replication_instance_arn=replication_instance_arn,
migration_type=migration_type,
table_mappings=table_mappings,
replication_task_settings=replication_task_settings,
region_name=self.region_name,
)
if self.replication_tasks.get(replication_task.arn):
raise ResourceAlreadyExistsFault(
"The resource you are attempting to create already exists."
)
self.replication_tasks[replication_task.arn] = replication_task
return replication_task
def start_replication_task(
self,
replication_task_arn,
start_replication_task_type,
cdc_start_time,
cdc_start_position,
cdc_stop_position,
):
if not self.replication_tasks.get(replication_task_arn):
raise ResourceNotFoundFault("Replication task could not be found.")
return self.replication_tasks[replication_task_arn].start()
def stop_replication_task(self, replication_task_arn):
if not self.replication_tasks.get(replication_task_arn):
raise ResourceNotFoundFault("Replication task could not be found.")
return self.replication_tasks[replication_task_arn].stop()
def delete_replication_task(self, replication_task_arn):
if not self.replication_tasks.get(replication_task_arn):
raise ResourceNotFoundFault("Replication task could not be found.")
task = self.replication_tasks[replication_task_arn]
task.delete()
self.replication_tasks.pop(replication_task_arn)
return task
def describe_replication_tasks(self, filters, max_records, without_settings):
replication_tasks = filter_tasks(self.replication_tasks.values(), filters)
if max_records and max_records > 0:
replication_tasks = replication_tasks[:max_records]
return None, replication_tasks
class FakeReplicationTask(BaseModel):
def __init__(
self,
replication_task_identifier,
migration_type,
replication_instance_arn,
source_endpoint_arn,
target_endpoint_arn,
table_mappings,
replication_task_settings,
region_name,
):
self.id = replication_task_identifier
self.region = region_name
self.migration_type = migration_type
self.replication_instance_arn = replication_instance_arn
self.source_endpoint_arn = source_endpoint_arn
self.target_endpoint_arn = target_endpoint_arn
self.table_mappings = table_mappings
self.replication_task_settings = replication_task_settings
self.status = "creating"
self.creation_date = datetime.utcnow()
self.start_date = None
self.stop_date = None
@property
def arn(self):
return "arn:aws:dms:{region}:{account_id}:task:{task_id}".format(
region=self.region, account_id=ACCOUNT_ID, task_id=self.id
)
def to_dict(self):
start_date = self.start_date.isoformat() if self.start_date else None
stop_date = self.stop_date.isoformat() if self.stop_date else None
return {
"ReplicationTaskIdentifier": self.id,
"SourceEndpointArn": self.source_endpoint_arn,
"TargetEndpointArn": self.target_endpoint_arn,
"ReplicationInstanceArn": self.replication_instance_arn,
"MigrationType": self.migration_type,
"TableMappings": json.dumps(self.table_mappings),
"ReplicationTaskSettings": json.dumps(self.replication_task_settings),
"Status": self.status,
"ReplicationTaskCreationDate": self.creation_date.isoformat(),
"ReplicationTaskStartDate": start_date,
"ReplicationTaskArn": self.arn,
"ReplicationTaskStats": {
"FullLoadProgressPercent": 100,
"ElapsedTimeMillis": 100,
"TablesLoaded": 1,
"TablesLoading": 0,
"TablesQueued": 0,
"TablesErrored": 0,
"FreshStartDate": start_date,
"StartDate": start_date,
"StopDate": stop_date,
"FullLoadStartDate": start_date,
"FullLoadFinishDate": stop_date,
},
}
def ready(self):
self.status = "ready"
return self
def start(self):
self.status = "starting"
self.start_date = datetime.utcnow()
self.run()
return self
def stop(self):
if self.status != "running":
raise InvalidResourceStateFault("Replication task is not running")
self.status = "stopped"
self.stop_date = datetime.utcnow()
return self
def delete(self):
self.status = "deleting"
return self
def run(self):
self.status = "running"
return self
dms_backends = {}
for region in Session().get_available_regions("dms"):
dms_backends[region] = DatabaseMigrationServiceBackend()
for region in Session().get_available_regions("dms", partition_name="aws-us-gov"):
dms_backends[region] = DatabaseMigrationServiceBackend()
for region in Session().get_available_regions("dms", partition_name="aws-cn"):
dms_backends[region] = DatabaseMigrationServiceBackend()

95
moto/dms/responses.py Normal file
View File

@ -0,0 +1,95 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import dms_backends
import json
class DatabaseMigrationServiceResponse(BaseResponse):
SERVICE_NAME = "dms"
@property
def dms_backend(self):
return dms_backends[self.region]
# add methods from here
def create_replication_task(self):
replication_task_identifier = self._get_param("ReplicationTaskIdentifier")
source_endpoint_arn = self._get_param("SourceEndpointArn")
target_endpoint_arn = self._get_param("TargetEndpointArn")
replication_instance_arn = self._get_param("ReplicationInstanceArn")
migration_type = self._get_param("MigrationType")
table_mappings = self._get_param("TableMappings")
replication_task_settings = self._get_param("ReplicationTaskSettings")
cdc_start_time = self._get_param("CdcStartTime")
cdc_start_position = self._get_param("CdcStartPosition")
cdc_stop_position = self._get_param("CdcStopPosition")
tags = self._get_list_prefix("Tags.member")
task_data = self._get_param("TaskData")
resource_identifier = self._get_param("ResourceIdentifier")
replication_task = self.dms_backend.create_replication_task(
replication_task_identifier=replication_task_identifier,
source_endpoint_arn=source_endpoint_arn,
target_endpoint_arn=target_endpoint_arn,
replication_instance_arn=replication_instance_arn,
migration_type=migration_type,
table_mappings=table_mappings,
replication_task_settings=replication_task_settings,
cdc_start_time=cdc_start_time,
cdc_start_position=cdc_start_position,
cdc_stop_position=cdc_stop_position,
tags=tags,
task_data=task_data,
resource_identifier=resource_identifier,
)
return json.dumps({"ReplicationTask": replication_task.to_dict()})
def start_replication_task(self):
replication_task_arn = self._get_param("ReplicationTaskArn")
start_replication_task_type = self._get_param("StartReplicationTaskType")
cdc_start_time = self._get_param("CdcStartTime")
cdc_start_position = self._get_param("CdcStartPosition")
cdc_stop_position = self._get_param("CdcStopPosition")
replication_task = self.dms_backend.start_replication_task(
replication_task_arn=replication_task_arn,
start_replication_task_type=start_replication_task_type,
cdc_start_time=cdc_start_time,
cdc_start_position=cdc_start_position,
cdc_stop_position=cdc_stop_position,
)
return json.dumps({"ReplicationTask": replication_task.to_dict()})
# add templates from here
def stop_replication_task(self):
replication_task_arn = self._get_param("ReplicationTaskArn")
replication_task = self.dms_backend.stop_replication_task(
replication_task_arn=replication_task_arn,
)
return json.dumps({"ReplicationTask": replication_task.to_dict()})
def delete_replication_task(self):
replication_task_arn = self._get_param("ReplicationTaskArn")
replication_task = self.dms_backend.delete_replication_task(
replication_task_arn=replication_task_arn,
)
return json.dumps({"ReplicationTask": replication_task.to_dict()})
def describe_replication_tasks(self):
filters = self._get_list_prefix("Filters.member")
max_records = self._get_int_param("MaxRecords")
marker = self._get_param("Marker")
without_settings = self._get_param("WithoutSettings")
marker, replication_tasks = self.dms_backend.describe_replication_tasks(
filters=filters, max_records=max_records, without_settings=without_settings,
)
return json.dumps(
dict(
marker=marker, ReplicationTasks=[t.to_dict() for t in replication_tasks]
)
)

11
moto/dms/urls.py Normal file
View File

@ -0,0 +1,11 @@
from __future__ import unicode_literals
from .responses import DatabaseMigrationServiceResponse
url_bases = [
"https?://dms.(.+).amazonaws.com",
]
url_paths = {
"{0}/$": DatabaseMigrationServiceResponse.dispatch,
}

49
moto/dms/utils.py Normal file
View File

@ -0,0 +1,49 @@
from __future__ import unicode_literals
def match_task_arn(task, arns):
return task["ReplicationTaskArn"] in arns
def match_task_id(task, ids):
return task["ReplicationTaskIdentifier"] in ids
def match_task_migration_type(task, migration_types):
return task["MigrationType"] in migration_types
def match_task_endpoint_arn(task, endpoint_arns):
return (
task["SourceEndpointArn"] in endpoint_arns
or task["TargetEndpointArn"] in endpoint_arns
)
def match_task_replication_instance_arn(task, replication_instance_arns):
return task["ReplicationInstanceArn"] in replication_instance_arns
task_filter_functions = {
"replication-task-arn": match_task_arn,
"replication-task-id": match_task_id,
"migration-type": match_task_migration_type,
"endpoint-arn": match_task_endpoint_arn,
"replication-instance-arn": match_task_replication_instance_arn,
}
def filter_tasks(tasks, filters):
matching_tasks = tasks
for f in filters:
filter_function = task_filter_functions[f["Name"]]
if not filter_function:
continue
matching_tasks = filter(
lambda task: filter_function(task, f["Values"]), matching_tasks
)
return matching_tasks

0
tests/test_dms/__init__ Normal file
View File

202
tests/test_dms/test_dms.py Normal file
View File

@ -0,0 +1,202 @@
from __future__ import unicode_literals
from botocore.exceptions import ClientError
import boto3
import sure # noqa
import pytest
from moto import mock_dms
@mock_dms
def test_create_and_get_replication_task():
client = boto3.client("dms", region_name="us-east-1")
client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
tasks = client.describe_replication_tasks(
Filters=[{"Name": "replication-task-id", "Values": ["test"]}]
)
tasks["ReplicationTasks"].should.have.length_of(1)
task = tasks["ReplicationTasks"][0]
task["ReplicationTaskIdentifier"].should.equal("test")
task["SourceEndpointArn"].should.equal("source-endpoint-arn")
task["TargetEndpointArn"].should.equal("target-endpoint-arn")
task["ReplicationInstanceArn"].should.equal("replication-instance-arn")
task["MigrationType"].should.equal("full-load")
task["Status"].should.equal("creating")
@mock_dms
def test_create_existing_replication_task_throws_error():
client = boto3.client("dms", region_name="us-east-1")
client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
with pytest.raises(ClientError) as ex:
client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
ex.value.operation_name.should.equal("CreateReplicationTask")
ex.value.response["Error"]["Code"].should.equal("ResourceAlreadyExistsFault")
ex.value.response["Error"]["Message"].should.equal(
"The resource you are attempting to create already exists."
)
@mock_dms
def test_start_replication_task():
client = boto3.client("dms", region_name="us-east-1")
response = client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
task_arn = response["ReplicationTask"]["ReplicationTaskArn"]
client.start_replication_task(
ReplicationTaskArn=task_arn, StartReplicationTaskType="start-replication"
)
tasks = client.describe_replication_tasks(
Filters=[{"Name": "replication-task-arn", "Values": [task_arn],}]
)
tasks["ReplicationTasks"][0]["Status"].should.equal("running")
@mock_dms
def test_start_replication_task_throws_resource_not_found_error():
client = boto3.client("dms", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.start_replication_task(
ReplicationTaskArn="does-not-exist",
StartReplicationTaskType="start-replication",
)
ex.value.operation_name.should.equal("StartReplicationTask")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundFault")
ex.value.response["Error"]["Message"].should.equal(
"Replication task could not be found."
)
@mock_dms
def test_stop_replication_task_throws_invalid_state_error():
client = boto3.client("dms", region_name="us-east-1")
response = client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
task_arn = response["ReplicationTask"]["ReplicationTaskArn"]
with pytest.raises(ClientError) as ex:
client.stop_replication_task(ReplicationTaskArn=task_arn)
ex.value.operation_name.should.equal("StopReplicationTask")
ex.value.response["Error"]["Code"].should.equal("InvalidResourceStateFault")
ex.value.response["Error"]["Message"].should.equal(
"Replication task is not running"
)
@mock_dms
def test_stop_replication_task_throws_resource_not_found_error():
client = boto3.client("dms", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.stop_replication_task(ReplicationTaskArn="does-not-exist")
ex.value.operation_name.should.equal("StopReplicationTask")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundFault")
ex.value.response["Error"]["Message"].should.equal(
"Replication task could not be found."
)
@mock_dms
def test_stop_replication_task():
client = boto3.client("dms", region_name="us-east-1")
response = client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
task_arn = response["ReplicationTask"]["ReplicationTaskArn"]
client.start_replication_task(
ReplicationTaskArn=task_arn, StartReplicationTaskType="start-replication"
)
client.stop_replication_task(ReplicationTaskArn=task_arn)
tasks = client.describe_replication_tasks(
Filters=[{"Name": "replication-task-arn", "Values": [task_arn],}]
)
tasks["ReplicationTasks"][0]["Status"].should.equal("stopped")
@mock_dms
def test_delete_replication_task():
client = boto3.client("dms", region_name="us-east-1")
response = client.create_replication_task(
ReplicationTaskIdentifier="test",
SourceEndpointArn="source-endpoint-arn",
TargetEndpointArn="target-endpoint-arn",
ReplicationInstanceArn="replication-instance-arn",
MigrationType="full-load",
TableMappings='{"rules":[]}',
)
task_arn = response["ReplicationTask"]["ReplicationTaskArn"]
client.delete_replication_task(ReplicationTaskArn=task_arn)
tasks = client.describe_replication_tasks(
Filters=[{"Name": "replication-task-arn", "Values": [task_arn],}]
)
tasks["ReplicationTasks"].should.have.length_of(0)
@mock_dms
def test_delete_replication_task_throws_resource_not_found_error():
client = boto3.client("dms", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.delete_replication_task(ReplicationTaskArn="does-not-exist")
ex.value.operation_name.should.equal("DeleteReplicationTask")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundFault")
ex.value.response["Error"]["Message"].should.equal(
"Replication task could not be found."
)