Add RDS methods: start_export_task, cancel_export_task, describe_export_tasks (#4803)

This commit is contained in:
Dmytro Kazanzhy 2022-01-29 00:40:30 +02:00 committed by GitHub
parent 0da889f641
commit f158f0e985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 363 additions and 10 deletions

View File

@ -3800,7 +3800,7 @@
## rds ## rds
<details> <details>
<summary>17% implemented</summary> <summary>19% implemented</summary>
- [ ] add_role_to_db_cluster - [ ] add_role_to_db_cluster
- [ ] add_role_to_db_instance - [ ] add_role_to_db_instance
@ -3809,7 +3809,7 @@
- [ ] apply_pending_maintenance_action - [ ] apply_pending_maintenance_action
- [ ] authorize_db_security_group_ingress - [ ] authorize_db_security_group_ingress
- [ ] backtrack_db_cluster - [ ] backtrack_db_cluster
- [ ] cancel_export_task - [X] cancel_export_task
- [ ] copy_db_cluster_parameter_group - [ ] copy_db_cluster_parameter_group
- [ ] copy_db_cluster_snapshot - [ ] copy_db_cluster_snapshot
- [ ] copy_db_parameter_group - [ ] copy_db_parameter_group
@ -3880,7 +3880,7 @@
- [ ] describe_event_categories - [ ] describe_event_categories
- [ ] describe_event_subscriptions - [ ] describe_event_subscriptions
- [ ] describe_events - [ ] describe_events
- [ ] describe_export_tasks - [X] describe_export_tasks
- [ ] describe_global_clusters - [ ] describe_global_clusters
- [ ] describe_installation_media - [ ] describe_installation_media
- [X] describe_option_group_options - [X] describe_option_group_options
@ -3938,7 +3938,7 @@
- [X] start_db_cluster - [X] start_db_cluster
- [ ] start_db_instance - [ ] start_db_instance
- [ ] start_db_instance_automated_backups_replication - [ ] start_db_instance_automated_backups_replication
- [ ] start_export_task - [X] start_export_task
- [ ] stop_activity_stream - [ ] stop_activity_stream
- [X] stop_db_cluster - [X] stop_db_cluster
- [ ] stop_db_instance - [ ] stop_db_instance

View File

@ -147,3 +147,33 @@ class DBClusterSnapshotAlreadyExistsError(RDSClientError):
database_snapshot_identifier database_snapshot_identifier
), ),
) )
class ExportTaskAlreadyExistsError(RDSClientError):
def __init__(self, export_task_identifier):
super().__init__(
"ExportTaskAlreadyExistsFault",
"Cannot start export task because a task with the identifier {} already exists.".format(
export_task_identifier
),
)
class ExportTaskNotFoundError(RDSClientError):
def __init__(self, export_task_identifier):
super().__init__(
"ExportTaskNotFoundFault",
"Cannot cancel export task because a task with the identifier {} is not exist.".format(
export_task_identifier
),
)
class InvalidExportSourceStateError(RDSClientError):
def __init__(self, status):
super().__init__(
"InvalidExportSourceStateFault",
"Export source should be 'available' but current status is {}.".format(
status
),
)

View File

@ -30,6 +30,9 @@ from .exceptions import (
InvalidParameterValue, InvalidParameterValue,
InvalidParameterCombination, InvalidParameterCombination,
InvalidDBClusterStateFault, InvalidDBClusterStateFault,
ExportTaskNotFoundError,
ExportTaskAlreadyExistsError,
InvalidExportSourceStateError,
) )
from .utils import FilterDef, apply_filter, merge_filters, validate_filters from .utils import FilterDef, apply_filter, merge_filters, validate_filters
@ -260,20 +263,20 @@ class ClusterSnapshot(BaseModel):
"db-cluster-snapshot-id": FilterDef( "db-cluster-snapshot-id": FilterDef(
["snapshot_id"], "DB Cluster Snapshot Identifiers" ["snapshot_id"], "DB Cluster Snapshot Identifiers"
), ),
"dbi-resource-id": FilterDef(["database.dbi_resource_id"], "Dbi Resource Ids"),
"snapshot-type": FilterDef(None, "Snapshot Types"), "snapshot-type": FilterDef(None, "Snapshot Types"),
"engine": FilterDef(["database.engine"], "Engine Names"), "engine": FilterDef(["cluster.engine"], "Engine Names"),
} }
def __init__(self, cluster, snapshot_id, tags): def __init__(self, cluster, snapshot_id, tags):
self.cluster = cluster self.cluster = cluster
self.snapshot_id = snapshot_id self.snapshot_id = snapshot_id
self.tags = tags self.tags = tags
self.status = "available"
self.created_at = iso_8601_datetime_with_milliseconds(datetime.datetime.now()) self.created_at = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
@property @property
def snapshot_arn(self): def snapshot_arn(self):
return "arn:aws:rds:{0}:{1}:snapshot:{2}".format( return "arn:aws:rds:{0}:{1}:cluster-snapshot:{2}".format(
self.cluster.region, ACCOUNT_ID, self.snapshot_id self.cluster.region, ACCOUNT_ID, self.snapshot_id
) )
@ -290,7 +293,7 @@ class ClusterSnapshot(BaseModel):
<MasterUsername>{{ cluster.master_username }}</MasterUsername> <MasterUsername>{{ cluster.master_username }}</MasterUsername>
<Port>{{ cluster.port }}</Port> <Port>{{ cluster.port }}</Port>
<Engine>{{ cluster.engine }}</Engine> <Engine>{{ cluster.engine }}</Engine>
<Status>available</Status> <Status>{{ snapshot.status }}</Status>
<SnapshotType>manual</SnapshotType> <SnapshotType>manual</SnapshotType>
<DBClusterSnapshotArn>{{ snapshot.snapshot_arn }}</DBClusterSnapshotArn> <DBClusterSnapshotArn>{{ snapshot.snapshot_arn }}</DBClusterSnapshotArn>
<SourceRegion>{{ cluster.region }}</SourceRegion> <SourceRegion>{{ cluster.region }}</SourceRegion>
@ -850,6 +853,7 @@ class DatabaseSnapshot(BaseModel):
self.database = database self.database = database
self.snapshot_id = snapshot_id self.snapshot_id = snapshot_id
self.tags = tags self.tags = tags
self.status = "available"
self.created_at = iso_8601_datetime_with_milliseconds(datetime.datetime.now()) self.created_at = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
@property @property
@ -867,7 +871,7 @@ class DatabaseSnapshot(BaseModel):
<SnapshotCreateTime>{{ snapshot.created_at }}</SnapshotCreateTime> <SnapshotCreateTime>{{ snapshot.created_at }}</SnapshotCreateTime>
<Engine>{{ database.engine }}</Engine> <Engine>{{ database.engine }}</Engine>
<AllocatedStorage>{{ database.allocated_storage }}</AllocatedStorage> <AllocatedStorage>{{ database.allocated_storage }}</AllocatedStorage>
<Status>available</Status> <Status>{{ snapshot.status }}</Status>
<Port>{{ database.port }}</Port> <Port>{{ database.port }}</Port>
<AvailabilityZone>{{ database.availability_zone }}</AvailabilityZone> <AvailabilityZone>{{ database.availability_zone }}</AvailabilityZone>
<VpcId>{{ database.db_subnet_group.vpc_id }}</VpcId> <VpcId>{{ database.db_subnet_group.vpc_id }}</VpcId>
@ -909,6 +913,50 @@ class DatabaseSnapshot(BaseModel):
self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in tag_keys] self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in tag_keys]
class ExportTask(BaseModel):
def __init__(self, snapshot, kwargs):
self.snapshot = snapshot
self.export_task_identifier = kwargs.get("export_task_identifier")
self.kms_key_id = kwargs.get("kms_key_id", "default_kms_key_id")
self.source_arn = kwargs.get("source_arn")
self.iam_role_arn = kwargs.get("iam_role_arn")
self.s3_bucket_name = kwargs.get("s3_bucket_name")
self.s3_prefix = kwargs.get("s3_prefix", "")
self.export_only = kwargs.get("export_only", [])
self.status = "available"
self.created_at = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
def to_xml(self):
template = Template(
"""
<ExportTaskIdentifier>{{ task.export_task_identifier }}</ExportTaskIdentifier>
<SourceArn>{{ snapshot.snapshot_arn }}</SourceArn>
<TaskStartTime>{{ task.created_at }}</TaskStartTime>
<TaskEndTime>{{ task.created_at }}</TaskEndTime>
<SnapshotTime>{{ snapshot.created_at }}</SnapshotTime>
<S3Bucket>{{ task.s3_bucket_name }}</S3Bucket>
<S3Prefix>{{ task.s3_prefix }}</S3Prefix>
<IamRoleArn>{{ task.iam_role_arn }}</IamRoleArn>
<KmsKeyId>{{ task.kms_key_id }}</KmsKeyId>
{%- if task.export_only -%}
<ExportOnly>
{%- for table in task.export_only -%}
<member>{{ table }}</member>
{%- endfor -%}
</ExportOnly>
{%- endif -%}
<Status>{{ task.status }}</Status>
<PercentProgress>{{ 100 }}</PercentProgress>
<TotalExtractedDataInGB>{{ 1 }}</TotalExtractedDataInGB>
<FailureCause></FailureCause>
<WarningMessage></WarningMessage>
"""
)
return template.render(task=self, snapshot=self.snapshot)
class SecurityGroup(CloudFormationModel): class SecurityGroup(CloudFormationModel):
def __init__(self, group_name, description, tags): def __init__(self, group_name, description, tags):
self.group_name = group_name self.group_name = group_name
@ -1128,12 +1176,13 @@ class RDS2Backend(BaseBackend):
def __init__(self, region): def __init__(self, region):
self.region = region self.region = region
self.arn_regex = re_compile( self.arn_regex = re_compile(
r"^arn:aws:rds:.*:[0-9]*:(db|cluster|es|og|pg|ri|secgrp|snapshot|subgrp):.*$" r"^arn:aws:rds:.*:[0-9]*:(db|cluster|es|og|pg|ri|secgrp|snapshot|cluster-snapshot|subgrp):.*$"
) )
self.clusters = OrderedDict() self.clusters = OrderedDict()
self.databases = OrderedDict() self.databases = OrderedDict()
self.database_snapshots = OrderedDict() self.database_snapshots = OrderedDict()
self.cluster_snapshots = OrderedDict() self.cluster_snapshots = OrderedDict()
self.export_tasks = OrderedDict()
self.db_parameter_groups = {} self.db_parameter_groups = {}
self.option_groups = {} self.option_groups = {}
self.security_groups = {} self.security_groups = {}
@ -1750,6 +1799,51 @@ class RDS2Backend(BaseBackend):
cluster.status = "stopped" cluster.status = "stopped"
return previous_state return previous_state
def start_export_task(self, kwargs):
export_task_id = kwargs["export_task_identifier"]
source_arn = kwargs["source_arn"]
snapshot_id = source_arn.split(":")[-1]
snapshot_type = source_arn.split(":")[-2]
if export_task_id in self.export_tasks:
raise ExportTaskAlreadyExistsError(export_task_id)
if snapshot_type == "snapshot" and snapshot_id not in self.database_snapshots:
raise DBSnapshotNotFoundError(snapshot_id)
elif (
snapshot_type == "cluster-snapshot"
and snapshot_id not in self.cluster_snapshots
):
raise DBClusterSnapshotNotFoundError(snapshot_id)
if snapshot_type == "snapshot":
snapshot = self.database_snapshots[snapshot_id]
else:
snapshot = self.cluster_snapshots[snapshot_id]
if snapshot.status not in ["available"]:
raise InvalidExportSourceStateError(snapshot.status)
export_task = ExportTask(snapshot, kwargs)
self.export_tasks[export_task_id] = export_task
return export_task
def cancel_export_task(self, export_task_identifier):
if export_task_identifier in self.export_tasks:
export_task = self.export_tasks[export_task_identifier]
export_task.status = "canceled"
self.export_tasks[export_task_identifier] = export_task
return export_task
raise ExportTaskNotFoundError(export_task_identifier)
def describe_export_tasks(self, export_task_identifier):
if export_task_identifier:
if export_task_identifier in self.export_tasks:
return [self.export_tasks[export_task_identifier]]
else:
raise ExportTaskNotFoundError(export_task_identifier)
return self.export_tasks.values()
def list_tags_for_resource(self, arn): def list_tags_for_resource(self, arn):
if self.arn_regex.match(arn): if self.arn_regex.match(arn):
arn_breakdown = arn.split(":") arn_breakdown = arn.split(":")
@ -1781,6 +1875,7 @@ class RDS2Backend(BaseBackend):
elif resource_type == "snapshot": # DB Snapshot elif resource_type == "snapshot": # DB Snapshot
if resource_name in self.database_snapshots: if resource_name in self.database_snapshots:
return self.database_snapshots[resource_name].get_tags() return self.database_snapshots[resource_name].get_tags()
elif resource_type == "cluster-snapshot": # DB Cluster Snapshot
if resource_name in self.cluster_snapshots: if resource_name in self.cluster_snapshots:
return self.cluster_snapshots[resource_name].get_tags() return self.cluster_snapshots[resource_name].get_tags()
elif resource_type == "subgrp": # DB subnet group elif resource_type == "subgrp": # DB subnet group
@ -1815,6 +1910,7 @@ class RDS2Backend(BaseBackend):
elif resource_type == "snapshot": # DB Snapshot elif resource_type == "snapshot": # DB Snapshot
if resource_name in self.database_snapshots: if resource_name in self.database_snapshots:
return self.database_snapshots[resource_name].remove_tags(tag_keys) return self.database_snapshots[resource_name].remove_tags(tag_keys)
elif resource_type == "cluster-snapshot": # DB Cluster Snapshot
if resource_name in self.cluster_snapshots: if resource_name in self.cluster_snapshots:
return self.cluster_snapshots[resource_name].remove_tags(tag_keys) return self.cluster_snapshots[resource_name].remove_tags(tag_keys)
elif resource_type == "subgrp": # DB subnet group elif resource_type == "subgrp": # DB subnet group
@ -1848,6 +1944,7 @@ class RDS2Backend(BaseBackend):
elif resource_type == "snapshot": # DB Snapshot elif resource_type == "snapshot": # DB Snapshot
if resource_name in self.database_snapshots: if resource_name in self.database_snapshots:
return self.database_snapshots[resource_name].add_tags(tags) return self.database_snapshots[resource_name].add_tags(tags)
elif resource_type == "cluster-snapshot": # DB Cluster Snapshot
if resource_name in self.cluster_snapshots: if resource_name in self.cluster_snapshots:
return self.cluster_snapshots[resource_name].add_tags(tags) return self.cluster_snapshots[resource_name].add_tags(tags)
elif resource_type == "subgrp": # DB subnet group elif resource_type == "subgrp": # DB subnet group

View File

@ -111,6 +111,17 @@ class RDS2Response(BaseResponse):
"tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")), "tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")),
} }
def _get_export_task_kwargs(self):
return {
"export_task_identifier": self._get_param("ExportTaskIdentifier"),
"source_arn": self._get_param("SourceArn"),
"s3_bucket_name": self._get_param("S3BucketName"),
"iam_role_arn": self._get_param("IamRoleArn"),
"kms_key_id": self._get_param("KmsKeyId"),
"s3_prefix": self._get_param("S3Prefix"),
"export_only": self.unpack_list_params("ExportOnly.member"),
}
def unpack_complex_list_params(self, label, names): def unpack_complex_list_params(self, label, names):
unpacked_list = list() unpacked_list = list()
count = 1 count = 1
@ -548,6 +559,24 @@ class RDS2Response(BaseResponse):
template = self.response_template(RESTORE_CLUSTER_FROM_SNAPSHOT_TEMPLATE) template = self.response_template(RESTORE_CLUSTER_FROM_SNAPSHOT_TEMPLATE)
return template.render(cluster=new_cluster) return template.render(cluster=new_cluster)
def start_export_task(self):
kwargs = self._get_export_task_kwargs()
export_task = self.backend.start_export_task(kwargs)
template = self.response_template(START_EXPORT_TASK_TEMPLATE)
return template.render(task=export_task)
def cancel_export_task(self):
export_task_identifier = self._get_param("ExportTaskIdentifier")
export_task = self.backend.cancel_export_task(export_task_identifier)
template = self.response_template(CANCEL_EXPORT_TASK_TEMPLATE)
return template.render(task=export_task)
def describe_export_tasks(self):
export_task_identifier = self._get_param("ExportTaskIdentifier")
tasks = self.backend.describe_export_tasks(export_task_identifier,)
template = self.response_template(DESCRIBE_EXPORT_TASKS_TEMPLATE)
return template.render(tasks=tasks)
CREATE_DATABASE_TEMPLATE = """<CreateDBInstanceResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/"> CREATE_DATABASE_TEMPLATE = """<CreateDBInstanceResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBInstanceResult> <CreateDBInstanceResult>
@ -997,3 +1026,40 @@ DELETE_CLUSTER_SNAPSHOT_TEMPLATE = """<DeleteDBClusterSnapshotResponse xmlns="ht
</ResponseMetadata> </ResponseMetadata>
</DeleteDBClusterSnapshotResponse> </DeleteDBClusterSnapshotResponse>
""" """
START_EXPORT_TASK_TEMPLATE = """<StartExportTaskResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<StartExportTaskResult>
{{ task.to_xml() }}
</StartExportTaskResult>
<ResponseMetadata>
<RequestId>523e3218-afc7-11c3-90f5-f90431260ab4</RequestId>
</ResponseMetadata>
</StartExportTaskResponse>
"""
CANCEL_EXPORT_TASK_TEMPLATE = """<CancelExportTaskResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CancelExportTaskResult>
{{ task.to_xml() }}
</CancelExportTaskResult>
<ResponseMetadata>
<RequestId>523e3218-afc7-11c3-90f5-f90431260ab4</RequestId>
</ResponseMetadata>
</CancelExportTaskResponse>
"""
DESCRIBE_EXPORT_TASKS_TEMPLATE = """<DescribeExportTasksResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DescribeExportTasksResult>
<ExportTasks>
{%- for task in tasks -%}
<ExportTask>{{ task.to_xml() }}</ExportTask>
{%- endfor -%}
</ExportTasks>
{% if marker %}
<Marker>{{ marker }}</Marker>
{% endif %}
</DescribeExportTasksResult>
<ResponseMetadata>
<RequestId>523e3218-afc7-11c3-90f5-f90431260ab4</RequestId>
</ResponseMetadata>
</DescribeExportTasksResponse>
"""

View File

@ -0,0 +1,160 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_rds2
from moto.core import ACCOUNT_ID
def _prepare_db_snapshot(client, snapshot_name="snapshot-1"):
client.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
resp = client.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier=snapshot_name
)
return resp["DBSnapshot"]["DBSnapshotArn"]
@mock_rds2
def test_start_export_task_fails_unknown_snapshot():
client = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as ex:
client.start_export_task(
ExportTaskIdentifier="export-snapshot-1",
SourceArn=f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:snapshot:snapshot-1",
S3BucketName="export-bucket",
IamRoleArn="",
KmsKeyId="",
)
err = ex.value.response["Error"]
err["Code"].should.equal("DBSnapshotNotFound")
err["Message"].should.equal("DBSnapshot snapshot-1 not found.")
@mock_rds2
def test_start_export_task():
client = boto3.client("rds", region_name="us-west-2")
source_arn = _prepare_db_snapshot(client)
export = client.start_export_task(
ExportTaskIdentifier="export-snapshot-1",
SourceArn=source_arn,
S3BucketName="export-bucket",
S3Prefix="snaps/",
IamRoleArn="arn:aws:iam:::role/export-role",
KmsKeyId="arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE",
ExportOnly=["schema.table"],
)
export["ExportTaskIdentifier"].should.equal("export-snapshot-1")
export["SourceArn"].should.equal(source_arn)
export["S3Bucket"].should.equal("export-bucket")
export["S3Prefix"].should.equal("snaps/")
export["IamRoleArn"].should.equal("arn:aws:iam:::role/export-role")
export["KmsKeyId"].should.equal(
"arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE"
)
export["ExportOnly"].should.equal(["schema.table"])
@mock_rds2
def test_start_export_task_fail_already_exists():
client = boto3.client("rds", region_name="us-west-2")
source_arn = _prepare_db_snapshot(client)
client.start_export_task(
ExportTaskIdentifier="export-snapshot-1",
SourceArn=source_arn,
S3BucketName="export-bucket",
IamRoleArn="",
KmsKeyId="",
)
with pytest.raises(ClientError) as ex:
client.start_export_task(
ExportTaskIdentifier="export-snapshot-1",
SourceArn=source_arn,
S3BucketName="export-bucket",
IamRoleArn="",
KmsKeyId="",
)
err = ex.value.response["Error"]
err["Code"].should.equal("ExportTaskAlreadyExistsFault")
err["Message"].should.equal(
"Cannot start export task because a task with the identifier export-snapshot-1 already exists."
)
@mock_rds2
def test_cancel_export_task_fails_unknown_task():
client = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as ex:
client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1")
err = ex.value.response["Error"]
err["Code"].should.equal("ExportTaskNotFoundFault")
err["Message"].should.equal(
"Cannot cancel export task because a task with the identifier export-snapshot-1 is not exist."
)
@mock_rds2
def test_cancel_export_task():
client = boto3.client("rds", region_name="us-west-2")
source_arn = _prepare_db_snapshot(client)
client.start_export_task(
ExportTaskIdentifier="export-snapshot-1",
SourceArn=source_arn,
S3BucketName="export-bucket",
IamRoleArn="",
KmsKeyId="",
)
export = client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1")
export["ExportTaskIdentifier"].should.equal("export-snapshot-1")
export["Status"].should.equal("canceled")
@mock_rds2
def test_describe_export_tasks():
client = boto3.client("rds", region_name="us-west-2")
source_arn = _prepare_db_snapshot(client)
client.start_export_task(
ExportTaskIdentifier="export-snapshot-1",
SourceArn=source_arn,
S3BucketName="export-bucket",
IamRoleArn="",
KmsKeyId="",
)
exports = client.describe_export_tasks().get("ExportTasks")
exports.should.have.length_of(1)
exports[0]["ExportTaskIdentifier"].should.equal("export-snapshot-1")
@mock_rds2
def test_describe_export_tasks_fails_unknown_task():
client = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as ex:
client.describe_export_tasks(ExportTaskIdentifier="export-snapshot-1")
err = ex.value.response["Error"]
err["Code"].should.equal("ExportTaskNotFoundFault")
err["Message"].should.equal(
"Cannot cancel export task because a task with the identifier export-snapshot-1 is not exist."
)