Add cloudwatch tags (#4179)

* Update cloudwatch.put_metric_alarm to accept TreatMissingData and Tags parameter
* Add parameter ExtendedStatistic and EvaluateLowSampleCountPercentile to cloudwatch.put_metric_alarm
* Add parameter ThresholdMetricId to cloudwatch.put_metric_alarm
This commit is contained in:
Anton Grübel 2021-08-16 16:52:19 +09:00 committed by GitHub
parent 1800733162
commit 4df099c724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 736 additions and 101 deletions

View File

@ -0,0 +1,36 @@
from moto.core.exceptions import RESTError
class InvalidFormat(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)
class InvalidParameterValue(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)
class ResourceNotFound(RESTError):
code = 404
def __init__(self):
super().__init__(__class__.__name__, "Unknown")
class ResourceNotFoundException(RESTError):
code = 404
def __init__(self):
super().__init__(__class__.__name__, "Unknown")
class ValidationError(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)

View File

@ -2,17 +2,28 @@ import json
from boto3 import Session
from moto.core.utils import iso_8601_datetime_without_milliseconds
from moto.core.utils import (
iso_8601_datetime_without_milliseconds,
iso_8601_datetime_with_nanoseconds,
)
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core.exceptions import RESTError
from moto.logs import logs_backends
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from uuid import uuid4
from .exceptions import (
InvalidFormat,
ResourceNotFound,
ValidationError,
InvalidParameterValue,
ResourceNotFoundException,
)
from .utils import make_arn_for_dashboard, make_arn_for_alarm
from dateutil import parser
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from ..utilities.tagging_service import TaggingService
_EMPTY_LIST = tuple()
@ -89,6 +100,7 @@ def daterange(start, stop, step=timedelta(days=1), inclusive=False):
class FakeAlarm(BaseModel):
def __init__(
self,
region_name,
name,
namespace,
metric_name,
@ -99,6 +111,7 @@ class FakeAlarm(BaseModel):
period,
threshold,
statistic,
extended_statistic,
description,
dimensions,
alarm_actions,
@ -106,11 +119,14 @@ class FakeAlarm(BaseModel):
insufficient_data_actions,
unit,
actions_enabled,
region="us-east-1",
treat_missing_data,
evaluate_low_sample_count_percentile,
threshold_metric_id,
rule=None,
):
self.region_name = region_name
self.name = name
self.alarm_arn = make_arn_for_alarm(region, DEFAULT_ACCOUNT_ID, name)
self.alarm_arn = make_arn_for_alarm(region_name, DEFAULT_ACCOUNT_ID, name)
self.namespace = namespace
self.metric_name = metric_name
self.metric_data_queries = metric_data_queries
@ -120,6 +136,7 @@ class FakeAlarm(BaseModel):
self.period = period
self.threshold = threshold
self.statistic = statistic
self.extended_statistic = extended_statistic
self.description = description
self.dimensions = [
Dimension(dimension["name"], dimension["value"]) for dimension in dimensions
@ -129,14 +146,21 @@ class FakeAlarm(BaseModel):
self.ok_actions = ok_actions
self.insufficient_data_actions = insufficient_data_actions
self.unit = unit
self.configuration_updated_timestamp = datetime.utcnow()
self.configuration_updated_timestamp = iso_8601_datetime_with_nanoseconds(
datetime.now(tz=tzutc())
)
self.treat_missing_data = treat_missing_data
self.evaluate_low_sample_count_percentile = evaluate_low_sample_count_percentile
self.threshold_metric_id = threshold_metric_id
self.history = []
self.state_reason = ""
self.state_reason = "Unchecked: Initial alarm creation"
self.state_reason_data = "{}"
self.state_value = "OK"
self.state_updated_timestamp = datetime.utcnow()
self.state_updated_timestamp = iso_8601_datetime_with_nanoseconds(
datetime.now(tz=tzutc())
)
# only used for composite alarms
self.rule = rule
@ -156,7 +180,9 @@ class FakeAlarm(BaseModel):
self.state_reason = reason
self.state_reason_data = reason_data
self.state_value = state_value
self.state_updated_timestamp = datetime.utcnow()
self.state_updated_timestamp = iso_8601_datetime_with_nanoseconds(
datetime.now(tz=tzutc())
)
def are_dimensions_same(metric_dimensions, dimensions):
@ -273,11 +299,18 @@ class Statistics:
class CloudWatchBackend(BaseBackend):
def __init__(self):
def __init__(self, region_name):
self.region_name = region_name
self.alarms = {}
self.dashboards = {}
self.metric_data = []
self.paged_metric_data = {}
self.tagger = TaggingService()
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
@property
# Retrieve a list of all OOTB metrics that are provided by metrics providers
@ -300,6 +333,7 @@ class CloudWatchBackend(BaseBackend):
period,
threshold,
statistic,
extended_statistic,
description,
dimensions,
alarm_actions,
@ -307,32 +341,54 @@ class CloudWatchBackend(BaseBackend):
insufficient_data_actions,
unit,
actions_enabled,
region="us-east-1",
treat_missing_data,
evaluate_low_sample_count_percentile,
threshold_metric_id,
rule=None,
tags=None,
):
if extended_statistic and not extended_statistic.startswith("p"):
raise InvalidParameterValue(
f"The value {extended_statistic} for parameter ExtendedStatistic is not supported."
)
if (
evaluate_low_sample_count_percentile
and evaluate_low_sample_count_percentile not in ("evaluate", "ignore")
):
raise ValidationError(
f"Option {evaluate_low_sample_count_percentile} is not supported. "
"Supported options for parameter EvaluateLowSampleCountPercentile are evaluate and ignore."
)
alarm = FakeAlarm(
name,
namespace,
metric_name,
metric_data_queries,
comparison_operator,
evaluation_periods,
datapoints_to_alarm,
period,
threshold,
statistic,
description,
dimensions,
alarm_actions,
ok_actions,
insufficient_data_actions,
unit,
actions_enabled,
region,
region_name=self.region_name,
name=name,
namespace=namespace,
metric_name=metric_name,
metric_data_queries=metric_data_queries,
comparison_operator=comparison_operator,
evaluation_periods=evaluation_periods,
datapoints_to_alarm=datapoints_to_alarm,
period=period,
threshold=threshold,
statistic=statistic,
extended_statistic=extended_statistic,
description=description,
dimensions=dimensions,
alarm_actions=alarm_actions,
ok_actions=ok_actions,
insufficient_data_actions=insufficient_data_actions,
unit=unit,
actions_enabled=actions_enabled,
treat_missing_data=treat_missing_data,
evaluate_low_sample_count_percentile=evaluate_low_sample_count_percentile,
threshold_metric_id=threshold_metric_id,
rule=rule,
)
self.alarms[name] = alarm
self.tagger.tag_resource(alarm.alarm_arn, tags)
return alarm
def get_all_alarms(self):
@ -371,13 +427,6 @@ class CloudWatchBackend(BaseBackend):
)
def delete_alarms(self, alarm_names):
for alarm_name in alarm_names:
if alarm_name not in self.alarms:
raise RESTError(
"ResourceNotFound",
"Alarm {0} not found".format(alarm_name),
status=404,
)
for alarm_name in alarm_names:
self.alarms.pop(alarm_name, None)
@ -549,17 +598,16 @@ class CloudWatchBackend(BaseBackend):
if reason_data is not None:
json.loads(reason_data)
except ValueError:
raise RESTError("InvalidFormat", "StateReasonData is invalid JSON")
raise InvalidFormat("Unknown")
if alarm_name not in self.alarms:
raise RESTError(
"ResourceNotFound", "Alarm {0} not found".format(alarm_name), status=404
)
raise ResourceNotFound
if state_value not in ("OK", "ALARM", "INSUFFICIENT_DATA"):
raise RESTError(
"InvalidParameterValue",
"StateValue is not one of OK | ALARM | INSUFFICIENT_DATA",
raise ValidationError(
"1 validation error detected: "
f"Value '{state_value}' at 'stateValue' failed to satisfy constraint: "
"Member must satisfy enum value set: [INSUFFICIENT_DATA, ALARM, OK]"
)
self.alarms[alarm_name].update_state(reason, reason_data, state_value)
@ -567,9 +615,7 @@ class CloudWatchBackend(BaseBackend):
def list_metrics(self, next_token, namespace, metric_name, dimensions):
if next_token:
if next_token not in self.paged_metric_data:
raise RESTError(
"PaginationException", "Request parameter NextToken is invalid"
)
raise InvalidParameterValue("Request parameter NextToken is invalid")
else:
metrics = self.paged_metric_data[next_token]
del self.paged_metric_data[next_token] # Cant reuse same token twice
@ -591,6 +637,21 @@ class CloudWatchBackend(BaseBackend):
new_metrics.append(md)
return new_metrics
def list_tags_for_resource(self, arn):
return self.tagger.get_tag_dict_for_resource(arn)
def tag_resource(self, arn, tags):
if arn not in self.tagger.tags.keys():
raise ResourceNotFoundException
self.tagger.tag_resource(arn, tags)
def untag_resource(self, arn, tag_keys):
if arn not in self.tagger.tags.keys():
raise ResourceNotFoundException
self.tagger.untag_resource_using_names(arn, tag_keys)
def _get_paginated(self, metrics):
if len(metrics) > 500:
next_token = str(uuid4())
@ -629,13 +690,13 @@ class LogGroup(CloudFormationModel):
cloudwatch_backends = {}
for region in Session().get_available_regions("cloudwatch"):
cloudwatch_backends[region] = CloudWatchBackend()
cloudwatch_backends[region] = CloudWatchBackend(region)
for region in Session().get_available_regions(
"cloudwatch", partition_name="aws-us-gov"
):
cloudwatch_backends[region] = CloudWatchBackend()
cloudwatch_backends[region] = CloudWatchBackend(region)
for region in Session().get_available_regions("cloudwatch", partition_name="aws-cn"):
cloudwatch_backends[region] = CloudWatchBackend()
cloudwatch_backends[region] = CloudWatchBackend(region)
# List of services that provide OOTB CW metrics
# See the S3Backend constructor for an example

View File

@ -70,6 +70,7 @@ class CloudWatchResponse(BaseResponse):
period = self._get_param("Period")
threshold = self._get_param("Threshold")
statistic = self._get_param("Statistic")
extended_statistic = self._get_param("ExtendedStatistic")
description = self._get_param("AlarmDescription")
dimensions = self._get_list_prefix("Dimensions.member")
alarm_actions = self._get_multi_param("AlarmActions.member")
@ -79,28 +80,38 @@ class CloudWatchResponse(BaseResponse):
"InsufficientDataActions.member"
)
unit = self._get_param("Unit")
treat_missing_data = self._get_param("TreatMissingData")
evaluate_low_sample_count_percentile = self._get_param(
"EvaluateLowSampleCountPercentile"
)
threshold_metric_id = self._get_param("ThresholdMetricId")
# fetch AlarmRule to re-use this method for composite alarms as well
rule = self._get_param("AlarmRule")
tags = self._get_multi_param("Tags.member")
alarm = self.cloudwatch_backend.put_metric_alarm(
name,
namespace,
metric_name,
metric_data_queries,
comparison_operator,
evaluation_periods,
datapoints_to_alarm,
period,
threshold,
statistic,
description,
dimensions,
alarm_actions,
ok_actions,
insufficient_data_actions,
unit,
actions_enabled,
self.region,
name=name,
namespace=namespace,
metric_name=metric_name,
metric_data_queries=metric_data_queries,
comparison_operator=comparison_operator,
evaluation_periods=evaluation_periods,
datapoints_to_alarm=datapoints_to_alarm,
period=period,
threshold=threshold,
statistic=statistic,
extended_statistic=extended_statistic,
description=description,
dimensions=dimensions,
alarm_actions=alarm_actions,
ok_actions=ok_actions,
insufficient_data_actions=insufficient_data_actions,
unit=unit,
actions_enabled=actions_enabled,
treat_missing_data=treat_missing_data,
evaluate_low_sample_count_percentile=evaluate_low_sample_count_percentile,
threshold_metric_id=threshold_metric_id,
rule=rule,
tags=tags,
)
template = self.response_template(PUT_METRIC_ALARM_TEMPLATE)
return template.render(alarm=alarm)
@ -301,6 +312,35 @@ class CloudWatchResponse(BaseResponse):
template = self.response_template(SET_ALARM_STATE_TEMPLATE)
return template.render()
@amzn_request_id
def list_tags_for_resource(self):
resource_arn = self._get_param("ResourceARN")
tags = self.cloudwatch_backend.list_tags_for_resource(resource_arn)
template = self.response_template(LIST_TAGS_FOR_RESOURCE_TEMPLATE)
return template.render(tags=tags)
@amzn_request_id
def tag_resource(self):
resource_arn = self._get_param("ResourceARN")
tags = self._get_multi_param("Tags.member")
self.cloudwatch_backend.tag_resource(resource_arn, tags)
template = self.response_template(TAG_RESOURCE_TEMPLATE)
return template.render()
@amzn_request_id
def untag_resource(self):
resource_arn = self._get_param("ResourceARN")
tag_keys = self._get_multi_param("TagKeys.member")
self.cloudwatch_backend.untag_resource(resource_arn, tag_keys)
template = self.response_template(UNTAG_RESOURCE_TEMPLATE)
return template.render()
PUT_METRIC_ALARM_TEMPLATE = """<PutMetricAlarmResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
<ResponseMetadata>
@ -409,12 +449,24 @@ DESCRIBE_ALARMS_TEMPLATE = """<DescribeAlarmsResponse xmlns="http://monitoring.a
{% if alarm.statistic is not none %}
<Statistic>{{ alarm.statistic }}</Statistic>
{% endif %}
{% if alarm.extended_statistic is not none %}
<ExtendedStatistic>{{ alarm.extended_statistic }}</ExtendedStatistic>
{% endif %}
{% if alarm.threshold is not none %}
<Threshold>{{ alarm.threshold }}</Threshold>
{% endif %}
{% if alarm.unit is not none %}
<Unit>{{ alarm.unit }}</Unit>
{% endif %}
{% if alarm.treat_missing_data is not none %}
<TreatMissingData>{{ alarm.treat_missing_data }}</TreatMissingData>
{% endif %}
{% if alarm.evaluate_low_sample_count_percentile is not none %}
<EvaluateLowSampleCountPercentile>{{ alarm.evaluate_low_sample_count_percentile }}</EvaluateLowSampleCountPercentile>
{% endif %}
{% if alarm.threshold_metric_id is not none %}
<ThresholdMetricId>{{ alarm.threshold_metric_id }}</ThresholdMetricId>
{% endif %}
{% if alarm.rule is not none %}
<AlarmRule>{{ alarm.rule }}</AlarmRule>
{% endif %}
@ -654,3 +706,34 @@ ERROR_RESPONSE_TEMPLATE = """<ErrorResponse xmlns="http://monitoring.amazonaws.c
</Error>
<RequestId>{{ request_id }}</RequestId>
</ErrorResponse>"""
LIST_TAGS_FOR_RESOURCE_TEMPLATE = """<ListTagsForResourceResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
<ListTagsForResourceResult>
<Tags>
{% for key, value in tags.items() %}
<member>
<Key>{{ key }}</Key>
<Value>{{ value }}</Value>
</member>
{% endfor %}
</Tags>
</ListTagsForResourceResult>
<ResponseMetadata>
<RequestId>{{ request_id }}</RequestId>
</ResponseMetadata>
</ListTagsForResourceResponse>
"""
TAG_RESOURCE_TEMPLATE = """<TagResourceResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
<TagResourceResult/>
<ResponseMetadata>
<RequestId>{{ request_id }}</RequestId>
</ResponseMetadata>
</TagResourceResponse>"""
UNTAG_RESOURCE_TEMPLATE = """<UntagResourceResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
<UntagResourceResult/>
<ResponseMetadata>
<RequestId>{{ request_id }}</RequestId>
</ResponseMetadata>
</UntagResourceResponse>"""

View File

@ -188,14 +188,17 @@ def iso_8601_datetime_with_milliseconds(datetime):
return datetime.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
# Even Python does not support nanoseconds, other languages like Go do (needed for Terraform)
def iso_8601_datetime_with_nanoseconds(datetime):
return datetime.strftime("%Y-%m-%dT%H:%M:%S.%f000Z")
def iso_8601_datetime_without_milliseconds(datetime):
return None if datetime is None else datetime.strftime("%Y-%m-%dT%H:%M:%S") + "Z"
return None if datetime is None else datetime.strftime("%Y-%m-%dT%H:%M:%SZ")
def iso_8601_datetime_without_milliseconds_s3(datetime):
return (
None if datetime is None else datetime.strftime("%Y-%m-%dT%H:%M:%S.000") + "Z"
)
return None if datetime is None else datetime.strftime("%Y-%m-%dT%H:%M:%S.000Z")
RFC1123 = "%a, %d %b %Y %H:%M:%S GMT"

View File

@ -627,5 +627,22 @@
"name": "amzn-ami-vpc-nat-2018.03.0.20210721.0-x86_64-ebs",
"virtualization_type": "hvm",
"hypervisor": "xen"
},
{
"ami_id": "ami-000c540e28953ace2",
"state": "available",
"public": true,
"owner_id": "137112412989",
"image_location": "amazon/amzn-ami-minimal-hvm-2018.03.0.20181129-x86_64-ebs",
"sriov": "simple",
"root_device_type": "ebs",
"root_device_name": "/dev/xvda",
"description": "Amazon Linux AMI 2018.03.0.20181129 x86_64 Minimal HVM ebs",
"image_type": "machine",
"platform": "Linux/UNIX",
"architecture": "x86_64",
"name": "amzn-ami-minimal-hvm-2018.03.0.20181129-x86_64-ebs",
"virtualization_type": "hvm",
"hypervisor": "xen"
}
]

View File

@ -12,6 +12,7 @@ TestAccAWSCloudWatchEventConnection
TestAccAWSCloudWatchEventPermission
TestAccAWSCloudWatchEventRule
TestAccAWSCloudwatchLogGroupDataSource
TestAccAWSCloudWatchMetricAlarm
TestAccAWSDataSourceCloudwatch
TestAccAWSDataSourceElasticBeanstalkHostedZone
TestAccAWSDataSourceIAMGroup

View File

@ -1,8 +1,9 @@
# from __future__ import unicode_literals
from datetime import datetime, timedelta
from operator import itemgetter
import boto3
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from freezegun import freeze_time
import pytest
from uuid import uuid4
@ -95,34 +96,12 @@ def test_get_dashboard_fail():
@mock_cloudwatch
def test_delete_invalid_alarm():
def test_delete_alarms_without_error():
# given
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
cloudwatch.put_metric_alarm(
AlarmName="testalarm1",
MetricName="cpu",
Namespace="blah",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
# trying to delete an alarm which is not created along with valid alarm.
with pytest.raises(ClientError) as e:
cloudwatch.delete_alarms(AlarmNames=["InvalidAlarmName", "testalarm1"])
e.value.response["Error"]["Code"].should.equal("ResourceNotFound")
resp = cloudwatch.describe_alarms(AlarmNames=["testalarm1"])
# making sure other alarms are not deleted in case of an error.
len(resp["MetricAlarms"]).should.equal(1)
# test to check if the error raises if only one invalid alarm is tried to delete.
with pytest.raises(ClientError) as e:
cloudwatch.delete_alarms(AlarmNames=["InvalidAlarmName"])
e.value.response["Error"]["Code"].should.equal("ResourceNotFound")
# when/then
cloudwatch.delete_alarms(AlarmNames=["not-exists"])
@mock_cloudwatch
@ -917,3 +896,458 @@ def test_get_metric_data_for_multiple_metrics():
res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0]
res2["Values"].should.equal([25.0])
@mock_cloudwatch
def test_put_metric_alarm():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
alarm_name = "test-alarm"
sns_topic_arn = f"arn:aws:sns:${region_name}:${ACCOUNT_ID}:test-topic"
# when
client.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription="test alarm",
ActionsEnabled=True,
OKActions=[sns_topic_arn],
AlarmActions=[sns_topic_arn],
InsufficientDataActions=[sns_topic_arn],
MetricName="5XXError",
Namespace="AWS/ApiGateway",
Statistic="Sum",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
TreatMissingData="notBreaching",
Tags=[{"Key": "key-1", "Value": "value-1"}],
)
# then
alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"]
alarms.should.have.length_of(1)
alarm = alarms[0]
alarm["AlarmName"].should.equal(alarm_name)
alarm["AlarmArn"].should.equal(
f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}"
)
alarm["AlarmDescription"].should.equal("test alarm")
alarm["AlarmConfigurationUpdatedTimestamp"].should.be.a(datetime)
alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo.should.equal(tzutc())
alarm["ActionsEnabled"].should.be.ok
alarm["OKActions"].should.equal([sns_topic_arn])
alarm["AlarmActions"].should.equal([sns_topic_arn])
alarm["InsufficientDataActions"].should.equal([sns_topic_arn])
alarm["StateValue"].should.equal("OK")
alarm["StateReason"].should.equal("Unchecked: Initial alarm creation")
alarm["StateUpdatedTimestamp"].should.be.a(datetime)
alarm["StateUpdatedTimestamp"].tzinfo.should.equal(tzutc())
alarm["MetricName"].should.equal("5XXError")
alarm["Namespace"].should.equal("AWS/ApiGateway")
alarm["Statistic"].should.equal("Sum")
sorted(alarm["Dimensions"], key=itemgetter("Name")).should.equal(
sorted(
[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
key=itemgetter("Name"),
)
)
alarm["Period"].should.equal(60)
alarm["Unit"].should.equal("Seconds")
alarm["EvaluationPeriods"].should.equal(1)
alarm["DatapointsToAlarm"].should.equal(1)
alarm["Threshold"].should.equal(1.0)
alarm["ComparisonOperator"].should.equal("GreaterThanOrEqualToThreshold")
alarm["TreatMissingData"].should.equal("notBreaching")
@mock_cloudwatch
def test_put_metric_alarm_with_percentile():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
alarm_name = "test-alarm"
# when
client.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription="test alarm",
ActionsEnabled=True,
MetricName="5XXError",
Namespace="AWS/ApiGateway",
ExtendedStatistic="p90",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
TreatMissingData="notBreaching",
EvaluateLowSampleCountPercentile="ignore",
)
# then
alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"]
alarms.should.have.length_of(1)
alarm = alarms[0]
alarm["AlarmName"].should.equal(alarm_name)
alarm["AlarmArn"].should.equal(
f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}"
)
alarm["AlarmDescription"].should.equal("test alarm")
alarm["AlarmConfigurationUpdatedTimestamp"].should.be.a(datetime)
alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo.should.equal(tzutc())
alarm["ActionsEnabled"].should.be.ok
alarm["StateValue"].should.equal("OK")
alarm["StateReason"].should.equal("Unchecked: Initial alarm creation")
alarm["StateUpdatedTimestamp"].should.be.a(datetime)
alarm["StateUpdatedTimestamp"].tzinfo.should.equal(tzutc())
alarm["MetricName"].should.equal("5XXError")
alarm["Namespace"].should.equal("AWS/ApiGateway")
alarm["ExtendedStatistic"].should.equal("p90")
sorted(alarm["Dimensions"], key=itemgetter("Name")).should.equal(
sorted(
[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
key=itemgetter("Name"),
)
)
alarm["Period"].should.equal(60)
alarm["Unit"].should.equal("Seconds")
alarm["EvaluationPeriods"].should.equal(1)
alarm["DatapointsToAlarm"].should.equal(1)
alarm["Threshold"].should.equal(1.0)
alarm["ComparisonOperator"].should.equal("GreaterThanOrEqualToThreshold")
alarm["TreatMissingData"].should.equal("notBreaching")
alarm["EvaluateLowSampleCountPercentile"].should.equal("ignore")
@mock_cloudwatch
def test_put_metric_alarm_with_anomaly_detection():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
alarm_name = "test-alarm"
metrics = [
{
"Id": "m1",
"ReturnData": True,
"MetricStat": {
"Metric": {
"MetricName": "CPUUtilization",
"Namespace": "AWS/EC2",
"Dimensions": [
{"Name": "instanceId", "Value": "i-1234567890abcdef0"}
],
},
"Stat": "Average",
"Period": 60,
},
},
{
"Id": "t1",
"ReturnData": False,
"Expression": "ANOMALY_DETECTION_BAND(m1, 3)",
},
]
# when
client.put_metric_alarm(
AlarmName=alarm_name,
ActionsEnabled=True,
Metrics=metrics,
EvaluationPeriods=2,
ComparisonOperator="GreaterThanOrEqualToThreshold",
ThresholdMetricId="t1",
)
# then
alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"]
alarms.should.have.length_of(1)
alarm = alarms[0]
alarm["AlarmName"].should.equal(alarm_name)
alarm["AlarmArn"].should.equal(
f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}"
)
alarm["AlarmConfigurationUpdatedTimestamp"].should.be.a(datetime)
alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo.should.equal(tzutc())
alarm["StateValue"].should.equal("OK")
alarm["StateReason"].should.equal("Unchecked: Initial alarm creation")
alarm["StateUpdatedTimestamp"].should.be.a(datetime)
alarm["StateUpdatedTimestamp"].tzinfo.should.equal(tzutc())
alarm["EvaluationPeriods"].should.equal(2)
alarm["ComparisonOperator"].should.equal("GreaterThanOrEqualToThreshold")
alarm["Metrics"].should.equal(metrics)
alarm["ThresholdMetricId"].should.equal("t1")
@mock_cloudwatch
def test_put_metric_alarm_error_extended_statistic():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
alarm_name = "test-alarm"
# when
with pytest.raises(ClientError) as e:
client.put_metric_alarm(
AlarmName=alarm_name,
ActionsEnabled=True,
MetricName="5XXError",
Namespace="AWS/ApiGateway",
ExtendedStatistic="90",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
TreatMissingData="notBreaching",
)
# then
ex = e.value
ex.operation_name.should.equal("PutMetricAlarm")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterValue")
ex.response["Error"]["Message"].should.equal(
"The value 90 for parameter ExtendedStatistic is not supported."
)
@mock_cloudwatch
def test_put_metric_alarm_error_evaluate_low_sample_count_percentile():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
alarm_name = "test-alarm"
# when
with pytest.raises(ClientError) as e:
client.put_metric_alarm(
AlarmName=alarm_name,
ActionsEnabled=True,
MetricName="5XXError",
Namespace="AWS/ApiGateway",
ExtendedStatistic="p90",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
TreatMissingData="notBreaching",
EvaluateLowSampleCountPercentile="unknown",
)
# then
ex = e.value
ex.operation_name.should.equal("PutMetricAlarm")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ValidationError")
ex.response["Error"]["Message"].should.equal(
"Option unknown is not supported. "
"Supported options for parameter EvaluateLowSampleCountPercentile are evaluate and ignore."
)
@mock_cloudwatch
def test_list_tags_for_resource():
# given
client = boto3.client("cloudwatch", region_name="eu-central-1")
alarm_name = "test-alarm"
client.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription="test alarm",
ActionsEnabled=True,
MetricName="5XXError",
Namespace="AWS/ApiGateway",
Statistic="Sum",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
Tags=[{"Key": "key-1", "Value": "value-1"}],
)
arn = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"][0]["AlarmArn"]
# when
response = client.list_tags_for_resource(ResourceARN=arn)
# then
response["Tags"].should.equal([{"Key": "key-1", "Value": "value-1"}])
@mock_cloudwatch
def test_list_tags_for_resource_with_unknown_resource():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
# when
response = client.list_tags_for_resource(
ResourceARN=make_arn_for_alarm(
region=region_name, account_id=ACCOUNT_ID, alarm_name="unknown"
)
)
# then
response["Tags"].should.be.empty
@mock_cloudwatch
def test_tag_resource():
# given
client = boto3.client("cloudwatch", region_name="eu-central-1")
alarm_name = "test-alarm"
client.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription="test alarm",
ActionsEnabled=True,
MetricName="5XXError",
Namespace="AWS/ApiGateway",
Statistic="Sum",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
Tags=[{"Key": "key-1", "Value": "value-1"}],
)
arn = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"][0]["AlarmArn"]
# when
client.tag_resource(ResourceARN=arn, Tags=[{"Key": "key-2", "Value": "value-2"}])
# then
response = client.list_tags_for_resource(ResourceARN=arn)
sorted(response["Tags"], key=itemgetter("Key")).should.equal(
sorted(
[
{"Key": "key-1", "Value": "value-1"},
{"Key": "key-2", "Value": "value-2"},
],
key=itemgetter("Key"),
)
)
@mock_cloudwatch
def test_tag_resource_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
# when
with pytest.raises(ClientError) as e:
client.tag_resource(
ResourceARN=make_arn_for_alarm(
region=region_name, account_id=ACCOUNT_ID, alarm_name="unknown"
),
Tags=[{"Key": "key-1", "Value": "value-1"},],
)
# then
ex = e.value
ex.operation_name.should.equal("TagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal("Unknown")
@mock_cloudwatch
def test_untag_resource():
# given
client = boto3.client("cloudwatch", region_name="eu-central-1")
alarm_name = "test-alarm"
client.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription="test alarm",
ActionsEnabled=True,
MetricName="5XXError",
Namespace="AWS/ApiGateway",
Statistic="Sum",
Dimensions=[
{"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"},
],
Period=60,
Unit="Seconds",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=1.0,
ComparisonOperator="GreaterThanOrEqualToThreshold",
Tags=[
{"Key": "key-1", "Value": "value-1"},
{"Key": "key-2", "Value": "value-2"},
],
)
arn = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"][0]["AlarmArn"]
# when
client.untag_resource(ResourceARN=arn, TagKeys=["key-2"])
# then
response = client.list_tags_for_resource(ResourceARN=arn)
response["Tags"].should.equal([{"Key": "key-1", "Value": "value-1"}])
@mock_cloudwatch
def test_untag_resource_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("cloudwatch", region_name=region_name)
# when
with pytest.raises(ClientError) as e:
client.untag_resource(
ResourceARN=make_arn_for_alarm(
region=region_name, account_id=ACCOUNT_ID, alarm_name="unknown"
),
TagKeys=["key-1"],
)
# then
ex = e.value
ex.operation_name.should.equal("UntagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal("Unknown")

View File

@ -290,7 +290,7 @@ def test_ami_filters():
amis_by_architecture = conn.get_all_images(filters={"architecture": "x86_64"})
set([ami.id for ami in amis_by_architecture]).should.contain(imageB.id)
len(amis_by_architecture).should.equal(38)
len(amis_by_architecture).should.equal(39)
amis_by_kernel = conn.get_all_images(filters={"kernel-id": "k-abcd1234"})
set([ami.id for ami in amis_by_kernel]).should.equal(set([imageB.id]))
@ -312,14 +312,14 @@ def test_ami_filters():
ami_ids_by_state = [ami.id for ami in amis_by_state]
ami_ids_by_state.should.contain(imageA.id)
ami_ids_by_state.should.contain(imageB.id)
len(amis_by_state).should.equal(39)
len(amis_by_state).should.equal(40)
amis_by_name = conn.get_all_images(filters={"name": imageA.name})
set([ami.id for ami in amis_by_name]).should.equal(set([imageA.id]))
amis_by_public = conn.get_all_images(filters={"is-public": "true"})
set([ami.id for ami in amis_by_public]).should.contain(imageB.id)
len(amis_by_public).should.equal(38)
len(amis_by_public).should.equal(39)
amis_by_nonpublic = conn.get_all_images(filters={"is-public": "false"})
set([ami.id for ami in amis_by_nonpublic]).should.contain(imageA.id)