CloudWatch: Add unit filter to query get_metric_data (#5722)

This commit is contained in:
Bogdan Girman 2022-11-30 16:02:59 +01:00 committed by GitHub
parent 5ea02ec1b6
commit fda02e659e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 0 deletions

View File

@ -553,6 +553,7 @@ class CloudWatchBackend(BaseBackend):
query_name = query["metric_stat._metric._metric_name"]
delta = timedelta(seconds=int(query["metric_stat._period"]))
dimensions = self._extract_dimensions_from_get_metric_data_query(query)
unit = query.get("metric_stat._unit")
result_vals: List[SupportsFloat] = []
timestamps: List[str] = []
stat = query["metric_stat._stat"]
@ -576,6 +577,11 @@ class CloudWatchBackend(BaseBackend):
if sorted(md.dimensions) == sorted(dimensions)
and md.name == query_name
]
# Filter based on unit value
if unit:
query_period_data = [
md for md in query_period_data if md.unit == unit
]
metric_values = [m.value for m in query_period_data]

View File

@ -1074,6 +1074,78 @@ def test_get_metric_data_for_dimensions():
res4["Values"].should.equal([75.0])
@mock_cloudwatch
def test_get_metric_data_for_unit():
utc_now = datetime.now(tz=pytz.utc)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace = "my_namespace/"
unit = "Seconds"
# put metric data
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": unit,
"Timestamp": utc_now,
},
{
"MetricName": "metric1",
"Value": -50,
"Timestamp": utc_now,
},
],
)
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result_without_unit",
"MetricStat": {
"Metric": {
"Namespace": namespace,
"MetricName": "metric1",
},
"Period": 60,
"Stat": "SampleCount",
},
},
{
"Id": "result_with_unit",
"MetricStat": {
"Metric": {
"Namespace": namespace,
"MetricName": "metric1",
},
"Period": 60,
"Stat": "SampleCount",
"Unit": unit,
},
},
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
expected_values = {
"result_without_unit": 2.0,
"result_with_unit": 1.0,
}
for id_, expected_value in expected_values.items():
metric_result_data = list(
filter(
lambda result_data: result_data["Id"] == id_,
response["MetricDataResults"],
)
)
len(metric_result_data).should.equal(1)
metric_result_data[0]["Values"][0].should.equal(expected_value)
@mock_cloudwatch
@mock_s3
def test_cloudwatch_return_s3_metrics():