* fix OPTIONS requests on non-existing API GW integrations * add cloudformation models for API Gateway deployments * bump version * add backdoor to return CloudWatch metrics * Updating implementation coverage * Updating implementation coverage * add cloudformation models for API Gateway deployments * Updating implementation coverage * Updating implementation coverage * Implemented get-caller-identity returning real data depending on the access key used. * bump version * minor fixes * fix Number data_type for SQS message attribute * fix handling of encoding errors * bump version * make CF stack queryable before starting to initialize its resources * bump version * fix integration_method for API GW method integrations * fix undefined status in CF FakeStack * Fix apigateway issues with terraform v0.12.21 * resource_methods -> add handle for "DELETE" method * integrations -> fix issue that "httpMethod" wasn't included in body request (this value was set as the value from refer method resource) * bump version * Fix setting http method for API gateway integrations (#6) * bump version * remove duplicate methods * add storage class to S3 Key when completing multipart upload (#7) * fix SQS performance issues; bump version * add pagination to SecretsManager list-secrets (#9) * fix default parameter groups in RDS * fix adding S3 metadata headers with names containing dots (#13) * Updating implementation coverage * Updating implementation coverage * add cloudformation models for API Gateway deployments * Updating implementation coverage * Updating implementation coverage * Implemented get-caller-identity returning real data depending on the access key used. * make CF stack queryable before starting to initialize its resources * bump version * remove duplicate methods * fix adding S3 metadata headers with names containing dots (#13) * Update amis.json to support EKS AMI mocks (#15) * fix PascalCase for boolean value in ListMultipartUploads response (#17); fix _get_multi_param to parse nested list/dict query params * determine non-zero container exit code in Batch API * support filtering by dimensions in CW get_metric_statistics * fix storing attributes for ELBv2 Route entities; API GW refactorings for TF tests * add missing fields for API GW resources * fix error messages for Route53 (TF-compat) * various fixes for IAM resources (tf-compat) * minor fixes for API GW models (tf-compat) * minor fixes for API GW responses (tf-compat) * add s3 exception for bucket notification filter rule validation * change the way RESTErrors generate the response body and content-type header * fix lint errors and disable "black" syntax enforcement * remove return type hint in RESTError.get_body * add RESTError XML template for IAM exceptions * add support for API GW minimumCompressionSize * fix casing getting PrivateDnsEnabled API GW attribute * minor fixes for error responses * fix escaping special chars for IAM role descriptions (tf-compat) * minor fixes and tagging support for API GW and ELB v2 (tf-compat) * Merge branch 'master' into localstack * add "AlarmRule" attribute to enable support for composite CloudWatch metrics * fix recursive parsing of complex/nested query params * bump version * add API to delete S3 website configurations (#18) * use dict copy to allow parallelism and avoid concurrent modification exceptions in S3 * fix precondition check for etags in S3 (#19) * minor fix for user filtering in Cognito * fix API Gateway error response; avoid returning empty response templates (tf-compat) * support tags and tracingEnabled attribute for API GW stages * fix boolean value in S3 encryption response (#20) * fix connection arn structure * fix api destination arn structure * black format * release 2.0.3.37 * fix s3 exception tests see botocore/parsers.py:1002 where RequestId is removed from parsed * remove python 2 from build action * add test failure annotations in build action * fix events test arn comparisons * fix s3 encryption response test * return default value "0" if EC2 availableIpAddressCount is empty * fix extracting SecurityGroupIds for EC2 VPC endpoints * support deleting/updating API Gateway DomainNames * fix(events): Return empty string instead of null when no pattern is specified in EventPattern (tf-compat) (#22) * fix logic and revert CF changes to get tests running again (#21) * add support for EC2 customer gateway API (#25) * add support for EC2 Transit Gateway APIs (#24) * feat(logs): add `kmsKeyId` into `LogGroup` entity (#23) * minor change in ELBv2 logic to fix tests * feat(events): add APIs to describe and delete CloudWatch Events connections (#26) * add support for EC2 transit gateway route tables (#27) * pass transit gateway route table ID in Describe API, minor refactoring (#29) * add support for EC2 Transit Gateway Routes (#28) * fix region on ACM certificate import (#31) * add support for EC2 transit gateway attachments (#30) * add support for EC2 Transit Gateway VPN attachments (#32) * fix account ID for logs API * add support for DeleteOrganization API * feat(events): store raw filter representation for CloudWatch events patterns (tf-compat) (#36) * feat(events): add support to describe/update/delete CloudWatch API destinations (#35) * add Cognito UpdateIdentityPool, CW Logs PutResourcePolicy * feat(events): add support for tags in EventBus API (#38) * fix parameter validation for Batch compute environments (tf-compat) * revert merge conflicts in IMPLEMENTATION_COVERAGE.md * format code using black * restore original README; re-enable and fix CloudFormation tests * restore tests and old logic for CF stack parameters from SSM * parameterize RequestId/RequestID in response messages and revert related test changes * undo LocalStack-specific adaptations * minor fix * Update CodeCov config to reflect removal of Py2 * undo change related to CW metric filtering; add additional test for CW metric statistics with dimensions * Terraform - Extend whitelist of running tests Co-authored-by: acsbendi <acsbendi28@gmail.com> Co-authored-by: Phan Duong <duongpv@outlook.com> Co-authored-by: Thomas Rausch <thomas@thrau.at> Co-authored-by: Macwan Nevil <macnev2013@gmail.com> Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com> Co-authored-by: Gonzalo Saad <saad.gonzalo.ale@gmail.com> Co-authored-by: Mohit Alonja <monty16597@users.noreply.github.com> Co-authored-by: Miguel Gagliardo <migag9@gmail.com> Co-authored-by: Bert Blommers <info@bertblommers.nl>
		
			
				
	
	
		
			920 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			920 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# from __future__ import unicode_literals
 | 
						|
 | 
						|
import boto3
 | 
						|
from botocore.exceptions import ClientError
 | 
						|
from datetime import datetime, timedelta
 | 
						|
from freezegun import freeze_time
 | 
						|
import pytest
 | 
						|
from uuid import uuid4
 | 
						|
import pytz
 | 
						|
import sure  # noqa
 | 
						|
 | 
						|
from moto import mock_cloudwatch
 | 
						|
from moto.cloudwatch.utils import make_arn_for_alarm
 | 
						|
from moto.core import ACCOUNT_ID
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_put_list_dashboard():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
 | 
						|
 | 
						|
    client.put_dashboard(DashboardName="test1", DashboardBody=widget)
 | 
						|
    resp = client.list_dashboards()
 | 
						|
 | 
						|
    len(resp["DashboardEntries"]).should.equal(1)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_put_list_prefix_nomatch_dashboard():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
 | 
						|
 | 
						|
    client.put_dashboard(DashboardName="test1", DashboardBody=widget)
 | 
						|
    resp = client.list_dashboards(DashboardNamePrefix="nomatch")
 | 
						|
 | 
						|
    len(resp["DashboardEntries"]).should.equal(0)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_delete_dashboard():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
 | 
						|
 | 
						|
    client.put_dashboard(DashboardName="test1", DashboardBody=widget)
 | 
						|
    client.put_dashboard(DashboardName="test2", DashboardBody=widget)
 | 
						|
    client.put_dashboard(DashboardName="test3", DashboardBody=widget)
 | 
						|
    client.delete_dashboards(DashboardNames=["test2", "test1"])
 | 
						|
 | 
						|
    resp = client.list_dashboards(DashboardNamePrefix="test3")
 | 
						|
    len(resp["DashboardEntries"]).should.equal(1)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_delete_dashboard_fail():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
 | 
						|
 | 
						|
    client.put_dashboard(DashboardName="test1", DashboardBody=widget)
 | 
						|
    client.put_dashboard(DashboardName="test2", DashboardBody=widget)
 | 
						|
    client.put_dashboard(DashboardName="test3", DashboardBody=widget)
 | 
						|
    # Doesnt delete anything if all dashboards to be deleted do not exist
 | 
						|
    try:
 | 
						|
        client.delete_dashboards(DashboardNames=["test2", "test1", "test_no_match"])
 | 
						|
    except ClientError as err:
 | 
						|
        err.response["Error"]["Code"].should.equal("ResourceNotFound")
 | 
						|
    else:
 | 
						|
        raise RuntimeError("Should of raised error")
 | 
						|
 | 
						|
    resp = client.list_dashboards()
 | 
						|
    len(resp["DashboardEntries"]).should.equal(3)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_dashboard():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
 | 
						|
    client.put_dashboard(DashboardName="test1", DashboardBody=widget)
 | 
						|
 | 
						|
    resp = client.get_dashboard(DashboardName="test1")
 | 
						|
    resp.should.contain("DashboardArn")
 | 
						|
    resp.should.contain("DashboardBody")
 | 
						|
    resp["DashboardName"].should.equal("test1")
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_dashboard_fail():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
 | 
						|
    try:
 | 
						|
        client.get_dashboard(DashboardName="test1")
 | 
						|
    except ClientError as err:
 | 
						|
        err.response["Error"]["Code"].should.equal("ResourceNotFound")
 | 
						|
    else:
 | 
						|
        raise RuntimeError("Should of raised error")
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_delete_invalid_alarm():
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
 | 
						|
    cloudwatch.put_metric_alarm(
 | 
						|
        AlarmName="testalarm1",
 | 
						|
        MetricName="cpu",
 | 
						|
        Namespace="blah",
 | 
						|
        Period=10,
 | 
						|
        EvaluationPeriods=5,
 | 
						|
        Statistic="Average",
 | 
						|
        Threshold=2,
 | 
						|
        ComparisonOperator="GreaterThanThreshold",
 | 
						|
        ActionsEnabled=True,
 | 
						|
    )
 | 
						|
 | 
						|
    # trying to delete an alarm which is not created along with valid alarm.
 | 
						|
    with pytest.raises(ClientError) as e:
 | 
						|
        cloudwatch.delete_alarms(AlarmNames=["InvalidAlarmName", "testalarm1"])
 | 
						|
    e.value.response["Error"]["Code"].should.equal("ResourceNotFound")
 | 
						|
 | 
						|
    resp = cloudwatch.describe_alarms(AlarmNames=["testalarm1"])
 | 
						|
    # making sure other alarms are not deleted in case of an error.
 | 
						|
    len(resp["MetricAlarms"]).should.equal(1)
 | 
						|
 | 
						|
    # test to check if the error raises if only one invalid alarm is tried to delete.
 | 
						|
    with pytest.raises(ClientError) as e:
 | 
						|
        cloudwatch.delete_alarms(AlarmNames=["InvalidAlarmName"])
 | 
						|
    e.value.response["Error"]["Code"].should.equal("ResourceNotFound")
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_describe_alarms_for_metric():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    conn.put_metric_alarm(
 | 
						|
        AlarmName="testalarm1",
 | 
						|
        MetricName="cpu",
 | 
						|
        Namespace="blah",
 | 
						|
        Period=10,
 | 
						|
        EvaluationPeriods=5,
 | 
						|
        Statistic="Average",
 | 
						|
        Threshold=2,
 | 
						|
        ComparisonOperator="GreaterThanThreshold",
 | 
						|
        ActionsEnabled=True,
 | 
						|
    )
 | 
						|
    alarms = conn.describe_alarms_for_metric(MetricName="cpu", Namespace="blah")
 | 
						|
    alarms.get("MetricAlarms").should.have.length_of(1)
 | 
						|
 | 
						|
    assert "testalarm1" in alarms.get("MetricAlarms")[0].get("AlarmArn")
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_describe_alarms():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
    conn.put_metric_alarm(
 | 
						|
        AlarmName="testalarm1",
 | 
						|
        MetricName="cpu",
 | 
						|
        Namespace="blah",
 | 
						|
        Period=10,
 | 
						|
        EvaluationPeriods=5,
 | 
						|
        Statistic="Average",
 | 
						|
        Threshold=2,
 | 
						|
        ComparisonOperator="GreaterThanThreshold",
 | 
						|
        ActionsEnabled=True,
 | 
						|
    )
 | 
						|
    metric_data_queries = [
 | 
						|
        {
 | 
						|
            "Id": "metricA",
 | 
						|
            "Expression": "metricB + metricC",
 | 
						|
            "Label": "metricA",
 | 
						|
            "ReturnData": True,
 | 
						|
        },
 | 
						|
        {
 | 
						|
            "Id": "metricB",
 | 
						|
            "MetricStat": {
 | 
						|
                "Metric": {
 | 
						|
                    "Namespace": "ns",
 | 
						|
                    "MetricName": "metricB",
 | 
						|
                    "Dimensions": [{"Name": "Name", "Value": "B"}],
 | 
						|
                },
 | 
						|
                "Period": 60,
 | 
						|
                "Stat": "Sum",
 | 
						|
            },
 | 
						|
            "ReturnData": False,
 | 
						|
        },
 | 
						|
        {
 | 
						|
            "Id": "metricC",
 | 
						|
            "MetricStat": {
 | 
						|
                "Metric": {
 | 
						|
                    "Namespace": "AWS/Lambda",
 | 
						|
                    "MetricName": "metricC",
 | 
						|
                    "Dimensions": [{"Name": "Name", "Value": "C"}],
 | 
						|
                },
 | 
						|
                "Period": 60,
 | 
						|
                "Stat": "Sum",
 | 
						|
                "Unit": "Seconds",
 | 
						|
            },
 | 
						|
            "ReturnData": False,
 | 
						|
        },
 | 
						|
    ]
 | 
						|
    conn.put_metric_alarm(
 | 
						|
        AlarmName="testalarm2",
 | 
						|
        EvaluationPeriods=1,
 | 
						|
        DatapointsToAlarm=1,
 | 
						|
        Metrics=metric_data_queries,
 | 
						|
        ComparisonOperator="GreaterThanThreshold",
 | 
						|
        Threshold=1.0,
 | 
						|
    )
 | 
						|
    alarms = conn.describe_alarms()
 | 
						|
    metric_alarms = alarms.get("MetricAlarms")
 | 
						|
    metric_alarms.should.have.length_of(2)
 | 
						|
    single_metric_alarm = [
 | 
						|
        alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm1"
 | 
						|
    ][0]
 | 
						|
    multiple_metric_alarm = [
 | 
						|
        alarm for alarm in metric_alarms if alarm["AlarmName"] == "testalarm2"
 | 
						|
    ][0]
 | 
						|
 | 
						|
    single_metric_alarm["MetricName"].should.equal("cpu")
 | 
						|
    single_metric_alarm.shouldnt.have.property("Metrics")
 | 
						|
    single_metric_alarm["Namespace"].should.equal("blah")
 | 
						|
    single_metric_alarm["Period"].should.equal(10)
 | 
						|
    single_metric_alarm["EvaluationPeriods"].should.equal(5)
 | 
						|
    single_metric_alarm["Statistic"].should.equal("Average")
 | 
						|
    single_metric_alarm["ComparisonOperator"].should.equal("GreaterThanThreshold")
 | 
						|
    single_metric_alarm["Threshold"].should.equal(2)
 | 
						|
 | 
						|
    multiple_metric_alarm.shouldnt.have.property("MetricName")
 | 
						|
    multiple_metric_alarm["EvaluationPeriods"].should.equal(1)
 | 
						|
    multiple_metric_alarm["DatapointsToAlarm"].should.equal(1)
 | 
						|
    multiple_metric_alarm["Metrics"].should.equal(metric_data_queries)
 | 
						|
    multiple_metric_alarm["ComparisonOperator"].should.equal("GreaterThanThreshold")
 | 
						|
    multiple_metric_alarm["Threshold"].should.equal(1.0)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_alarm_state():
 | 
						|
    client = boto3.client("cloudwatch", region_name="eu-central-1")
 | 
						|
 | 
						|
    client.put_metric_alarm(
 | 
						|
        AlarmName="testalarm1",
 | 
						|
        MetricName="cpu",
 | 
						|
        Namespace="blah",
 | 
						|
        Period=10,
 | 
						|
        EvaluationPeriods=5,
 | 
						|
        Statistic="Average",
 | 
						|
        Threshold=2,
 | 
						|
        ComparisonOperator="GreaterThanThreshold",
 | 
						|
        ActionsEnabled=True,
 | 
						|
    )
 | 
						|
    client.put_metric_alarm(
 | 
						|
        AlarmName="testalarm2",
 | 
						|
        MetricName="cpu",
 | 
						|
        Namespace="blah",
 | 
						|
        Period=10,
 | 
						|
        EvaluationPeriods=5,
 | 
						|
        Statistic="Average",
 | 
						|
        Threshold=2,
 | 
						|
        ComparisonOperator="GreaterThanThreshold",
 | 
						|
    )
 | 
						|
 | 
						|
    # This is tested implicitly as if it doesnt work the rest will die
 | 
						|
    client.set_alarm_state(
 | 
						|
        AlarmName="testalarm1",
 | 
						|
        StateValue="ALARM",
 | 
						|
        StateReason="testreason",
 | 
						|
        StateReasonData='{"some": "json_data"}',
 | 
						|
    )
 | 
						|
 | 
						|
    resp = client.describe_alarms(StateValue="ALARM")
 | 
						|
    len(resp["MetricAlarms"]).should.equal(1)
 | 
						|
    resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm1")
 | 
						|
    resp["MetricAlarms"][0]["StateValue"].should.equal("ALARM")
 | 
						|
    resp["MetricAlarms"][0]["ActionsEnabled"].should.equal(True)
 | 
						|
 | 
						|
    resp = client.describe_alarms(StateValue="OK")
 | 
						|
    len(resp["MetricAlarms"]).should.equal(1)
 | 
						|
    resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm2")
 | 
						|
    resp["MetricAlarms"][0]["StateValue"].should.equal("OK")
 | 
						|
    resp["MetricAlarms"][0]["ActionsEnabled"].should.equal(False)
 | 
						|
 | 
						|
    # Just for sanity
 | 
						|
    resp = client.describe_alarms()
 | 
						|
    len(resp["MetricAlarms"]).should.equal(2)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_put_metric_data_no_dimensions():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="us-east-1")
 | 
						|
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester", MetricData=[dict(MetricName="metric", Value=1.5)]
 | 
						|
    )
 | 
						|
 | 
						|
    metrics = conn.list_metrics()["Metrics"]
 | 
						|
    metrics.should.have.length_of(1)
 | 
						|
    metric = metrics[0]
 | 
						|
    metric["Namespace"].should.equal("tester")
 | 
						|
    metric["MetricName"].should.equal("metric")
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_put_metric_data_with_statistics():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="us-east-1")
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName="statmetric",
 | 
						|
                Timestamp=utc_now,
 | 
						|
                # no Value to test  https://github.com/spulec/moto/issues/1615
 | 
						|
                StatisticValues=dict(
 | 
						|
                    SampleCount=123.0, Sum=123.0, Minimum=123.0, Maximum=123.0
 | 
						|
                ),
 | 
						|
                Unit="Milliseconds",
 | 
						|
                StorageResolution=123,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    metrics = conn.list_metrics()["Metrics"]
 | 
						|
    metrics.should.have.length_of(1)
 | 
						|
    metric = metrics[0]
 | 
						|
    metric["Namespace"].should.equal("tester")
 | 
						|
    metric["MetricName"].should.equal("statmetric")
 | 
						|
    # TODO: test statistics - https://github.com/spulec/moto/issues/1615
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_metric_statistics():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="us-east-1")
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[dict(MetricName="metric", Value=1.5, Timestamp=utc_now)],
 | 
						|
    )
 | 
						|
 | 
						|
    stats = conn.get_metric_statistics(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricName="metric",
 | 
						|
        StartTime=utc_now - timedelta(seconds=60),
 | 
						|
        EndTime=utc_now + timedelta(seconds=60),
 | 
						|
        Period=60,
 | 
						|
        Statistics=["SampleCount", "Sum"],
 | 
						|
    )
 | 
						|
 | 
						|
    stats["Datapoints"].should.have.length_of(1)
 | 
						|
    datapoint = stats["Datapoints"][0]
 | 
						|
    datapoint["SampleCount"].should.equal(1.0)
 | 
						|
    datapoint["Sum"].should.equal(1.5)
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_metric_statistics_dimensions():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="us-east-1")
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
 | 
						|
    # put metric data with different dimensions
 | 
						|
    dimensions1 = [{"Name": "dim1", "Value": "v1"}]
 | 
						|
    dimensions2 = dimensions1 + [{"Name": "dim2", "Value": "v2"}]
 | 
						|
    metric_name = "metr-stats-dims"
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName=metric_name,
 | 
						|
                Value=1,
 | 
						|
                Timestamp=utc_now,
 | 
						|
                Dimensions=dimensions1,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName=metric_name,
 | 
						|
                Value=2,
 | 
						|
                Timestamp=utc_now,
 | 
						|
                Dimensions=dimensions1,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName=metric_name,
 | 
						|
                Value=6,
 | 
						|
                Timestamp=utc_now,
 | 
						|
                Dimensions=dimensions2,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    # list of (<kwargs>, <expectedSum>, <expectedAverage>)
 | 
						|
    params_list = (
 | 
						|
        # get metric stats with no restriction on dimensions
 | 
						|
        ({}, 9, 3),
 | 
						|
        # get metric stats for dimensions1 (should also cover dimensions2)
 | 
						|
        ({"Dimensions": dimensions1}, 9, 3),
 | 
						|
        # get metric stats for dimensions2 only
 | 
						|
        ({"Dimensions": dimensions2}, 6, 6),
 | 
						|
    )
 | 
						|
 | 
						|
    for params in params_list:
 | 
						|
        stats = conn.get_metric_statistics(
 | 
						|
            Namespace="tester",
 | 
						|
            MetricName=metric_name,
 | 
						|
            StartTime=utc_now - timedelta(seconds=60),
 | 
						|
            EndTime=utc_now + timedelta(seconds=60),
 | 
						|
            Period=60,
 | 
						|
            Statistics=["Average", "Sum"],
 | 
						|
            **params[0],
 | 
						|
        )
 | 
						|
        stats["Datapoints"].should.have.length_of(1)
 | 
						|
        datapoint = stats["Datapoints"][0]
 | 
						|
        datapoint["Sum"].should.equal(params[1])
 | 
						|
        datapoint["Average"].should.equal(params[2])
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_duplicate_put_metric_data():
 | 
						|
    conn = boto3.client("cloudwatch", region_name="us-east-1")
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName="metric",
 | 
						|
                Dimensions=[{"Name": "Name", "Value": "B"}],
 | 
						|
                Value=1.5,
 | 
						|
                Timestamp=utc_now,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    result = conn.list_metrics(
 | 
						|
        Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
 | 
						|
    )["Metrics"]
 | 
						|
    len(result).should.equal(1)
 | 
						|
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName="metric",
 | 
						|
                Dimensions=[{"Name": "Name", "Value": "B"}],
 | 
						|
                Value=1.5,
 | 
						|
                Timestamp=utc_now,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    result = conn.list_metrics(
 | 
						|
        Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
 | 
						|
    )["Metrics"]
 | 
						|
    len(result).should.equal(1)
 | 
						|
    result.should.equal(
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "Namespace": "tester",
 | 
						|
                "MetricName": "metric",
 | 
						|
                "Dimensions": [{"Name": "Name", "Value": "B"}],
 | 
						|
            }
 | 
						|
        ]
 | 
						|
    )
 | 
						|
 | 
						|
    conn.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(
 | 
						|
                MetricName="metric",
 | 
						|
                Dimensions=[
 | 
						|
                    {"Name": "Name", "Value": "B"},
 | 
						|
                    {"Name": "Name", "Value": "C"},
 | 
						|
                ],
 | 
						|
                Value=1.5,
 | 
						|
                Timestamp=utc_now,
 | 
						|
            )
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    result = conn.list_metrics(
 | 
						|
        Namespace="tester", Dimensions=[{"Name": "Name", "Value": "B"}]
 | 
						|
    )["Metrics"]
 | 
						|
    result.should.equal(
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "Namespace": "tester",
 | 
						|
                "MetricName": "metric",
 | 
						|
                "Dimensions": [{"Name": "Name", "Value": "B"}],
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "Namespace": "tester",
 | 
						|
                "MetricName": "metric",
 | 
						|
                "Dimensions": [
 | 
						|
                    {"Name": "Name", "Value": "B"},
 | 
						|
                    {"Name": "Name", "Value": "C"},
 | 
						|
                ],
 | 
						|
            },
 | 
						|
        ]
 | 
						|
    )
 | 
						|
 | 
						|
    result = conn.list_metrics(
 | 
						|
        Namespace="tester", Dimensions=[{"Name": "Name", "Value": "C"}]
 | 
						|
    )["Metrics"]
 | 
						|
    result.should.equal(
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "Namespace": "tester",
 | 
						|
                "MetricName": "metric",
 | 
						|
                "Dimensions": [
 | 
						|
                    {"Name": "Name", "Value": "B"},
 | 
						|
                    {"Name": "Name", "Value": "C"},
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ]
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
@freeze_time("2020-02-10 18:44:05")
 | 
						|
def test_custom_timestamp():
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
    time = "2020-02-10T18:44:09Z"
 | 
						|
    cw = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
 | 
						|
    cw.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[dict(MetricName="metric1", Value=1.5, Timestamp=time)],
 | 
						|
    )
 | 
						|
 | 
						|
    cw.put_metric_data(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricData=[
 | 
						|
            dict(MetricName="metric2", Value=1.5, Timestamp=datetime(2020, 2, 10))
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    stats = cw.get_metric_statistics(
 | 
						|
        Namespace="tester",
 | 
						|
        MetricName="metric",
 | 
						|
        StartTime=utc_now - timedelta(seconds=60),
 | 
						|
        EndTime=utc_now + timedelta(seconds=60),
 | 
						|
        Period=60,
 | 
						|
        Statistics=["SampleCount", "Sum"],
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_list_metrics():
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
    # Verify namespace has to exist
 | 
						|
    res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
 | 
						|
    res.should.be.empty
 | 
						|
    # Create some metrics to filter on
 | 
						|
    create_metrics(cloudwatch, namespace="list_test_1/", metrics=4, data_points=2)
 | 
						|
    create_metrics(cloudwatch, namespace="list_test_2/", metrics=4, data_points=2)
 | 
						|
    # Verify we can retrieve everything
 | 
						|
    res = cloudwatch.list_metrics()["Metrics"]
 | 
						|
    len(res).should.equal(16)  # 2 namespaces * 4 metrics * 2 data points
 | 
						|
    # Verify we can filter by namespace/metric name
 | 
						|
    res = cloudwatch.list_metrics(Namespace="list_test_1/")["Metrics"]
 | 
						|
    len(res).should.equal(8)  # 1 namespace * 4 metrics * 2 data points
 | 
						|
    res = cloudwatch.list_metrics(Namespace="list_test_1/", MetricName="metric1")[
 | 
						|
        "Metrics"
 | 
						|
    ]
 | 
						|
    len(res).should.equal(2)  # 1 namespace * 1 metrics * 2 data points
 | 
						|
    # Verify format
 | 
						|
    res.should.equal(
 | 
						|
        [
 | 
						|
            {"Namespace": "list_test_1/", "Dimensions": [], "MetricName": "metric1",},
 | 
						|
            {"Namespace": "list_test_1/", "Dimensions": [], "MetricName": "metric1",},
 | 
						|
        ]
 | 
						|
    )
 | 
						|
    # Verify unknown namespace still has no results
 | 
						|
    res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
 | 
						|
    res.should.be.empty
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_list_metrics_paginated():
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
    # Verify that only a single page of metrics is returned
 | 
						|
    cloudwatch.list_metrics()["Metrics"].should.be.empty
 | 
						|
    # Verify we can't pass a random NextToken
 | 
						|
    with pytest.raises(ClientError) as e:
 | 
						|
        cloudwatch.list_metrics(NextToken=str(uuid4()))
 | 
						|
    e.value.response["Error"]["Message"].should.equal(
 | 
						|
        "Request parameter NextToken is invalid"
 | 
						|
    )
 | 
						|
    # Add a boatload of metrics
 | 
						|
    create_metrics(cloudwatch, namespace="test", metrics=100, data_points=1)
 | 
						|
    # Verify that a single page is returned until we've reached 500
 | 
						|
    first_page = cloudwatch.list_metrics()
 | 
						|
    first_page["Metrics"].shouldnt.be.empty
 | 
						|
    len(first_page["Metrics"]).should.equal(100)
 | 
						|
    create_metrics(cloudwatch, namespace="test", metrics=200, data_points=2)
 | 
						|
    first_page = cloudwatch.list_metrics()
 | 
						|
    len(first_page["Metrics"]).should.equal(500)
 | 
						|
    first_page.shouldnt.contain("NextToken")
 | 
						|
    # Verify that adding more data points results in pagination
 | 
						|
    create_metrics(cloudwatch, namespace="test", metrics=60, data_points=10)
 | 
						|
    first_page = cloudwatch.list_metrics()
 | 
						|
    len(first_page["Metrics"]).should.equal(500)
 | 
						|
    first_page["NextToken"].shouldnt.be.empty
 | 
						|
    # Retrieve second page - and verify there's more where that came from
 | 
						|
    second_page = cloudwatch.list_metrics(NextToken=first_page["NextToken"])
 | 
						|
    len(second_page["Metrics"]).should.equal(500)
 | 
						|
    second_page.should.contain("NextToken")
 | 
						|
    # Last page should only have the last 100 results, and no NextToken (indicating that pagination is finished)
 | 
						|
    third_page = cloudwatch.list_metrics(NextToken=second_page["NextToken"])
 | 
						|
    len(third_page["Metrics"]).should.equal(100)
 | 
						|
    third_page.shouldnt.contain("NextToken")
 | 
						|
    # Verify that we can't reuse an existing token
 | 
						|
    with pytest.raises(ClientError) as e:
 | 
						|
        cloudwatch.list_metrics(NextToken=first_page["NextToken"])
 | 
						|
    e.value.response["Error"]["Message"].should.equal(
 | 
						|
        "Request parameter NextToken is invalid"
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def create_metrics(cloudwatch, namespace, metrics=5, data_points=5):
 | 
						|
    for i in range(0, metrics):
 | 
						|
        metric_name = "metric" + str(i)
 | 
						|
        for j in range(0, data_points):
 | 
						|
            cloudwatch.put_metric_data(
 | 
						|
                Namespace=namespace,
 | 
						|
                MetricData=[{"MetricName": metric_name, "Value": j, "Unit": "Seconds"}],
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_metric_data_within_timeframe():
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
    namespace1 = "my_namespace/"
 | 
						|
    # put metric data
 | 
						|
    values = [0, 2, 4, 3.5, 7, 100]
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace1,
 | 
						|
        MetricData=[
 | 
						|
            {"MetricName": "metric1", "Value": val, "Unit": "Seconds"} for val in values
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    # get_metric_data
 | 
						|
    stats = ["Average", "Sum", "Minimum", "Maximum"]
 | 
						|
    response = cloudwatch.get_metric_data(
 | 
						|
        MetricDataQueries=[
 | 
						|
            {
 | 
						|
                "Id": "result_" + stat,
 | 
						|
                "MetricStat": {
 | 
						|
                    "Metric": {"Namespace": namespace1, "MetricName": "metric1"},
 | 
						|
                    "Period": 60,
 | 
						|
                    "Stat": stat,
 | 
						|
                },
 | 
						|
            }
 | 
						|
            for stat in stats
 | 
						|
        ],
 | 
						|
        StartTime=utc_now - timedelta(seconds=60),
 | 
						|
        EndTime=utc_now + timedelta(seconds=60),
 | 
						|
    )
 | 
						|
    #
 | 
						|
    # Assert Average/Min/Max/Sum is returned as expected
 | 
						|
    avg = [
 | 
						|
        res for res in response["MetricDataResults"] if res["Id"] == "result_Average"
 | 
						|
    ][0]
 | 
						|
    avg["Label"].should.equal("metric1 Average")
 | 
						|
    avg["StatusCode"].should.equal("Complete")
 | 
						|
    [int(val) for val in avg["Values"]].should.equal([19])
 | 
						|
 | 
						|
    sum_ = [res for res in response["MetricDataResults"] if res["Id"] == "result_Sum"][
 | 
						|
        0
 | 
						|
    ]
 | 
						|
    sum_["Label"].should.equal("metric1 Sum")
 | 
						|
    sum_["StatusCode"].should.equal("Complete")
 | 
						|
    [val for val in sum_["Values"]].should.equal([sum(values)])
 | 
						|
 | 
						|
    min_ = [
 | 
						|
        res for res in response["MetricDataResults"] if res["Id"] == "result_Minimum"
 | 
						|
    ][0]
 | 
						|
    min_["Label"].should.equal("metric1 Minimum")
 | 
						|
    min_["StatusCode"].should.equal("Complete")
 | 
						|
    [int(val) for val in min_["Values"]].should.equal([0])
 | 
						|
 | 
						|
    max_ = [
 | 
						|
        res for res in response["MetricDataResults"] if res["Id"] == "result_Maximum"
 | 
						|
    ][0]
 | 
						|
    max_["Label"].should.equal("metric1 Maximum")
 | 
						|
    max_["StatusCode"].should.equal("Complete")
 | 
						|
    [int(val) for val in max_["Values"]].should.equal([100])
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_metric_data_partially_within_timeframe():
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
    yesterday = utc_now - timedelta(days=1)
 | 
						|
    last_week = utc_now - timedelta(days=7)
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
    namespace1 = "my_namespace/"
 | 
						|
    # put metric data
 | 
						|
    values = [0, 2, 4, 3.5, 7, 100]
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace1,
 | 
						|
        MetricData=[
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 10,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": utc_now,
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace1,
 | 
						|
        MetricData=[
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 20,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": yesterday,
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace1,
 | 
						|
        MetricData=[
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 50,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": last_week,
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 10,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": last_week + timedelta(seconds=10),
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 20,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": last_week + timedelta(seconds=15),
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 40,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": last_week + timedelta(seconds=30),
 | 
						|
            },
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    # data for average, min, max
 | 
						|
 | 
						|
    def get_data(start, end, stat="Sum", scanBy="TimestampAscending"):
 | 
						|
        # get_metric_data
 | 
						|
        response = cloudwatch.get_metric_data(
 | 
						|
            MetricDataQueries=[
 | 
						|
                {
 | 
						|
                    "Id": "result",
 | 
						|
                    "MetricStat": {
 | 
						|
                        "Metric": {"Namespace": namespace1, "MetricName": "metric1"},
 | 
						|
                        "Period": 60,
 | 
						|
                        "Stat": stat,
 | 
						|
                    },
 | 
						|
                }
 | 
						|
            ],
 | 
						|
            StartTime=start,
 | 
						|
            EndTime=end,
 | 
						|
            ScanBy=scanBy,
 | 
						|
        )
 | 
						|
        return response
 | 
						|
 | 
						|
    response = get_data(
 | 
						|
        start=yesterday - timedelta(seconds=60), end=utc_now + timedelta(seconds=60),
 | 
						|
    )
 | 
						|
 | 
						|
    # Assert Last week's data is not returned
 | 
						|
    len(response["MetricDataResults"]).should.equal(1)
 | 
						|
    sum_ = response["MetricDataResults"][0]
 | 
						|
    sum_["Label"].should.equal("metric1 Sum")
 | 
						|
    sum_["StatusCode"].should.equal("Complete")
 | 
						|
    sum_["Values"].should.equal([20.0, 10.0])
 | 
						|
    response = get_data(
 | 
						|
        start=yesterday - timedelta(seconds=60),
 | 
						|
        end=utc_now + timedelta(seconds=60),
 | 
						|
        scanBy="TimestampDescending",
 | 
						|
    )
 | 
						|
    response["MetricDataResults"][0]["Values"].should.equal([10.0, 20.0])
 | 
						|
 | 
						|
    response = get_data(
 | 
						|
        start=last_week - timedelta(seconds=1),
 | 
						|
        end=utc_now + timedelta(seconds=60),
 | 
						|
        stat="Average",
 | 
						|
    )
 | 
						|
    # assert average
 | 
						|
    response["MetricDataResults"][0]["Values"].should.equal([30.0, 20.0, 10.0])
 | 
						|
 | 
						|
    response = get_data(
 | 
						|
        start=last_week - timedelta(seconds=1),
 | 
						|
        end=utc_now + timedelta(seconds=60),
 | 
						|
        stat="Maximum",
 | 
						|
    )
 | 
						|
    # assert maximum
 | 
						|
    response["MetricDataResults"][0]["Values"].should.equal([50.0, 20.0, 10.0])
 | 
						|
 | 
						|
    response = get_data(
 | 
						|
        start=last_week - timedelta(seconds=1),
 | 
						|
        end=utc_now + timedelta(seconds=60),
 | 
						|
        stat="Minimum",
 | 
						|
    )
 | 
						|
    # assert minimum
 | 
						|
    response["MetricDataResults"][0]["Values"].should.equal([10.0, 20.0, 10.0])
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_metric_data_outside_timeframe():
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
    last_week = utc_now - timedelta(days=7)
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
    namespace1 = "my_namespace/"
 | 
						|
    # put metric data
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace1,
 | 
						|
        MetricData=[
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 50,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": last_week,
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    # get_metric_data
 | 
						|
    response = cloudwatch.get_metric_data(
 | 
						|
        MetricDataQueries=[
 | 
						|
            {
 | 
						|
                "Id": "result",
 | 
						|
                "MetricStat": {
 | 
						|
                    "Metric": {"Namespace": namespace1, "MetricName": "metric1"},
 | 
						|
                    "Period": 60,
 | 
						|
                    "Stat": "Sum",
 | 
						|
                },
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        StartTime=utc_now - timedelta(seconds=60),
 | 
						|
        EndTime=utc_now + timedelta(seconds=60),
 | 
						|
    )
 | 
						|
    #
 | 
						|
    # Assert Last week's data is not returned
 | 
						|
    len(response["MetricDataResults"]).should.equal(1)
 | 
						|
    response["MetricDataResults"][0]["Id"].should.equal("result")
 | 
						|
    response["MetricDataResults"][0]["StatusCode"].should.equal("Complete")
 | 
						|
    response["MetricDataResults"][0]["Values"].should.equal([])
 | 
						|
 | 
						|
 | 
						|
@mock_cloudwatch
 | 
						|
def test_get_metric_data_for_multiple_metrics():
 | 
						|
    utc_now = datetime.now(tz=pytz.utc)
 | 
						|
    cloudwatch = boto3.client("cloudwatch", "eu-west-1")
 | 
						|
    namespace = "my_namespace/"
 | 
						|
    # put metric data
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace,
 | 
						|
        MetricData=[
 | 
						|
            {
 | 
						|
                "MetricName": "metric1",
 | 
						|
                "Value": 50,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": utc_now,
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    cloudwatch.put_metric_data(
 | 
						|
        Namespace=namespace,
 | 
						|
        MetricData=[
 | 
						|
            {
 | 
						|
                "MetricName": "metric2",
 | 
						|
                "Value": 25,
 | 
						|
                "Unit": "Seconds",
 | 
						|
                "Timestamp": utc_now,
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    # get_metric_data
 | 
						|
    response = cloudwatch.get_metric_data(
 | 
						|
        MetricDataQueries=[
 | 
						|
            {
 | 
						|
                "Id": "result1",
 | 
						|
                "MetricStat": {
 | 
						|
                    "Metric": {"Namespace": namespace, "MetricName": "metric1"},
 | 
						|
                    "Period": 60,
 | 
						|
                    "Stat": "Sum",
 | 
						|
                },
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "Id": "result2",
 | 
						|
                "MetricStat": {
 | 
						|
                    "Metric": {"Namespace": namespace, "MetricName": "metric2"},
 | 
						|
                    "Period": 60,
 | 
						|
                    "Stat": "Sum",
 | 
						|
                },
 | 
						|
            },
 | 
						|
        ],
 | 
						|
        StartTime=utc_now - timedelta(seconds=60),
 | 
						|
        EndTime=utc_now + timedelta(seconds=60),
 | 
						|
    )
 | 
						|
    #
 | 
						|
    len(response["MetricDataResults"]).should.equal(2)
 | 
						|
 | 
						|
    res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0]
 | 
						|
    res1["Values"].should.equal([50.0])
 | 
						|
 | 
						|
    res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0]
 | 
						|
    res2["Values"].should.equal([25.0])
 |