diff --git a/moto/cloudwatch/exceptions.py b/moto/cloudwatch/exceptions.py index 4196cc994..466d14723 100644 --- a/moto/cloudwatch/exceptions.py +++ b/moto/cloudwatch/exceptions.py @@ -4,40 +4,40 @@ from moto.core.exceptions import RESTError class InvalidFormat(RESTError): code = 400 - def __init__(self, message): - super().__init__(__class__.__name__, message) + def __init__(self, message: str): + super().__init__(InvalidFormat.__name__, message) class InvalidParameterValue(RESTError): code = 400 - def __init__(self, message): - super().__init__(__class__.__name__, message) + def __init__(self, message: str): + super().__init__(InvalidParameterValue.__name__, message) class InvalidParameterCombination(RESTError): code = 400 - def __init__(self, message): - super().__init__(__class__.__name__, message) + def __init__(self, message: str): + super().__init__(InvalidParameterCombination.__name__, message) class ResourceNotFound(RESTError): code = 404 - def __init__(self): - super().__init__(__class__.__name__, "Unknown") + def __init__(self) -> None: + super().__init__(ResourceNotFound.__name__, "Unknown") class ResourceNotFoundException(RESTError): code = 404 - def __init__(self): - super().__init__(__class__.__name__, "Unknown") + def __init__(self) -> None: + super().__init__(ResourceNotFoundException.__name__, "Unknown") class ValidationError(RESTError): code = 400 - def __init__(self, message): - super().__init__(__class__.__name__, message) + def __init__(self, message: str): + super().__init__(ValidationError.__name__, message) diff --git a/moto/cloudwatch/models.py b/moto/cloudwatch/models.py index 7f78edf95..3bd509b25 100644 --- a/moto/cloudwatch/models.py +++ b/moto/cloudwatch/models.py @@ -1,4 +1,5 @@ import json +import statistics from moto.core import BaseBackend, BaseModel, CloudWatchMetricProvider from moto.core.utils import ( @@ -19,37 +20,37 @@ from .exceptions import ( ) from .utils import make_arn_for_dashboard, make_arn_for_alarm from dateutil import parser - +from typing import Tuple, Optional, List, Iterable, Dict, Any, SupportsFloat from ..utilities.tagging_service import TaggingService -_EMPTY_LIST = tuple() +_EMPTY_LIST: Any = tuple() class Dimension(object): - def __init__(self, name, value): + def __init__(self, name: Optional[str], value: Optional[str]): self.name = name self.value = value - def __eq__(self, item): + def __eq__(self, item: Any) -> bool: if isinstance(item, Dimension): return self.name == item.name and ( self.value is None or item.value is None or self.value == item.value ) return False - def __lt__(self, other): - return self.name < other.name and self.value < other.name + def __lt__(self, other: "Dimension") -> bool: + return self.name < other.name and self.value < other.name # type: ignore[operator] class Metric(object): - def __init__(self, metric_name, namespace, dimensions): + def __init__(self, metric_name: str, namespace: str, dimensions: List[Dimension]): self.metric_name = metric_name self.namespace = namespace self.dimensions = dimensions class MetricStat(object): - def __init__(self, metric, period, stat, unit): + def __init__(self, metric: Metric, period: str, stat: str, unit: str): self.metric = metric self.period = period self.stat = stat @@ -58,7 +59,13 @@ class MetricStat(object): class MetricDataQuery(object): def __init__( - self, query_id, label, period, return_data, expression=None, metric_stat=None + self, + query_id: str, + label: str, + period: str, + return_data: str, + expression: Optional[str] = None, + metric_stat: Optional[MetricStat] = None, ): self.id = query_id self.label = label @@ -68,7 +75,12 @@ class MetricDataQuery(object): self.metric_stat = metric_stat -def daterange(start, stop, step=timedelta(days=1), inclusive=False): +def daterange( + start: datetime, + stop: datetime, + step: timedelta = timedelta(days=1), + inclusive: bool = False, +) -> Iterable[datetime]: """ This method will iterate from `start` to `stop` datetimes with a timedelta step of `step` (supports iteration forwards or backwards in time) @@ -99,30 +111,30 @@ def daterange(start, stop, step=timedelta(days=1), inclusive=False): class FakeAlarm(BaseModel): def __init__( self, - account_id, - region_name, - name, - namespace, - metric_name, - metric_data_queries, - comparison_operator, - evaluation_periods, - datapoints_to_alarm, - period, - threshold, - statistic, - extended_statistic, - description, - dimensions, - alarm_actions, - ok_actions, - insufficient_data_actions, - unit, - actions_enabled, - treat_missing_data, - evaluate_low_sample_count_percentile, - threshold_metric_id, - rule=None, + account_id: str, + region_name: str, + name: str, + namespace: str, + metric_name: str, + metric_data_queries: List[MetricDataQuery], + comparison_operator: str, + evaluation_periods: int, + datapoints_to_alarm: int, + period: int, + threshold: float, + statistic: str, + extended_statistic: str, + description: str, + dimensions: List[Dict[str, str]], + alarm_actions: List[str], + ok_actions: List[str], + insufficient_data_actions: List[str], + unit: str, + actions_enabled: bool, + treat_missing_data: str, + evaluate_low_sample_count_percentile: str, + threshold_metric_id: str, + rule: str, ): self.region_name = region_name self.name = name @@ -153,7 +165,7 @@ class FakeAlarm(BaseModel): self.evaluate_low_sample_count_percentile = evaluate_low_sample_count_percentile self.threshold_metric_id = threshold_metric_id - self.history = [] + self.history: List[Any] = [] self.state_reason = "Unchecked: Initial alarm creation" self.state_reason_data = "{}" @@ -165,7 +177,7 @@ class FakeAlarm(BaseModel): # only used for composite alarms self.rule = rule - def update_state(self, reason, reason_data, state_value): + def update_state(self, reason: str, reason_data: str, state_value: str) -> None: # History type, that then decides what the rest of the items are, can be one of ConfigurationUpdate | StateUpdate | Action self.history.append( ( @@ -185,7 +197,9 @@ class FakeAlarm(BaseModel): ) -def are_dimensions_same(metric_dimensions, dimensions): +def are_dimensions_same( + metric_dimensions: List[Dimension], dimensions: List[Dimension] +) -> bool: if len(metric_dimensions) != len(dimensions): return False for dimension in metric_dimensions: @@ -199,7 +213,15 @@ def are_dimensions_same(metric_dimensions, dimensions): class MetricDatum(BaseModel): - def __init__(self, namespace, name, value, dimensions, timestamp, unit=None): + def __init__( + self, + namespace: str, + name: str, + value: float, + dimensions: List[Dict[str, str]], + timestamp: datetime, + unit: Any = None, + ): self.namespace = namespace self.name = name self.value = value @@ -209,7 +231,13 @@ class MetricDatum(BaseModel): ] self.unit = unit - def filter(self, namespace, name, dimensions, already_present_metrics=None): + def filter( + self, + namespace: Optional[str], + name: Optional[str], + dimensions: List[Dict[str, str]], + already_present_metrics: Optional[List["MetricDatum"]] = None, + ) -> bool: if namespace and namespace != self.namespace: return False if name and name != self.name: @@ -235,7 +263,7 @@ class MetricDatum(BaseModel): class Dashboard(BaseModel): - def __init__(self, account_id, name, body): + def __init__(self, account_id: str, name: str, body: str): # Guaranteed to be unique for now as the name is also the key of a dictionary where they are stored self.arn = make_arn_for_dashboard(account_id, name) self.name = name @@ -243,75 +271,76 @@ class Dashboard(BaseModel): self.last_modified = datetime.now() @property - def last_modified_iso(self): + def last_modified_iso(self) -> str: return self.last_modified.isoformat() @property - def size(self): + def size(self) -> int: return len(self) - def __len__(self): + def __len__(self) -> int: return len(self.body) - def __repr__(self): + def __repr__(self) -> str: return "".format(self.name) class Statistics: - def __init__(self, stats, dt): + def __init__(self, stats: List[str], dt: datetime): self.timestamp = iso_8601_datetime_without_milliseconds(dt) - self.values = [] + self.values: List[float] = [] self.stats = stats self.unit = None @property - def sample_count(self): + def sample_count(self) -> Optional[SupportsFloat]: if "SampleCount" not in self.stats: return None return len(self.values) @property - def sum(self): + def sum(self) -> Optional[SupportsFloat]: if "Sum" not in self.stats: return None return sum(self.values) @property - def minimum(self): + def minimum(self) -> Optional[SupportsFloat]: if "Minimum" not in self.stats: return None return min(self.values) @property - def maximum(self): + def maximum(self) -> Optional[SupportsFloat]: if "Maximum" not in self.stats: return None return max(self.values) @property - def average(self): + def average(self) -> Optional[SupportsFloat]: if "Average" not in self.stats: return None - # when moto is 3.4+ we can switch to the statistics module - return sum(self.values) / len(self.values) + return statistics.mean(self.values) class CloudWatchBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.alarms = {} - self.dashboards = {} - self.metric_data = [] - self.paged_metric_data = {} + self.alarms: Dict[str, FakeAlarm] = {} + self.dashboards: Dict[str, Dashboard] = {} + self.metric_data: List[MetricDatum] = [] + self.paged_metric_data: Dict[str, List[MetricDatum]] = {} self.tagger = TaggingService() @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service( + service_region: str, zones: List[str] + ) -> List[Dict[str, str]]: """Default VPC endpoint service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "monitoring" @@ -320,7 +349,7 @@ class CloudWatchBackend(BaseBackend): @property # Retrieve a list of all OOTB metrics that are provided by metrics providers # Computed on the fly - def aws_metric_data(self): + def aws_metric_data(self) -> List[MetricDatum]: providers = CloudWatchMetricProvider.__subclasses__() md = [] for provider in providers: @@ -329,30 +358,30 @@ class CloudWatchBackend(BaseBackend): def put_metric_alarm( self, - name, - namespace, - metric_name, - metric_data_queries, - comparison_operator, - evaluation_periods, - datapoints_to_alarm, - period, - threshold, - statistic, - extended_statistic, - description, - dimensions, - alarm_actions, - ok_actions, - insufficient_data_actions, - unit, - actions_enabled, - treat_missing_data, - evaluate_low_sample_count_percentile, - threshold_metric_id, - rule=None, - tags=None, - ): + name: str, + namespace: str, + metric_name: str, + metric_data_queries: List[MetricDataQuery], + comparison_operator: str, + evaluation_periods: int, + datapoints_to_alarm: int, + period: int, + threshold: float, + statistic: str, + extended_statistic: str, + description: str, + dimensions: List[Dict[str, str]], + alarm_actions: List[str], + ok_actions: List[str], + insufficient_data_actions: List[str], + unit: str, + actions_enabled: bool, + treat_missing_data: str, + evaluate_low_sample_count_percentile: str, + threshold_metric_id: str, + rule: str, + tags: List[Dict[str, str]], + ) -> FakeAlarm: if extended_statistic and not extended_statistic.startswith("p"): raise InvalidParameterValue( f"The value {extended_statistic} for parameter ExtendedStatistic is not supported." @@ -398,18 +427,18 @@ class CloudWatchBackend(BaseBackend): return alarm - def get_all_alarms(self): + def get_all_alarms(self) -> Iterable[FakeAlarm]: return self.alarms.values() @staticmethod - def _list_element_starts_with(items, needle): + def _list_element_starts_with(items: List[str], needle: str) -> bool: """True of any of the list elements starts with needle""" for item in items: if item.startswith(needle): return True return False - def get_alarms_by_action_prefix(self, action_prefix): + def get_alarms_by_action_prefix(self, action_prefix: str) -> Iterable[FakeAlarm]: return [ alarm for alarm in self.alarms.values() @@ -418,26 +447,28 @@ class CloudWatchBackend(BaseBackend): ) ] - def get_alarms_by_alarm_name_prefix(self, name_prefix): + def get_alarms_by_alarm_name_prefix(self, name_prefix: str) -> Iterable[FakeAlarm]: return [ alarm for alarm in self.alarms.values() if alarm.name.startswith(name_prefix) ] - def get_alarms_by_alarm_names(self, alarm_names): + def get_alarms_by_alarm_names(self, alarm_names: List[str]) -> Iterable[FakeAlarm]: return [alarm for alarm in self.alarms.values() if alarm.name in alarm_names] - def get_alarms_by_state_value(self, target_state): + def get_alarms_by_state_value(self, target_state: str) -> Iterable[FakeAlarm]: return filter( lambda alarm: alarm.state_value == target_state, self.alarms.values() ) - def delete_alarms(self, alarm_names): + def delete_alarms(self, alarm_names: List[str]) -> None: for alarm_name in alarm_names: self.alarms.pop(alarm_name, None) - def put_metric_data(self, namespace, metric_data): + def put_metric_data( + self, namespace: str, metric_data: List[Dict[str, Any]] + ) -> None: for i, metric in enumerate(metric_data): if metric.get("Value") == "NaN": raise InvalidParameterValue( @@ -461,8 +492,12 @@ class CloudWatchBackend(BaseBackend): ) def get_metric_data( - self, queries, start_time, end_time, scan_by="TimestampAscending" - ): + self, + queries: List[Dict[str, Any]], + start_time: datetime, + end_time: datetime, + scan_by: str = "TimestampAscending", + ) -> List[Dict[str, Any]]: period_data = [ md for md in self.metric_data if start_time <= md.timestamp <= end_time @@ -475,8 +510,8 @@ class CloudWatchBackend(BaseBackend): query_name = query["metric_stat._metric._metric_name"] delta = timedelta(seconds=int(query["metric_stat._period"])) dimensions = self._extract_dimensions_from_get_metric_data_query(query) - result_vals = [] - timestamps = [] + result_vals: List[SupportsFloat] = [] + timestamps: List[str] = [] stat = query["metric_stat._stat"] while period_start_time <= end_time: period_end_time = period_start_time + delta @@ -513,7 +548,7 @@ class CloudWatchBackend(BaseBackend): elif stat == "Sum": result_vals.append(sum(metric_values)) timestamps.append( - iso_8601_datetime_without_milliseconds(period_start_time) + iso_8601_datetime_without_milliseconds(period_start_time) # type: ignore[arg-type] ) period_start_time += delta if scan_by == "TimestampDescending" and len(timestamps) > 0: @@ -532,15 +567,15 @@ class CloudWatchBackend(BaseBackend): def get_metric_statistics( self, - namespace, - metric_name, - start_time, - end_time, - period, - stats, - dimensions, - unit=None, - ): + namespace: str, + metric_name: str, + start_time: datetime, + end_time: datetime, + period: int, + stats: List[str], + dimensions: List[Dict[str, str]], + unit: Optional[str] = None, + ) -> List[Statistics]: period_delta = timedelta(seconds=period) filtered_data = [ md @@ -563,7 +598,7 @@ class CloudWatchBackend(BaseBackend): return [] idx = 0 - data = list() + data: List[Statistics] = list() for dt in daterange( filtered_data[0].timestamp, filtered_data[-1].timestamp + period_delta, @@ -584,40 +619,38 @@ class CloudWatchBackend(BaseBackend): return data - def get_all_metrics(self): + def get_all_metrics(self) -> List[MetricDatum]: return self.metric_data + self.aws_metric_data - def put_dashboard(self, name, body): + def put_dashboard(self, name: str, body: str) -> None: self.dashboards[name] = Dashboard(self.account_id, name, body) - def list_dashboards(self, prefix=""): + def list_dashboards(self, prefix: str = "") -> Iterable[Dashboard]: for key, value in self.dashboards.items(): if key.startswith(prefix): yield value - def delete_dashboards(self, dashboards): + def delete_dashboards(self, dashboards: List[str]) -> Optional[str]: to_delete = set(dashboards) all_dashboards = set(self.dashboards.keys()) left_over = to_delete - all_dashboards if len(left_over) > 0: # Some dashboards are not found - return ( - False, - "The specified dashboard does not exist. [{0}]".format( - ", ".join(left_over) - ), - ) + db_list = ", ".join(left_over) + return f"The specified dashboard does not exist. [{db_list}]" for dashboard in to_delete: del self.dashboards[dashboard] - return True, None + return None - def get_dashboard(self, dashboard): + def get_dashboard(self, dashboard: str) -> Optional[Dashboard]: return self.dashboards.get(dashboard) - def set_alarm_state(self, alarm_name, reason, reason_data, state_value): + def set_alarm_state( + self, alarm_name: str, reason: str, reason_data: str, state_value: str + ) -> None: try: if reason_data is not None: json.loads(reason_data) @@ -636,7 +669,13 @@ class CloudWatchBackend(BaseBackend): self.alarms[alarm_name].update_state(reason, reason_data, state_value) - def list_metrics(self, next_token, namespace, metric_name, dimensions): + def list_metrics( + self, + next_token: Optional[str], + namespace: str, + metric_name: str, + dimensions: List[Dict[str, str]], + ) -> Tuple[Optional[str], List[MetricDatum]]: if next_token: if next_token not in self.paged_metric_data: raise InvalidParameterValue("Request parameter NextToken is invalid") @@ -648,9 +687,11 @@ class CloudWatchBackend(BaseBackend): metrics = self.get_filtered_metrics(metric_name, namespace, dimensions) return self._get_paginated(metrics) - def get_filtered_metrics(self, metric_name, namespace, dimensions): + def get_filtered_metrics( + self, metric_name: str, namespace: str, dimensions: List[Dict[str, str]] + ) -> List[MetricDatum]: metrics = self.get_all_metrics() - new_metrics = [] + new_metrics: List[MetricDatum] = [] for md in metrics: if md.filter( namespace=namespace, @@ -661,10 +702,10 @@ class CloudWatchBackend(BaseBackend): new_metrics.append(md) return new_metrics - def list_tags_for_resource(self, arn): + def list_tags_for_resource(self, arn: str) -> Dict[str, str]: return self.tagger.get_tag_dict_for_resource(arn) - def tag_resource(self, arn, tags): + def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None: # From boto3: # Currently, the only CloudWatch resources that can be tagged are alarms and Contributor Insights rules. all_arns = [alarm.alarm_arn for alarm in self.get_all_alarms()] @@ -673,13 +714,15 @@ class CloudWatchBackend(BaseBackend): self.tagger.tag_resource(arn, tags) - def untag_resource(self, arn, tag_keys): + def untag_resource(self, arn: str, tag_keys: List[str]) -> None: if arn not in self.tagger.tags.keys(): raise ResourceNotFoundException self.tagger.untag_resource_using_names(arn, tag_keys) - def _get_paginated(self, metrics): + def _get_paginated( + self, metrics: List[MetricDatum] + ) -> Tuple[Optional[str], List[MetricDatum]]: if len(metrics) > 500: next_token = str(mock_random.uuid4()) self.paged_metric_data[next_token] = metrics[500:] @@ -687,7 +730,9 @@ class CloudWatchBackend(BaseBackend): else: return None, metrics - def _extract_dimensions_from_get_metric_data_query(self, query): + def _extract_dimensions_from_get_metric_data_query( + self, query: Dict[str, str] + ) -> List[Dimension]: dimensions = [] prefix = "metric_stat._metric._dimensions.member." suffix_name = "._name" diff --git a/moto/cloudwatch/responses.py b/moto/cloudwatch/responses.py index e340a66e3..ce728e869 100644 --- a/moto/cloudwatch/responses.py +++ b/moto/cloudwatch/responses.py @@ -1,27 +1,38 @@ import json from dateutil.parser import parse as dtparse - +from typing import Dict, List, Iterable, Tuple, Union from moto.core.responses import BaseResponse from moto.utilities.aws_headers import amzn_request_id -from .models import cloudwatch_backends, MetricDataQuery, MetricStat, Metric, Dimension +from .models import ( + cloudwatch_backends, + CloudWatchBackend, + MetricDataQuery, + MetricStat, + Metric, + Dimension, + FakeAlarm, +) from .exceptions import InvalidParameterCombination +ERROR_RESPONSE = Tuple[str, Dict[str, int]] + + class CloudWatchResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="cloudwatch") @property - def cloudwatch_backend(self): + def cloudwatch_backend(self) -> CloudWatchBackend: return cloudwatch_backends[self.current_account][self.region] - def _error(self, code, message, status=400): + def _error(self, code: str, message: str, status: int = 400) -> ERROR_RESPONSE: template = self.response_template(ERROR_RESPONSE_TEMPLATE) return template.render(code=code, message=message), dict(status=status) @amzn_request_id - def put_metric_alarm(self): + def put_metric_alarm(self) -> str: name = self._get_param("AlarmName") namespace = self._get_param("Namespace") metric_name = self._get_param("MetricName") @@ -30,14 +41,14 @@ class CloudWatchResponse(BaseResponse): if metrics: metric_data_queries = [] for metric in metrics: - dimensions = [] + metric_dimensions = [] dims = ( metric.get("MetricStat", {}) .get("Metric", {}) .get("Dimensions.member", []) ) for dim in dims: - dimensions.append( + metric_dimensions.append( Dimension(name=dim.get("Name"), value=dim.get("Value")) ) metric_stat = None @@ -51,7 +62,7 @@ class CloudWatchResponse(BaseResponse): metric=Metric( metric_name=stat_metric_name, namespace=stat_metric_ns, - dimensions=dimensions, + dimensions=metric_dimensions, ), period=stat_details.get("Period"), stat=stat_details.get("Stat"), @@ -121,7 +132,7 @@ class CloudWatchResponse(BaseResponse): return template.render(alarm=alarm) @amzn_request_id - def describe_alarms(self): + def describe_alarms(self) -> str: action_prefix = self._get_param("ActionPrefix") alarm_name_prefix = self._get_param("AlarmNamePrefix") alarm_names = self._get_multi_param("AlarmNames.member") @@ -149,14 +160,14 @@ class CloudWatchResponse(BaseResponse): ) @amzn_request_id - def delete_alarms(self): + def delete_alarms(self) -> str: alarm_names = self._get_multi_param("AlarmNames.member") self.cloudwatch_backend.delete_alarms(alarm_names) template = self.response_template(DELETE_METRIC_ALARMS_TEMPLATE) return template.render() @amzn_request_id - def put_metric_data(self): + def put_metric_data(self) -> str: namespace = self._get_param("Namespace") metric_data = self._get_multi_param("MetricData.member") self.cloudwatch_backend.put_metric_data(namespace, metric_data) @@ -164,7 +175,7 @@ class CloudWatchResponse(BaseResponse): return template.render() @amzn_request_id - def get_metric_data(self): + def get_metric_data(self) -> str: start = dtparse(self._get_param("StartTime")) end = dtparse(self._get_param("EndTime")) scan_by = self._get_param("ScanBy") @@ -178,7 +189,7 @@ class CloudWatchResponse(BaseResponse): return template.render(results=results) @amzn_request_id - def get_metric_statistics(self): + def get_metric_statistics(self) -> str: namespace = self._get_param("Namespace") metric_name = self._get_param("MetricName") start_time = dtparse(self._get_param("StartTime")) @@ -210,7 +221,7 @@ class CloudWatchResponse(BaseResponse): return template.render(label=metric_name, datapoints=datapoints) @amzn_request_id - def list_metrics(self): + def list_metrics(self) -> str: namespace = self._get_param("Namespace") metric_name = self._get_param("MetricName") dimensions = self._get_params().get("Dimensions", []) @@ -222,24 +233,26 @@ class CloudWatchResponse(BaseResponse): return template.render(metrics=metrics, next_token=next_token) @amzn_request_id - def delete_dashboards(self): + def delete_dashboards(self) -> Union[str, ERROR_RESPONSE]: dashboards = self._get_multi_param("DashboardNames.member") if dashboards is None: return self._error("InvalidParameterValue", "Need at least 1 dashboard") - status, error = self.cloudwatch_backend.delete_dashboards(dashboards) - if not status: + error = self.cloudwatch_backend.delete_dashboards(dashboards) + if error is not None: return self._error("ResourceNotFound", error) template = self.response_template(DELETE_DASHBOARD_TEMPLATE) return template.render() @amzn_request_id - def describe_alarm_history(self): + def describe_alarm_history(self) -> None: raise NotImplementedError() @staticmethod - def filter_alarms(alarms, metric_name, namespace): + def filter_alarms( + alarms: Iterable[FakeAlarm], metric_name: str, namespace: str + ) -> List[FakeAlarm]: metric_filtered_alarms = [] for alarm in alarms: @@ -248,7 +261,7 @@ class CloudWatchResponse(BaseResponse): return metric_filtered_alarms @amzn_request_id - def describe_alarms_for_metric(self): + def describe_alarms_for_metric(self) -> str: alarms = self.cloudwatch_backend.get_all_alarms() namespace = self._get_param("Namespace") metric_name = self._get_param("MetricName") @@ -257,15 +270,15 @@ class CloudWatchResponse(BaseResponse): return template.render(alarms=filtered_alarms) @amzn_request_id - def disable_alarm_actions(self): + def disable_alarm_actions(self) -> str: raise NotImplementedError() @amzn_request_id - def enable_alarm_actions(self): + def enable_alarm_actions(self) -> str: raise NotImplementedError() @amzn_request_id - def get_dashboard(self): + def get_dashboard(self) -> Union[str, ERROR_RESPONSE]: dashboard_name = self._get_param("DashboardName") dashboard = self.cloudwatch_backend.get_dashboard(dashboard_name) @@ -276,7 +289,7 @@ class CloudWatchResponse(BaseResponse): return template.render(dashboard=dashboard) @amzn_request_id - def list_dashboards(self): + def list_dashboards(self) -> str: prefix = self._get_param("DashboardNamePrefix", "") dashboards = self.cloudwatch_backend.list_dashboards(prefix) @@ -285,7 +298,7 @@ class CloudWatchResponse(BaseResponse): return template.render(dashboards=dashboards) @amzn_request_id - def put_dashboard(self): + def put_dashboard(self) -> Union[str, ERROR_RESPONSE]: name = self._get_param("DashboardName") body = self._get_param("DashboardBody") @@ -300,7 +313,7 @@ class CloudWatchResponse(BaseResponse): return template.render() @amzn_request_id - def set_alarm_state(self): + def set_alarm_state(self) -> str: alarm_name = self._get_param("AlarmName") reason = self._get_param("StateReason") reason_data = self._get_param("StateReasonData") @@ -314,7 +327,7 @@ class CloudWatchResponse(BaseResponse): return template.render() @amzn_request_id - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> str: resource_arn = self._get_param("ResourceARN") tags = self.cloudwatch_backend.list_tags_for_resource(resource_arn) @@ -323,7 +336,7 @@ class CloudWatchResponse(BaseResponse): return template.render(tags=tags) @amzn_request_id - def tag_resource(self): + def tag_resource(self) -> str: resource_arn = self._get_param("ResourceARN") tags = self._get_multi_param("Tags.member") @@ -333,7 +346,7 @@ class CloudWatchResponse(BaseResponse): return template.render() @amzn_request_id - def untag_resource(self): + def untag_resource(self) -> str: resource_arn = self._get_param("ResourceARN") tag_keys = self._get_multi_param("TagKeys.member") diff --git a/moto/cloudwatch/utils.py b/moto/cloudwatch/utils.py index da9d93ac0..30ed3ed1a 100644 --- a/moto/cloudwatch/utils.py +++ b/moto/cloudwatch/utils.py @@ -1,6 +1,6 @@ -def make_arn_for_dashboard(account_id, name): +def make_arn_for_dashboard(account_id: str, name: str) -> str: return "arn:aws:cloudwatch::{0}dashboard/{1}".format(account_id, name) -def make_arn_for_alarm(region, account_id, alarm_name): +def make_arn_for_alarm(region: str, account_id: str, alarm_name: str) -> str: return "arn:aws:cloudwatch:{0}:{1}:alarm:{2}".format(region, account_id, alarm_name) diff --git a/moto/core/common_models.py b/moto/core/common_models.py index 950a56019..6d85627dd 100644 --- a/moto/core/common_models.py +++ b/moto/core/common_models.py @@ -195,5 +195,5 @@ class ConfigQueryModel: class CloudWatchMetricProvider(object): @staticmethod @abstractmethod - def get_cloudwatch_metrics(account_id): + def get_cloudwatch_metrics(account_id: str) -> Any: pass diff --git a/setup.cfg b/setup.cfg index 8eabe4793..726758df7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild +files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch show_column_numbers=True show_error_codes = True disable_error_code=abstract