Added event subscriptions: create_event_subscription, delete_event_subscription, describe_event_subscriptions (#4807)

This commit is contained in:
Dmytro Kazanzhy 2022-01-29 14:24:28 +02:00 committed by GitHub
parent 3dfda9c1c9
commit cb60761510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 314 additions and 3 deletions

View File

@ -177,3 +177,19 @@ class InvalidExportSourceStateError(RDSClientError):
status
),
)
class SubscriptionAlreadyExistError(RDSClientError):
def __init__(self, subscription_name):
super().__init__(
"SubscriptionAlreadyExistFault",
"Subscription {} already exists.".format(subscription_name),
)
class SubscriptionNotFoundError(RDSClientError):
def __init__(self, subscription_name):
super().__init__(
"SubscriptionNotFoundFault",
"Subscription {} not found.".format(subscription_name),
)

View File

@ -33,6 +33,8 @@ from .exceptions import (
ExportTaskNotFoundError,
ExportTaskAlreadyExistsError,
InvalidExportSourceStateError,
SubscriptionNotFoundError,
SubscriptionAlreadyExistError,
)
from .utils import FilterDef, apply_filter, merge_filters, validate_filters
@ -957,6 +959,72 @@ class ExportTask(BaseModel):
return template.render(task=self, snapshot=self.snapshot)
class EventSubscription(BaseModel):
def __init__(self, kwargs):
self.subscription_name = kwargs.get("subscription_name")
self.sns_topic_arn = kwargs.get("sns_topic_arn")
self.source_type = kwargs.get("source_type")
self.event_categories = kwargs.get("event_categories", [])
self.source_ids = kwargs.get("source_ids", [])
self.enabled = kwargs.get("enabled", True)
self.tags = kwargs.get("tags", True)
self.region = ""
self.customer_aws_id = copy.copy(ACCOUNT_ID)
self.status = "available"
self.created_at = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
@property
def es_arn(self):
return "arn:aws:rds:{0}:{1}:es:{2}".format(
self.region, ACCOUNT_ID, self.subscription_name
)
def to_xml(self):
template = Template(
"""
<EventSubscription>
<CustomerAwsId>{{ subscription.customer_aws_id }}</CustomerAwsId>
<CustSubscriptionId>{{ subscription.subscription_name }}</CustSubscriptionId>
<SnsTopicArn>{{ subscription.sns_topic_arn }}</SnsTopicArn>
<SubscriptionCreationTime>{{ subscription.created_at }}</SubscriptionCreationTime>
<SourceType>{{ subscription.source_type }}</SourceType>
<SourceIdsList>
{%- for source_id in subscription.source_ids -%}
<SourceId>{{ source_id }}</SourceId>
{%- endfor -%}
</SourceIdsList>
<EventCategoriesList>
{%- for category in subscription.event_categories -%}
<EventCategory>{{ category }}</EventCategory>
{%- endfor -%}
</EventCategoriesList>
<Status>{{ subscription.status }}</Status>
<Enabled>{{ subscription.enabled }}</Enabled>
<EventSubscriptionArn>{{ subscription.es_arn }}</EventSubscriptionArn>
<TagList>
{%- for tag in subscription.tags -%}
<Tag><Key>{{ tag['Key'] }}</Key><Value>{{ tag['Value'] }}</Value></Tag>
{%- endfor -%}
</TagList>
</EventSubscription>
"""
)
return template.render(subscription=self)
def get_tags(self):
return self.tags
def add_tags(self, tags):
new_keys = [tag_set["Key"] for tag_set in tags]
self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in new_keys]
self.tags.extend(tags)
return self.tags
def remove_tags(self, tag_keys):
self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in tag_keys]
class SecurityGroup(CloudFormationModel):
def __init__(self, group_name, description, tags):
self.group_name = group_name
@ -1183,6 +1251,7 @@ class RDS2Backend(BaseBackend):
self.database_snapshots = OrderedDict()
self.cluster_snapshots = OrderedDict()
self.export_tasks = OrderedDict()
self.event_subscriptions = OrderedDict()
self.db_parameter_groups = {}
self.option_groups = {}
self.security_groups = {}
@ -1844,6 +1913,30 @@ class RDS2Backend(BaseBackend):
raise ExportTaskNotFoundError(export_task_identifier)
return self.export_tasks.values()
def create_event_subscription(self, kwargs):
subscription_name = kwargs["subscription_name"]
if subscription_name in self.event_subscriptions:
raise SubscriptionAlreadyExistError(subscription_name)
subscription = EventSubscription(kwargs)
self.event_subscriptions[subscription_name] = subscription
return subscription
def delete_event_subscription(self, subscription_name):
if subscription_name in self.event_subscriptions:
return self.event_subscriptions.pop(subscription_name)
raise SubscriptionNotFoundError(subscription_name)
def describe_event_subscriptions(self, subscription_name):
if subscription_name:
if subscription_name in self.event_subscriptions:
return [self.event_subscriptions[subscription_name]]
else:
raise SubscriptionNotFoundError(subscription_name)
return self.event_subscriptions.values()
def list_tags_for_resource(self, arn):
if self.arn_regex.match(arn):
arn_breakdown = arn.split(":")
@ -1856,9 +1949,8 @@ class RDS2Backend(BaseBackend):
if resource_name in self.clusters:
return self.clusters[resource_name].get_tags()
elif resource_type == "es": # Event Subscription
# TODO: Complete call to tags on resource type Event
# Subscription
return []
if resource_name in self.event_subscriptions:
return self.event_subscriptions[resource_name].get_tags()
elif resource_type == "og": # Option Group
if resource_name in self.option_groups:
return self.option_groups[resource_name].get_tags()

View File

@ -122,6 +122,19 @@ class RDS2Response(BaseResponse):
"export_only": self.unpack_list_params("ExportOnly.member"),
}
def _get_event_subscription_kwargs(self):
return {
"subscription_name": self._get_param("SubscriptionName"),
"sns_topic_arn": self._get_param("SnsTopicArn"),
"source_type": self._get_param("SourceType"),
"event_categories": self.unpack_list_params(
"EventCategories.EventCategory"
),
"source_ids": self.unpack_list_params("SourceIds.SourceId"),
"enabled": self._get_param("Enabled"),
"tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")),
}
def unpack_complex_list_params(self, label, names):
unpacked_list = list()
count = 1
@ -577,6 +590,24 @@ class RDS2Response(BaseResponse):
template = self.response_template(DESCRIBE_EXPORT_TASKS_TEMPLATE)
return template.render(tasks=tasks)
def create_event_subscription(self):
kwargs = self._get_event_subscription_kwargs()
subscription = self.backend.create_event_subscription(kwargs)
template = self.response_template(CREATE_EVENT_SUBSCRIPTION_TEMPLATE)
return template.render(subscription=subscription)
def delete_event_subscription(self):
subscription_name = self._get_param("SubscriptionName")
subscription = self.backend.delete_event_subscription(subscription_name)
template = self.response_template(DELETE_EVENT_SUBSCRIPTION_TEMPLATE)
return template.render(subscription=subscription)
def describe_event_subscriptions(self):
subscription_name = self._get_param("SubscriptionName")
subscriptions = self.backend.describe_event_subscriptions(subscription_name)
template = self.response_template(DESCRIBE_EVENT_SUBSCRIPTIONS_TEMPLATE)
return template.render(subscriptions=subscriptions)
CREATE_DATABASE_TEMPLATE = """<CreateDBInstanceResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBInstanceResult>
@ -1063,3 +1094,37 @@ DESCRIBE_EXPORT_TASKS_TEMPLATE = """<DescribeExportTasksResponse xmlns="http://r
</ResponseMetadata>
</DescribeExportTasksResponse>
"""
CREATE_EVENT_SUBSCRIPTION_TEMPLATE = """<CreateEventSubscriptionResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateEventSubscriptionResult>
{{ subscription.to_xml() }}
</CreateEventSubscriptionResult>
<ResponseMetadata>
<RequestId>523e3218-afc7-11c3-90f5-f90431260ab4</RequestId>
</ResponseMetadata>
</CreateEventSubscriptionResponse>
"""
DELETE_EVENT_SUBSCRIPTION_TEMPLATE = """<DeleteEventSubscriptionResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DeleteEventSubscriptionResult>
{{ subscription.to_xml() }}
</DeleteEventSubscriptionResult>
<ResponseMetadata>
<RequestId>523e3218-afc7-11c3-90f5-f90431260ab4</RequestId>
</ResponseMetadata>
</DeleteEventSubscriptionResponse>
"""
DESCRIBE_EVENT_SUBSCRIPTIONS_TEMPLATE = """<DescribeEventSubscriptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DescribeEventSubscriptionsResult>
<EventSubscriptionsList>
{%- for subscription in subscriptions -%}
{{ subscription.to_xml() }}
{%- endfor -%}
</EventSubscriptionsList>
</DescribeEventSubscriptionsResult>
<ResponseMetadata>
<RequestId>523e3218-afc7-11c3-90f5-f90431260ab4</RequestId>
</ResponseMetadata>
</DescribeEventSubscriptionsResponse>
"""

View File

@ -0,0 +1,138 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_rds2
from moto.core import ACCOUNT_ID
DB_INSTANCE_IDENTIFIER = "db-primary-1"
def _prepare_db_instance(client):
resp = client.create_db_instance(
DBInstanceIdentifier=DB_INSTANCE_IDENTIFIER,
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
return resp["DBInstance"]["DBInstanceIdentifier"]
@mock_rds2
def test_create_event_subscription():
client = boto3.client("rds", region_name="us-west-2")
db_identifier = _prepare_db_instance(client)
es = client.create_event_subscription(
SubscriptionName=f"{db_identifier}-events",
SnsTopicArn=f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic",
SourceType="db-instance",
EventCategories=[
"Backup",
"Creation",
"Deletion",
"Failure",
"Recovery",
"Restoration",
],
SourceIds=[db_identifier],
).get("EventSubscription")
es["CustSubscriptionId"].should.equal(f"{db_identifier}-events")
es["SnsTopicArn"].should.equal(
f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic"
)
es["SourceType"].should.equal("db-instance")
es["EventCategoriesList"].should.equal(
["Backup", "Creation", "Deletion", "Failure", "Recovery", "Restoration"]
)
es["SourceIdsList"].should.equal([db_identifier])
es["Enabled"].should.equal(False)
@mock_rds2
def test_create_event_fail_already_exists():
client = boto3.client("rds", region_name="us-west-2")
db_identifier = _prepare_db_instance(client)
client.create_event_subscription(
SubscriptionName=f"{db_identifier}-events",
SnsTopicArn=f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic",
)
with pytest.raises(ClientError) as ex:
client.create_event_subscription(
SubscriptionName=f"{db_identifier}-events",
SnsTopicArn=f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic",
Enabled=True,
)
err = ex.value.response["Error"]
err["Code"].should.equal("SubscriptionAlreadyExistFault")
err["Message"].should.equal("Subscription db-primary-1-events already exists.")
@mock_rds2
def test_delete_event_subscription_fails_unknown_subscription():
client = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as ex:
client.delete_event_subscription(SubscriptionName="my-db-events")
err = ex.value.response["Error"]
err["Code"].should.equal("SubscriptionNotFoundFault")
err["Message"].should.equal("Subscription my-db-events not found.")
@mock_rds2
def test_delete_event_subscription():
client = boto3.client("rds", region_name="us-west-2")
db_identifier = _prepare_db_instance(client)
client.create_event_subscription(
SubscriptionName=f"{db_identifier}-events",
SnsTopicArn=f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic",
)
es = client.delete_event_subscription(
SubscriptionName=f"{db_identifier}-events",
).get("EventSubscription")
es["CustSubscriptionId"].should.equal(f"{db_identifier}-events")
es["SnsTopicArn"].should.equal(
f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic"
)
@mock_rds2
def test_describe_event_subscriptions():
client = boto3.client("rds", region_name="us-west-2")
db_identifier = _prepare_db_instance(client)
client.create_event_subscription(
SubscriptionName=f"{db_identifier}-events",
SnsTopicArn=f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic",
)
subscriptions = client.describe_event_subscriptions().get("EventSubscriptionsList")
subscriptions.should.have.length_of(1)
subscriptions[0]["CustSubscriptionId"].should.equal(f"{db_identifier}-events")
@mock_rds2
def test_describe_event_subscriptions_fails_unknown_subscription():
client = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as ex:
client.describe_event_subscriptions(SubscriptionName="my-db-events")
err = ex.value.response["Error"]
err["Code"].should.equal("SubscriptionNotFoundFault")
err["Message"].should.equal("Subscription my-db-events not found.")