TechDebt: MyPy CloudWatch (#5623)

This commit is contained in:
Bert Blommers 2022-10-31 09:17:05 -01:00 committed by GitHub
parent 0f9a907af0
commit 15891efcef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 232 additions and 174 deletions

View File

@ -4,40 +4,40 @@ from moto.core.exceptions import RESTError
class InvalidFormat(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)
def __init__(self, message: str):
super().__init__(InvalidFormat.__name__, message)
class InvalidParameterValue(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)
def __init__(self, message: str):
super().__init__(InvalidParameterValue.__name__, message)
class InvalidParameterCombination(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)
def __init__(self, message: str):
super().__init__(InvalidParameterCombination.__name__, message)
class ResourceNotFound(RESTError):
code = 404
def __init__(self):
super().__init__(__class__.__name__, "Unknown")
def __init__(self) -> None:
super().__init__(ResourceNotFound.__name__, "Unknown")
class ResourceNotFoundException(RESTError):
code = 404
def __init__(self):
super().__init__(__class__.__name__, "Unknown")
def __init__(self) -> None:
super().__init__(ResourceNotFoundException.__name__, "Unknown")
class ValidationError(RESTError):
code = 400
def __init__(self, message):
super().__init__(__class__.__name__, message)
def __init__(self, message: str):
super().__init__(ValidationError.__name__, message)

View File

@ -1,4 +1,5 @@
import json
import statistics
from moto.core import BaseBackend, BaseModel, CloudWatchMetricProvider
from moto.core.utils import (
@ -19,37 +20,37 @@ from .exceptions import (
)
from .utils import make_arn_for_dashboard, make_arn_for_alarm
from dateutil import parser
from typing import Tuple, Optional, List, Iterable, Dict, Any, SupportsFloat
from ..utilities.tagging_service import TaggingService
_EMPTY_LIST = tuple()
_EMPTY_LIST: Any = tuple()
class Dimension(object):
def __init__(self, name, value):
def __init__(self, name: Optional[str], value: Optional[str]):
self.name = name
self.value = value
def __eq__(self, item):
def __eq__(self, item: Any) -> bool:
if isinstance(item, Dimension):
return self.name == item.name and (
self.value is None or item.value is None or self.value == item.value
)
return False
def __lt__(self, other):
return self.name < other.name and self.value < other.name
def __lt__(self, other: "Dimension") -> bool:
return self.name < other.name and self.value < other.name # type: ignore[operator]
class Metric(object):
def __init__(self, metric_name, namespace, dimensions):
def __init__(self, metric_name: str, namespace: str, dimensions: List[Dimension]):
self.metric_name = metric_name
self.namespace = namespace
self.dimensions = dimensions
class MetricStat(object):
def __init__(self, metric, period, stat, unit):
def __init__(self, metric: Metric, period: str, stat: str, unit: str):
self.metric = metric
self.period = period
self.stat = stat
@ -58,7 +59,13 @@ class MetricStat(object):
class MetricDataQuery(object):
def __init__(
self, query_id, label, period, return_data, expression=None, metric_stat=None
self,
query_id: str,
label: str,
period: str,
return_data: str,
expression: Optional[str] = None,
metric_stat: Optional[MetricStat] = None,
):
self.id = query_id
self.label = label
@ -68,7 +75,12 @@ class MetricDataQuery(object):
self.metric_stat = metric_stat
def daterange(start, stop, step=timedelta(days=1), inclusive=False):
def daterange(
start: datetime,
stop: datetime,
step: timedelta = timedelta(days=1),
inclusive: bool = False,
) -> Iterable[datetime]:
"""
This method will iterate from `start` to `stop` datetimes with a timedelta step of `step`
(supports iteration forwards or backwards in time)
@ -99,30 +111,30 @@ def daterange(start, stop, step=timedelta(days=1), inclusive=False):
class FakeAlarm(BaseModel):
def __init__(
self,
account_id,
region_name,
name,
namespace,
metric_name,
metric_data_queries,
comparison_operator,
evaluation_periods,
datapoints_to_alarm,
period,
threshold,
statistic,
extended_statistic,
description,
dimensions,
alarm_actions,
ok_actions,
insufficient_data_actions,
unit,
actions_enabled,
treat_missing_data,
evaluate_low_sample_count_percentile,
threshold_metric_id,
rule=None,
account_id: str,
region_name: str,
name: str,
namespace: str,
metric_name: str,
metric_data_queries: List[MetricDataQuery],
comparison_operator: str,
evaluation_periods: int,
datapoints_to_alarm: int,
period: int,
threshold: float,
statistic: str,
extended_statistic: str,
description: str,
dimensions: List[Dict[str, str]],
alarm_actions: List[str],
ok_actions: List[str],
insufficient_data_actions: List[str],
unit: str,
actions_enabled: bool,
treat_missing_data: str,
evaluate_low_sample_count_percentile: str,
threshold_metric_id: str,
rule: str,
):
self.region_name = region_name
self.name = name
@ -153,7 +165,7 @@ class FakeAlarm(BaseModel):
self.evaluate_low_sample_count_percentile = evaluate_low_sample_count_percentile
self.threshold_metric_id = threshold_metric_id
self.history = []
self.history: List[Any] = []
self.state_reason = "Unchecked: Initial alarm creation"
self.state_reason_data = "{}"
@ -165,7 +177,7 @@ class FakeAlarm(BaseModel):
# only used for composite alarms
self.rule = rule
def update_state(self, reason, reason_data, state_value):
def update_state(self, reason: str, reason_data: str, state_value: str) -> None:
# History type, that then decides what the rest of the items are, can be one of ConfigurationUpdate | StateUpdate | Action
self.history.append(
(
@ -185,7 +197,9 @@ class FakeAlarm(BaseModel):
)
def are_dimensions_same(metric_dimensions, dimensions):
def are_dimensions_same(
metric_dimensions: List[Dimension], dimensions: List[Dimension]
) -> bool:
if len(metric_dimensions) != len(dimensions):
return False
for dimension in metric_dimensions:
@ -199,7 +213,15 @@ def are_dimensions_same(metric_dimensions, dimensions):
class MetricDatum(BaseModel):
def __init__(self, namespace, name, value, dimensions, timestamp, unit=None):
def __init__(
self,
namespace: str,
name: str,
value: float,
dimensions: List[Dict[str, str]],
timestamp: datetime,
unit: Any = None,
):
self.namespace = namespace
self.name = name
self.value = value
@ -209,7 +231,13 @@ class MetricDatum(BaseModel):
]
self.unit = unit
def filter(self, namespace, name, dimensions, already_present_metrics=None):
def filter(
self,
namespace: Optional[str],
name: Optional[str],
dimensions: List[Dict[str, str]],
already_present_metrics: Optional[List["MetricDatum"]] = None,
) -> bool:
if namespace and namespace != self.namespace:
return False
if name and name != self.name:
@ -235,7 +263,7 @@ class MetricDatum(BaseModel):
class Dashboard(BaseModel):
def __init__(self, account_id, name, body):
def __init__(self, account_id: str, name: str, body: str):
# Guaranteed to be unique for now as the name is also the key of a dictionary where they are stored
self.arn = make_arn_for_dashboard(account_id, name)
self.name = name
@ -243,75 +271,76 @@ class Dashboard(BaseModel):
self.last_modified = datetime.now()
@property
def last_modified_iso(self):
def last_modified_iso(self) -> str:
return self.last_modified.isoformat()
@property
def size(self):
def size(self) -> int:
return len(self)
def __len__(self):
def __len__(self) -> int:
return len(self.body)
def __repr__(self):
def __repr__(self) -> str:
return "<CloudWatchDashboard {0}>".format(self.name)
class Statistics:
def __init__(self, stats, dt):
def __init__(self, stats: List[str], dt: datetime):
self.timestamp = iso_8601_datetime_without_milliseconds(dt)
self.values = []
self.values: List[float] = []
self.stats = stats
self.unit = None
@property
def sample_count(self):
def sample_count(self) -> Optional[SupportsFloat]:
if "SampleCount" not in self.stats:
return None
return len(self.values)
@property
def sum(self):
def sum(self) -> Optional[SupportsFloat]:
if "Sum" not in self.stats:
return None
return sum(self.values)
@property
def minimum(self):
def minimum(self) -> Optional[SupportsFloat]:
if "Minimum" not in self.stats:
return None
return min(self.values)
@property
def maximum(self):
def maximum(self) -> Optional[SupportsFloat]:
if "Maximum" not in self.stats:
return None
return max(self.values)
@property
def average(self):
def average(self) -> Optional[SupportsFloat]:
if "Average" not in self.stats:
return None
# when moto is 3.4+ we can switch to the statistics module
return sum(self.values) / len(self.values)
return statistics.mean(self.values)
class CloudWatchBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.alarms = {}
self.dashboards = {}
self.metric_data = []
self.paged_metric_data = {}
self.alarms: Dict[str, FakeAlarm] = {}
self.dashboards: Dict[str, Dashboard] = {}
self.metric_data: List[MetricDatum] = []
self.paged_metric_data: Dict[str, List[MetricDatum]] = {}
self.tagger = TaggingService()
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
def default_vpc_endpoint_service(
service_region: str, zones: List[str]
) -> List[Dict[str, str]]:
"""Default VPC endpoint service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "monitoring"
@ -320,7 +349,7 @@ class CloudWatchBackend(BaseBackend):
@property
# Retrieve a list of all OOTB metrics that are provided by metrics providers
# Computed on the fly
def aws_metric_data(self):
def aws_metric_data(self) -> List[MetricDatum]:
providers = CloudWatchMetricProvider.__subclasses__()
md = []
for provider in providers:
@ -329,30 +358,30 @@ class CloudWatchBackend(BaseBackend):
def put_metric_alarm(
self,
name,
namespace,
metric_name,
metric_data_queries,
comparison_operator,
evaluation_periods,
datapoints_to_alarm,
period,
threshold,
statistic,
extended_statistic,
description,
dimensions,
alarm_actions,
ok_actions,
insufficient_data_actions,
unit,
actions_enabled,
treat_missing_data,
evaluate_low_sample_count_percentile,
threshold_metric_id,
rule=None,
tags=None,
):
name: str,
namespace: str,
metric_name: str,
metric_data_queries: List[MetricDataQuery],
comparison_operator: str,
evaluation_periods: int,
datapoints_to_alarm: int,
period: int,
threshold: float,
statistic: str,
extended_statistic: str,
description: str,
dimensions: List[Dict[str, str]],
alarm_actions: List[str],
ok_actions: List[str],
insufficient_data_actions: List[str],
unit: str,
actions_enabled: bool,
treat_missing_data: str,
evaluate_low_sample_count_percentile: str,
threshold_metric_id: str,
rule: str,
tags: List[Dict[str, str]],
) -> FakeAlarm:
if extended_statistic and not extended_statistic.startswith("p"):
raise InvalidParameterValue(
f"The value {extended_statistic} for parameter ExtendedStatistic is not supported."
@ -398,18 +427,18 @@ class CloudWatchBackend(BaseBackend):
return alarm
def get_all_alarms(self):
def get_all_alarms(self) -> Iterable[FakeAlarm]:
return self.alarms.values()
@staticmethod
def _list_element_starts_with(items, needle):
def _list_element_starts_with(items: List[str], needle: str) -> bool:
"""True of any of the list elements starts with needle"""
for item in items:
if item.startswith(needle):
return True
return False
def get_alarms_by_action_prefix(self, action_prefix):
def get_alarms_by_action_prefix(self, action_prefix: str) -> Iterable[FakeAlarm]:
return [
alarm
for alarm in self.alarms.values()
@ -418,26 +447,28 @@ class CloudWatchBackend(BaseBackend):
)
]
def get_alarms_by_alarm_name_prefix(self, name_prefix):
def get_alarms_by_alarm_name_prefix(self, name_prefix: str) -> Iterable[FakeAlarm]:
return [
alarm
for alarm in self.alarms.values()
if alarm.name.startswith(name_prefix)
]
def get_alarms_by_alarm_names(self, alarm_names):
def get_alarms_by_alarm_names(self, alarm_names: List[str]) -> Iterable[FakeAlarm]:
return [alarm for alarm in self.alarms.values() if alarm.name in alarm_names]
def get_alarms_by_state_value(self, target_state):
def get_alarms_by_state_value(self, target_state: str) -> Iterable[FakeAlarm]:
return filter(
lambda alarm: alarm.state_value == target_state, self.alarms.values()
)
def delete_alarms(self, alarm_names):
def delete_alarms(self, alarm_names: List[str]) -> None:
for alarm_name in alarm_names:
self.alarms.pop(alarm_name, None)
def put_metric_data(self, namespace, metric_data):
def put_metric_data(
self, namespace: str, metric_data: List[Dict[str, Any]]
) -> None:
for i, metric in enumerate(metric_data):
if metric.get("Value") == "NaN":
raise InvalidParameterValue(
@ -461,8 +492,12 @@ class CloudWatchBackend(BaseBackend):
)
def get_metric_data(
self, queries, start_time, end_time, scan_by="TimestampAscending"
):
self,
queries: List[Dict[str, Any]],
start_time: datetime,
end_time: datetime,
scan_by: str = "TimestampAscending",
) -> List[Dict[str, Any]]:
period_data = [
md for md in self.metric_data if start_time <= md.timestamp <= end_time
@ -475,8 +510,8 @@ class CloudWatchBackend(BaseBackend):
query_name = query["metric_stat._metric._metric_name"]
delta = timedelta(seconds=int(query["metric_stat._period"]))
dimensions = self._extract_dimensions_from_get_metric_data_query(query)
result_vals = []
timestamps = []
result_vals: List[SupportsFloat] = []
timestamps: List[str] = []
stat = query["metric_stat._stat"]
while period_start_time <= end_time:
period_end_time = period_start_time + delta
@ -513,7 +548,7 @@ class CloudWatchBackend(BaseBackend):
elif stat == "Sum":
result_vals.append(sum(metric_values))
timestamps.append(
iso_8601_datetime_without_milliseconds(period_start_time)
iso_8601_datetime_without_milliseconds(period_start_time) # type: ignore[arg-type]
)
period_start_time += delta
if scan_by == "TimestampDescending" and len(timestamps) > 0:
@ -532,15 +567,15 @@ class CloudWatchBackend(BaseBackend):
def get_metric_statistics(
self,
namespace,
metric_name,
start_time,
end_time,
period,
stats,
dimensions,
unit=None,
):
namespace: str,
metric_name: str,
start_time: datetime,
end_time: datetime,
period: int,
stats: List[str],
dimensions: List[Dict[str, str]],
unit: Optional[str] = None,
) -> List[Statistics]:
period_delta = timedelta(seconds=period)
filtered_data = [
md
@ -563,7 +598,7 @@ class CloudWatchBackend(BaseBackend):
return []
idx = 0
data = list()
data: List[Statistics] = list()
for dt in daterange(
filtered_data[0].timestamp,
filtered_data[-1].timestamp + period_delta,
@ -584,40 +619,38 @@ class CloudWatchBackend(BaseBackend):
return data
def get_all_metrics(self):
def get_all_metrics(self) -> List[MetricDatum]:
return self.metric_data + self.aws_metric_data
def put_dashboard(self, name, body):
def put_dashboard(self, name: str, body: str) -> None:
self.dashboards[name] = Dashboard(self.account_id, name, body)
def list_dashboards(self, prefix=""):
def list_dashboards(self, prefix: str = "") -> Iterable[Dashboard]:
for key, value in self.dashboards.items():
if key.startswith(prefix):
yield value
def delete_dashboards(self, dashboards):
def delete_dashboards(self, dashboards: List[str]) -> Optional[str]:
to_delete = set(dashboards)
all_dashboards = set(self.dashboards.keys())
left_over = to_delete - all_dashboards
if len(left_over) > 0:
# Some dashboards are not found
return (
False,
"The specified dashboard does not exist. [{0}]".format(
", ".join(left_over)
),
)
db_list = ", ".join(left_over)
return f"The specified dashboard does not exist. [{db_list}]"
for dashboard in to_delete:
del self.dashboards[dashboard]
return True, None
return None
def get_dashboard(self, dashboard):
def get_dashboard(self, dashboard: str) -> Optional[Dashboard]:
return self.dashboards.get(dashboard)
def set_alarm_state(self, alarm_name, reason, reason_data, state_value):
def set_alarm_state(
self, alarm_name: str, reason: str, reason_data: str, state_value: str
) -> None:
try:
if reason_data is not None:
json.loads(reason_data)
@ -636,7 +669,13 @@ class CloudWatchBackend(BaseBackend):
self.alarms[alarm_name].update_state(reason, reason_data, state_value)
def list_metrics(self, next_token, namespace, metric_name, dimensions):
def list_metrics(
self,
next_token: Optional[str],
namespace: str,
metric_name: str,
dimensions: List[Dict[str, str]],
) -> Tuple[Optional[str], List[MetricDatum]]:
if next_token:
if next_token not in self.paged_metric_data:
raise InvalidParameterValue("Request parameter NextToken is invalid")
@ -648,9 +687,11 @@ class CloudWatchBackend(BaseBackend):
metrics = self.get_filtered_metrics(metric_name, namespace, dimensions)
return self._get_paginated(metrics)
def get_filtered_metrics(self, metric_name, namespace, dimensions):
def get_filtered_metrics(
self, metric_name: str, namespace: str, dimensions: List[Dict[str, str]]
) -> List[MetricDatum]:
metrics = self.get_all_metrics()
new_metrics = []
new_metrics: List[MetricDatum] = []
for md in metrics:
if md.filter(
namespace=namespace,
@ -661,10 +702,10 @@ class CloudWatchBackend(BaseBackend):
new_metrics.append(md)
return new_metrics
def list_tags_for_resource(self, arn):
def list_tags_for_resource(self, arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(arn)
def tag_resource(self, arn, tags):
def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
# From boto3:
# Currently, the only CloudWatch resources that can be tagged are alarms and Contributor Insights rules.
all_arns = [alarm.alarm_arn for alarm in self.get_all_alarms()]
@ -673,13 +714,15 @@ class CloudWatchBackend(BaseBackend):
self.tagger.tag_resource(arn, tags)
def untag_resource(self, arn, tag_keys):
def untag_resource(self, arn: str, tag_keys: List[str]) -> None:
if arn not in self.tagger.tags.keys():
raise ResourceNotFoundException
self.tagger.untag_resource_using_names(arn, tag_keys)
def _get_paginated(self, metrics):
def _get_paginated(
self, metrics: List[MetricDatum]
) -> Tuple[Optional[str], List[MetricDatum]]:
if len(metrics) > 500:
next_token = str(mock_random.uuid4())
self.paged_metric_data[next_token] = metrics[500:]
@ -687,7 +730,9 @@ class CloudWatchBackend(BaseBackend):
else:
return None, metrics
def _extract_dimensions_from_get_metric_data_query(self, query):
def _extract_dimensions_from_get_metric_data_query(
self, query: Dict[str, str]
) -> List[Dimension]:
dimensions = []
prefix = "metric_stat._metric._dimensions.member."
suffix_name = "._name"

View File

@ -1,27 +1,38 @@
import json
from dateutil.parser import parse as dtparse
from typing import Dict, List, Iterable, Tuple, Union
from moto.core.responses import BaseResponse
from moto.utilities.aws_headers import amzn_request_id
from .models import cloudwatch_backends, MetricDataQuery, MetricStat, Metric, Dimension
from .models import (
cloudwatch_backends,
CloudWatchBackend,
MetricDataQuery,
MetricStat,
Metric,
Dimension,
FakeAlarm,
)
from .exceptions import InvalidParameterCombination
ERROR_RESPONSE = Tuple[str, Dict[str, int]]
class CloudWatchResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="cloudwatch")
@property
def cloudwatch_backend(self):
def cloudwatch_backend(self) -> CloudWatchBackend:
return cloudwatch_backends[self.current_account][self.region]
def _error(self, code, message, status=400):
def _error(self, code: str, message: str, status: int = 400) -> ERROR_RESPONSE:
template = self.response_template(ERROR_RESPONSE_TEMPLATE)
return template.render(code=code, message=message), dict(status=status)
@amzn_request_id
def put_metric_alarm(self):
def put_metric_alarm(self) -> str:
name = self._get_param("AlarmName")
namespace = self._get_param("Namespace")
metric_name = self._get_param("MetricName")
@ -30,14 +41,14 @@ class CloudWatchResponse(BaseResponse):
if metrics:
metric_data_queries = []
for metric in metrics:
dimensions = []
metric_dimensions = []
dims = (
metric.get("MetricStat", {})
.get("Metric", {})
.get("Dimensions.member", [])
)
for dim in dims:
dimensions.append(
metric_dimensions.append(
Dimension(name=dim.get("Name"), value=dim.get("Value"))
)
metric_stat = None
@ -51,7 +62,7 @@ class CloudWatchResponse(BaseResponse):
metric=Metric(
metric_name=stat_metric_name,
namespace=stat_metric_ns,
dimensions=dimensions,
dimensions=metric_dimensions,
),
period=stat_details.get("Period"),
stat=stat_details.get("Stat"),
@ -121,7 +132,7 @@ class CloudWatchResponse(BaseResponse):
return template.render(alarm=alarm)
@amzn_request_id
def describe_alarms(self):
def describe_alarms(self) -> str:
action_prefix = self._get_param("ActionPrefix")
alarm_name_prefix = self._get_param("AlarmNamePrefix")
alarm_names = self._get_multi_param("AlarmNames.member")
@ -149,14 +160,14 @@ class CloudWatchResponse(BaseResponse):
)
@amzn_request_id
def delete_alarms(self):
def delete_alarms(self) -> str:
alarm_names = self._get_multi_param("AlarmNames.member")
self.cloudwatch_backend.delete_alarms(alarm_names)
template = self.response_template(DELETE_METRIC_ALARMS_TEMPLATE)
return template.render()
@amzn_request_id
def put_metric_data(self):
def put_metric_data(self) -> str:
namespace = self._get_param("Namespace")
metric_data = self._get_multi_param("MetricData.member")
self.cloudwatch_backend.put_metric_data(namespace, metric_data)
@ -164,7 +175,7 @@ class CloudWatchResponse(BaseResponse):
return template.render()
@amzn_request_id
def get_metric_data(self):
def get_metric_data(self) -> str:
start = dtparse(self._get_param("StartTime"))
end = dtparse(self._get_param("EndTime"))
scan_by = self._get_param("ScanBy")
@ -178,7 +189,7 @@ class CloudWatchResponse(BaseResponse):
return template.render(results=results)
@amzn_request_id
def get_metric_statistics(self):
def get_metric_statistics(self) -> str:
namespace = self._get_param("Namespace")
metric_name = self._get_param("MetricName")
start_time = dtparse(self._get_param("StartTime"))
@ -210,7 +221,7 @@ class CloudWatchResponse(BaseResponse):
return template.render(label=metric_name, datapoints=datapoints)
@amzn_request_id
def list_metrics(self):
def list_metrics(self) -> str:
namespace = self._get_param("Namespace")
metric_name = self._get_param("MetricName")
dimensions = self._get_params().get("Dimensions", [])
@ -222,24 +233,26 @@ class CloudWatchResponse(BaseResponse):
return template.render(metrics=metrics, next_token=next_token)
@amzn_request_id
def delete_dashboards(self):
def delete_dashboards(self) -> Union[str, ERROR_RESPONSE]:
dashboards = self._get_multi_param("DashboardNames.member")
if dashboards is None:
return self._error("InvalidParameterValue", "Need at least 1 dashboard")
status, error = self.cloudwatch_backend.delete_dashboards(dashboards)
if not status:
error = self.cloudwatch_backend.delete_dashboards(dashboards)
if error is not None:
return self._error("ResourceNotFound", error)
template = self.response_template(DELETE_DASHBOARD_TEMPLATE)
return template.render()
@amzn_request_id
def describe_alarm_history(self):
def describe_alarm_history(self) -> None:
raise NotImplementedError()
@staticmethod
def filter_alarms(alarms, metric_name, namespace):
def filter_alarms(
alarms: Iterable[FakeAlarm], metric_name: str, namespace: str
) -> List[FakeAlarm]:
metric_filtered_alarms = []
for alarm in alarms:
@ -248,7 +261,7 @@ class CloudWatchResponse(BaseResponse):
return metric_filtered_alarms
@amzn_request_id
def describe_alarms_for_metric(self):
def describe_alarms_for_metric(self) -> str:
alarms = self.cloudwatch_backend.get_all_alarms()
namespace = self._get_param("Namespace")
metric_name = self._get_param("MetricName")
@ -257,15 +270,15 @@ class CloudWatchResponse(BaseResponse):
return template.render(alarms=filtered_alarms)
@amzn_request_id
def disable_alarm_actions(self):
def disable_alarm_actions(self) -> str:
raise NotImplementedError()
@amzn_request_id
def enable_alarm_actions(self):
def enable_alarm_actions(self) -> str:
raise NotImplementedError()
@amzn_request_id
def get_dashboard(self):
def get_dashboard(self) -> Union[str, ERROR_RESPONSE]:
dashboard_name = self._get_param("DashboardName")
dashboard = self.cloudwatch_backend.get_dashboard(dashboard_name)
@ -276,7 +289,7 @@ class CloudWatchResponse(BaseResponse):
return template.render(dashboard=dashboard)
@amzn_request_id
def list_dashboards(self):
def list_dashboards(self) -> str:
prefix = self._get_param("DashboardNamePrefix", "")
dashboards = self.cloudwatch_backend.list_dashboards(prefix)
@ -285,7 +298,7 @@ class CloudWatchResponse(BaseResponse):
return template.render(dashboards=dashboards)
@amzn_request_id
def put_dashboard(self):
def put_dashboard(self) -> Union[str, ERROR_RESPONSE]:
name = self._get_param("DashboardName")
body = self._get_param("DashboardBody")
@ -300,7 +313,7 @@ class CloudWatchResponse(BaseResponse):
return template.render()
@amzn_request_id
def set_alarm_state(self):
def set_alarm_state(self) -> str:
alarm_name = self._get_param("AlarmName")
reason = self._get_param("StateReason")
reason_data = self._get_param("StateReasonData")
@ -314,7 +327,7 @@ class CloudWatchResponse(BaseResponse):
return template.render()
@amzn_request_id
def list_tags_for_resource(self):
def list_tags_for_resource(self) -> str:
resource_arn = self._get_param("ResourceARN")
tags = self.cloudwatch_backend.list_tags_for_resource(resource_arn)
@ -323,7 +336,7 @@ class CloudWatchResponse(BaseResponse):
return template.render(tags=tags)
@amzn_request_id
def tag_resource(self):
def tag_resource(self) -> str:
resource_arn = self._get_param("ResourceARN")
tags = self._get_multi_param("Tags.member")
@ -333,7 +346,7 @@ class CloudWatchResponse(BaseResponse):
return template.render()
@amzn_request_id
def untag_resource(self):
def untag_resource(self) -> str:
resource_arn = self._get_param("ResourceARN")
tag_keys = self._get_multi_param("TagKeys.member")

View File

@ -1,6 +1,6 @@
def make_arn_for_dashboard(account_id, name):
def make_arn_for_dashboard(account_id: str, name: str) -> str:
return "arn:aws:cloudwatch::{0}dashboard/{1}".format(account_id, name)
def make_arn_for_alarm(region, account_id, alarm_name):
def make_arn_for_alarm(region: str, account_id: str, alarm_name: str) -> str:
return "arn:aws:cloudwatch:{0}:{1}:alarm:{2}".format(region, account_id, alarm_name)

View File

@ -195,5 +195,5 @@ class ConfigQueryModel:
class CloudWatchMetricProvider(object):
@staticmethod
@abstractmethod
def get_cloudwatch_metrics(account_id):
def get_cloudwatch_metrics(account_id: str) -> Any:
pass

View File

@ -18,7 +18,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract