Techdebt: Replace sure with regular assertions in CloudWatch (#6493)

This commit is contained in:
Bert Blommers 2023-07-07 14:24:54 +00:00 committed by GitHub
parent 89f2a3c538
commit 0cad3b54f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 375 additions and 408 deletions

View File

@ -1,5 +1,4 @@
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_cloudwatch from moto import mock_cloudwatch
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -29,29 +28,25 @@ def test_create_alarm():
) )
alarms = cloudwatch.describe_alarms()["MetricAlarms"] alarms = cloudwatch.describe_alarms()["MetricAlarms"]
alarms.should.have.length_of(1) assert len(alarms) == 1
alarm = alarms[0] alarm = alarms[0]
alarm.should.have.key("AlarmName").equal("tester") assert alarm["AlarmName"] == "tester"
alarm.should.have.key("Namespace").equal("tester_namespace") assert alarm["Namespace"] == "tester_namespace"
alarm.should.have.key("MetricName").equal("tester_metric") assert alarm["MetricName"] == "tester_metric"
alarm.should.have.key("ComparisonOperator").equal("GreaterThanOrEqualToThreshold") assert alarm["ComparisonOperator"] == "GreaterThanOrEqualToThreshold"
alarm.should.have.key("Threshold").equal(2.0) assert alarm["Threshold"] == 2.0
alarm.should.have.key("Period").equal(60) assert alarm["Period"] == 60
alarm.should.have.key("EvaluationPeriods").equal(5) assert alarm["EvaluationPeriods"] == 5
alarm.should.have.key("Statistic").should.equal("Average") assert alarm["Statistic"] == "Average"
alarm.should.have.key("AlarmDescription").equal("A test") assert alarm["AlarmDescription"] == "A test"
alarm.should.have.key("Dimensions").equal( assert alarm["Dimensions"] == [{"Name": "InstanceId", "Value": "i-0123457"}]
[{"Name": "InstanceId", "Value": "i-0123457"}] assert alarm["AlarmActions"] == ["arn:alarm"]
) assert alarm["OKActions"] == ["arn:ok"]
alarm.should.have.key("AlarmActions").equal(["arn:alarm"]) assert alarm["InsufficientDataActions"] == ["arn:insufficient"]
alarm.should.have.key("OKActions").equal(["arn:ok"]) assert alarm["Unit"] == "Seconds"
alarm.should.have.key("InsufficientDataActions").equal(["arn:insufficient"]) assert alarm["AlarmArn"] == f"arn:aws:cloudwatch:{region}:{ACCOUNT_ID}:alarm:{name}"
alarm.should.have.key("Unit").equal("Seconds")
alarm.should.have.key("AlarmArn").equal(
f"arn:aws:cloudwatch:{region}:{ACCOUNT_ID}:alarm:{name}"
)
# default value should be True # default value should be True
alarm.should.have.key("ActionsEnabled").equal(True) assert alarm["ActionsEnabled"] is True
@mock_cloudwatch @mock_cloudwatch
@ -59,7 +54,7 @@ def test_delete_alarm():
cloudwatch = boto3.client("cloudwatch", region_name="eu-central-1") cloudwatch = boto3.client("cloudwatch", region_name="eu-central-1")
alarms = cloudwatch.describe_alarms()["MetricAlarms"] alarms = cloudwatch.describe_alarms()["MetricAlarms"]
alarms.should.have.length_of(0) assert len(alarms) == 0
name = "tester" name = "tester"
cloudwatch.put_metric_alarm( cloudwatch.put_metric_alarm(
@ -80,12 +75,12 @@ def test_delete_alarm():
) )
alarms = cloudwatch.describe_alarms()["MetricAlarms"] alarms = cloudwatch.describe_alarms()["MetricAlarms"]
alarms.should.have.length_of(1) assert len(alarms) == 1
cloudwatch.delete_alarms(AlarmNames=[name]) cloudwatch.delete_alarms(AlarmNames=[name])
alarms = cloudwatch.describe_alarms()["MetricAlarms"] alarms = cloudwatch.describe_alarms()["MetricAlarms"]
alarms.should.have.length_of(0) assert len(alarms) == 0
@mock_cloudwatch @mock_cloudwatch
@ -112,10 +107,10 @@ def test_describe_alarms_for_metric():
ActionsEnabled=True, ActionsEnabled=True,
) )
alarms = conn.describe_alarms_for_metric(MetricName="cpu", Namespace="blah") alarms = conn.describe_alarms_for_metric(MetricName="cpu", Namespace="blah")
alarms.get("MetricAlarms").should.have.length_of(1) assert len(alarms.get("MetricAlarms")) == 1
alarm = alarms.get("MetricAlarms")[0] alarm = alarms.get("MetricAlarms")[0]
assert "testalarm1" in alarm.get("AlarmArn") assert "testalarm1" in alarm.get("AlarmArn")
alarm.should.have.key("ActionsEnabled").equal(True) assert alarm["ActionsEnabled"] is True
@mock_cloudwatch @mock_cloudwatch
@ -177,7 +172,7 @@ def test_describe_alarms():
) )
alarms = conn.describe_alarms() alarms = conn.describe_alarms()
metric_alarms = alarms.get("MetricAlarms") metric_alarms = alarms.get("MetricAlarms")
metric_alarms.should.have.length_of(2) assert len(metric_alarms) == 2
single_metric_alarm = [ single_metric_alarm = [
alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm1" alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm1"
][0] ][0]
@ -185,23 +180,23 @@ def test_describe_alarms():
alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm2" alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm2"
][0] ][0]
single_metric_alarm["MetricName"].should.equal("cpu") assert single_metric_alarm["MetricName"] == "cpu"
single_metric_alarm.shouldnt.have.property("Metrics") assert "Metrics" not in single_metric_alarm
single_metric_alarm["Namespace"].should.equal("blah") assert single_metric_alarm["Namespace"] == "blah"
single_metric_alarm["Period"].should.equal(10) assert single_metric_alarm["Period"] == 10
single_metric_alarm["EvaluationPeriods"].should.equal(5) assert single_metric_alarm["EvaluationPeriods"] == 5
single_metric_alarm["Statistic"].should.equal("Average") assert single_metric_alarm["Statistic"] == "Average"
single_metric_alarm["ComparisonOperator"].should.equal("GreaterThanThreshold") assert single_metric_alarm["ComparisonOperator"] == "GreaterThanThreshold"
single_metric_alarm["Threshold"].should.equal(2) assert single_metric_alarm["Threshold"] == 2
single_metric_alarm["ActionsEnabled"].should.equal(False) assert single_metric_alarm["ActionsEnabled"] is False
multiple_metric_alarm.shouldnt.have.property("MetricName") assert "MetricName" not in multiple_metric_alarm
multiple_metric_alarm["EvaluationPeriods"].should.equal(1) assert multiple_metric_alarm["EvaluationPeriods"] == 1
multiple_metric_alarm["DatapointsToAlarm"].should.equal(1) assert multiple_metric_alarm["DatapointsToAlarm"] == 1
multiple_metric_alarm["Metrics"].should.equal(metric_data_queries) assert multiple_metric_alarm["Metrics"] == metric_data_queries
multiple_metric_alarm["ComparisonOperator"].should.equal("GreaterThanThreshold") assert multiple_metric_alarm["ComparisonOperator"] == "GreaterThanThreshold"
multiple_metric_alarm["Threshold"].should.equal(1.0) assert multiple_metric_alarm["Threshold"] == 1.0
multiple_metric_alarm["ActionsEnabled"].should.equal(True) assert multiple_metric_alarm["ActionsEnabled"] is True
@mock_cloudwatch @mock_cloudwatch
@ -239,17 +234,17 @@ def test_alarm_state():
) )
resp = client.describe_alarms(StateValue="ALARM") resp = client.describe_alarms(StateValue="ALARM")
len(resp["MetricAlarms"]).should.equal(1) assert len(resp["MetricAlarms"]) == 1
resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm1") assert resp["MetricAlarms"][0]["AlarmName"] == "testalarm1"
resp["MetricAlarms"][0]["StateValue"].should.equal("ALARM") assert resp["MetricAlarms"][0]["StateValue"] == "ALARM"
resp["MetricAlarms"][0]["ActionsEnabled"].should.equal(True) assert resp["MetricAlarms"][0]["ActionsEnabled"] is True
resp = client.describe_alarms(StateValue="OK") resp = client.describe_alarms(StateValue="OK")
len(resp["MetricAlarms"]).should.equal(1) assert len(resp["MetricAlarms"]) == 1
resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm2") assert resp["MetricAlarms"][0]["AlarmName"] == "testalarm2"
resp["MetricAlarms"][0]["StateValue"].should.equal("OK") assert resp["MetricAlarms"][0]["StateValue"] == "OK"
resp["MetricAlarms"][0]["ActionsEnabled"].should.equal(True) assert resp["MetricAlarms"][0]["ActionsEnabled"] is True
# Just for sanity # Just for sanity
resp = client.describe_alarms() resp = client.describe_alarms()
len(resp["MetricAlarms"]).should.equal(2) assert len(resp["MetricAlarms"]) == 2

View File

@ -1,6 +1,5 @@
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -23,9 +22,7 @@ def test_put_metric_data_no_dimensions():
) )
metrics = conn.list_metrics()["Metrics"] metrics = conn.list_metrics()["Metrics"]
metrics.should.contain( assert {"Namespace": "tester", "MetricName": "metric", "Dimensions": []} in metrics
{"Namespace": "tester", "MetricName": "metric", "Dimensions": []}
)
@mock_cloudwatch @mock_cloudwatch
@ -45,9 +42,10 @@ def test_put_metric_data_can_not_have_nan():
], ],
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert (
"The value NaN for parameter MetricData.member.1.Value is invalid." err["Message"]
== "The value NaN for parameter MetricData.member.1.Value is invalid."
) )
@ -69,9 +67,10 @@ def test_put_metric_data_can_not_have_value_and_values():
], ],
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert (
"The parameters MetricData.member.1.Value and MetricData.member.1.Values are mutually exclusive and you have specified both." err["Message"]
== "The parameters MetricData.member.1.Value and MetricData.member.1.Values are mutually exclusive and you have specified both."
) )
@ -93,9 +92,10 @@ def test_put_metric_data_can_not_have_and_values_mismatched_counts():
], ],
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert (
"The parameters MetricData.member.1.Values and MetricData.member.1.Counts must be of the same size." err["Message"]
== "The parameters MetricData.member.1.Values and MetricData.member.1.Counts must be of the same size."
) )
@ -125,9 +125,9 @@ def test_put_metric_data_values_and_counts():
Statistics=["SampleCount", "Sum", "Maximum"], Statistics=["SampleCount", "Sum", "Maximum"],
) )
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(6.0) assert datapoint["SampleCount"] == 6.0
datapoint["Sum"].should.equal(42.0) assert datapoint["Sum"] == 42.0
datapoint["Maximum"].should.equal(10.0) assert datapoint["Maximum"] == 10.0
@mock_cloudwatch @mock_cloudwatch
@ -155,9 +155,9 @@ def test_put_metric_data_values_without_counts():
Statistics=["SampleCount", "Sum", "Maximum"], Statistics=["SampleCount", "Sum", "Maximum"],
) )
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(3.0) assert datapoint["SampleCount"] == 3.0
datapoint["Sum"].should.equal(34.45) assert datapoint["Sum"] == 34.45
datapoint["Maximum"].should.equal(23.45) assert datapoint["Maximum"] == 23.45
@mock_cloudwatch @mock_cloudwatch
@ -177,9 +177,10 @@ def test_put_metric_data_value_and_statistics():
], ],
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterCombination") assert err["Code"] == "InvalidParameterCombination"
err["Message"].should.equal( assert (
"The parameters MetricData.member.1.Value and MetricData.member.1.StatisticValues are mutually exclusive and you have specified both." err["Message"]
== "The parameters MetricData.member.1.Value and MetricData.member.1.StatisticValues are mutually exclusive and you have specified both."
) )
@ -205,9 +206,11 @@ def test_put_metric_data_with_statistics():
) )
metrics = conn.list_metrics()["Metrics"] metrics = conn.list_metrics()["Metrics"]
metrics.should.contain( assert {
{"Namespace": "tester", "MetricName": "statmetric", "Dimensions": []} "Namespace": "tester",
) "MetricName": "statmetric",
"Dimensions": [],
} in metrics
stats = conn.get_metric_statistics( stats = conn.get_metric_statistics(
Namespace="tester", Namespace="tester",
@ -218,13 +221,13 @@ def test_put_metric_data_with_statistics():
Statistics=["SampleCount", "Sum", "Maximum", "Minimum", "Average"], Statistics=["SampleCount", "Sum", "Maximum", "Minimum", "Average"],
) )
stats["Datapoints"].should.have.length_of(1) assert len(stats["Datapoints"]) == 1
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(3.0) assert datapoint["SampleCount"] == 3.0
datapoint["Sum"].should.equal(123.0) assert datapoint["Sum"] == 123.0
datapoint["Minimum"].should.equal(12.0) assert datapoint["Minimum"] == 12.0
datapoint["Maximum"].should.equal(100.0) assert datapoint["Maximum"] == 100.0
datapoint["Average"].should.equal(41.0) assert datapoint["Average"] == 41.0
# add single value # add single value
conn.put_metric_data( conn.put_metric_data(
@ -248,13 +251,13 @@ def test_put_metric_data_with_statistics():
Statistics=["SampleCount", "Sum", "Maximum", "Minimum", "Average"], Statistics=["SampleCount", "Sum", "Maximum", "Minimum", "Average"],
) )
stats["Datapoints"].should.have.length_of(1) assert len(stats["Datapoints"]) == 1
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(4.0) assert datapoint["SampleCount"] == 4.0
datapoint["Sum"].should.equal(224.0) assert datapoint["Sum"] == 224.0
datapoint["Minimum"].should.equal(12.0) assert datapoint["Minimum"] == 12.0
datapoint["Maximum"].should.equal(101.0) assert datapoint["Maximum"] == 101.0
datapoint["Average"].should.equal(56.0) assert datapoint["Average"] == 56.0
@mock_cloudwatch @mock_cloudwatch
@ -276,10 +279,10 @@ def test_get_metric_statistics():
Statistics=["SampleCount", "Sum"], Statistics=["SampleCount", "Sum"],
) )
stats["Datapoints"].should.have.length_of(1) assert len(stats["Datapoints"]) == 1
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(1.0) assert datapoint["SampleCount"] == 1.0
datapoint["Sum"].should.equal(1.5) assert datapoint["Sum"] == 1.5
@mock_cloudwatch @mock_cloudwatch
@ -303,8 +306,8 @@ def test_get_metric_invalid_parameter_combination():
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterCombination") assert err["Code"] == "InvalidParameterCombination"
err["Message"].should.equal("Must specify either Statistics or ExtendedStatistics") assert err["Message"] == "Must specify either Statistics or ExtendedStatistics"
@mock_cloudwatch @mock_cloudwatch
@ -370,10 +373,10 @@ def test_get_metric_statistics_dimensions():
Statistics=["Average", "Sum"], Statistics=["Average", "Sum"],
**params[0], **params[0],
) )
stats["Datapoints"].should.have.length_of(1) assert len(stats["Datapoints"]) == 1
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["Sum"].should.equal(params[1]) assert datapoint["Sum"] == params[1]
datapoint["Average"].should.equal(params[2]) assert datapoint["Average"] == params[2]
@mock_cloudwatch @mock_cloudwatch
@ -396,11 +399,12 @@ def test_get_metric_statistics_endtime_sooner_than_starttime():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("GetMetricStatistics") assert ex.operation_name == "GetMetricStatistics"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("InvalidParameterValue") assert ex.response["Error"]["Code"] == "InvalidParameterValue"
ex.response["Error"]["Message"].should.equal( assert (
"The parameter StartTime must be less than the parameter EndTime." ex.response["Error"]["Message"]
== "The parameter StartTime must be less than the parameter EndTime."
) )
@ -424,11 +428,12 @@ def test_get_metric_statistics_starttime_endtime_equals():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("GetMetricStatistics") assert ex.operation_name == "GetMetricStatistics"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("InvalidParameterValue") assert ex.response["Error"]["Code"] == "InvalidParameterValue"
ex.response["Error"]["Message"].should.equal( assert (
"The parameter StartTime must be less than the parameter EndTime." ex.response["Error"]["Message"]
== "The parameter StartTime must be less than the parameter EndTime."
) )
@ -452,11 +457,12 @@ def test_get_metric_statistics_starttime_endtime_within_1_second():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("GetMetricStatistics") assert ex.operation_name == "GetMetricStatistics"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("InvalidParameterValue") assert ex.response["Error"]["Code"] == "InvalidParameterValue"
ex.response["Error"]["Message"].should.equal( assert (
"The parameter StartTime must be less than the parameter EndTime." ex.response["Error"]["Message"]
== "The parameter StartTime must be less than the parameter EndTime."
) )
@ -485,10 +491,10 @@ def test_get_metric_statistics_starttime_endtime_ignore_miliseconds():
Statistics=["SampleCount", "Sum"], Statistics=["SampleCount", "Sum"],
) )
stats["Datapoints"].should.have.length_of(1) assert len(stats["Datapoints"]) == 1
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(1.0) assert datapoint["SampleCount"] == 1.0
datapoint["Sum"].should.equal(1.5) assert datapoint["Sum"] == 1.5
@mock_cloudwatch @mock_cloudwatch
@ -511,7 +517,7 @@ def test_duplicate_put_metric_data():
result = conn.list_metrics( result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}] Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
)["Metrics"] )["Metrics"]
len(result).should.equal(1) assert len(result) == 1
conn.put_metric_data( conn.put_metric_data(
Namespace="tester", Namespace="tester",
@ -528,16 +534,14 @@ def test_duplicate_put_metric_data():
result = conn.list_metrics( result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}] Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
)["Metrics"] )["Metrics"]
len(result).should.equal(1) assert len(result) == 1
result.should.equal( assert result == [
[
{ {
"Namespace": "tester", "Namespace": "tester",
"MetricName": "metric", "MetricName": "metric",
"Dimensions": [{"Name": "Name", "Value": "B"}], "Dimensions": [{"Name": "Name", "Value": "B"}],
} }
] ]
)
conn.put_metric_data( conn.put_metric_data(
Namespace="tester", Namespace="tester",
@ -557,8 +561,7 @@ def test_duplicate_put_metric_data():
result = conn.list_metrics( result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}] Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
)["Metrics"] )["Metrics"]
result.should.equal( assert result == [
[
{ {
"Namespace": "tester", "Namespace": "tester",
"MetricName": "metric", "MetricName": "metric",
@ -573,13 +576,11 @@ def test_duplicate_put_metric_data():
], ],
}, },
] ]
)
result = conn.list_metrics( result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "C"}] Namespace="tester", Dimensions=[{"Name": "Name", "Value": "C"}]
)["Metrics"] )["Metrics"]
result.should.equal( assert result == [
[
{ {
"Namespace": "tester", "Namespace": "tester",
"MetricName": "metric", "MetricName": "metric",
@ -589,7 +590,6 @@ def test_duplicate_put_metric_data():
], ],
} }
] ]
)
@mock_cloudwatch @mock_cloudwatch
@ -627,7 +627,7 @@ def test_list_metrics():
cloudwatch = boto3.client("cloudwatch", "eu-west-1") cloudwatch = boto3.client("cloudwatch", "eu-west-1")
# Verify namespace has to exist # Verify namespace has to exist
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"] res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
res.should.equal([]) assert res == []
# Create some metrics to filter on # Create some metrics to filter on
create_metrics(cloudwatch, namespace="list_test_1/", metrics=4, data_points=2) create_metrics(cloudwatch, namespace="list_test_1/", metrics=4, data_points=2)
create_metrics(cloudwatch, namespace="list_test_2/", metrics=4, data_points=2) create_metrics(cloudwatch, namespace="list_test_2/", metrics=4, data_points=2)
@ -636,67 +636,67 @@ def test_list_metrics():
assert len(res) >= 16 # 2 namespaces * 4 metrics * 2 data points assert len(res) >= 16 # 2 namespaces * 4 metrics * 2 data points
# Verify we can filter by namespace/metric name # Verify we can filter by namespace/metric name
res = cloudwatch.list_metrics(Namespace="list_test_1/")["Metrics"] res = cloudwatch.list_metrics(Namespace="list_test_1/")["Metrics"]
res.should.have.length_of(8) # 1 namespace * 4 metrics * 2 data points assert len(res) == 8 # 1 namespace * 4 metrics * 2 data points
res = cloudwatch.list_metrics(Namespace="list_test_1/", MetricName="metric1")[ res = cloudwatch.list_metrics(Namespace="list_test_1/", MetricName="metric1")[
"Metrics" "Metrics"
] ]
res.should.have.length_of(2) # 1 namespace * 1 metrics * 2 data points assert len(res) == 2 # 1 namespace * 1 metrics * 2 data points
# Verify format # Verify format
res.should.equal( assert res == [
[
{"Namespace": "list_test_1/", "Dimensions": [], "MetricName": "metric1"}, {"Namespace": "list_test_1/", "Dimensions": [], "MetricName": "metric1"},
{"Namespace": "list_test_1/", "Dimensions": [], "MetricName": "metric1"}, {"Namespace": "list_test_1/", "Dimensions": [], "MetricName": "metric1"},
] ]
)
# Verify unknown namespace still has no results # Verify unknown namespace still has no results
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"] res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
res.should.equal([]) assert res == []
@mock_cloudwatch @mock_cloudwatch
def test_list_metrics_paginated(): def test_list_metrics_paginated():
cloudwatch = boto3.client("cloudwatch", "eu-west-1") cloudwatch = boto3.client("cloudwatch", "eu-west-1")
# Verify that only a single page of metrics is returned # Verify that only a single page of metrics is returned
cloudwatch.list_metrics().shouldnt.have.key("NextToken") assert "NextToken" not in cloudwatch.list_metrics()
# Verify we can't pass a random NextToken # Verify we can't pass a random NextToken
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
cloudwatch.list_metrics(NextToken=str(uuid4())) cloudwatch.list_metrics(NextToken=str(uuid4()))
e.value.response["Error"]["Message"].should.equal( assert (
"Request parameter NextToken is invalid" e.value.response["Error"]["Message"] == "Request parameter NextToken is invalid"
) )
# Add a boatload of metrics # Add a boatload of metrics
create_metrics(cloudwatch, namespace="test", metrics=100, data_points=1) create_metrics(cloudwatch, namespace="test", metrics=100, data_points=1)
# Verify that a single page is returned until we've reached 500 # Verify that a single page is returned until we've reached 500
first_page = cloudwatch.list_metrics(Namespace="test") first_page = cloudwatch.list_metrics(Namespace="test")
first_page["Metrics"].should.have.length_of(100) assert len(first_page["Metrics"]) == 100
len(first_page["Metrics"]).should.equal(100) assert len(first_page["Metrics"]) == 100
create_metrics(cloudwatch, namespace="test", metrics=200, data_points=2) create_metrics(cloudwatch, namespace="test", metrics=200, data_points=2)
first_page = cloudwatch.list_metrics(Namespace="test") first_page = cloudwatch.list_metrics(Namespace="test")
len(first_page["Metrics"]).should.equal(500) assert len(first_page["Metrics"]) == 500
first_page.shouldnt.contain("NextToken") assert "NextToken" not in first_page
# Verify that adding more data points results in pagination # Verify that adding more data points results in pagination
create_metrics(cloudwatch, namespace="test", metrics=60, data_points=10) create_metrics(cloudwatch, namespace="test", metrics=60, data_points=10)
first_page = cloudwatch.list_metrics(Namespace="test") first_page = cloudwatch.list_metrics(Namespace="test")
len(first_page["Metrics"]).should.equal(500) assert len(first_page["Metrics"]) == 500
first_page["NextToken"].shouldnt.equal(None)
# Retrieve second page - and verify there's more where that came from # Retrieve second page - and verify there's more where that came from
second_page = cloudwatch.list_metrics( second_page = cloudwatch.list_metrics(
Namespace="test", NextToken=first_page["NextToken"] Namespace="test", NextToken=first_page["NextToken"]
) )
len(second_page["Metrics"]).should.equal(500) assert len(second_page["Metrics"]) == 500
second_page.should.contain("NextToken")
# Last page should only have the last 100 results, and no NextToken (indicating that pagination is finished) # Last page should only have the last 100 results, and no NextToken (indicating that pagination is finished)
third_page = cloudwatch.list_metrics( third_page = cloudwatch.list_metrics(
Namespace="test", NextToken=second_page["NextToken"] Namespace="test", NextToken=second_page["NextToken"]
) )
len(third_page["Metrics"]).should.equal(100) assert len(third_page["Metrics"]) == 100
third_page.shouldnt.contain("NextToken") assert "NextToken" not in third_page
# Verify that we can't reuse an existing token # Verify that we can't reuse an existing token
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
cloudwatch.list_metrics(Namespace="test", NextToken=first_page["NextToken"]) cloudwatch.list_metrics(Namespace="test", NextToken=first_page["NextToken"])
e.value.response["Error"]["Message"].should.equal( assert (
"Request parameter NextToken is invalid" e.value.response["Error"]["Message"] == "Request parameter NextToken is invalid"
) )
@ -707,16 +707,16 @@ def test_list_metrics_without_value():
create_metrics_with_dimensions(cloudwatch, namespace="MyNamespace", data_points=3) create_metrics_with_dimensions(cloudwatch, namespace="MyNamespace", data_points=3)
# Verify we can filter by namespace/metric name # Verify we can filter by namespace/metric name
res = cloudwatch.list_metrics(Namespace="MyNamespace")["Metrics"] res = cloudwatch.list_metrics(Namespace="MyNamespace")["Metrics"]
res.should.have.length_of(3) assert len(res) == 3
# Verify we can filter by Dimension without value # Verify we can filter by Dimension without value
results = cloudwatch.list_metrics( results = cloudwatch.list_metrics(
Namespace="MyNamespace", MetricName="MyMetric", Dimensions=[{"Name": "D1"}] Namespace="MyNamespace", MetricName="MyMetric", Dimensions=[{"Name": "D1"}]
)["Metrics"] )["Metrics"]
results.should.have.length_of(1) assert len(results) == 1
results[0]["Namespace"].should.equals("MyNamespace") assert results[0]["Namespace"] == "MyNamespace"
results[0]["MetricName"].should.equal("MyMetric") assert results[0]["MetricName"] == "MyMetric"
results[0]["Dimensions"].should.equal([{"Name": "D1", "Value": "V1"}]) assert results[0]["Dimensions"] == [{"Name": "D1", "Value": "V1"}]
@mock_cloudwatch @mock_cloudwatch
@ -747,7 +747,7 @@ def test_list_metrics_with_same_dimensions_different_metric_name():
) )
results = cloudwatch.list_metrics(Namespace="unique/")["Metrics"] results = cloudwatch.list_metrics(Namespace="unique/")["Metrics"]
results.should.have.length_of(2) assert len(results) == 2
# duplicating existing metric # duplicating existing metric
cloudwatch.put_metric_data( cloudwatch.put_metric_data(
@ -763,7 +763,7 @@ def test_list_metrics_with_same_dimensions_different_metric_name():
# asserting only unique values are returned # asserting only unique values are returned
results = cloudwatch.list_metrics(Namespace="unique/")["Metrics"] results = cloudwatch.list_metrics(Namespace="unique/")["Metrics"]
results.should.have.length_of(2) assert len(results) == 2
def create_metrics(cloudwatch, namespace, metrics=5, data_points=5): def create_metrics(cloudwatch, namespace, metrics=5, data_points=5):
@ -831,10 +831,10 @@ def test_get_metric_data_for_multiple_metrics_w_same_dimensions():
EndTime=utc_now + timedelta(seconds=60), EndTime=utc_now + timedelta(seconds=60),
) )
# #
len(response1["MetricDataResults"]).should.equal(1) assert len(response1["MetricDataResults"]) == 1
res1 = response1["MetricDataResults"][0] res1 = response1["MetricDataResults"][0]
res1["Values"].should.equal([50.0]) assert res1["Values"] == [50.0]
# get_metric_data 2 # get_metric_data 2
response2 = cloudwatch.get_metric_data( response2 = cloudwatch.get_metric_data(
@ -856,10 +856,10 @@ def test_get_metric_data_for_multiple_metrics_w_same_dimensions():
EndTime=utc_now + timedelta(seconds=60), EndTime=utc_now + timedelta(seconds=60),
) )
# #
len(response2["MetricDataResults"]).should.equal(1) assert len(response2["MetricDataResults"]) == 1
res2 = response2["MetricDataResults"][0] res2 = response2["MetricDataResults"][0]
res2["Values"].should.equal([25.0]) assert res2["Values"] == [25.0]
@mock_cloudwatch @mock_cloudwatch
@ -897,30 +897,30 @@ def test_get_metric_data_within_timeframe():
avg = [ avg = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Average" res for res in response["MetricDataResults"] if res["Id"] == "result_Average"
][0] ][0]
avg["Label"].should.equal("metric1 Average") assert avg["Label"] == "metric1 Average"
avg["StatusCode"].should.equal("Complete") assert avg["StatusCode"] == "Complete"
[int(val) for val in avg["Values"]].should.equal([19]) assert [int(val) for val in avg["Values"]] == [19]
sum_ = [res for res in response["MetricDataResults"] if res["Id"] == "result_Sum"][ sum_ = [res for res in response["MetricDataResults"] if res["Id"] == "result_Sum"][
0 0
] ]
sum_["Label"].should.equal("metric1 Sum") assert sum_["Label"] == "metric1 Sum"
sum_["StatusCode"].should.equal("Complete") assert sum_["StatusCode"] == "Complete"
[val for val in sum_["Values"]].should.equal([sum(values)]) assert [val for val in sum_["Values"]] == [sum(values)]
min_ = [ min_ = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Minimum" res for res in response["MetricDataResults"] if res["Id"] == "result_Minimum"
][0] ][0]
min_["Label"].should.equal("metric1 Minimum") assert min_["Label"] == "metric1 Minimum"
min_["StatusCode"].should.equal("Complete") assert min_["StatusCode"] == "Complete"
[int(val) for val in min_["Values"]].should.equal([0]) assert [int(val) for val in min_["Values"]] == [0]
max_ = [ max_ = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Maximum" res for res in response["MetricDataResults"] if res["Id"] == "result_Maximum"
][0] ][0]
max_["Label"].should.equal("metric1 Maximum") assert max_["Label"] == "metric1 Maximum"
max_["StatusCode"].should.equal("Complete") assert max_["StatusCode"] == "Complete"
[int(val) for val in max_["Values"]].should.equal([100]) assert [int(val) for val in max_["Values"]] == [100]
@mock_cloudwatch @mock_cloudwatch
@ -1010,17 +1010,17 @@ def test_get_metric_data_partially_within_timeframe():
) )
# Assert Last week's data is not returned # Assert Last week's data is not returned
len(response["MetricDataResults"]).should.equal(1) assert len(response["MetricDataResults"]) == 1
sum_ = response["MetricDataResults"][0] sum_ = response["MetricDataResults"][0]
sum_["Label"].should.equal("metric1 Sum") assert sum_["Label"] == "metric1 Sum"
sum_["StatusCode"].should.equal("Complete") assert sum_["StatusCode"] == "Complete"
sum_["Values"].should.equal([20.0, 10.0]) assert sum_["Values"] == [20.0, 10.0]
response = get_data( response = get_data(
start=yesterday - timedelta(seconds=60), start=yesterday - timedelta(seconds=60),
end=utc_now + timedelta(seconds=60), end=utc_now + timedelta(seconds=60),
scanBy="TimestampDescending", scanBy="TimestampDescending",
) )
response["MetricDataResults"][0]["Values"].should.equal([10.0, 20.0]) assert response["MetricDataResults"][0]["Values"] == [10.0, 20.0]
response = get_data( response = get_data(
start=last_week - timedelta(seconds=1), start=last_week - timedelta(seconds=1),
@ -1028,7 +1028,7 @@ def test_get_metric_data_partially_within_timeframe():
stat="Average", stat="Average",
) )
# assert average # assert average
response["MetricDataResults"][0]["Values"].should.equal([30.0, 20.0, 10.0]) assert response["MetricDataResults"][0]["Values"] == [30.0, 20.0, 10.0]
response = get_data( response = get_data(
start=last_week - timedelta(seconds=1), start=last_week - timedelta(seconds=1),
@ -1036,7 +1036,7 @@ def test_get_metric_data_partially_within_timeframe():
stat="Maximum", stat="Maximum",
) )
# assert maximum # assert maximum
response["MetricDataResults"][0]["Values"].should.equal([50.0, 20.0, 10.0]) assert response["MetricDataResults"][0]["Values"] == [50.0, 20.0, 10.0]
response = get_data( response = get_data(
start=last_week - timedelta(seconds=1), start=last_week - timedelta(seconds=1),
@ -1044,7 +1044,7 @@ def test_get_metric_data_partially_within_timeframe():
stat="Minimum", stat="Minimum",
) )
# assert minimum # assert minimum
response["MetricDataResults"][0]["Values"].should.equal([10.0, 20.0, 10.0]) assert response["MetricDataResults"][0]["Values"] == [10.0, 20.0, 10.0]
@mock_cloudwatch @mock_cloudwatch
@ -1082,10 +1082,10 @@ def test_get_metric_data_outside_timeframe():
) )
# #
# Assert Last week's data is not returned # Assert Last week's data is not returned
len(response["MetricDataResults"]).should.equal(1) assert len(response["MetricDataResults"]) == 1
response["MetricDataResults"][0]["Id"].should.equal("result") assert response["MetricDataResults"][0]["Id"] == "result"
response["MetricDataResults"][0]["StatusCode"].should.equal("Complete") assert response["MetricDataResults"][0]["StatusCode"] == "Complete"
response["MetricDataResults"][0]["Values"].should.equal([]) assert response["MetricDataResults"][0]["Values"] == []
@mock_cloudwatch @mock_cloudwatch
@ -1140,13 +1140,13 @@ def test_get_metric_data_for_multiple_metrics():
EndTime=utc_now + timedelta(seconds=60), EndTime=utc_now + timedelta(seconds=60),
) )
# #
len(response["MetricDataResults"]).should.equal(2) assert len(response["MetricDataResults"]) == 2
res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0] res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0]
res1["Values"].should.equal([50.0]) assert res1["Values"] == [50.0]
res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0] res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0]
res2["Values"].should.equal([25.0]) assert res2["Values"] == [25.0]
@mock_cloudwatch @mock_cloudwatch
@ -1238,23 +1238,23 @@ def test_get_metric_data_for_dimensions():
EndTime=utc_now + timedelta(seconds=60), EndTime=utc_now + timedelta(seconds=60),
) )
# #
len(response["MetricDataResults"]).should.equal(4) assert len(response["MetricDataResults"]) == 4
res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0] res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0]
# expect sample count for dimension_frankfurt # expect sample count for dimension_frankfurt
res1["Values"].should.equal([1.0]) assert res1["Values"] == [1.0]
res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0] res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0]
# expect sum for dimension_berlin # expect sum for dimension_berlin
res2["Values"].should.equal([50.0]) assert res2["Values"] == [50.0]
res3 = [res for res in response["MetricDataResults"] if res["Id"] == "result3"][0] res3 = [res for res in response["MetricDataResults"] if res["Id"] == "result3"][0]
# expect no result, as server_prod is only a part of other dimensions, e.g. there is no match # expect no result, as server_prod is only a part of other dimensions, e.g. there is no match
res3["Values"].should.equal([]) assert res3["Values"] == []
res4 = [res for res in response["MetricDataResults"] if res["Id"] == "result4"][0] res4 = [res for res in response["MetricDataResults"] if res["Id"] == "result4"][0]
# expect sum of both metrics, as we did not filter for dimensions # expect sum of both metrics, as we did not filter for dimensions
res4["Values"].should.equal([75.0]) assert res4["Values"] == [75.0]
@mock_cloudwatch @mock_cloudwatch
@ -1325,8 +1325,8 @@ def test_get_metric_data_for_unit():
response["MetricDataResults"], response["MetricDataResults"],
) )
) )
len(metric_result_data).should.equal(1) assert len(metric_result_data) == 1
metric_result_data[0]["Values"][0].should.equal(expected_value) assert metric_result_data[0]["Values"][0] == expected_value
@mock_cloudwatch @mock_cloudwatch
@ -1358,12 +1358,11 @@ def test_get_metric_data_endtime_sooner_than_starttime():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("GetMetricData") assert ex.operation_name == "GetMetricData"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("ValidationError") err = ex.response["Error"]
ex.response["Error"]["Message"].should.equal( assert err["Code"] == "ValidationError"
"The parameter EndTime must be greater than StartTime." assert err["Message"] == "The parameter EndTime must be greater than StartTime."
)
@mock_cloudwatch @mock_cloudwatch
@ -1395,12 +1394,11 @@ def test_get_metric_data_starttime_endtime_equals():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("GetMetricData") assert ex.operation_name == "GetMetricData"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("ValidationError") err = ex.response["Error"]
ex.response["Error"]["Message"].should.equal( assert err["Code"] == "ValidationError"
"The parameter StartTime must not equal parameter EndTime." assert err["Message"] == "The parameter StartTime must not equal parameter EndTime."
)
@mock_cloudwatch @mock_cloudwatch
@ -1432,11 +1430,12 @@ def test_get_metric_data_starttime_endtime_within_1_second():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("GetMetricData") assert ex.operation_name == "GetMetricData"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("ValidationError") assert ex.response["Error"]["Code"] == "ValidationError"
ex.response["Error"]["Message"].should.equal( assert (
"The parameter StartTime must not equal parameter EndTime." ex.response["Error"]["Message"]
== "The parameter StartTime must not equal parameter EndTime."
) )
@ -1476,9 +1475,9 @@ def test_get_metric_data_starttime_endtime_ignore_miliseconds():
EndTime=(utc_now + timedelta(seconds=1)).replace(microsecond=0), EndTime=(utc_now + timedelta(seconds=1)).replace(microsecond=0),
) )
len(response["MetricDataResults"]).should.equal(1) assert len(response["MetricDataResults"]) == 1
response["MetricDataResults"][0]["Id"].should.equal("test") assert response["MetricDataResults"][0]["Id"] == "test"
response["MetricDataResults"][0]["Values"][0].should.equal(1.0) assert response["MetricDataResults"][0]["Values"][0] == 1.0
@mock_cloudwatch @mock_cloudwatch
@ -1501,27 +1500,23 @@ def test_cloudwatch_return_s3_metrics():
)["Metrics"] )["Metrics"]
# then # then
metrics.should.have.length_of(2) assert len(metrics) == 2
metrics.should.contain( assert {
{
"Namespace": "AWS/S3", "Namespace": "AWS/S3",
"MetricName": "NumberOfObjects", "MetricName": "NumberOfObjects",
"Dimensions": [ "Dimensions": [
{"Name": "StorageType", "Value": "AllStorageTypes"}, {"Name": "StorageType", "Value": "AllStorageTypes"},
{"Name": "BucketName", "Value": bucket_name}, {"Name": "BucketName", "Value": bucket_name},
], ],
} } in metrics
) assert {
metrics.should.contain(
{
"Namespace": "AWS/S3", "Namespace": "AWS/S3",
"MetricName": "BucketSizeBytes", "MetricName": "BucketSizeBytes",
"Dimensions": [ "Dimensions": [
{"Name": "StorageType", "Value": "StandardStorage"}, {"Name": "StorageType", "Value": "StandardStorage"},
{"Name": "BucketName", "Value": bucket_name}, {"Name": "BucketName", "Value": bucket_name},
], ],
} } in metrics
)
# when # when
stats = cloudwatch.get_metric_statistics( stats = cloudwatch.get_metric_statistics(
@ -1539,11 +1534,11 @@ def test_cloudwatch_return_s3_metrics():
) )
# then # then
stats.should.have.key("Label").equal("BucketSizeBytes") assert stats["Label"] == "BucketSizeBytes"
stats.should.have.key("Datapoints").length_of(1) assert len(stats["Datapoints"]) == 1
data_point = stats["Datapoints"][0] data_point = stats["Datapoints"][0]
data_point.should.have.key("Average").being.above(0) assert data_point["Average"] > 0
data_point.should.have.key("Unit").being.equal("Bytes") assert data_point["Unit"] == "Bytes"
# when # when
stats = cloudwatch.get_metric_statistics( stats = cloudwatch.get_metric_statistics(
@ -1560,11 +1555,11 @@ def test_cloudwatch_return_s3_metrics():
) )
# then # then
stats.should.have.key("Label").equal("NumberOfObjects") assert stats["Label"] == "NumberOfObjects"
stats.should.have.key("Datapoints").length_of(1) assert len(stats["Datapoints"]) == 1
data_point = stats["Datapoints"][0] data_point = stats["Datapoints"][0]
data_point.should.have.key("Average").being.equal(1) assert data_point["Average"] == 1
data_point.should.have.key("Unit").being.equal("Count") assert data_point["Unit"] == "Count"
s3_client.delete_object(Bucket=bucket_name, Key="file.txt") s3_client.delete_object(Bucket=bucket_name, Key="file.txt")
s3_client.delete_bucket(Bucket=bucket_name) s3_client.delete_bucket(Bucket=bucket_name)
@ -1605,43 +1600,37 @@ def test_put_metric_alarm():
# then # then
alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"] alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"]
alarms.should.have.length_of(1) assert len(alarms) == 1
alarm = alarms[0] alarm = alarms[0]
alarm["AlarmName"].should.equal(alarm_name) assert alarm["AlarmName"] == alarm_name
alarm["AlarmArn"].should.equal( assert (
f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}" alarm["AlarmArn"]
== f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}"
) )
alarm["AlarmDescription"].should.equal("test alarm") assert alarm["AlarmDescription"] == "test alarm"
alarm["AlarmConfigurationUpdatedTimestamp"].should.be.a(datetime) assert alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo == tzutc()
alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo.should.equal(tzutc()) assert alarm["ActionsEnabled"] is True
alarm["ActionsEnabled"].should.equal(True) assert alarm["OKActions"] == [sns_topic_arn]
alarm["OKActions"].should.equal([sns_topic_arn]) assert alarm["AlarmActions"] == [sns_topic_arn]
alarm["AlarmActions"].should.equal([sns_topic_arn]) assert alarm["InsufficientDataActions"] == [sns_topic_arn]
alarm["InsufficientDataActions"].should.equal([sns_topic_arn]) assert alarm["StateValue"] == "OK"
alarm["StateValue"].should.equal("OK") assert alarm["StateReason"] == "Unchecked: Initial alarm creation"
alarm["StateReason"].should.equal("Unchecked: Initial alarm creation") assert alarm["StateUpdatedTimestamp"].tzinfo == tzutc()
alarm["StateUpdatedTimestamp"].should.be.a(datetime) assert alarm["MetricName"] == "5XXError"
alarm["StateUpdatedTimestamp"].tzinfo.should.equal(tzutc()) assert alarm["Namespace"] == "AWS/ApiGateway"
alarm["MetricName"].should.equal("5XXError") assert alarm["Statistic"] == "Sum"
alarm["Namespace"].should.equal("AWS/ApiGateway") assert sorted(alarm["Dimensions"], key=itemgetter("Name")) == [
alarm["Statistic"].should.equal("Sum")
sorted(alarm["Dimensions"], key=itemgetter("Name")).should.equal(
sorted(
[
{"Name": "ApiName", "Value": "test-api"}, {"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"}, {"Name": "Stage", "Value": "default"},
], ]
key=itemgetter("Name"), assert alarm["Period"] == 60
) assert alarm["Unit"] == "Seconds"
) assert alarm["EvaluationPeriods"] == 1
alarm["Period"].should.equal(60) assert alarm["DatapointsToAlarm"] == 1
alarm["Unit"].should.equal("Seconds") assert alarm["Threshold"] == 1.0
alarm["EvaluationPeriods"].should.equal(1) assert alarm["ComparisonOperator"] == "GreaterThanOrEqualToThreshold"
alarm["DatapointsToAlarm"].should.equal(1) assert alarm["TreatMissingData"] == "notBreaching"
alarm["Threshold"].should.equal(1.0)
alarm["ComparisonOperator"].should.equal("GreaterThanOrEqualToThreshold")
alarm["TreatMissingData"].should.equal("notBreaching")
@mock_cloudwatch @mock_cloudwatch
@ -1675,41 +1664,35 @@ def test_put_metric_alarm_with_percentile():
# then # then
alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"] alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"]
alarms.should.have.length_of(1) assert len(alarms) == 1
alarm = alarms[0] alarm = alarms[0]
alarm["AlarmName"].should.equal(alarm_name) assert alarm["AlarmName"] == alarm_name
alarm["AlarmArn"].should.equal( assert (
f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}" alarm["AlarmArn"]
== f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}"
) )
alarm["AlarmDescription"].should.equal("test alarm") assert alarm["AlarmDescription"] == "test alarm"
alarm["AlarmConfigurationUpdatedTimestamp"].should.be.a(datetime) assert alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo == tzutc()
alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo.should.equal(tzutc()) assert alarm["ActionsEnabled"] is True
alarm["ActionsEnabled"].should.equal(True) assert alarm["StateValue"] == "OK"
alarm["StateValue"].should.equal("OK") assert alarm["StateReason"] == "Unchecked: Initial alarm creation"
alarm["StateReason"].should.equal("Unchecked: Initial alarm creation") assert alarm["StateUpdatedTimestamp"].tzinfo == tzutc()
alarm["StateUpdatedTimestamp"].should.be.a(datetime) assert alarm["MetricName"] == "5XXError"
alarm["StateUpdatedTimestamp"].tzinfo.should.equal(tzutc()) assert alarm["Namespace"] == "AWS/ApiGateway"
alarm["MetricName"].should.equal("5XXError") assert alarm["ExtendedStatistic"] == "p90"
alarm["Namespace"].should.equal("AWS/ApiGateway") assert sorted(alarm["Dimensions"], key=itemgetter("Name")) == [
alarm["ExtendedStatistic"].should.equal("p90")
sorted(alarm["Dimensions"], key=itemgetter("Name")).should.equal(
sorted(
[
{"Name": "ApiName", "Value": "test-api"}, {"Name": "ApiName", "Value": "test-api"},
{"Name": "Stage", "Value": "default"}, {"Name": "Stage", "Value": "default"},
], ]
key=itemgetter("Name"), assert alarm["Period"] == 60
) assert alarm["Unit"] == "Seconds"
) assert alarm["EvaluationPeriods"] == 1
alarm["Period"].should.equal(60) assert alarm["DatapointsToAlarm"] == 1
alarm["Unit"].should.equal("Seconds") assert alarm["Threshold"] == 1.0
alarm["EvaluationPeriods"].should.equal(1) assert alarm["ComparisonOperator"] == "GreaterThanOrEqualToThreshold"
alarm["DatapointsToAlarm"].should.equal(1) assert alarm["TreatMissingData"] == "notBreaching"
alarm["Threshold"].should.equal(1.0) assert alarm["EvaluateLowSampleCountPercentile"] == "ignore"
alarm["ComparisonOperator"].should.equal("GreaterThanOrEqualToThreshold")
alarm["TreatMissingData"].should.equal("notBreaching")
alarm["EvaluateLowSampleCountPercentile"].should.equal("ignore")
@mock_cloudwatch @mock_cloudwatch
@ -1753,23 +1736,22 @@ def test_put_metric_alarm_with_anomaly_detection():
# then # then
alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"] alarms = client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"]
alarms.should.have.length_of(1) assert len(alarms) == 1
alarm = alarms[0] alarm = alarms[0]
alarm["AlarmName"].should.equal(alarm_name) assert alarm["AlarmName"] == alarm_name
alarm["AlarmArn"].should.equal( assert (
f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}" alarm["AlarmArn"]
== f"arn:aws:cloudwatch:{region_name}:{ACCOUNT_ID}:alarm:{alarm_name}"
) )
alarm["AlarmConfigurationUpdatedTimestamp"].should.be.a(datetime) assert alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo == tzutc()
alarm["AlarmConfigurationUpdatedTimestamp"].tzinfo.should.equal(tzutc()) assert alarm["StateValue"] == "OK"
alarm["StateValue"].should.equal("OK") assert alarm["StateReason"] == "Unchecked: Initial alarm creation"
alarm["StateReason"].should.equal("Unchecked: Initial alarm creation") assert alarm["StateUpdatedTimestamp"].tzinfo == tzutc()
alarm["StateUpdatedTimestamp"].should.be.a(datetime) assert alarm["EvaluationPeriods"] == 2
alarm["StateUpdatedTimestamp"].tzinfo.should.equal(tzutc()) assert alarm["ComparisonOperator"] == "GreaterThanOrEqualToThreshold"
alarm["EvaluationPeriods"].should.equal(2) assert alarm["Metrics"] == metrics
alarm["ComparisonOperator"].should.equal("GreaterThanOrEqualToThreshold") assert alarm["ThresholdMetricId"] == "t1"
alarm["Metrics"].should.equal(metrics)
alarm["ThresholdMetricId"].should.equal("t1")
@mock_cloudwatch @mock_cloudwatch
@ -1802,11 +1784,12 @@ def test_put_metric_alarm_error_extended_statistic():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("PutMetricAlarm") assert ex.operation_name == "PutMetricAlarm"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("InvalidParameterValue") assert ex.response["Error"]["Code"] == "InvalidParameterValue"
ex.response["Error"]["Message"].should.equal( assert (
"The value 90 for parameter ExtendedStatistic is not supported." ex.response["Error"]["Message"]
== "The value 90 for parameter ExtendedStatistic is not supported."
) )
@ -1841,11 +1824,12 @@ def test_put_metric_alarm_error_evaluate_low_sample_count_percentile():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("PutMetricAlarm") assert ex.operation_name == "PutMetricAlarm"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("ValidationError") err = ex.response["Error"]
ex.response["Error"]["Message"].should.equal( assert err["Code"] == "ValidationError"
"Option unknown is not supported. " assert (
err["Message"] == "Option unknown is not supported. "
"Supported options for parameter EvaluateLowSampleCountPercentile are evaluate and ignore." "Supported options for parameter EvaluateLowSampleCountPercentile are evaluate and ignore."
) )
@ -1917,5 +1901,5 @@ def test_get_metric_data_with_custom_label():
response["MetricDataResults"], response["MetricDataResults"],
) )
) )
len(metric_result_data).should.equal(1) assert len(metric_result_data) == 1
metric_result_data[0]["Label"].should.equal(expected_value) assert metric_result_data[0]["Label"] == expected_value

View File

@ -1,6 +1,6 @@
import boto3 import boto3
import pytest
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import sure # noqa # pylint: disable=unused-import
from moto import mock_cloudwatch from moto import mock_cloudwatch
@ -13,7 +13,7 @@ def test_put_list_dashboard():
client.put_dashboard(DashboardName="test1", DashboardBody=widget) client.put_dashboard(DashboardName="test1", DashboardBody=widget)
resp = client.list_dashboards() resp = client.list_dashboards()
len(resp["DashboardEntries"]).should.equal(1) assert len(resp["DashboardEntries"]) == 1
@mock_cloudwatch @mock_cloudwatch
@ -24,7 +24,7 @@ def test_put_list_prefix_nomatch_dashboard():
client.put_dashboard(DashboardName="test1", DashboardBody=widget) client.put_dashboard(DashboardName="test1", DashboardBody=widget)
resp = client.list_dashboards(DashboardNamePrefix="nomatch") resp = client.list_dashboards(DashboardNamePrefix="nomatch")
len(resp["DashboardEntries"]).should.equal(0) assert len(resp["DashboardEntries"]) == 0
@mock_cloudwatch @mock_cloudwatch
@ -38,7 +38,7 @@ def test_delete_dashboard():
client.delete_dashboards(DashboardNames=["test2", "test1"]) client.delete_dashboards(DashboardNames=["test2", "test1"])
resp = client.list_dashboards(DashboardNamePrefix="test3") resp = client.list_dashboards(DashboardNamePrefix="test3")
len(resp["DashboardEntries"]).should.equal(1) assert len(resp["DashboardEntries"]) == 1
@mock_cloudwatch @mock_cloudwatch
@ -49,16 +49,13 @@ def test_delete_dashboard_fail():
client.put_dashboard(DashboardName="test1", DashboardBody=widget) client.put_dashboard(DashboardName="test1", DashboardBody=widget)
client.put_dashboard(DashboardName="test2", DashboardBody=widget) client.put_dashboard(DashboardName="test2", DashboardBody=widget)
client.put_dashboard(DashboardName="test3", DashboardBody=widget) client.put_dashboard(DashboardName="test3", DashboardBody=widget)
# Doesnt delete anything if all dashboards to be deleted do not exist # Doesn't delete anything if some dashboards to be deleted do not exist
try: with pytest.raises(ClientError) as exc:
client.delete_dashboards(DashboardNames=["test2", "test1", "test_no_match"]) client.delete_dashboards(DashboardNames=["test2", "test1", "test_no_match"])
except ClientError as err: assert exc.value.response["Error"]["Code"] == "ResourceNotFound"
err.response["Error"]["Code"].should.equal("ResourceNotFound")
else:
raise RuntimeError("Should have raised error")
resp = client.list_dashboards() resp = client.list_dashboards()
len(resp["DashboardEntries"]).should.equal(3) assert len(resp["DashboardEntries"]) == 3
@mock_cloudwatch @mock_cloudwatch
@ -68,18 +65,15 @@ def test_get_dashboard():
client.put_dashboard(DashboardName="test1", DashboardBody=widget) client.put_dashboard(DashboardName="test1", DashboardBody=widget)
resp = client.get_dashboard(DashboardName="test1") resp = client.get_dashboard(DashboardName="test1")
resp.should.contain("DashboardArn") assert "DashboardArn" in resp
resp.should.contain("DashboardBody") assert "DashboardBody" in resp
resp["DashboardName"].should.equal("test1") assert resp["DashboardName"] == "test1"
@mock_cloudwatch @mock_cloudwatch
def test_get_dashboard_fail(): def test_get_dashboard_fail():
client = boto3.client("cloudwatch", region_name="eu-central-1") client = boto3.client("cloudwatch", region_name="eu-central-1")
try: with pytest.raises(ClientError) as exc:
client.get_dashboard(DashboardName="test1") client.get_dashboard(DashboardName="test1")
except ClientError as err: assert exc.value.response["Error"]["Code"] == "ResourceNotFound"
err.response["Error"]["Code"].should.equal("ResourceNotFound")
else:
raise RuntimeError("Should have raised error")

View File

@ -3,7 +3,6 @@ from operator import itemgetter
import boto3 import boto3
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from moto import mock_cloudwatch from moto import mock_cloudwatch
from moto.cloudwatch.utils import make_arn_for_alarm from moto.cloudwatch.utils import make_arn_for_alarm
@ -40,7 +39,7 @@ def test_list_tags_for_resource():
response = client.list_tags_for_resource(ResourceARN=arn) response = client.list_tags_for_resource(ResourceARN=arn)
# then # then
response["Tags"].should.equal([{"Key": "key-1", "Value": "value-1"}]) assert response["Tags"] == [{"Key": "key-1", "Value": "value-1"}]
@mock_cloudwatch @mock_cloudwatch
@ -57,7 +56,7 @@ def test_list_tags_for_resource_with_unknown_resource():
) )
# then # then
response["Tags"].should.equal([]) assert response["Tags"] == []
@mock_cloudwatch @mock_cloudwatch
@ -91,15 +90,10 @@ def test_tag_resource():
# then # then
response = client.list_tags_for_resource(ResourceARN=arn) response = client.list_tags_for_resource(ResourceARN=arn)
sorted(response["Tags"], key=itemgetter("Key")).should.equal( assert sorted(response["Tags"], key=itemgetter("Key")) == [
sorted(
[
{"Key": "key-1", "Value": "value-1"}, {"Key": "key-1", "Value": "value-1"},
{"Key": "key-2", "Value": "value-2"}, {"Key": "key-2", "Value": "value-2"},
], ]
key=itemgetter("Key"),
)
)
@mock_cloudwatch @mock_cloudwatch
@ -117,7 +111,7 @@ def test_tag_resource_on_resource_without_tags():
alarm_arn = alarms["MetricAlarms"][0]["AlarmArn"] alarm_arn = alarms["MetricAlarms"][0]["AlarmArn"]
# List 0 tags - none have been added # List 0 tags - none have been added
cw.list_tags_for_resource(ResourceARN=alarm_arn)["Tags"].should.equal([]) assert cw.list_tags_for_resource(ResourceARN=alarm_arn)["Tags"] == []
# Tag the Alarm for the first time # Tag the Alarm for the first time
cw.tag_resource(ResourceARN=alarm_arn, Tags=[{"Key": "tk", "Value": "tv"}]) cw.tag_resource(ResourceARN=alarm_arn, Tags=[{"Key": "tk", "Value": "tv"}])
@ -141,10 +135,10 @@ def test_tag_resource_error_not_exists():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("TagResource") assert ex.operation_name == "TagResource"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 404
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") assert ex.response["Error"]["Code"] == "ResourceNotFoundException"
ex.response["Error"]["Message"].should.equal("Unknown") assert ex.response["Error"]["Message"] == "Unknown"
@mock_cloudwatch @mock_cloudwatch
@ -181,7 +175,7 @@ def test_untag_resource():
# then # then
response = client.list_tags_for_resource(ResourceARN=arn) response = client.list_tags_for_resource(ResourceARN=arn)
response["Tags"].should.equal([{"Key": "key-1", "Value": "value-1"}]) assert response["Tags"] == [{"Key": "key-1", "Value": "value-1"}]
@mock_cloudwatch @mock_cloudwatch
@ -201,7 +195,7 @@ def test_untag_resource_error_not_exists():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("UntagResource") assert ex.operation_name == "UntagResource"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 404
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException") assert ex.response["Error"]["Code"] == "ResourceNotFoundException"
ex.response["Error"]["Message"].should.equal("Unknown") assert ex.response["Error"]["Message"] == "Unknown"