moto/tests/test_cloudwatch/test_cloudwatch_boto3.py

859 lines
28 KiB
Python
Raw Normal View History

# from __future__ import unicode_literals
2017-09-22 15:38:20 +00:00
import boto3
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
from freezegun import freeze_time
import pytest
from uuid import uuid4
import pytz
2019-10-31 15:44:26 +00:00
import sure # noqa
2017-09-22 15:38:20 +00:00
from moto import mock_cloudwatch
from moto.cloudwatch.utils import make_arn_for_alarm
from moto.core import ACCOUNT_ID
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_put_list_dashboard():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
2017-09-22 15:38:20 +00:00
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
2019-10-31 15:44:26 +00:00
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
2017-09-22 15:38:20 +00:00
resp = client.list_dashboards()
2019-10-31 15:44:26 +00:00
len(resp["DashboardEntries"]).should.equal(1)
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_put_list_prefix_nomatch_dashboard():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
2017-09-22 15:38:20 +00:00
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
2019-10-31 15:44:26 +00:00
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
resp = client.list_dashboards(DashboardNamePrefix="nomatch")
2017-09-22 15:38:20 +00:00
2019-10-31 15:44:26 +00:00
len(resp["DashboardEntries"]).should.equal(0)
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_delete_dashboard():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
2017-09-22 15:38:20 +00:00
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
2019-10-31 15:44:26 +00:00
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
client.put_dashboard(DashboardName="test2", DashboardBody=widget)
client.put_dashboard(DashboardName="test3", DashboardBody=widget)
client.delete_dashboards(DashboardNames=["test2", "test1"])
2017-09-22 15:38:20 +00:00
2019-10-31 15:44:26 +00:00
resp = client.list_dashboards(DashboardNamePrefix="test3")
len(resp["DashboardEntries"]).should.equal(1)
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_delete_dashboard_fail():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
2017-09-22 15:38:20 +00:00
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
2019-10-31 15:44:26 +00:00
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
client.put_dashboard(DashboardName="test2", DashboardBody=widget)
client.put_dashboard(DashboardName="test3", DashboardBody=widget)
2017-09-22 15:38:20 +00:00
# Doesnt delete anything if all dashboards to be deleted do not exist
try:
2019-10-31 15:44:26 +00:00
client.delete_dashboards(DashboardNames=["test2", "test1", "test_no_match"])
2017-09-22 15:38:20 +00:00
except ClientError as err:
2019-10-31 15:44:26 +00:00
err.response["Error"]["Code"].should.equal("ResourceNotFound")
2017-09-22 15:38:20 +00:00
else:
2019-10-31 15:44:26 +00:00
raise RuntimeError("Should of raised error")
2017-09-22 15:38:20 +00:00
resp = client.list_dashboards()
2019-10-31 15:44:26 +00:00
len(resp["DashboardEntries"]).should.equal(3)
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_get_dashboard():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
2017-09-22 15:38:20 +00:00
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
2019-10-31 15:44:26 +00:00
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
2017-09-22 15:38:20 +00:00
2019-10-31 15:44:26 +00:00
resp = client.get_dashboard(DashboardName="test1")
resp.should.contain("DashboardArn")
resp.should.contain("DashboardBody")
resp["DashboardName"].should.equal("test1")
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_get_dashboard_fail():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
2017-09-22 15:38:20 +00:00
try:
2019-10-31 15:44:26 +00:00
client.get_dashboard(DashboardName="test1")
2017-09-22 15:38:20 +00:00
except ClientError as err:
2019-10-31 15:44:26 +00:00
err.response["Error"]["Code"].should.equal("ResourceNotFound")
2017-09-22 15:38:20 +00:00
else:
2019-10-31 15:44:26 +00:00
raise RuntimeError("Should of raised error")
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_delete_invalid_alarm():
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
cloudwatch.put_metric_alarm(
AlarmName="testalarm1",
MetricName="cpu",
Namespace="blah",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
# trying to delete an alarm which is not created along with valid alarm.
with pytest.raises(ClientError) as e:
cloudwatch.delete_alarms(AlarmNames=["InvalidAlarmName", "testalarm1"])
2020-10-06 06:04:09 +00:00
e.value.response["Error"]["Code"].should.equal("ResourceNotFound")
resp = cloudwatch.describe_alarms(AlarmNames=["testalarm1"])
# making sure other alarms are not deleted in case of an error.
len(resp["MetricAlarms"]).should.equal(1)
# test to check if the error raises if only one invalid alarm is tried to delete.
with pytest.raises(ClientError) as e:
cloudwatch.delete_alarms(AlarmNames=["InvalidAlarmName"])
2020-10-06 06:04:09 +00:00
e.value.response["Error"]["Code"].should.equal("ResourceNotFound")
@mock_cloudwatch
def test_describe_alarms_for_metric():
conn = boto3.client("cloudwatch", region_name="eu-central-1")
conn.put_metric_alarm(
AlarmName="testalarm1",
MetricName="cpu",
Namespace="blah",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
alarms = conn.describe_alarms_for_metric(MetricName="cpu", Namespace="blah")
alarms.get("MetricAlarms").should.have.length_of(1)
assert "testalarm1" in alarms.get("MetricAlarms")[0].get("AlarmArn")
@mock_cloudwatch
def test_describe_alarms():
conn = boto3.client("cloudwatch", region_name="eu-central-1")
conn.put_metric_alarm(
AlarmName="testalarm1",
MetricName="cpu",
Namespace="blah",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
metric_data_queries = [
{
"Id": "metricA",
"Expression": "metricB + metricC",
"Label": "metricA",
"ReturnData": True,
},
{
"Id": "metricB",
"MetricStat": {
"Metric": {
"Namespace": "ns",
"MetricName": "metricB",
"Dimensions": [{"Name": "Name", "Value": "B"}],
},
"Period": 60,
"Stat": "Sum",
},
"ReturnData": False,
},
{
"Id": "metricC",
"MetricStat": {
"Metric": {
"Namespace": "AWS/Lambda",
"MetricName": "metricC",
"Dimensions": [{"Name": "Name", "Value": "C"}],
},
"Period": 60,
"Stat": "Sum",
"Unit": "Seconds",
},
"ReturnData": False,
},
]
conn.put_metric_alarm(
AlarmName="testalarm2",
EvaluationPeriods=1,
DatapointsToAlarm=1,
Metrics=metric_data_queries,
ComparisonOperator="GreaterThanThreshold",
Threshold=1.0,
)
alarms = conn.describe_alarms()
metric_alarms = alarms.get("MetricAlarms")
metric_alarms.should.have.length_of(2)
single_metric_alarm = [
alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm1"
][0]
multiple_metric_alarm = [
alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm2"
][0]
single_metric_alarm["MetricName"].should.equal("cpu")
single_metric_alarm.shouldnt.have.property("Metrics")
single_metric_alarm["Namespace"].should.equal("blah")
single_metric_alarm["Period"].should.equal(10)
single_metric_alarm["EvaluationPeriods"].should.equal(5)
single_metric_alarm["Statistic"].should.equal("Average")
single_metric_alarm["ComparisonOperator"].should.equal("GreaterThanThreshold")
single_metric_alarm["Threshold"].should.equal(2)
multiple_metric_alarm.shouldnt.have.property("MetricName")
multiple_metric_alarm["EvaluationPeriods"].should.equal(1)
multiple_metric_alarm["DatapointsToAlarm"].should.equal(1)
multiple_metric_alarm["Metrics"].should.equal(metric_data_queries)
multiple_metric_alarm["ComparisonOperator"].should.equal("GreaterThanThreshold")
multiple_metric_alarm["Threshold"].should.equal(1.0)
@mock_cloudwatch
def test_alarm_state():
2019-10-31 15:44:26 +00:00
client = boto3.client("cloudwatch", region_name="eu-central-1")
client.put_metric_alarm(
2019-10-31 15:44:26 +00:00
AlarmName="testalarm1",
MetricName="cpu",
Namespace="blah",
Period=10,
EvaluationPeriods=5,
2019-10-31 15:44:26 +00:00
Statistic="Average",
Threshold=2,
2019-10-31 15:44:26 +00:00
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
client.put_metric_alarm(
2019-10-31 15:44:26 +00:00
AlarmName="testalarm2",
MetricName="cpu",
Namespace="blah",
Period=10,
EvaluationPeriods=5,
2019-10-31 15:44:26 +00:00
Statistic="Average",
Threshold=2,
2019-10-31 15:44:26 +00:00
ComparisonOperator="GreaterThanThreshold",
)
# This is tested implicitly as if it doesnt work the rest will die
client.set_alarm_state(
2019-10-31 15:44:26 +00:00
AlarmName="testalarm1",
StateValue="ALARM",
StateReason="testreason",
StateReasonData='{"some": "json_data"}',
)
2019-10-31 15:44:26 +00:00
resp = client.describe_alarms(StateValue="ALARM")
len(resp["MetricAlarms"]).should.equal(1)
resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm1")
resp["MetricAlarms"][0]["StateValue"].should.equal("ALARM")
resp["MetricAlarms"][0]["ActionsEnabled"].should.equal(True)
2019-10-31 15:44:26 +00:00
resp = client.describe_alarms(StateValue="OK")
len(resp["MetricAlarms"]).should.equal(1)
resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm2")
resp["MetricAlarms"][0]["StateValue"].should.equal("OK")
resp["MetricAlarms"][0]["ActionsEnabled"].should.equal(False)
# Just for sanity
resp = client.describe_alarms()
2019-10-31 15:44:26 +00:00
len(resp["MetricAlarms"]).should.equal(2)
2017-09-22 15:38:20 +00:00
2020-03-16 20:58:50 +00:00
@mock_cloudwatch
def test_put_metric_data_no_dimensions():
2019-10-31 15:44:26 +00:00
conn = boto3.client("cloudwatch", region_name="us-east-1")
conn.put_metric_data(
2019-10-31 15:44:26 +00:00
Namespace="tester", MetricData=[dict(MetricName="metric", Value=1.5)]
)
2017-09-22 15:38:20 +00:00
2019-10-31 15:44:26 +00:00
metrics = conn.list_metrics()["Metrics"]
metrics.should.have.length_of(1)
metric = metrics[0]
2019-10-31 15:44:26 +00:00
metric["Namespace"].should.equal("tester")
2020-04-27 19:48:23 +00:00
metric["MetricName"].should.equal("metric")
2017-09-22 15:38:20 +00:00
@mock_cloudwatch
def test_put_metric_data_with_statistics():
2019-10-31 15:44:26 +00:00
conn = boto3.client("cloudwatch", region_name="us-east-1")
utc_now = datetime.now(tz=pytz.utc)
conn.put_metric_data(
2019-10-31 15:44:26 +00:00
Namespace="tester",
MetricData=[
dict(
2019-10-31 15:44:26 +00:00
MetricName="statmetric",
Timestamp=utc_now,
# no Value to test https://github.com/spulec/moto/issues/1615
StatisticValues=dict(
2019-10-31 15:44:26 +00:00
SampleCount=123.0, Sum=123.0, Minimum=123.0, Maximum=123.0
),
2019-10-31 15:44:26 +00:00
Unit="Milliseconds",
StorageResolution=123,
)
2019-10-31 15:44:26 +00:00
],
)
2019-10-31 15:44:26 +00:00
metrics = conn.list_metrics()["Metrics"]
metrics.should.have.length_of(1)
metric = metrics[0]
2019-10-31 15:44:26 +00:00
metric["Namespace"].should.equal("tester")
2020-04-27 19:48:23 +00:00
metric["MetricName"].should.equal("statmetric")
# TODO: test statistics - https://github.com/spulec/moto/issues/1615
2019-10-31 15:44:26 +00:00
@mock_cloudwatch
def test_get_metric_statistics():
2019-10-31 15:44:26 +00:00
conn = boto3.client("cloudwatch", region_name="us-east-1")
utc_now = datetime.now(tz=pytz.utc)
conn.put_metric_data(
2019-10-31 15:44:26 +00:00
Namespace="tester",
MetricData=[dict(MetricName="metric", Value=1.5, Timestamp=utc_now)],
)
stats = conn.get_metric_statistics(
2019-10-31 15:44:26 +00:00
Namespace="tester",
MetricName="metric",
2018-05-05 19:22:29 +00:00
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
Period=60,
2019-10-31 15:44:26 +00:00
Statistics=["SampleCount", "Sum"],
)
2019-10-31 15:44:26 +00:00
stats["Datapoints"].should.have.length_of(1)
datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(1.0)
datapoint["Sum"].should.equal(1.5)
@mock_cloudwatch
def test_duplicate_put_metric_data():
conn = boto3.client("cloudwatch", region_name="us-east-1")
utc_now = datetime.now(tz=pytz.utc)
conn.put_metric_data(
Namespace="tester",
MetricData=[
dict(
MetricName="metric",
Dimensions=[{"Name": "Name", "Value": "B"}],
Value=1.5,
Timestamp=utc_now,
)
],
)
result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
)["Metrics"]
len(result).should.equal(1)
conn.put_metric_data(
Namespace="tester",
MetricData=[
dict(
MetricName="metric",
Dimensions=[{"Name": "Name", "Value": "B"}],
Value=1.5,
Timestamp=utc_now,
)
],
)
result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
)["Metrics"]
len(result).should.equal(1)
result.should.equal(
[
{
"Namespace": "tester",
"MetricName": "metric",
"Dimensions": [{"Name": "Name", "Value": "B"}],
}
]
)
conn.put_metric_data(
Namespace="tester",
MetricData=[
dict(
MetricName="metric",
Dimensions=[
{"Name": "Name", "Value": "B"},
{"Name": "Name", "Value": "C"},
],
Value=1.5,
Timestamp=utc_now,
)
],
)
result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
)["Metrics"]
result.should.equal(
[
{
"Namespace": "tester",
"MetricName": "metric",
"Dimensions": [{"Name": "Name", "Value": "B"}],
},
{
"Namespace": "tester",
"MetricName": "metric",
"Dimensions": [
{"Name": "Name", "Value": "B"},
{"Name": "Name", "Value": "C"},
],
},
]
)
result = conn.list_metrics(
Namespace="tester", Dimensions=[{"Name": "Name", "Value": "C"}]
)["Metrics"]
result.should.equal(
[
{
"Namespace": "tester",
"MetricName": "metric",
"Dimensions": [
{"Name": "Name", "Value": "B"},
{"Name": "Name", "Value": "C"},
],
}
]
)
@mock_cloudwatch
@freeze_time("2020-02-10 18:44:05")
def test_custom_timestamp():
utc_now = datetime.now(tz=pytz.utc)
time = "2020-02-10T18:44:09Z"
cw = boto3.client("cloudwatch", "eu-west-1")
cw.put_metric_data(
Namespace="tester",
MetricData=[dict(MetricName="metric1", Value=1.5, Timestamp=time)],
)
cw.put_metric_data(
Namespace="tester",
MetricData=[
dict(MetricName="metric2", Value=1.5, Timestamp=datetime(2020, 2, 10))
],
)
stats = cw.get_metric_statistics(
Namespace="tester",
MetricName="metric",
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
Period=60,
Statistics=["SampleCount", "Sum"],
)
@mock_cloudwatch
def test_list_metrics():
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
# Verify namespace has to exist
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
res.should.be.empty
# Create some metrics to filter on
create_metrics(cloudwatch, namespace="list_test_1/", metrics=4, data_points=2)
create_metrics(cloudwatch, namespace="list_test_2/", metrics=4, data_points=2)
# Verify we can retrieve everything
res = cloudwatch.list_metrics()["Metrics"]
len(res).should.equal(16) # 2 namespaces * 4 metrics * 2 data points
# Verify we can filter by namespace/metric name
res = cloudwatch.list_metrics(Namespace="list_test_1/")["Metrics"]
len(res).should.equal(8) # 1 namespace * 4 metrics * 2 data points
res = cloudwatch.list_metrics(Namespace="list_test_1/", MetricName="metric1")[
"Metrics"
]
len(res).should.equal(2) # 1 namespace * 1 metrics * 2 data points
# Verify format
res.should.equal(
[
{
u"Namespace": "list_test_1/",
u"Dimensions": [],
2020-04-27 19:48:23 +00:00
u"MetricName": "metric1",
},
{
u"Namespace": "list_test_1/",
u"Dimensions": [],
2020-04-27 19:48:23 +00:00
u"MetricName": "metric1",
},
]
)
# Verify unknown namespace still has no results
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
res.should.be.empty
@mock_cloudwatch
def test_list_metrics_paginated():
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
# Verify that only a single page of metrics is returned
cloudwatch.list_metrics()["Metrics"].should.be.empty
# Verify we can't pass a random NextToken
with pytest.raises(ClientError) as e:
cloudwatch.list_metrics(NextToken=str(uuid4()))
2020-10-06 06:04:09 +00:00
e.value.response["Error"]["Message"].should.equal(
"Request parameter NextToken is invalid"
)
# Add a boatload of metrics
create_metrics(cloudwatch, namespace="test", metrics=100, data_points=1)
# Verify that a single page is returned until we've reached 500
first_page = cloudwatch.list_metrics()
first_page["Metrics"].shouldnt.be.empty
len(first_page["Metrics"]).should.equal(100)
create_metrics(cloudwatch, namespace="test", metrics=200, data_points=2)
first_page = cloudwatch.list_metrics()
len(first_page["Metrics"]).should.equal(500)
first_page.shouldnt.contain("NextToken")
# Verify that adding more data points results in pagination
create_metrics(cloudwatch, namespace="test", metrics=60, data_points=10)
first_page = cloudwatch.list_metrics()
len(first_page["Metrics"]).should.equal(500)
first_page["NextToken"].shouldnt.be.empty
# Retrieve second page - and verify there's more where that came from
second_page = cloudwatch.list_metrics(NextToken=first_page["NextToken"])
len(second_page["Metrics"]).should.equal(500)
second_page.should.contain("NextToken")
# Last page should only have the last 100 results, and no NextToken (indicating that pagination is finished)
third_page = cloudwatch.list_metrics(NextToken=second_page["NextToken"])
len(third_page["Metrics"]).should.equal(100)
third_page.shouldnt.contain("NextToken")
# Verify that we can't reuse an existing token
with pytest.raises(ClientError) as e:
cloudwatch.list_metrics(NextToken=first_page["NextToken"])
2020-10-06 06:04:09 +00:00
e.value.response["Error"]["Message"].should.equal(
"Request parameter NextToken is invalid"
)
def create_metrics(cloudwatch, namespace, metrics=5, data_points=5):
for i in range(0, metrics):
metric_name = "metric" + str(i)
for j in range(0, data_points):
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[{"MetricName": metric_name, "Value": j, "Unit": "Seconds"}],
)
@mock_cloudwatch
def test_get_metric_data_within_timeframe():
utc_now = datetime.now(tz=pytz.utc)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace1 = "my_namespace/"
# put metric data
values = [0, 2, 4, 3.5, 7, 100]
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{"MetricName": "metric1", "Value": val, "Unit": "Seconds"} for val in values
],
)
# get_metric_data
stats = ["Average", "Sum", "Minimum", "Maximum"]
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result_" + stat,
"MetricStat": {
"Metric": {"Namespace": namespace1, "MetricName": "metric1"},
"Period": 60,
"Stat": stat,
},
}
for stat in stats
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
# Assert Average/Min/Max/Sum is returned as expected
avg = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Average"
][0]
avg["Label"].should.equal("metric1 Average")
avg["StatusCode"].should.equal("Complete")
[int(val) for val in avg["Values"]].should.equal([19])
sum_ = [res for res in response["MetricDataResults"] if res["Id"] == "result_Sum"][
0
]
sum_["Label"].should.equal("metric1 Sum")
sum_["StatusCode"].should.equal("Complete")
[val for val in sum_["Values"]].should.equal([sum(values)])
min_ = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Minimum"
][0]
min_["Label"].should.equal("metric1 Minimum")
min_["StatusCode"].should.equal("Complete")
[int(val) for val in min_["Values"]].should.equal([0])
max_ = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Maximum"
][0]
max_["Label"].should.equal("metric1 Maximum")
max_["StatusCode"].should.equal("Complete")
[int(val) for val in max_["Values"]].should.equal([100])
@mock_cloudwatch
def test_get_metric_data_partially_within_timeframe():
utc_now = datetime.now(tz=pytz.utc)
yesterday = utc_now - timedelta(days=1)
last_week = utc_now - timedelta(days=7)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace1 = "my_namespace/"
# put metric data
values = [0, 2, 4, 3.5, 7, 100]
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 10,
"Unit": "Seconds",
"Timestamp": utc_now,
}
],
)
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 20,
"Unit": "Seconds",
"Timestamp": yesterday,
}
],
)
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": "Seconds",
"Timestamp": last_week,
},
{
"MetricName": "metric1",
"Value": 10,
"Unit": "Seconds",
"Timestamp": last_week + timedelta(seconds=10),
},
{
"MetricName": "metric1",
"Value": 20,
"Unit": "Seconds",
"Timestamp": last_week + timedelta(seconds=15),
},
{
"MetricName": "metric1",
"Value": 40,
"Unit": "Seconds",
"Timestamp": last_week + timedelta(seconds=30),
},
],
)
# data for average, min, max
def get_data(start, end, stat="Sum", scanBy="TimestampAscending"):
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result",
"MetricStat": {
"Metric": {"Namespace": namespace1, "MetricName": "metric1"},
"Period": 60,
"Stat": stat,
},
}
],
StartTime=start,
EndTime=end,
ScanBy=scanBy,
)
return response
response = get_data(
start=yesterday - timedelta(seconds=60), end=utc_now + timedelta(seconds=60),
)
# Assert Last week's data is not returned
len(response["MetricDataResults"]).should.equal(1)
sum_ = response["MetricDataResults"][0]
sum_["Label"].should.equal("metric1 Sum")
sum_["StatusCode"].should.equal("Complete")
sum_["Values"].should.equal([20.0, 10.0])
response = get_data(
start=yesterday - timedelta(seconds=60),
end=utc_now + timedelta(seconds=60),
scanBy="TimestampDescending",
)
response["MetricDataResults"][0]["Values"].should.equal([10.0, 20.0])
response = get_data(
start=last_week - timedelta(seconds=1),
end=utc_now + timedelta(seconds=60),
stat="Average",
)
# assert average
response["MetricDataResults"][0]["Values"].should.equal([30.0, 20.0, 10.0])
response = get_data(
start=last_week - timedelta(seconds=1),
end=utc_now + timedelta(seconds=60),
stat="Maximum",
)
# assert maximum
response["MetricDataResults"][0]["Values"].should.equal([50.0, 20.0, 10.0])
response = get_data(
start=last_week - timedelta(seconds=1),
end=utc_now + timedelta(seconds=60),
stat="Minimum",
)
# assert minimum
response["MetricDataResults"][0]["Values"].should.equal([10.0, 20.0, 10.0])
@mock_cloudwatch
def test_get_metric_data_outside_timeframe():
utc_now = datetime.now(tz=pytz.utc)
last_week = utc_now - timedelta(days=7)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace1 = "my_namespace/"
# put metric data
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": "Seconds",
"Timestamp": last_week,
}
],
)
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result",
"MetricStat": {
"Metric": {"Namespace": namespace1, "MetricName": "metric1"},
"Period": 60,
"Stat": "Sum",
},
}
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
# Assert Last week's data is not returned
len(response["MetricDataResults"]).should.equal(1)
response["MetricDataResults"][0]["Id"].should.equal("result")
response["MetricDataResults"][0]["StatusCode"].should.equal("Complete")
response["MetricDataResults"][0]["Values"].should.equal([])
@mock_cloudwatch
def test_get_metric_data_for_multiple_metrics():
utc_now = datetime.now(tz=pytz.utc)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace = "my_namespace/"
# put metric data
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": "Seconds",
"Timestamp": utc_now,
}
],
)
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
"MetricName": "metric2",
"Value": 25,
"Unit": "Seconds",
"Timestamp": utc_now,
}
],
)
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result1",
"MetricStat": {
"Metric": {"Namespace": namespace, "MetricName": "metric1"},
"Period": 60,
"Stat": "Sum",
},
},
{
"Id": "result2",
"MetricStat": {
"Metric": {"Namespace": namespace, "MetricName": "metric2"},
"Period": 60,
"Stat": "Sum",
},
},
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
len(response["MetricDataResults"]).should.equal(2)
res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0]
res1["Values"].should.equal([50.0])
res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0]
res2["Values"].should.equal([25.0])