From 101cee83607f8dfc92ca7627d99a64a4c7b0bf30 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 5 Mar 2023 09:27:17 -0100 Subject: [PATCH] Techdebt: MyPy Events, EMR (#6012) --- moto/emrcontainers/exceptions.py | 2 +- moto/emrcontainers/models.py | 123 +++--- moto/emrcontainers/responses.py | 24 +- moto/emrcontainers/utils.py | 21 +- moto/emrserverless/exceptions.py | 4 +- moto/emrserverless/models.py | 88 ++-- moto/emrserverless/responses.py | 29 +- moto/emrserverless/utils.py | 45 +-- moto/events/exceptions.py | 11 +- moto/events/models.py | 671 ++++++++++++++++--------------- moto/events/notifications.py | 13 +- moto/events/responses.py | 128 +++--- setup.cfg | 2 +- 13 files changed, 575 insertions(+), 586 deletions(-) diff --git a/moto/emrcontainers/exceptions.py b/moto/emrcontainers/exceptions.py index 202541bb5..492fff856 100644 --- a/moto/emrcontainers/exceptions.py +++ b/moto/emrcontainers/exceptions.py @@ -5,5 +5,5 @@ from moto.core.exceptions import JsonRESTError class ResourceNotFoundException(JsonRESTError): code = 400 - def __init__(self, resource): + def __init__(self, resource: str): super().__init__("ResourceNotFoundException", resource) diff --git a/moto/emrcontainers/models.py b/moto/emrcontainers/models.py index 98fd31937..ac2c091c0 100644 --- a/moto/emrcontainers/models.py +++ b/moto/emrcontainers/models.py @@ -1,6 +1,7 @@ """EMRContainersBackend class with methods for supported APIs.""" import re from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple, Iterator from moto.core import BaseBackend, BackendDict, BaseModel from moto.core.utils import iso_8601_datetime_without_milliseconds @@ -22,14 +23,14 @@ JOB_STATUS = "RUNNING" class FakeCluster(BaseModel): def __init__( self, - name, - container_provider, - client_token, - account_id, - region_name, - aws_partition, - tags=None, - virtual_cluster_id=None, + name: str, + container_provider: Dict[str, Any], + client_token: str, + account_id: str, + region_name: str, + aws_partition: str, + tags: Optional[Dict[str, str]] = None, + virtual_cluster_id: Optional[str] = None, ): self.id = virtual_cluster_id or random_cluster_id() @@ -50,7 +51,7 @@ class FakeCluster(BaseModel): ) self.tags = tags - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: yield "id", self.id yield "name", self.name yield "arn", self.arn @@ -59,7 +60,7 @@ class FakeCluster(BaseModel): yield "createdAt", self.creation_date yield "tags", self.tags - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: # Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeVirtualCluster.html # (response syntax section) return { @@ -76,17 +77,17 @@ class FakeCluster(BaseModel): class FakeJob(BaseModel): def __init__( self, - name, - virtual_cluster_id, - client_token, - execution_role_arn, - release_label, - job_driver, - configuration_overrides, - account_id, - region_name, - aws_partition, - tags, + name: str, + virtual_cluster_id: str, + client_token: str, + execution_role_arn: str, + release_label: str, + job_driver: str, + configuration_overrides: Dict[str, Any], + account_id: str, + region_name: str, + aws_partition: str, + tags: Optional[Dict[str, str]], ): self.id = random_job_id() self.name = name @@ -108,12 +109,12 @@ class FakeJob(BaseModel): datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) ) self.created_by = None - self.finished_at = None - self.state_details = None + self.finished_at: Optional[str] = None + self.state_details: Optional[str] = None self.failure_reason = None self.tags = tags - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: yield "id", self.id yield "name", self.name yield "virtualClusterId", self.virtual_cluster_id @@ -131,7 +132,7 @@ class FakeJob(BaseModel): yield "failureReason", self.failure_reason yield "tags", self.tags - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: # Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeJobRun.html # (response syntax section) return { @@ -157,15 +158,21 @@ class FakeJob(BaseModel): class EMRContainersBackend(BaseBackend): """Implementation of EMRContainers APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.virtual_clusters = dict() + self.virtual_clusters: Dict[str, FakeCluster] = dict() self.virtual_cluster_count = 0 - self.jobs = dict() + self.jobs: Dict[str, FakeJob] = dict() self.job_count = 0 self.partition = get_partition(region_name) - def create_virtual_cluster(self, name, container_provider, client_token, tags=None): + def create_virtual_cluster( + self, + name: str, + container_provider: Dict[str, Any], + client_token: str, + tags: Optional[Dict[str, str]] = None, + ) -> FakeCluster: occupied_namespaces = [ virtual_cluster.namespace for virtual_cluster in self.virtual_clusters.values() @@ -190,14 +197,14 @@ class EMRContainersBackend(BaseBackend): self.virtual_cluster_count += 1 return virtual_cluster - def delete_virtual_cluster(self, cluster_id): + def delete_virtual_cluster(self, cluster_id: str) -> FakeCluster: if cluster_id not in self.virtual_clusters: raise ValidationException("VirtualCluster does not exist") self.virtual_clusters[cluster_id].state = "TERMINATED" return self.virtual_clusters[cluster_id] - def describe_virtual_cluster(self, cluster_id): + def describe_virtual_cluster(self, cluster_id: str) -> Dict[str, Any]: if cluster_id not in self.virtual_clusters: raise ValidationException(f"Virtual cluster {cluster_id} doesn't exist.") @@ -205,14 +212,14 @@ class EMRContainersBackend(BaseBackend): def list_virtual_clusters( self, - container_provider_id, - container_provider_type, - created_after, - created_before, - states, - max_results, - next_token, - ): + container_provider_id: str, + container_provider_type: str, + created_after: str, + created_before: str, + states: Optional[List[str]], + max_results: int, + next_token: Optional[str], + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: virtual_clusters = [ virtual_cluster.to_dict() for virtual_cluster in self.virtual_clusters.values() @@ -258,15 +265,15 @@ class EMRContainersBackend(BaseBackend): def start_job_run( self, - name, - virtual_cluster_id, - client_token, - execution_role_arn, - release_label, - job_driver, - configuration_overrides, - tags, - ): + name: str, + virtual_cluster_id: str, + client_token: str, + execution_role_arn: str, + release_label: str, + job_driver: str, + configuration_overrides: Dict[str, Any], + tags: Dict[str, str], + ) -> FakeJob: if virtual_cluster_id not in self.virtual_clusters.keys(): raise ResourceNotFoundException( @@ -296,7 +303,7 @@ class EMRContainersBackend(BaseBackend): self.job_count += 1 return job - def cancel_job_run(self, job_id, virtual_cluster_id): + def cancel_job_run(self, job_id: str, virtual_cluster_id: str) -> FakeJob: if not re.match(r"[a-z,A-Z,0-9]{19}", job_id): raise ValidationException("Invalid job run short id") @@ -326,14 +333,14 @@ class EMRContainersBackend(BaseBackend): def list_job_runs( self, - virtual_cluster_id, - created_before, - created_after, - name, - states, - max_results, - next_token, - ): + virtual_cluster_id: str, + created_before: str, + created_after: str, + name: str, + states: Optional[List[str]], + max_results: int, + next_token: Optional[str], + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: jobs = [job.to_dict() for job in self.jobs.values()] jobs = [job for job in jobs if job["virtualClusterId"] == virtual_cluster_id] @@ -353,7 +360,7 @@ class EMRContainersBackend(BaseBackend): sort_key = "id" return paginated_list(jobs, sort_key, max_results, next_token) - def describe_job_run(self, job_id, virtual_cluster_id): + def describe_job_run(self, job_id: str, virtual_cluster_id: str) -> Dict[str, Any]: if not re.match(r"[a-z,A-Z,0-9]{19}", job_id): raise ValidationException("Invalid job run short id") diff --git a/moto/emrcontainers/responses.py b/moto/emrcontainers/responses.py index 601ab1295..09ba85e29 100644 --- a/moto/emrcontainers/responses.py +++ b/moto/emrcontainers/responses.py @@ -1,8 +1,8 @@ """Handles incoming emrcontainers requests, invokes methods, returns responses.""" import json - +from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse -from .models import emrcontainers_backends +from .models import emrcontainers_backends, EMRContainersBackend DEFAULT_MAX_RESULTS = 100 DEFAULT_NEXT_TOKEN = "" @@ -12,15 +12,15 @@ DEFAULT_CONTAINER_PROVIDER_TYPE = "EKS" class EMRContainersResponse(BaseResponse): """Handler for EMRContainers requests and responses.""" - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="emr-containers") @property - def emrcontainers_backend(self): + def emrcontainers_backend(self) -> EMRContainersBackend: """Return backend instance specific for this region.""" return emrcontainers_backends[self.current_account][self.region] - def create_virtual_cluster(self): + def create_virtual_cluster(self) -> TYPE_RESPONSE: name = self._get_param("name") container_provider = self._get_param("containerProvider") client_token = self._get_param("clientToken") @@ -34,7 +34,7 @@ class EMRContainersResponse(BaseResponse): ) return 200, {}, json.dumps(dict(virtual_cluster)) - def delete_virtual_cluster(self): + def delete_virtual_cluster(self) -> TYPE_RESPONSE: cluster_id = self._get_param("virtualClusterId") virtual_cluster = self.emrcontainers_backend.delete_virtual_cluster( @@ -42,7 +42,7 @@ class EMRContainersResponse(BaseResponse): ) return 200, {}, json.dumps(dict(virtual_cluster)) - def describe_virtual_cluster(self): + def describe_virtual_cluster(self) -> TYPE_RESPONSE: cluster_id = self._get_param("virtualClusterId") virtual_cluster = self.emrcontainers_backend.describe_virtual_cluster( @@ -51,7 +51,7 @@ class EMRContainersResponse(BaseResponse): response = {"virtualCluster": virtual_cluster} return 200, {}, json.dumps(response) - def list_virtual_clusters(self): + def list_virtual_clusters(self) -> TYPE_RESPONSE: container_provider_id = self._get_param("containerProviderId") container_provider_type = self._get_param( "containerProviderType", DEFAULT_CONTAINER_PROVIDER_TYPE @@ -75,7 +75,7 @@ class EMRContainersResponse(BaseResponse): response = {"virtualClusters": virtual_clusters, "nextToken": next_token} return 200, {}, json.dumps(response) - def start_job_run(self): + def start_job_run(self) -> TYPE_RESPONSE: name = self._get_param("name") virtual_cluster_id = self._get_param("virtualClusterId") client_token = self._get_param("clientToken") @@ -97,7 +97,7 @@ class EMRContainersResponse(BaseResponse): ) return 200, {}, json.dumps(dict(job)) - def cancel_job_run(self): + def cancel_job_run(self) -> TYPE_RESPONSE: job_id = self._get_param("jobRunId") virtual_cluster_id = self._get_param("virtualClusterId") @@ -106,7 +106,7 @@ class EMRContainersResponse(BaseResponse): ) return 200, {}, json.dumps(dict(job)) - def list_job_runs(self): + def list_job_runs(self) -> TYPE_RESPONSE: virtual_cluster_id = self._get_param("virtualClusterId") created_before = self._get_param("createdBefore") created_after = self._get_param("createdAfter") @@ -128,7 +128,7 @@ class EMRContainersResponse(BaseResponse): response = {"jobRuns": job_runs, "nextToken": next_token} return 200, {}, json.dumps(response) - def describe_job_run(self): + def describe_job_run(self) -> TYPE_RESPONSE: job_id = self._get_param("jobRunId") virtual_cluster_id = self._get_param("virtualClusterId") diff --git a/moto/emrcontainers/utils.py b/moto/emrcontainers/utils.py index 09c3cc600..5844326fc 100644 --- a/moto/emrcontainers/utils.py +++ b/moto/emrcontainers/utils.py @@ -1,9 +1,10 @@ # import json import string +from typing import Any, List, Optional, Tuple from moto.moto_api._internal import mock_random as random -def get_partition(region): +def get_partition(region: str) -> str: valid_matches = [ # (region prefix, aws partition) ("cn-", "aws-cn"), @@ -18,33 +19,35 @@ def get_partition(region): return "aws" -def random_id(size=13): +def random_id(size: int = 13) -> str: chars = list(range(10)) + list(string.ascii_lowercase) return "".join(str(random.choice(chars)) for x in range(size)) -def random_cluster_id(): +def random_cluster_id() -> str: return random_id(size=25) -def random_job_id(): +def random_job_id() -> str: return random_id(size=19) -def paginated_list(full_list, sort_key, max_results, next_token): +def paginated_list( + full_list: List[Any], sort_key: str, max_results: int, next_token: Optional[str] +) -> Tuple[List[Any], Optional[str]]: """ Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results number of elements, and the new next_token which can be passed back in for the next segment of the full list. """ if next_token is None or not next_token: - next_token = 0 - next_token = int(next_token) + next_token = 0 # type: ignore + next_token = int(next_token) # type: ignore sorted_list = sorted(full_list, key=lambda d: d[sort_key]) - values = sorted_list[next_token : next_token + max_results] + values = sorted_list[next_token : next_token + max_results] # type: ignore if len(values) == max_results: - new_next_token = str(next_token + max_results) + new_next_token = str(next_token + max_results) # type: ignore else: new_next_token = None return values, new_next_token diff --git a/moto/emrserverless/exceptions.py b/moto/emrserverless/exceptions.py index 68bae4c29..a91d44f71 100644 --- a/moto/emrserverless/exceptions.py +++ b/moto/emrserverless/exceptions.py @@ -5,7 +5,7 @@ from moto.core.exceptions import JsonRESTError class ResourceNotFoundException(JsonRESTError): code = 400 - def __init__(self, resource): + def __init__(self, resource: str): super().__init__( "ResourceNotFoundException", f"Application {resource} does not exist" ) @@ -14,5 +14,5 @@ class ResourceNotFoundException(JsonRESTError): class ValidationException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ValidationException", message) diff --git a/moto/emrserverless/models.py b/moto/emrserverless/models.py index d962f5097..efdd4e245 100644 --- a/moto/emrserverless/models.py +++ b/moto/emrserverless/models.py @@ -1,17 +1,16 @@ """EMRServerlessBackend class with methods for supported APIs.""" import re from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple, Iterator import inspect from moto.core import BaseBackend, BackendDict, BaseModel from moto.core.utils import iso_8601_datetime_without_milliseconds +from moto.emrcontainers.utils import get_partition, paginated_list from .utils import ( default_auto_start_configuration, default_auto_stop_configuration, - get_partition, - paginated_list, random_appplication_id, - # random_job_id, ) from .exceptions import ResourceNotFoundException, ValidationException @@ -26,18 +25,18 @@ JOB_STATUS = "RUNNING" class FakeApplication(BaseModel): def __init__( self, - name, - release_label, - application_type, - client_token, - account_id, - region_name, - initial_capacity, - maximum_capacity, - tags, - auto_start_configuration, - auto_stop_configuration, - network_configuration, + name: str, + release_label: str, + application_type: str, + client_token: str, + account_id: str, + region_name: str, + initial_capacity: str, + maximum_capacity: str, + tags: Dict[str, str], + auto_start_configuration: str, + auto_stop_configuration: str, + network_configuration: str, ): # Provided parameters self.name = name @@ -53,7 +52,7 @@ class FakeApplication(BaseModel): auto_stop_configuration or default_auto_stop_configuration() ) self.network_configuration = network_configuration - self.tags = tags or {} + self.tags: Dict[str, str] = tags or {} # Service-generated-parameters self.id = random_appplication_id() @@ -70,14 +69,14 @@ class FakeApplication(BaseModel): ) self.updated_at = self.created_at - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: yield "applicationId", self.id yield "name", self.name yield "arn", self.arn yield "autoStartConfig", self.auto_start_configuration, yield "autoStopConfig", self.auto_stop_configuration, - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: """ Dictionary representation of an EMR Serverless Application. When used in `list-applications`, capacity, auto-start/stop configs, and tags are not returned. https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_ListApplications.html @@ -127,26 +126,25 @@ class FakeApplication(BaseModel): class EMRServerlessBackend(BaseBackend): """Implementation of EMRServerless APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.region_name = region_name - self.applications = dict() - self.jobs = dict() + self.applications: Dict[str, FakeApplication] = dict() self.partition = get_partition(region_name) def create_application( self, - name, - release_label, - application_type, - client_token, - initial_capacity, - maximum_capacity, - tags, - auto_start_configuration, - auto_stop_configuration, - network_configuration, - ): + name: str, + release_label: str, + application_type: str, + client_token: str, + initial_capacity: str, + maximum_capacity: str, + tags: Dict[str, str], + auto_start_configuration: str, + auto_stop_configuration: str, + network_configuration: str, + ) -> FakeApplication: if application_type not in ["HIVE", "SPARK"]: raise ValidationException(f"Unsupported engine {application_type}") @@ -173,7 +171,7 @@ class EMRServerlessBackend(BaseBackend): self.applications[application.id] = application return application - def delete_application(self, application_id): + def delete_application(self, application_id: str) -> None: if application_id not in self.applications.keys(): raise ResourceNotFoundException(application_id) @@ -184,13 +182,15 @@ class EMRServerlessBackend(BaseBackend): ) self.applications[application_id].state = "TERMINATED" - def get_application(self, application_id): + def get_application(self, application_id: str) -> Dict[str, Any]: if application_id not in self.applications.keys(): raise ResourceNotFoundException(application_id) return self.applications[application_id].to_dict() - def list_applications(self, next_token, max_results, states): + def list_applications( + self, next_token: Optional[str], max_results: int, states: Optional[List[str]] + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: applications = [ application.to_dict() for application in self.applications.values() ] @@ -203,25 +203,25 @@ class EMRServerlessBackend(BaseBackend): sort_key = "name" return paginated_list(applications, sort_key, max_results, next_token) - def start_application(self, application_id): + def start_application(self, application_id: str) -> None: if application_id not in self.applications.keys(): raise ResourceNotFoundException(application_id) self.applications[application_id].state = "STARTED" - def stop_application(self, application_id): + def stop_application(self, application_id: str) -> None: if application_id not in self.applications.keys(): raise ResourceNotFoundException(application_id) self.applications[application_id].state = "STOPPED" def update_application( self, - application_id, - initial_capacity, - maximum_capacity, - auto_start_configuration, - auto_stop_configuration, - network_configuration, - ): + application_id: str, + initial_capacity: Optional[str], + maximum_capacity: Optional[str], + auto_start_configuration: Optional[str], + auto_stop_configuration: Optional[str], + network_configuration: Optional[str], + ) -> Dict[str, Any]: if application_id not in self.applications.keys(): raise ResourceNotFoundException(application_id) diff --git a/moto/emrserverless/responses.py b/moto/emrserverless/responses.py index af8edad5f..0e3bbc5e5 100644 --- a/moto/emrserverless/responses.py +++ b/moto/emrserverless/responses.py @@ -1,8 +1,9 @@ """Handles incoming emrserverless requests, invokes methods, returns responses.""" import json +from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse -from .models import emrserverless_backends +from .models import emrserverless_backends, EMRServerlessBackend DEFAULT_MAX_RESULTS = 100 DEFAULT_NEXT_TOKEN = "" @@ -33,15 +34,15 @@ These are the available methos: class EMRServerlessResponse(BaseResponse): """Handler for EMRServerless requests and responses.""" - def __init__(self): + def __init__(self) -> None: super().__init__("emr-serverless") @property - def emrserverless_backend(self): + def emrserverless_backend(self) -> EMRServerlessBackend: """Return backend instance specific for this region.""" return emrserverless_backends[self.current_account][self.region] - def create_application(self): + def create_application(self) -> TYPE_RESPONSE: name = self._get_param("name") release_label = self._get_param("releaseLabel") application_type = self._get_param("type") @@ -65,15 +66,15 @@ class EMRServerlessResponse(BaseResponse): auto_stop_configuration=auto_stop_configuration, network_configuration=network_configuration, ) - return (200, {}, json.dumps(dict(application))) + return 200, {}, json.dumps(dict(application)) - def delete_application(self): + def delete_application(self) -> TYPE_RESPONSE: application_id = self._get_param("applicationId") self.emrserverless_backend.delete_application(application_id=application_id) - return (200, {}, None) + return 200, {}, "" - def get_application(self): + def get_application(self) -> TYPE_RESPONSE: application_id = self._get_param("applicationId") application = self.emrserverless_backend.get_application( @@ -82,7 +83,7 @@ class EMRServerlessResponse(BaseResponse): response = {"application": application} return 200, {}, json.dumps(response) - def list_applications(self): + def list_applications(self) -> TYPE_RESPONSE: states = self.querystring.get("states", []) max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) @@ -95,19 +96,19 @@ class EMRServerlessResponse(BaseResponse): response = {"applications": applications, "nextToken": next_token} return 200, {}, json.dumps(response) - def start_application(self): + def start_application(self) -> TYPE_RESPONSE: application_id = self._get_param("applicationId") self.emrserverless_backend.start_application(application_id=application_id) - return (200, {}, None) + return 200, {}, "" - def stop_application(self): + def stop_application(self) -> TYPE_RESPONSE: application_id = self._get_param("applicationId") self.emrserverless_backend.stop_application(application_id=application_id) - return (200, {}, None) + return 200, {}, "" - def update_application(self): + def update_application(self) -> TYPE_RESPONSE: application_id = self._get_param("applicationId") initial_capacity = self._get_param("initialCapacity") maximum_capacity = self._get_param("maximumCapacity") diff --git a/moto/emrserverless/utils.py b/moto/emrserverless/utils.py index afa8153aa..fd3bc3de0 100644 --- a/moto/emrserverless/utils.py +++ b/moto/emrserverless/utils.py @@ -1,57 +1,24 @@ from moto.moto_api._internal import mock_random as random import string +from typing import Any, Dict -def get_partition(region): - valid_matches = [ - # (region prefix, aws partition) - ("cn-", "aws-cn"), - ("us-gov-", "aws-us-gov"), - ("us-gov-iso-", "aws-iso"), - ("us-gov-iso-b-", "aws-iso-b"), - ] - - for prefix, partition in valid_matches: - if region.startswith(prefix): - return partition - return "aws" - - -def random_id(size=13): +def random_id(size: int = 13) -> str: chars = list(range(10)) + list(string.ascii_lowercase) return "".join(str(random.choice(chars)) for x in range(size)) -def random_appplication_id(): +def random_appplication_id() -> str: return random_id(size=16) -def random_job_id(): +def random_job_id() -> str: return random_id(size=16) -def default_auto_start_configuration(): +def default_auto_start_configuration() -> Dict[str, bool]: return {"enabled": True} -def default_auto_stop_configuration(): +def default_auto_stop_configuration() -> Dict[str, Any]: return {"enabled": True, "idleTimeoutMinutes": 15} - - -def paginated_list(full_list, sort_key, max_results, next_token): - """ - Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results - number of elements, and the new next_token which can be passed back in for the next segment of the full list. - """ - if next_token is None or not next_token: - next_token = 0 - next_token = int(next_token) - - sorted_list = sorted(full_list, key=lambda d: d[sort_key]) - - values = sorted_list[next_token : next_token + max_results] - if len(values) == max_results: - new_next_token = str(next_token + max_results) - else: - new_next_token = None - return values, new_next_token diff --git a/moto/events/exceptions.py b/moto/events/exceptions.py index 2412dee08..eb5e6f795 100644 --- a/moto/events/exceptions.py +++ b/moto/events/exceptions.py @@ -1,17 +1,18 @@ +from typing import Optional from moto.core.exceptions import JsonRESTError class IllegalStatusException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("IllegalStatusException", message) class InvalidEventPatternException(JsonRESTError): code = 400 - def __init__(self, reason=None): + def __init__(self, reason: Optional[str] = None): msg = "Event pattern is not valid. " if reason: msg += f"Reason: {reason}" @@ -22,19 +23,19 @@ class InvalidEventPatternException(JsonRESTError): class ResourceNotFoundException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ResourceNotFoundException", message) class ResourceAlreadyExistsException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ResourceAlreadyExistsException", message) class ValidationException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ValidationException", message) diff --git a/moto/events/models.py b/moto/events/models.py index dad78538a..9ef96c647 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -9,6 +9,7 @@ from datetime import datetime from enum import Enum, unique from json import JSONDecodeError from operator import lt, le, eq, ge, gt +from typing import Any, Dict, List, Optional, Tuple from collections import OrderedDict from moto.core.exceptions import JsonRESTError @@ -42,17 +43,17 @@ class Rule(CloudFormationModel): def __init__( self, - name, - account_id, - region_name, - description, - event_pattern, - schedule_exp, - role_arn, - event_bus_name, - state, - managed_by=None, - targets=None, + name: str, + account_id: str, + region_name: str, + description: Optional[str], + event_pattern: Optional[str], + schedule_exp: Optional[str], + role_arn: Optional[str], + event_bus_name: str, + state: Optional[str], + managed_by: Optional[str] = None, + targets: Optional[List[Dict[str, Any]]] = None, ): self.name = name self.account_id = account_id @@ -68,7 +69,7 @@ class Rule(CloudFormationModel): self.targets = targets or [] @property - def arn(self): + def arn(self) -> str: event_bus_name = ( "" if self.event_bus_name == "default" else f"{self.event_bus_name}/" ) @@ -76,28 +77,28 @@ class Rule(CloudFormationModel): return f"arn:aws:events:{self.region_name}:{self.account_id}:rule/{event_bus_name}{self.name}" @property - def physical_resource_id(self): + def physical_resource_id(self) -> str: return self.name # This song and dance for targets is because we need order for Limits and NextTokens, but can't use OrderedDicts # with Python 2.6, so tracking it with an array it is. - def _check_target_exists(self, target_id): + def _check_target_exists(self, target_id: str) -> Optional[int]: for i in range(0, len(self.targets)): if target_id == self.targets[i]["Id"]: return i return None - def enable(self): + def enable(self) -> None: self.state = "ENABLED" - def disable(self): + def disable(self) -> None: self.state = "DISABLED" - def delete(self, account_id, region_name): + def delete(self, account_id: str, region_name: str) -> None: event_backend = events_backends[account_id][region_name] event_backend.delete_rule(name=self.name) - def put_targets(self, targets): + def put_targets(self, targets: List[Dict[str, Any]]) -> None: # Not testing for valid ARNs. for target in targets: index = self._check_target_exists(target["Id"]) @@ -106,13 +107,13 @@ class Rule(CloudFormationModel): else: self.targets.append(target) - def remove_targets(self, ids): + def remove_targets(self, ids: List[str]) -> None: for target_id in ids: index = self._check_target_exists(target_id) if index is not None: self.targets.pop(index) - def send_to_targets(self, event_bus_name, event): + def send_to_targets(self, event_bus_name: str, event: Dict[str, Any]) -> None: event_bus_name = event_bus_name.split("/")[-1] if event_bus_name != self.event_bus_name.split("/")[-1]: return @@ -179,7 +180,7 @@ class Rule(CloudFormationModel): resource_id=resource_id, ) - def _send_to_cw_log_group(self, name, event): + def _send_to_cw_log_group(self, name: str, event: Dict[str, Any]) -> None: from moto.logs import logs_backends event_copy = copy.deepcopy(event) @@ -196,7 +197,7 @@ class Rule(CloudFormationModel): log_backend.create_log_stream(name, log_stream_name) log_backend.put_log_events(name, log_stream_name, log_events) - def _send_to_events_archive(self, resource_id, event): + def _send_to_events_archive(self, resource_id: str, event: Dict[str, Any]) -> None: archive_name, archive_uuid = resource_id.split(":") archive = events_backends[self.account_id][self.region_name].archives.get( archive_name @@ -204,7 +205,9 @@ class Rule(CloudFormationModel): if archive.uuid == archive_uuid: archive.events.append(event) - def _send_to_sqs_queue(self, resource_id, event, group_id=None): + def _send_to_sqs_queue( + self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None + ) -> None: from moto.sqs import sqs_backends event_copy = copy.deepcopy(event) @@ -232,10 +235,10 @@ class Rule(CloudFormationModel): ) @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Arn"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> str: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": @@ -244,18 +247,23 @@ class Rule(CloudFormationModel): raise UnformattedGetAttTemplateException() @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "Name" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-rule.html return "AWS::Events::Rule" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "Rule": properties = cloudformation_json["Properties"] properties.setdefault("EventBusName", "default") @@ -285,27 +293,31 @@ class Rule(CloudFormationModel): ) @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> "Rule": original_resource.delete(account_id, region_name) return cls.create_from_cloudformation_json( new_resource_name, cloudformation_json, account_id, region_name ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: event_backend = events_backends[account_id][region_name] event_backend.delete_rule(resource_name) - def describe(self): + def describe(self) -> Dict[str, Any]: attributes = { "Arn": self.arn, "CreatedBy": self.created_by, @@ -325,17 +337,23 @@ class Rule(CloudFormationModel): class EventBus(CloudFormationModel): - def __init__(self, account_id, region_name, name, tags=None): + def __init__( + self, + account_id: str, + region_name: str, + name: str, + tags: Optional[List[Dict[str, str]]] = None, + ): self.account_id = account_id self.region = region_name self.name = name self.arn = f"arn:aws:events:{self.region}:{account_id}:event-bus/{name}" self.tags = tags or [] - self._statements = {} + self._statements: Dict[str, EventBusPolicyStatement] = {} @property - def policy(self): + def policy(self) -> Optional[str]: if self._statements: policy = { "Version": "2012-10-17", @@ -344,18 +362,18 @@ class EventBus(CloudFormationModel): return json.dumps(policy) return None - def has_permissions(self): + def has_permissions(self) -> bool: return len(self._statements) > 0 - def delete(self, account_id, region_name): + def delete(self, account_id: str, region_name: str) -> None: event_backend = events_backends[account_id][region_name] event_backend.delete_event_bus(name=self.name) @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Arn", "Name", "Policy"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": @@ -368,18 +386,23 @@ class EventBus(CloudFormationModel): raise UnformattedGetAttTemplateException() @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "Name" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-eventbus.html return "AWS::Events::EventBus" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "EventBus": properties = cloudformation_json["Properties"] event_backend = events_backends[account_id][region_name] event_name = resource_name @@ -389,28 +412,32 @@ class EventBus(CloudFormationModel): ) @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> "EventBus": original_resource.delete(account_id, region_name) return cls.create_from_cloudformation_json( new_resource_name, cloudformation_json, account_id, region_name ) @classmethod - def delete_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name - ): + def delete_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> None: event_backend = events_backends[account_id][region_name] event_bus_name = resource_name event_backend.delete_event_bus(event_bus_name) - def _remove_principals_statements(self, *principals): + def _remove_principals_statements(self, *principals: Any) -> None: statements_to_delete = set() for principal in principals: @@ -423,7 +450,13 @@ class EventBus(CloudFormationModel): for sid in statements_to_delete: del self._statements[sid] - def add_permission(self, statement_id, action, principal, condition): + def add_permission( + self, + statement_id: str, + action: str, + principal: Dict[str, str], + condition: Optional[Dict[str, Any]], + ) -> None: self._remove_principals_statements(principal) statement = EventBusPolicyStatement( sid=statement_id, @@ -434,7 +467,7 @@ class EventBus(CloudFormationModel): ) self._statements[statement_id] = statement - def add_policy(self, policy): + def add_policy(self, policy: Dict[str, Any]) -> None: policy_statements = policy["Statement"] principals = [stmt["Principal"] for stmt in policy_statements] @@ -444,16 +477,22 @@ class EventBus(CloudFormationModel): sid = new_statement["Sid"] self._statements[sid] = EventBusPolicyStatement.from_dict(new_statement) - def remove_statement(self, sid): + def remove_statement(self, sid: str) -> Optional["EventBusPolicyStatement"]: return self._statements.pop(sid, None) - def remove_statements(self): + def remove_statements(self) -> None: self._statements.clear() class EventBusPolicyStatement: def __init__( - self, sid, principal, action, resource, effect="Allow", condition=None + self, + sid: str, + principal: Dict[str, str], + action: str, + resource: str, + effect: str = "Allow", + condition: Optional[Dict[str, Any]] = None, ): self.sid = sid self.principal = principal @@ -462,8 +501,8 @@ class EventBusPolicyStatement: self.effect = effect self.condition = condition - def describe(self): - statement = dict( + def describe(self) -> Dict[str, Any]: + statement: Dict[str, Any] = dict( Sid=self.sid, Effect=self.effect, Principal=self.principal, @@ -476,7 +515,7 @@ class EventBusPolicyStatement: return statement @classmethod - def from_dict(cls, statement_dict): + def from_dict(cls, statement_dict: Dict[str, Any]) -> "EventBusPolicyStatement": # type: ignore[misc] params = dict( sid=statement_dict["Sid"], effect=statement_dict["Effect"], @@ -504,13 +543,13 @@ class Archive(CloudFormationModel): def __init__( self, - account_id, - region_name, - name, - source_arn, - description, - event_pattern, - retention, + account_id: str, + region_name: str, + name: str, + source_arn: str, + description: str, + event_pattern: str, + retention: str, ): self.region = region_name self.name = name @@ -524,10 +563,10 @@ class Archive(CloudFormationModel): self.state = "ENABLED" self.uuid = str(random.uuid4()) - self.events = [] + self.events: List[str] = [] self.event_bus_name = source_arn.split("/")[-1] - def describe_short(self): + def describe_short(self) -> Dict[str, Any]: return { "ArchiveName": self.name, "EventSourceArn": self.source_arn, @@ -538,7 +577,7 @@ class Archive(CloudFormationModel): "CreationTime": self.creation_time, } - def describe(self): + def describe(self) -> Dict[str, Any]: result = { "ArchiveArn": self.arn, "Description": self.description, @@ -548,7 +587,12 @@ class Archive(CloudFormationModel): return result - def update(self, description, event_pattern, retention): + def update( + self, + description: Optional[str], + event_pattern: Optional[str], + retention: Optional[str], + ) -> None: if description: self.description = description if event_pattern: @@ -556,15 +600,15 @@ class Archive(CloudFormationModel): if retention: self.retention = retention - def delete(self, account_id, region_name): + def delete(self, account_id: str, region_name: str) -> None: event_backend = events_backends[account_id][region_name] event_backend.archives.pop(self.name) @classmethod - def has_cfn_attr(cls, attr): + def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Arn", "ArchiveName"] - def get_cfn_attribute(self, attribute_name): + def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "ArchiveName": @@ -575,18 +619,23 @@ class Archive(CloudFormationModel): raise UnformattedGetAttTemplateException() @staticmethod - def cloudformation_name_type(): + def cloudformation_name_type() -> str: return "ArchiveName" @staticmethod - def cloudformation_type(): + def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-archive.html return "AWS::Events::Archive" @classmethod - def create_from_cloudformation_json( - cls, resource_name, cloudformation_json, account_id, region_name, **kwargs - ): + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "Archive": properties = cloudformation_json["Properties"] event_backend = events_backends[account_id][region_name] @@ -600,14 +649,14 @@ class Archive(CloudFormationModel): ) @classmethod - def update_from_cloudformation_json( + def update_from_cloudformation_json( # type: ignore[misc] cls, - original_resource, - new_resource_name, - cloudformation_json, - account_id, - region_name, - ): + original_resource: Any, + new_resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + ) -> "Archive": if new_resource_name == original_resource.name: properties = cloudformation_json["Properties"] @@ -639,14 +688,14 @@ class ReplayState(Enum): class Replay(BaseModel): def __init__( self, - account_id, - region_name, - name, - description, - source_arn, - start_time, - end_time, - destination, + account_id: str, + region_name: str, + name: str, + description: str, + source_arn: str, + start_time: str, + end_time: str, + destination: Dict[str, Any], ): self.account_id = account_id self.region = region_name @@ -660,9 +709,9 @@ class Replay(BaseModel): self.arn = f"arn:aws:events:{region_name}:{account_id}:replay/{name}" self.state = ReplayState.STARTING self.start_time = unix_time(datetime.utcnow()) - self.end_time = None + self.end_time: Optional[float] = None - def describe_short(self): + def describe_short(self) -> Dict[str, Any]: return { "ReplayName": self.name, "EventSourceArn": self.source_arn, @@ -673,7 +722,7 @@ class Replay(BaseModel): "ReplayEndTime": self.end_time, } - def describe(self): + def describe(self) -> Dict[str, Any]: result = { "ReplayArn": self.arn, "Description": self.description, @@ -684,7 +733,7 @@ class Replay(BaseModel): return result - def replay_events(self, archive): + def replay_events(self, archive: Archive) -> None: event_bus_name = self.destination["Arn"].split("/")[-1] for event in archive.events: @@ -693,7 +742,7 @@ class Replay(BaseModel): rule.send_to_targets( event_bus_name, dict( - event, **{"id": str(random.uuid4()), "replay-name": self.name} + event, **{"id": str(random.uuid4()), "replay-name": self.name} # type: ignore ), ) @@ -704,12 +753,12 @@ class Replay(BaseModel): class Connection(BaseModel): def __init__( self, - name, - account_id, - region_name, - description, - authorization_type, - auth_parameters, + name: str, + account_id: str, + region_name: str, + description: str, + authorization_type: str, + auth_parameters: Dict[str, Any], ): self.uuid = random.uuid4() self.name = name @@ -722,7 +771,7 @@ class Connection(BaseModel): self.arn = f"arn:aws:events:{region_name}:{account_id}:connection/{self.name}/{self.uuid}" - def describe_short(self): + def describe_short(self) -> Dict[str, Any]: """ Create the short description for the Connection object. @@ -745,7 +794,7 @@ class Connection(BaseModel): "CreationTime": self.creation_time, } - def describe(self): + def describe(self) -> Dict[str, Any]: """ Create a complete description for the Connection object. @@ -778,14 +827,14 @@ class Connection(BaseModel): class Destination(BaseModel): def __init__( self, - name, - account_id, - region_name, - description, - connection_arn, - invocation_endpoint, - invocation_rate_limit_per_second, - http_method, + name: str, + account_id: str, + region_name: str, + description: str, + connection_arn: str, + invocation_endpoint: str, + invocation_rate_limit_per_second: str, + http_method: str, ): self.uuid = random.uuid4() self.name = name @@ -799,21 +848,7 @@ class Destination(BaseModel): self.state = "ACTIVE" self.arn = f"arn:aws:events:{region_name}:{account_id}:api-destination/{name}/{self.uuid}" - def describe(self): - """ - Describes the Destination object as a dict - - Docs: - Response Syntax in - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_DescribeApiDestination.html - - Something to consider: - - The response also has [InvocationRateLimitPerSecond] which was not - available when implementing this method - - Returns: - dict - """ + def describe(self) -> Dict[str, Any]: return { "ApiDestinationArn": self.arn, "ApiDestinationState": self.state, @@ -827,7 +862,7 @@ class Destination(BaseModel): "Name": self.name, } - def describe_short(self): + def describe_short(self) -> Dict[str, Any]: return { "ApiDestinationArn": self.arn, "ApiDestinationState": self.state, @@ -837,20 +872,20 @@ class Destination(BaseModel): class EventPattern: - def __init__(self, raw_pattern, pattern): + def __init__(self, raw_pattern: Optional[str], pattern: Dict[str, Any]): self._raw_pattern = raw_pattern self._pattern = pattern - def get_pattern(self): + def get_pattern(self) -> Dict[str, Any]: return self._pattern - def matches_event(self, event): + def matches_event(self, event: Dict[str, Any]) -> bool: if not self._pattern: return True event = json.loads(json.dumps(event)) return self._does_event_match(event, self._pattern) - def _does_event_match(self, event, pattern): + def _does_event_match(self, event: Dict[str, Any], pattern: Dict[str, str]) -> bool: items_and_filters = [(event.get(k, UNDEFINED), v) for k, v in pattern.items()] nested_filter_matches = [ self._does_event_match(item, nested_filter) @@ -864,7 +899,7 @@ class EventPattern: ] return all(nested_filter_matches + filter_list_matches) - def _does_item_match_filters(self, item, filters): + def _does_item_match_filters(self, item: Any, filters: Any) -> bool: allowed_values = [value for value in filters if isinstance(value, str)] allowed_values_match = item in allowed_values if allowed_values else True full_match = isinstance(item, list) and item == allowed_values @@ -876,7 +911,7 @@ class EventPattern: return (full_match or allowed_values_match) and all(named_filter_matches) @staticmethod - def _does_item_match_named_filter(item, pattern): + def _does_item_match_named_filter(item: Any, pattern: Any) -> bool: # type: ignore[misc] filter_name, filter_value = list(pattern.items())[0] if filter_name == "exists": is_leaf_node = not isinstance(item, dict) @@ -901,20 +936,20 @@ class EventPattern: return True @classmethod - def load(cls, raw_pattern): + def load(cls, raw_pattern: Optional[str]) -> "EventPattern": parser = EventPatternParser(raw_pattern) pattern = parser.parse() return cls(raw_pattern, pattern) - def dump(self): + def dump(self) -> Optional[str]: return self._raw_pattern class EventPatternParser: - def __init__(self, pattern): + def __init__(self, pattern: Optional[str]): self.pattern = pattern - def _validate_event_pattern(self, pattern): + def _validate_event_pattern(self, pattern: Dict[str, Any]) -> None: # values in the event pattern have to be either a dict or an array for attr, value in pattern.items(): if isinstance(value, dict): @@ -929,7 +964,7 @@ class EventPatternParser: reason=f"'{attr}' must be an object or an array" ) - def parse(self): + def parse(self) -> Dict[str, Any]: try: parsed_pattern = json.loads(self.pattern) if self.pattern else dict() self._validate_event_pattern(parsed_pattern) @@ -959,41 +994,48 @@ class EventsBackend(BaseBackend): _CRON_REGEX = re.compile(r"^cron\(.*\)") _RATE_REGEX = re.compile(r"^rate\(\d*\s(minute|minutes|hour|hours|day|days)\)") - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.rules = OrderedDict() - self.next_tokens = {} - self.event_buses = {} - self.event_sources = {} - self.archives = {} - self.replays = {} + self.rules: Dict[str, Rule] = OrderedDict() + self.next_tokens: Dict[str, int] = {} + self.event_buses: Dict[str, EventBus] = {} + self.event_sources: Dict[str, str] = {} + self.archives: Dict[str, Archive] = {} + self.replays: Dict[str, Replay] = {} self.tagger = TaggingService() self._add_default_event_bus() - self.connections = {} - self.destinations = {} + self.connections: Dict[str, Connection] = {} + self.destinations: Dict[str, Destination] = {} @staticmethod - def default_vpc_endpoint_service(service_region, zones): + def default_vpc_endpoint_service( + service_region: str, zones: List[str] + ) -> List[Dict[str, str]]: """Default VPC endpoint service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "events" ) - def _add_default_event_bus(self): + def _add_default_event_bus(self) -> None: self.event_buses["default"] = EventBus( self.account_id, self.region_name, "default" ) - def _gen_next_token(self, index): - token = os.urandom(128).encode("base64") + def _gen_next_token(self, index: int) -> str: + token = os.urandom(128).encode("base64") # type: ignore self.next_tokens[token] = index return token - def _process_token_and_limits(self, array_len, next_token=None, limit=None): + def _process_token_and_limits( + self, + array_len: int, + next_token: Optional[str] = None, + limit: Optional[str] = None, + ) -> Tuple[int, int, Optional[str]]: start_index = 0 end_index = array_len - new_next_token = None + new_next_token: Optional[str] = None if next_token: start_index = self.next_tokens.pop(next_token, 0) @@ -1006,7 +1048,7 @@ class EventsBackend(BaseBackend): return start_index, end_index, new_next_token - def _get_event_bus(self, name): + def _get_event_bus(self, name: str) -> EventBus: event_bus_name = name.split("/")[-1] event_bus = self.event_buses.get(event_bus_name) @@ -1017,7 +1059,7 @@ class EventsBackend(BaseBackend): return event_bus - def _get_replay(self, name): + def _get_replay(self, name: str) -> Replay: replay = self.replays.get(name) if not replay: raise ResourceNotFoundException(f"Replay {name} does not exist.") @@ -1026,17 +1068,16 @@ class EventsBackend(BaseBackend): def put_rule( self, - name, - *, - description=None, - event_bus_name=None, - event_pattern=None, - role_arn=None, - scheduled_expression=None, - state=None, - managed_by=None, - tags=None, - ): + name: str, + description: Optional[str] = None, + event_bus_name: Optional[str] = None, + event_pattern: Optional[str] = None, + role_arn: Optional[str] = None, + scheduled_expression: Optional[str] = None, + state: Optional[str] = None, + managed_by: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + ) -> Rule: event_bus_name = event_bus_name or "default" if not event_pattern and not scheduled_expression: @@ -1097,14 +1138,14 @@ class EventsBackend(BaseBackend): raise ResourceNotFoundException(f"Rule {name} does not exist.") return rule - def disable_rule(self, name): + def disable_rule(self, name: str) -> bool: if name in self.rules: self.rules[name].disable() return True return False - def enable_rule(self, name): + def enable_rule(self, name: str) -> bool: if name in self.rules: self.rules[name].enable() return True @@ -1112,7 +1153,7 @@ class EventsBackend(BaseBackend): return False @paginate(pagination_model=PAGINATION_MODEL) - def list_rule_names_by_target(self, target_arn): + def list_rule_names_by_target(self, target_arn: str) -> List[Rule]: # type: ignore[misc] matching_rules = [] for _, rule in self.rules.items(): @@ -1123,7 +1164,7 @@ class EventsBackend(BaseBackend): return matching_rules @paginate(pagination_model=PAGINATION_MODEL) - def list_rules(self, prefix=None): + def list_rules(self, prefix: Optional[str] = None) -> List[Rule]: # type: ignore[misc] match_string = ".*" if prefix is not None: match_string = "^" + prefix + match_string @@ -1138,17 +1179,22 @@ class EventsBackend(BaseBackend): return matching_rules - def list_targets_by_rule(self, rule, next_token=None, limit=None): + def list_targets_by_rule( + self, + rule_id: str, + next_token: Optional[str] = None, + limit: Optional[str] = None, + ) -> Dict[str, Any]: # We'll let a KeyError exception be thrown for response to handle if # rule doesn't exist. - rule = self.rules[rule] + rule = self.rules[rule_id] start_index, end_index, new_next_token = self._process_token_and_limits( len(rule.targets), next_token, limit ) - returned_targets = [] - return_obj = {} + returned_targets: List[Dict[str, Any]] = [] + return_obj: Dict[str, Any] = {} for i in range(start_index, end_index): returned_targets.append(rule.targets[i]) @@ -1159,7 +1205,9 @@ class EventsBackend(BaseBackend): return return_obj - def put_targets(self, name, event_bus_name, targets): + def put_targets( + self, name: str, event_bus_name: str, targets: List[Dict[str, Any]] + ) -> None: # super simple ARN check invalid_arn = next( ( @@ -1195,7 +1243,7 @@ class EventsBackend(BaseBackend): rule.put_targets(targets) - def put_events(self, events): + def put_events(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ The following targets are supported at the moment: @@ -1273,7 +1321,7 @@ class EventsBackend(BaseBackend): return entries - def remove_targets(self, name, event_bus_name, ids): + def remove_targets(self, name: str, event_bus_name: str, ids: List[str]) -> None: rule = self.rules.get(name) if not rule: @@ -1283,11 +1331,11 @@ class EventsBackend(BaseBackend): rule.remove_targets(ids) - def test_event_pattern(self): + def test_event_pattern(self) -> None: raise NotImplementedError() @staticmethod - def _put_permission_from_policy(event_bus, policy): + def _put_permission_from_policy(event_bus: EventBus, policy: str) -> None: try: policy_doc = json.loads(policy) event_bus.add_policy(policy_doc) @@ -1297,7 +1345,7 @@ class EventsBackend(BaseBackend): ) @staticmethod - def _condition_param_to_stmt_condition(condition): + def _condition_param_to_stmt_condition(condition: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: # type: ignore[misc] if condition: key = condition["Key"] value = condition["Value"] @@ -1306,8 +1354,13 @@ class EventsBackend(BaseBackend): return None def _put_permission_from_params( - self, event_bus, action, principal, statement_id, condition - ): + self, + event_bus: EventBus, + action: Optional[str], + principal: str, + statement_id: str, + condition: Dict[str, str], + ) -> None: if principal is None: raise JsonRESTError( "ValidationException", "Parameter Principal must be specified." @@ -1337,13 +1390,19 @@ class EventsBackend(BaseBackend): "InvalidParameterValue", r"StatementId must match ^[a-zA-Z0-9-_]{1,64}$" ) - principal = {"AWS": f"arn:aws:iam::{principal}:root"} + principal_arn = {"AWS": f"arn:aws:iam::{principal}:root"} stmt_condition = self._condition_param_to_stmt_condition(condition) - event_bus.add_permission(statement_id, action, principal, stmt_condition) + event_bus.add_permission(statement_id, action, principal_arn, stmt_condition) def put_permission( - self, event_bus_name, action, principal, statement_id, condition, policy - ): + self, + event_bus_name: str, + action: str, + principal: str, + statement_id: str, + condition: Dict[str, str], + policy: str, + ) -> None: if not event_bus_name: event_bus_name = "default" @@ -1356,7 +1415,12 @@ class EventsBackend(BaseBackend): event_bus, action, principal, statement_id, condition ) - def remove_permission(self, event_bus_name, statement_id, remove_all_permissions): + def remove_permission( + self, + event_bus_name: Optional[str], + statement_id: str, + remove_all_permissions: bool, + ) -> None: if not event_bus_name: event_bus_name = "default" @@ -1377,7 +1441,7 @@ class EventsBackend(BaseBackend): "Statement with the provided id does not exist.", ) - def describe_event_bus(self, name): + def describe_event_bus(self, name: str) -> EventBus: if not name: name = "default" @@ -1385,7 +1449,12 @@ class EventsBackend(BaseBackend): return event_bus - def create_event_bus(self, name, event_source_name=None, tags=None): + def create_event_bus( + self, + name: str, + event_source_name: Optional[str] = None, + tags: Optional[List[Dict[str, str]]] = None, + ) -> EventBus: if name in self.event_buses: raise JsonRESTError( "ResourceAlreadyExistsException", f"Event bus {name} already exists." @@ -1409,7 +1478,7 @@ class EventsBackend(BaseBackend): return self.event_buses[name] - def list_event_buses(self, name_prefix): + def list_event_buses(self, name_prefix: Optional[str]) -> List[EventBus]: if name_prefix: return [ event_bus @@ -1419,7 +1488,7 @@ class EventsBackend(BaseBackend): return list(self.event_buses.values()) - def delete_event_bus(self, name): + def delete_event_bus(self, name: str) -> None: if name == "default": raise JsonRESTError( "ValidationException", "Cannot delete event bus default." @@ -1428,39 +1497,46 @@ class EventsBackend(BaseBackend): if event_bus: self.tagger.delete_all_tags_for_resource(event_bus.arn) - def list_tags_for_resource(self, arn): + def list_tags_for_resource(self, arn: str) -> Dict[str, List[Dict[str, str]]]: name = arn.split("/")[-1] registries = [self.rules, self.event_buses] for registry in registries: - if name in registry: - return self.tagger.list_tags_for_resource(registry[name].arn) + if name in registry: # type: ignore + return self.tagger.list_tags_for_resource(registry[name].arn) # type: ignore raise ResourceNotFoundException( f"Rule {name} does not exist on EventBus default." ) - def tag_resource(self, arn, tags): + def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None: name = arn.split("/")[-1] registries = [self.rules, self.event_buses] for registry in registries: - if name in registry: - self.tagger.tag_resource(registry[name].arn, tags) - return {} + if name in registry: # type: ignore + self.tagger.tag_resource(registry[name].arn, tags) # type: ignore + return raise ResourceNotFoundException( f"Rule {name} does not exist on EventBus default." ) - def untag_resource(self, arn, tag_names): + def untag_resource(self, arn: str, tag_names: List[str]) -> None: name = arn.split("/")[-1] registries = [self.rules, self.event_buses] for registry in registries: - if name in registry: - self.tagger.untag_resource_using_names(registry[name].arn, tag_names) - return {} + if name in registry: # type: ignore + self.tagger.untag_resource_using_names(registry[name].arn, tag_names) # type: ignore + return raise ResourceNotFoundException( f"Rule {name} does not exist on EventBus default." ) - def create_archive(self, name, source_arn, description, event_pattern, retention): + def create_archive( + self, + name: str, + source_arn: str, + description: str, + event_pattern: str, + retention: str, + ) -> Archive: if len(name) > 48: raise ValidationException( " 1 validation error detected: " @@ -1518,7 +1594,7 @@ class EventsBackend(BaseBackend): return archive - def describe_archive(self, name): + def describe_archive(self, name: str) -> Dict[str, Any]: archive = self.archives.get(name) if not archive: @@ -1526,7 +1602,12 @@ class EventsBackend(BaseBackend): return archive.describe() - def list_archives(self, name_prefix, source_arn, state): + def list_archives( + self, + name_prefix: Optional[str], + source_arn: Optional[str], + state: Optional[str], + ) -> List[Dict[str, Any]]: if [name_prefix, source_arn, state].count(None) < 2: raise ValidationException( "At most one filter is allowed for ListArchives. " @@ -1556,7 +1637,9 @@ class EventsBackend(BaseBackend): return result - def update_archive(self, name, description, event_pattern, retention): + def update_archive( + self, name: str, description: str, event_pattern: str, retention: str + ) -> Dict[str, Any]: archive = self.archives.get(name) if not archive: @@ -1570,7 +1653,7 @@ class EventsBackend(BaseBackend): "State": archive.state, } - def delete_archive(self, name): + def delete_archive(self, name: str) -> None: archive = self.archives.get(name) if not archive: @@ -1579,8 +1662,14 @@ class EventsBackend(BaseBackend): archive.delete(self.account_id, self.region_name) def start_replay( - self, name, description, source_arn, start_time, end_time, destination - ): + self, + name: str, + description: str, + source_arn: str, + start_time: str, + end_time: str, + destination: Dict[str, Any], + ) -> Dict[str, Any]: event_bus_arn = destination["Arn"] event_bus_arn_pattern = r"^arn:aws:events:[a-zA-Z0-9-]+:\d{12}:event-bus/" if not re.match(event_bus_arn_pattern, event_bus_arn): @@ -1633,13 +1722,15 @@ class EventsBackend(BaseBackend): "State": ReplayState.STARTING.value, # the replay will be done before returning the response } - def describe_replay(self, name): + def describe_replay(self, name: str) -> Dict[str, Any]: replay = self._get_replay(name) return replay.describe() - def list_replays(self, name_prefix, source_arn, state): - if [name_prefix, source_arn, state].count(None) < 2: + def list_replays( + self, name_prefix: str, source_arn: str, state: str + ) -> List[Dict[str, Any]]: + if [name_prefix, source_arn, state].count(None) < 2: # type: ignore raise ValidationException( "At most one filter is allowed for ListReplays. " "Use either : State, EventSourceArn, or NamePrefix." @@ -1652,7 +1743,7 @@ class EventsBackend(BaseBackend): f"1 validation error detected: Value '{state}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [{all_states}]" ) - if [name_prefix, source_arn, state].count(None) == 3: + if [name_prefix, source_arn, state].count(None) == 3: # type: ignore return [replay.describe_short() for replay in self.replays.values()] result = [] @@ -1662,12 +1753,12 @@ class EventsBackend(BaseBackend): result.append(replay.describe_short()) elif source_arn and replay.source_arn == source_arn: result.append(replay.describe_short()) - elif state and replay.state == state: + elif state and replay.state == state: # type: ignore result.append(replay.describe_short()) return result - def cancel_replay(self, name): + def cancel_replay(self, name: str) -> Dict[str, str]: replay = self._get_replay(name) # replays in the state 'COMPLETED' can't be canceled, @@ -1686,7 +1777,13 @@ class EventsBackend(BaseBackend): return {"ReplayArn": replay.arn, "State": ReplayState.CANCELLING.value} - def create_connection(self, name, description, authorization_type, auth_parameters): + def create_connection( + self, + name: str, + description: str, + authorization_type: str, + auth_parameters: Dict[str, Any], + ) -> Connection: connection = Connection( name, self.account_id, @@ -1698,7 +1795,7 @@ class EventsBackend(BaseBackend): self.connections[name] = connection return connection - def update_connection(self, *, name, **kwargs): + def update_connection(self, name: str, **kwargs: Any) -> Dict[str, Any]: connection = self.connections.get(name) if not connection: raise ResourceNotFoundException(f"Connection '{name}' does not exist.") @@ -1708,47 +1805,17 @@ class EventsBackend(BaseBackend): setattr(connection, attr, value) return connection.describe_short() - def list_connections(self): - return self.connections.values() + def list_connections(self) -> List[Connection]: + return list(self.connections.values()) - def describe_connection(self, name): - """ - Retrieves details about a connection. - - Docs: - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_DescribeConnection.html - - Args: - name: The name of the connection to retrieve. - - Raises: - ResourceNotFoundException: When the connection is not present. - - Returns: - dict - """ + def describe_connection(self, name: str) -> Dict[str, Any]: connection = self.connections.get(name) if not connection: raise ResourceNotFoundException(f"Connection '{name}' does not exist.") return connection.describe() - def delete_connection(self, name): - """ - Deletes a connection. - - Docs: - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_DeleteConnection.html - - Args: - name: The name of the connection to delete. - - Raises: - ResourceNotFoundException: When the connection is not present. - - Returns: - dict - """ + def delete_connection(self, name: str) -> Dict[str, Any]: connection = self.connections.pop(name, None) if not connection: raise ResourceNotFoundException(f"Connection '{name}' does not exist.") @@ -1757,22 +1824,13 @@ class EventsBackend(BaseBackend): def create_api_destination( self, - name, - description, - connection_arn, - invocation_endpoint, - invocation_rate_limit_per_second, - http_method, - ): - """ - Creates an API destination, which is an HTTP invocation endpoint configured as a target for events. - - Docs: - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_CreateApiDestination.html - - Returns: - dict - """ + name: str, + description: str, + connection_arn: str, + invocation_endpoint: str, + invocation_rate_limit_per_second: str, + http_method: str, + ) -> Dict[str, Any]: destination = Destination( name=name, account_id=self.account_id, @@ -1787,21 +1845,10 @@ class EventsBackend(BaseBackend): self.destinations[name] = destination return destination.describe_short() - def list_api_destinations(self): - return self.destinations.values() + def list_api_destinations(self) -> List[Destination]: + return list(self.destinations.values()) - def describe_api_destination(self, name): - """ - Retrieves details about an API destination. - - Docs: - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_DescribeApiDestination.html - Args: - name: The name of the API destination to retrieve. - - Returns: - dict - """ + def describe_api_destination(self, name: str) -> Dict[str, Any]: destination = self.destinations.get(name) if not destination: raise ResourceNotFoundException( @@ -1809,16 +1856,7 @@ class EventsBackend(BaseBackend): ) return destination.describe() - def update_api_destination(self, *, name, **kwargs): - """ - Creates an API destination, which is an HTTP invocation endpoint configured as a target for events. - - Docs: - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_UpdateApiDestination.html - - Returns: - dict - """ + def update_api_destination(self, name: str, **kwargs: Any) -> Dict[str, Any]: destination = self.destinations.get(name) if not destination: raise ResourceNotFoundException( @@ -1830,29 +1868,12 @@ class EventsBackend(BaseBackend): setattr(destination, attr, value) return destination.describe_short() - def delete_api_destination(self, name): - """ - Deletes the specified API destination. - - Docs: - https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_DeleteApiDestination.html - - Args: - name: The name of the destination to delete. - - Raises: - ResourceNotFoundException: When the destination is not present. - - Returns: - dict - - """ + def delete_api_destination(self, name: str) -> None: destination = self.destinations.pop(name, None) if not destination: raise ResourceNotFoundException( f"An api-destination '{name}' does not exist." ) - return {} events_backends = BackendDict(EventsBackend, "events") diff --git a/moto/events/notifications.py b/moto/events/notifications.py index 3f380b416..dad88d376 100644 --- a/moto/events/notifications.py +++ b/moto/events/notifications.py @@ -1,7 +1,8 @@ import json +from typing import Any, Dict -_EVENT_S3_OBJECT_CREATED = { +_EVENT_S3_OBJECT_CREATED: Dict[str, Any] = { "version": "0", "id": "17793124-05d4-b198-2fde-7ededc63b103", "detail-type": "Object Created", @@ -14,7 +15,9 @@ _EVENT_S3_OBJECT_CREATED = { } -def send_notification(source, event_name, region, resources, detail): +def send_notification( + source: str, event_name: str, region: str, resources: Any, detail: Any +) -> None: try: _send_safe_notification(source, event_name, region, resources, detail) except: # noqa @@ -22,7 +25,9 @@ def send_notification(source, event_name, region, resources, detail): pass -def _send_safe_notification(source, event_name, region, resources, detail): +def _send_safe_notification( + source: str, event_name: str, region: str, resources: Any, detail: Any +) -> None: from .models import events_backends event = None @@ -51,7 +56,7 @@ def _send_safe_notification(source, event_name, region, resources, detail): _invoke_lambda(account_id, target.get("Arn"), event=event) -def _invoke_lambda(account_id, fn_arn, event): +def _invoke_lambda(account_id: str, fn_arn: str, event: Any) -> None: from moto.awslambda import lambda_backends lmbda_region = fn_arn.split(":")[3] diff --git a/moto/events/responses.py b/moto/events/responses.py index d6cc88a93..c1b25c9ba 100644 --- a/moto/events/responses.py +++ b/moto/events/responses.py @@ -1,36 +1,18 @@ import json - +from typing import Any, Dict, Tuple from moto.core.responses import BaseResponse -from moto.events import events_backends +from moto.events.models import events_backends, EventsBackend class EventsHandler(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="events") @property - def events_backend(self): - """ - Events Backend - - :return: Events Backend object - :rtype: moto.events.models.EventsBackend - """ + def events_backend(self) -> EventsBackend: return events_backends[self.current_account][self.region] - @property - def request_params(self): - if not hasattr(self, "_json_body"): - try: - self._json_body = json.loads(self.body) - except ValueError: - self._json_body = {} - return self._json_body - - def _get_param(self, param_name, if_none=None): - return self.request_params.get(param_name, if_none) - - def _create_response(self, result): + def _create_response(self, result: Any) -> Tuple[str, Dict[str, Any]]: """ Creates a proper response for the API. @@ -44,12 +26,14 @@ class EventsHandler(BaseResponse): """ return json.dumps(result), self.response_headers - def error(self, type_, message="", status=400): - headers = self.response_headers + def error( + self, type_: str, message: str = "", status: int = 400 + ) -> Tuple[str, Dict[str, Any]]: + headers: Dict[str, Any] = self.response_headers headers["status"] = status return json.dumps({"__type": type_, "message": message}), headers - def put_rule(self): + def put_rule(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") event_pattern = self._get_param("EventPattern") scheduled_expression = self._get_param("ScheduleExpression") @@ -72,7 +56,7 @@ class EventsHandler(BaseResponse): result = {"RuleArn": rule.arn} return self._create_response(result) - def delete_rule(self): + def delete_rule(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") if not name: @@ -81,7 +65,7 @@ class EventsHandler(BaseResponse): return "", self.response_headers - def describe_rule(self): + def describe_rule(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") if not name: @@ -92,7 +76,7 @@ class EventsHandler(BaseResponse): result = rule.describe() return self._create_response(result) - def disable_rule(self): + def disable_rule(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") if not name: @@ -105,7 +89,7 @@ class EventsHandler(BaseResponse): return "", self.response_headers - def enable_rule(self): + def enable_rule(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") if not name: @@ -118,10 +102,10 @@ class EventsHandler(BaseResponse): return "", self.response_headers - def generate_presigned_url(self): + def generate_presigned_url(self) -> None: pass - def list_rule_names_by_target(self): + def list_rule_names_by_target(self) -> Tuple[str, Dict[str, Any]]: target_arn = self._get_param("TargetArn") next_token = self._get_param("NextToken") limit = self._get_param("Limit") @@ -137,7 +121,7 @@ class EventsHandler(BaseResponse): return json.dumps(res), self.response_headers - def list_rules(self): + def list_rules(self) -> Tuple[str, Dict[str, Any]]: prefix = self._get_param("NamePrefix") next_token = self._get_param("NextToken") limit = self._get_param("Limit") @@ -152,7 +136,7 @@ class EventsHandler(BaseResponse): return json.dumps(rules_obj), self.response_headers - def list_targets_by_rule(self): + def list_targets_by_rule(self) -> Tuple[str, Dict[str, Any]]: rule_name = self._get_param("Rule") next_token = self._get_param("NextToken") limit = self._get_param("Limit") @@ -171,7 +155,7 @@ class EventsHandler(BaseResponse): return json.dumps(targets), self.response_headers - def put_events(self): + def put_events(self) -> str: events = self._get_param("Entries") entries = self.events_backend.put_events(events) @@ -184,7 +168,7 @@ class EventsHandler(BaseResponse): return json.dumps(response) - def put_targets(self): + def put_targets(self) -> Tuple[str, Dict[str, Any]]: rule_name = self._get_param("Rule") event_bus_name = self._get_param("EventBusName", "default") targets = self._get_param("Targets") @@ -196,7 +180,7 @@ class EventsHandler(BaseResponse): self.response_headers, ) - def remove_targets(self): + def remove_targets(self) -> Tuple[str, Dict[str, Any]]: rule_name = self._get_param("Rule") event_bus_name = self._get_param("EventBusName", "default") ids = self._get_param("Ids") @@ -208,10 +192,10 @@ class EventsHandler(BaseResponse): self.response_headers, ) - def test_event_pattern(self): + def test_event_pattern(self) -> None: pass - def put_permission(self): + def put_permission(self) -> str: event_bus_name = self._get_param("EventBusName") action = self._get_param("Action") principal = self._get_param("Principal") @@ -225,7 +209,7 @@ class EventsHandler(BaseResponse): return "" - def remove_permission(self): + def remove_permission(self) -> str: event_bus_name = self._get_param("EventBusName") statement_id = self._get_param("StatementId") remove_all_permissions = self._get_param("RemoveAllPermissions") @@ -236,7 +220,7 @@ class EventsHandler(BaseResponse): return "" - def describe_event_bus(self): + def describe_event_bus(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") event_bus = self.events_backend.describe_event_bus(name) @@ -247,7 +231,7 @@ class EventsHandler(BaseResponse): return json.dumps(response), self.response_headers - def create_event_bus(self): + def create_event_bus(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") event_source_name = self._get_param("EventSourceName") tags = self._get_param("Tags") @@ -255,7 +239,7 @@ class EventsHandler(BaseResponse): event_bus = self.events_backend.create_event_bus(name, event_source_name, tags) return json.dumps({"EventBusArn": event_bus.arn}), self.response_headers - def list_event_buses(self): + def list_event_buses(self) -> Tuple[str, Dict[str, Any]]: name_prefix = self._get_param("NamePrefix") # ToDo: add 'NextToken' & 'Limit' parameters @@ -270,37 +254,37 @@ class EventsHandler(BaseResponse): return json.dumps({"EventBuses": response}), self.response_headers - def delete_event_bus(self): + def delete_event_bus(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") self.events_backend.delete_event_bus(name) return "", self.response_headers - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> Tuple[str, Dict[str, Any]]: arn = self._get_param("ResourceARN") result = self.events_backend.list_tags_for_resource(arn) return json.dumps(result), self.response_headers - def tag_resource(self): + def tag_resource(self) -> Tuple[str, Dict[str, Any]]: arn = self._get_param("ResourceARN") tags = self._get_param("Tags") - result = self.events_backend.tag_resource(arn, tags) + self.events_backend.tag_resource(arn, tags) - return json.dumps(result), self.response_headers + return "{}", self.response_headers - def untag_resource(self): + def untag_resource(self) -> Tuple[str, Dict[str, Any]]: arn = self._get_param("ResourceARN") tags = self._get_param("TagKeys") - result = self.events_backend.untag_resource(arn, tags) + self.events_backend.untag_resource(arn, tags) - return json.dumps(result), self.response_headers + return "{}", self.response_headers - def create_archive(self): + def create_archive(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ArchiveName") source_arn = self._get_param("EventSourceArn") description = self._get_param("Description") @@ -322,14 +306,14 @@ class EventsHandler(BaseResponse): self.response_headers, ) - def describe_archive(self): + def describe_archive(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ArchiveName") result = self.events_backend.describe_archive(name) return json.dumps(result), self.response_headers - def list_archives(self): + def list_archives(self) -> Tuple[str, Dict[str, Any]]: name_prefix = self._get_param("NamePrefix") source_arn = self._get_param("EventSourceArn") state = self._get_param("State") @@ -338,7 +322,7 @@ class EventsHandler(BaseResponse): return json.dumps({"Archives": result}), self.response_headers - def update_archive(self): + def update_archive(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ArchiveName") description = self._get_param("Description") event_pattern = self._get_param("EventPattern") @@ -350,14 +334,14 @@ class EventsHandler(BaseResponse): return json.dumps(result), self.response_headers - def delete_archive(self): + def delete_archive(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ArchiveName") self.events_backend.delete_archive(name) return "", self.response_headers - def start_replay(self): + def start_replay(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ReplayName") description = self._get_param("Description") source_arn = self._get_param("EventSourceArn") @@ -371,14 +355,14 @@ class EventsHandler(BaseResponse): return json.dumps(result), self.response_headers - def describe_replay(self): + def describe_replay(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ReplayName") result = self.events_backend.describe_replay(name) return json.dumps(result), self.response_headers - def list_replays(self): + def list_replays(self) -> Tuple[str, Dict[str, Any]]: name_prefix = self._get_param("NamePrefix") source_arn = self._get_param("EventSourceArn") state = self._get_param("State") @@ -387,14 +371,14 @@ class EventsHandler(BaseResponse): return json.dumps({"Replays": result}), self.response_headers - def cancel_replay(self): + def cancel_replay(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("ReplayName") result = self.events_backend.cancel_replay(name) return json.dumps(result), self.response_headers - def create_connection(self): + def create_connection(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") description = self._get_param("Description") authorization_type = self._get_param("AuthorizationType") @@ -416,7 +400,7 @@ class EventsHandler(BaseResponse): self.response_headers, ) - def list_connections(self): + def list_connections(self) -> Tuple[str, Dict[str, Any]]: connections = self.events_backend.list_connections() result = [] for connection in connections: @@ -432,12 +416,12 @@ class EventsHandler(BaseResponse): return json.dumps({"Connections": result}), self.response_headers - def describe_connection(self): + def describe_connection(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") result = self.events_backend.describe_connection(name) return json.dumps(result), self.response_headers - def update_connection(self): + def update_connection(self) -> Tuple[str, Dict[str, Any]]: updates = dict( name=self._get_param("Name"), description=self._get_param("Description"), @@ -447,12 +431,12 @@ class EventsHandler(BaseResponse): result = self.events_backend.update_connection(**updates) return self._create_response(result) - def delete_connection(self): + def delete_connection(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") result = self.events_backend.delete_connection(name) return json.dumps(result), self.response_headers - def create_api_destination(self): + def create_api_destination(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") description = self._get_param("Description") connection_arn = self._get_param("ConnectionArn") @@ -472,7 +456,7 @@ class EventsHandler(BaseResponse): ) return self._create_response(result) - def list_api_destinations(self): + def list_api_destinations(self) -> Tuple[str, Dict[str, Any]]: destinations = self.events_backend.list_api_destinations() result = [] for destination in destinations: @@ -491,12 +475,12 @@ class EventsHandler(BaseResponse): return json.dumps({"ApiDestinations": result}), self.response_headers - def describe_api_destination(self): + def describe_api_destination(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") result = self.events_backend.describe_api_destination(name) return self._create_response(result) - def update_api_destination(self): + def update_api_destination(self) -> Tuple[str, Dict[str, Any]]: updates = dict( connection_arn=self._get_param("ConnectionArn"), description=self._get_param("Description"), @@ -511,7 +495,7 @@ class EventsHandler(BaseResponse): result = self.events_backend.update_api_destination(**updates) return self._create_response(result) - def delete_api_destination(self): + def delete_api_destination(self) -> Tuple[str, Dict[str, Any]]: name = self._get_param("Name") - result = self.events_backend.delete_api_destination(name) - return self._create_response(result) + self.events_backend.delete_api_destination(name) + return self._create_response({}) diff --git a/setup.cfg b/setup.cfg index e75a9295a..985b192b8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/elb,moto/elbv2,moto/emr,moto/es,moto/moto_api,moto/neptune +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/moto_api,moto/neptune show_column_numbers=True show_error_codes = True disable_error_code=abstract