Merge pull request #2599 from bblommers/feature/#2010

CloudWatch - Implement list_metrics pagination
This commit is contained in:
Mike Grima 2019-12-09 14:11:05 -08:00 committed by GitHub
commit 1c5ea4b545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 130 additions and 8 deletions

View File

@ -5,6 +5,7 @@ from moto.core.exceptions import RESTError
import boto.ec2.cloudwatch import boto.ec2.cloudwatch
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dateutil.tz import tzutc from dateutil.tz import tzutc
from uuid import uuid4
from .utils import make_arn_for_dashboard from .utils import make_arn_for_dashboard
DEFAULT_ACCOUNT_ID = 123456789012 DEFAULT_ACCOUNT_ID = 123456789012
@ -193,6 +194,7 @@ class CloudWatchBackend(BaseBackend):
self.alarms = {} self.alarms = {}
self.dashboards = {} self.dashboards = {}
self.metric_data = [] self.metric_data = []
self.paged_metric_data = {}
def put_metric_alarm( def put_metric_alarm(
self, self,
@ -377,6 +379,36 @@ class CloudWatchBackend(BaseBackend):
self.alarms[alarm_name].update_state(reason, reason_data, state_value) self.alarms[alarm_name].update_state(reason, reason_data, state_value)
def list_metrics(self, next_token, namespace, metric_name):
if next_token:
if next_token not in self.paged_metric_data:
raise RESTError(
"PaginationException", "Request parameter NextToken is invalid"
)
else:
metrics = self.paged_metric_data[next_token]
del self.paged_metric_data[next_token] # Cant reuse same token twice
return self._get_paginated(metrics)
else:
metrics = self.get_filtered_metrics(metric_name, namespace)
return self._get_paginated(metrics)
def get_filtered_metrics(self, metric_name, namespace):
metrics = self.get_all_metrics()
if namespace:
metrics = [md for md in metrics if md.namespace == namespace]
if metric_name:
metrics = [md for md in metrics if md.name == metric_name]
return metrics
def _get_paginated(self, metrics):
if len(metrics) > 500:
next_token = str(uuid4())
self.paged_metric_data[next_token] = metrics[500:]
return next_token, metrics[0:500]
else:
return None, metrics
class LogGroup(BaseModel): class LogGroup(BaseModel):
def __init__(self, spec): def __init__(self, spec):

View File

@ -120,9 +120,14 @@ class CloudWatchResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def list_metrics(self): def list_metrics(self):
metrics = self.cloudwatch_backend.get_all_metrics() namespace = self._get_param("Namespace")
metric_name = self._get_param("MetricName")
next_token = self._get_param("NextToken")
next_token, metrics = self.cloudwatch_backend.list_metrics(
next_token, namespace, metric_name
)
template = self.response_template(LIST_METRICS_TEMPLATE) template = self.response_template(LIST_METRICS_TEMPLATE)
return template.render(metrics=metrics) return template.render(metrics=metrics, next_token=next_token)
@amzn_request_id @amzn_request_id
def delete_dashboards(self): def delete_dashboards(self):
@ -340,9 +345,11 @@ LIST_METRICS_TEMPLATE = """<ListMetricsResponse xmlns="http://monitoring.amazona
</member> </member>
{% endfor %} {% endfor %}
</Metrics> </Metrics>
{% if next_token is not none %}
<NextToken> <NextToken>
96e88479-4662-450b-8a13-239ded6ce9fe {{ next_token }}
</NextToken> </NextToken>
{% endif %}
</ListMetricsResult> </ListMetricsResult>
</ListMetricsResponse>""" </ListMetricsResponse>"""

View File

@ -1,8 +1,5 @@
import boto import boto
from boto.ec2.cloudwatch.alarm import MetricAlarm from boto.ec2.cloudwatch.alarm import MetricAlarm
import boto3
from datetime import datetime, timedelta
import pytz
import sure # noqa import sure # noqa
from moto import mock_cloudwatch_deprecated from moto import mock_cloudwatch_deprecated

90
tests/test_cloudwatch/test_cloudwatch_boto3.py Executable file → Normal file
View File

@ -1,8 +1,10 @@
from __future__ import unicode_literals # from __future__ import unicode_literals
import boto3 import boto3
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from datetime import datetime, timedelta from datetime import datetime, timedelta
from nose.tools import assert_raises
from uuid import uuid4
import pytz import pytz
import sure # noqa import sure # noqa
@ -155,13 +157,14 @@ def test_put_metric_data_no_dimensions():
@mock_cloudwatch @mock_cloudwatch
def test_put_metric_data_with_statistics(): def test_put_metric_data_with_statistics():
conn = boto3.client("cloudwatch", region_name="us-east-1") conn = boto3.client("cloudwatch", region_name="us-east-1")
utc_now = datetime.now(tz=pytz.utc)
conn.put_metric_data( conn.put_metric_data(
Namespace="tester", Namespace="tester",
MetricData=[ MetricData=[
dict( dict(
MetricName="statmetric", MetricName="statmetric",
Timestamp=datetime(2015, 1, 1), Timestamp=utc_now,
# no Value to test https://github.com/spulec/moto/issues/1615 # no Value to test https://github.com/spulec/moto/issues/1615
StatisticValues=dict( StatisticValues=dict(
SampleCount=123.0, Sum=123.0, Minimum=123.0, Maximum=123.0 SampleCount=123.0, Sum=123.0, Minimum=123.0, Maximum=123.0
@ -203,3 +206,86 @@ def test_get_metric_statistics():
datapoint = stats["Datapoints"][0] datapoint = stats["Datapoints"][0]
datapoint["SampleCount"].should.equal(1.0) datapoint["SampleCount"].should.equal(1.0)
datapoint["Sum"].should.equal(1.5) datapoint["Sum"].should.equal(1.5)
@mock_cloudwatch
def test_list_metrics():
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
# Verify namespace has to exist
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
res.should.be.empty
# Create some metrics to filter on
create_metrics(cloudwatch, namespace="list_test_1/", metrics=4, data_points=2)
create_metrics(cloudwatch, namespace="list_test_2/", metrics=4, data_points=2)
# Verify we can retrieve everything
res = cloudwatch.list_metrics()["Metrics"]
len(res).should.equal(16) # 2 namespaces * 4 metrics * 2 data points
# Verify we can filter by namespace/metric name
res = cloudwatch.list_metrics(Namespace="list_test_1/")["Metrics"]
len(res).should.equal(8) # 1 namespace * 4 metrics * 2 data points
res = cloudwatch.list_metrics(Namespace="list_test_1/", MetricName="metric1")[
"Metrics"
]
len(res).should.equal(2) # 1 namespace * 1 metrics * 2 data points
# Verify format
res.should.equal(
[
{u"Namespace": "list_test_1/", u"Dimensions": [], u"MetricName": "metric1"},
{u"Namespace": "list_test_1/", u"Dimensions": [], u"MetricName": "metric1"},
]
)
# Verify unknown namespace still has no results
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
res.should.be.empty
@mock_cloudwatch
def test_list_metrics_paginated():
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
# Verify that only a single page of metrics is returned
cloudwatch.list_metrics()["Metrics"].should.be.empty
# Verify we can't pass a random NextToken
with assert_raises(ClientError) as e:
cloudwatch.list_metrics(NextToken=str(uuid4()))
e.exception.response["Error"]["Message"].should.equal(
"Request parameter NextToken is invalid"
)
# Add a boatload of metrics
create_metrics(cloudwatch, namespace="test", metrics=100, data_points=1)
# Verify that a single page is returned until we've reached 500
first_page = cloudwatch.list_metrics()
first_page["Metrics"].shouldnt.be.empty
len(first_page["Metrics"]).should.equal(100)
create_metrics(cloudwatch, namespace="test", metrics=200, data_points=2)
first_page = cloudwatch.list_metrics()
len(first_page["Metrics"]).should.equal(500)
first_page.shouldnt.contain("NextToken")
# Verify that adding more data points results in pagination
create_metrics(cloudwatch, namespace="test", metrics=60, data_points=10)
first_page = cloudwatch.list_metrics()
len(first_page["Metrics"]).should.equal(500)
first_page["NextToken"].shouldnt.be.empty
# Retrieve second page - and verify there's more where that came from
second_page = cloudwatch.list_metrics(NextToken=first_page["NextToken"])
len(second_page["Metrics"]).should.equal(500)
second_page.should.contain("NextToken")
# Last page should only have the last 100 results, and no NextToken (indicating that pagination is finished)
third_page = cloudwatch.list_metrics(NextToken=second_page["NextToken"])
len(third_page["Metrics"]).should.equal(100)
third_page.shouldnt.contain("NextToken")
# Verify that we can't reuse an existing token
with assert_raises(ClientError) as e:
cloudwatch.list_metrics(NextToken=first_page["NextToken"])
e.exception.response["Error"]["Message"].should.equal(
"Request parameter NextToken is invalid"
)
def create_metrics(cloudwatch, namespace, metrics=5, data_points=5):
for i in range(0, metrics):
metric_name = "metric" + str(i)
for j in range(0, data_points):
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[{"MetricName": metric_name, "Value": j, "Unit": "Seconds"}],
)