Techdebt: MyPy Events, EMR (#6012)

This commit is contained in:
Bert Blommers 2023-03-05 09:27:17 -01:00 committed by GitHub
parent 57205668ed
commit 101cee8360
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 575 additions and 586 deletions

View File

@ -5,5 +5,5 @@ from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
code = 400
def __init__(self, resource):
def __init__(self, resource: str):
super().__init__("ResourceNotFoundException", resource)

View File

@ -1,6 +1,7 @@
"""EMRContainersBackend class with methods for supported APIs."""
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Iterator
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds
@ -22,14 +23,14 @@ JOB_STATUS = "RUNNING"
class FakeCluster(BaseModel):
def __init__(
self,
name,
container_provider,
client_token,
account_id,
region_name,
aws_partition,
tags=None,
virtual_cluster_id=None,
name: str,
container_provider: Dict[str, Any],
client_token: str,
account_id: str,
region_name: str,
aws_partition: str,
tags: Optional[Dict[str, str]] = None,
virtual_cluster_id: Optional[str] = None,
):
self.id = virtual_cluster_id or random_cluster_id()
@ -50,7 +51,7 @@ class FakeCluster(BaseModel):
)
self.tags = tags
def __iter__(self):
def __iter__(self) -> Iterator[Tuple[str, Any]]:
yield "id", self.id
yield "name", self.name
yield "arn", self.arn
@ -59,7 +60,7 @@ class FakeCluster(BaseModel):
yield "createdAt", self.creation_date
yield "tags", self.tags
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
# Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeVirtualCluster.html
# (response syntax section)
return {
@ -76,17 +77,17 @@ class FakeCluster(BaseModel):
class FakeJob(BaseModel):
def __init__(
self,
name,
virtual_cluster_id,
client_token,
execution_role_arn,
release_label,
job_driver,
configuration_overrides,
account_id,
region_name,
aws_partition,
tags,
name: str,
virtual_cluster_id: str,
client_token: str,
execution_role_arn: str,
release_label: str,
job_driver: str,
configuration_overrides: Dict[str, Any],
account_id: str,
region_name: str,
aws_partition: str,
tags: Optional[Dict[str, str]],
):
self.id = random_job_id()
self.name = name
@ -108,12 +109,12 @@ class FakeJob(BaseModel):
datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
)
self.created_by = None
self.finished_at = None
self.state_details = None
self.finished_at: Optional[str] = None
self.state_details: Optional[str] = None
self.failure_reason = None
self.tags = tags
def __iter__(self):
def __iter__(self) -> Iterator[Tuple[str, Any]]:
yield "id", self.id
yield "name", self.name
yield "virtualClusterId", self.virtual_cluster_id
@ -131,7 +132,7 @@ class FakeJob(BaseModel):
yield "failureReason", self.failure_reason
yield "tags", self.tags
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
# Format for summary https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DescribeJobRun.html
# (response syntax section)
return {
@ -157,15 +158,21 @@ class FakeJob(BaseModel):
class EMRContainersBackend(BaseBackend):
"""Implementation of EMRContainers APIs."""
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.virtual_clusters = dict()
self.virtual_clusters: Dict[str, FakeCluster] = dict()
self.virtual_cluster_count = 0
self.jobs = dict()
self.jobs: Dict[str, FakeJob] = dict()
self.job_count = 0
self.partition = get_partition(region_name)
def create_virtual_cluster(self, name, container_provider, client_token, tags=None):
def create_virtual_cluster(
self,
name: str,
container_provider: Dict[str, Any],
client_token: str,
tags: Optional[Dict[str, str]] = None,
) -> FakeCluster:
occupied_namespaces = [
virtual_cluster.namespace
for virtual_cluster in self.virtual_clusters.values()
@ -190,14 +197,14 @@ class EMRContainersBackend(BaseBackend):
self.virtual_cluster_count += 1
return virtual_cluster
def delete_virtual_cluster(self, cluster_id):
def delete_virtual_cluster(self, cluster_id: str) -> FakeCluster:
if cluster_id not in self.virtual_clusters:
raise ValidationException("VirtualCluster does not exist")
self.virtual_clusters[cluster_id].state = "TERMINATED"
return self.virtual_clusters[cluster_id]
def describe_virtual_cluster(self, cluster_id):
def describe_virtual_cluster(self, cluster_id: str) -> Dict[str, Any]:
if cluster_id not in self.virtual_clusters:
raise ValidationException(f"Virtual cluster {cluster_id} doesn't exist.")
@ -205,14 +212,14 @@ class EMRContainersBackend(BaseBackend):
def list_virtual_clusters(
self,
container_provider_id,
container_provider_type,
created_after,
created_before,
states,
max_results,
next_token,
):
container_provider_id: str,
container_provider_type: str,
created_after: str,
created_before: str,
states: Optional[List[str]],
max_results: int,
next_token: Optional[str],
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
virtual_clusters = [
virtual_cluster.to_dict()
for virtual_cluster in self.virtual_clusters.values()
@ -258,15 +265,15 @@ class EMRContainersBackend(BaseBackend):
def start_job_run(
self,
name,
virtual_cluster_id,
client_token,
execution_role_arn,
release_label,
job_driver,
configuration_overrides,
tags,
):
name: str,
virtual_cluster_id: str,
client_token: str,
execution_role_arn: str,
release_label: str,
job_driver: str,
configuration_overrides: Dict[str, Any],
tags: Dict[str, str],
) -> FakeJob:
if virtual_cluster_id not in self.virtual_clusters.keys():
raise ResourceNotFoundException(
@ -296,7 +303,7 @@ class EMRContainersBackend(BaseBackend):
self.job_count += 1
return job
def cancel_job_run(self, job_id, virtual_cluster_id):
def cancel_job_run(self, job_id: str, virtual_cluster_id: str) -> FakeJob:
if not re.match(r"[a-z,A-Z,0-9]{19}", job_id):
raise ValidationException("Invalid job run short id")
@ -326,14 +333,14 @@ class EMRContainersBackend(BaseBackend):
def list_job_runs(
self,
virtual_cluster_id,
created_before,
created_after,
name,
states,
max_results,
next_token,
):
virtual_cluster_id: str,
created_before: str,
created_after: str,
name: str,
states: Optional[List[str]],
max_results: int,
next_token: Optional[str],
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
jobs = [job.to_dict() for job in self.jobs.values()]
jobs = [job for job in jobs if job["virtualClusterId"] == virtual_cluster_id]
@ -353,7 +360,7 @@ class EMRContainersBackend(BaseBackend):
sort_key = "id"
return paginated_list(jobs, sort_key, max_results, next_token)
def describe_job_run(self, job_id, virtual_cluster_id):
def describe_job_run(self, job_id: str, virtual_cluster_id: str) -> Dict[str, Any]:
if not re.match(r"[a-z,A-Z,0-9]{19}", job_id):
raise ValidationException("Invalid job run short id")

View File

@ -1,8 +1,8 @@
"""Handles incoming emrcontainers requests, invokes methods, returns responses."""
import json
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import emrcontainers_backends
from .models import emrcontainers_backends, EMRContainersBackend
DEFAULT_MAX_RESULTS = 100
DEFAULT_NEXT_TOKEN = ""
@ -12,15 +12,15 @@ DEFAULT_CONTAINER_PROVIDER_TYPE = "EKS"
class EMRContainersResponse(BaseResponse):
"""Handler for EMRContainers requests and responses."""
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="emr-containers")
@property
def emrcontainers_backend(self):
def emrcontainers_backend(self) -> EMRContainersBackend:
"""Return backend instance specific for this region."""
return emrcontainers_backends[self.current_account][self.region]
def create_virtual_cluster(self):
def create_virtual_cluster(self) -> TYPE_RESPONSE:
name = self._get_param("name")
container_provider = self._get_param("containerProvider")
client_token = self._get_param("clientToken")
@ -34,7 +34,7 @@ class EMRContainersResponse(BaseResponse):
)
return 200, {}, json.dumps(dict(virtual_cluster))
def delete_virtual_cluster(self):
def delete_virtual_cluster(self) -> TYPE_RESPONSE:
cluster_id = self._get_param("virtualClusterId")
virtual_cluster = self.emrcontainers_backend.delete_virtual_cluster(
@ -42,7 +42,7 @@ class EMRContainersResponse(BaseResponse):
)
return 200, {}, json.dumps(dict(virtual_cluster))
def describe_virtual_cluster(self):
def describe_virtual_cluster(self) -> TYPE_RESPONSE:
cluster_id = self._get_param("virtualClusterId")
virtual_cluster = self.emrcontainers_backend.describe_virtual_cluster(
@ -51,7 +51,7 @@ class EMRContainersResponse(BaseResponse):
response = {"virtualCluster": virtual_cluster}
return 200, {}, json.dumps(response)
def list_virtual_clusters(self):
def list_virtual_clusters(self) -> TYPE_RESPONSE:
container_provider_id = self._get_param("containerProviderId")
container_provider_type = self._get_param(
"containerProviderType", DEFAULT_CONTAINER_PROVIDER_TYPE
@ -75,7 +75,7 @@ class EMRContainersResponse(BaseResponse):
response = {"virtualClusters": virtual_clusters, "nextToken": next_token}
return 200, {}, json.dumps(response)
def start_job_run(self):
def start_job_run(self) -> TYPE_RESPONSE:
name = self._get_param("name")
virtual_cluster_id = self._get_param("virtualClusterId")
client_token = self._get_param("clientToken")
@ -97,7 +97,7 @@ class EMRContainersResponse(BaseResponse):
)
return 200, {}, json.dumps(dict(job))
def cancel_job_run(self):
def cancel_job_run(self) -> TYPE_RESPONSE:
job_id = self._get_param("jobRunId")
virtual_cluster_id = self._get_param("virtualClusterId")
@ -106,7 +106,7 @@ class EMRContainersResponse(BaseResponse):
)
return 200, {}, json.dumps(dict(job))
def list_job_runs(self):
def list_job_runs(self) -> TYPE_RESPONSE:
virtual_cluster_id = self._get_param("virtualClusterId")
created_before = self._get_param("createdBefore")
created_after = self._get_param("createdAfter")
@ -128,7 +128,7 @@ class EMRContainersResponse(BaseResponse):
response = {"jobRuns": job_runs, "nextToken": next_token}
return 200, {}, json.dumps(response)
def describe_job_run(self):
def describe_job_run(self) -> TYPE_RESPONSE:
job_id = self._get_param("jobRunId")
virtual_cluster_id = self._get_param("virtualClusterId")

View File

@ -1,9 +1,10 @@
# import json
import string
from typing import Any, List, Optional, Tuple
from moto.moto_api._internal import mock_random as random
def get_partition(region):
def get_partition(region: str) -> str:
valid_matches = [
# (region prefix, aws partition)
("cn-", "aws-cn"),
@ -18,33 +19,35 @@ def get_partition(region):
return "aws"
def random_id(size=13):
def random_id(size: int = 13) -> str:
chars = list(range(10)) + list(string.ascii_lowercase)
return "".join(str(random.choice(chars)) for x in range(size))
def random_cluster_id():
def random_cluster_id() -> str:
return random_id(size=25)
def random_job_id():
def random_job_id() -> str:
return random_id(size=19)
def paginated_list(full_list, sort_key, max_results, next_token):
def paginated_list(
full_list: List[Any], sort_key: str, max_results: int, next_token: Optional[str]
) -> Tuple[List[Any], Optional[str]]:
"""
Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results
number of elements, and the new next_token which can be passed back in for the next segment of the full list.
"""
if next_token is None or not next_token:
next_token = 0
next_token = int(next_token)
next_token = 0 # type: ignore
next_token = int(next_token) # type: ignore
sorted_list = sorted(full_list, key=lambda d: d[sort_key])
values = sorted_list[next_token : next_token + max_results]
values = sorted_list[next_token : next_token + max_results] # type: ignore
if len(values) == max_results:
new_next_token = str(next_token + max_results)
new_next_token = str(next_token + max_results) # type: ignore
else:
new_next_token = None
return values, new_next_token

View File

@ -5,7 +5,7 @@ from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
code = 400
def __init__(self, resource):
def __init__(self, resource: str):
super().__init__(
"ResourceNotFoundException", f"Application {resource} does not exist"
)
@ -14,5 +14,5 @@ class ResourceNotFoundException(JsonRESTError):
class ValidationException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ValidationException", message)

View File

@ -1,17 +1,16 @@
"""EMRServerlessBackend class with methods for supported APIs."""
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Iterator
import inspect
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds
from moto.emrcontainers.utils import get_partition, paginated_list
from .utils import (
default_auto_start_configuration,
default_auto_stop_configuration,
get_partition,
paginated_list,
random_appplication_id,
# random_job_id,
)
from .exceptions import ResourceNotFoundException, ValidationException
@ -26,18 +25,18 @@ JOB_STATUS = "RUNNING"
class FakeApplication(BaseModel):
def __init__(
self,
name,
release_label,
application_type,
client_token,
account_id,
region_name,
initial_capacity,
maximum_capacity,
tags,
auto_start_configuration,
auto_stop_configuration,
network_configuration,
name: str,
release_label: str,
application_type: str,
client_token: str,
account_id: str,
region_name: str,
initial_capacity: str,
maximum_capacity: str,
tags: Dict[str, str],
auto_start_configuration: str,
auto_stop_configuration: str,
network_configuration: str,
):
# Provided parameters
self.name = name
@ -53,7 +52,7 @@ class FakeApplication(BaseModel):
auto_stop_configuration or default_auto_stop_configuration()
)
self.network_configuration = network_configuration
self.tags = tags or {}
self.tags: Dict[str, str] = tags or {}
# Service-generated-parameters
self.id = random_appplication_id()
@ -70,14 +69,14 @@ class FakeApplication(BaseModel):
)
self.updated_at = self.created_at
def __iter__(self):
def __iter__(self) -> Iterator[Tuple[str, Any]]:
yield "applicationId", self.id
yield "name", self.name
yield "arn", self.arn
yield "autoStartConfig", self.auto_start_configuration,
yield "autoStopConfig", self.auto_stop_configuration,
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
"""
Dictionary representation of an EMR Serverless Application.
When used in `list-applications`, capacity, auto-start/stop configs, and tags are not returned. https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_ListApplications.html
@ -127,26 +126,25 @@ class FakeApplication(BaseModel):
class EMRServerlessBackend(BaseBackend):
"""Implementation of EMRServerless APIs."""
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.region_name = region_name
self.applications = dict()
self.jobs = dict()
self.applications: Dict[str, FakeApplication] = dict()
self.partition = get_partition(region_name)
def create_application(
self,
name,
release_label,
application_type,
client_token,
initial_capacity,
maximum_capacity,
tags,
auto_start_configuration,
auto_stop_configuration,
network_configuration,
):
name: str,
release_label: str,
application_type: str,
client_token: str,
initial_capacity: str,
maximum_capacity: str,
tags: Dict[str, str],
auto_start_configuration: str,
auto_stop_configuration: str,
network_configuration: str,
) -> FakeApplication:
if application_type not in ["HIVE", "SPARK"]:
raise ValidationException(f"Unsupported engine {application_type}")
@ -173,7 +171,7 @@ class EMRServerlessBackend(BaseBackend):
self.applications[application.id] = application
return application
def delete_application(self, application_id):
def delete_application(self, application_id: str) -> None:
if application_id not in self.applications.keys():
raise ResourceNotFoundException(application_id)
@ -184,13 +182,15 @@ class EMRServerlessBackend(BaseBackend):
)
self.applications[application_id].state = "TERMINATED"
def get_application(self, application_id):
def get_application(self, application_id: str) -> Dict[str, Any]:
if application_id not in self.applications.keys():
raise ResourceNotFoundException(application_id)
return self.applications[application_id].to_dict()
def list_applications(self, next_token, max_results, states):
def list_applications(
self, next_token: Optional[str], max_results: int, states: Optional[List[str]]
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
applications = [
application.to_dict() for application in self.applications.values()
]
@ -203,25 +203,25 @@ class EMRServerlessBackend(BaseBackend):
sort_key = "name"
return paginated_list(applications, sort_key, max_results, next_token)
def start_application(self, application_id):
def start_application(self, application_id: str) -> None:
if application_id not in self.applications.keys():
raise ResourceNotFoundException(application_id)
self.applications[application_id].state = "STARTED"
def stop_application(self, application_id):
def stop_application(self, application_id: str) -> None:
if application_id not in self.applications.keys():
raise ResourceNotFoundException(application_id)
self.applications[application_id].state = "STOPPED"
def update_application(
self,
application_id,
initial_capacity,
maximum_capacity,
auto_start_configuration,
auto_stop_configuration,
network_configuration,
):
application_id: str,
initial_capacity: Optional[str],
maximum_capacity: Optional[str],
auto_start_configuration: Optional[str],
auto_stop_configuration: Optional[str],
network_configuration: Optional[str],
) -> Dict[str, Any]:
if application_id not in self.applications.keys():
raise ResourceNotFoundException(application_id)

View File

@ -1,8 +1,9 @@
"""Handles incoming emrserverless requests, invokes methods, returns responses."""
import json
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import emrserverless_backends
from .models import emrserverless_backends, EMRServerlessBackend
DEFAULT_MAX_RESULTS = 100
DEFAULT_NEXT_TOKEN = ""
@ -33,15 +34,15 @@ These are the available methos:
class EMRServerlessResponse(BaseResponse):
"""Handler for EMRServerless requests and responses."""
def __init__(self):
def __init__(self) -> None:
super().__init__("emr-serverless")
@property
def emrserverless_backend(self):
def emrserverless_backend(self) -> EMRServerlessBackend:
"""Return backend instance specific for this region."""
return emrserverless_backends[self.current_account][self.region]
def create_application(self):
def create_application(self) -> TYPE_RESPONSE:
name = self._get_param("name")
release_label = self._get_param("releaseLabel")
application_type = self._get_param("type")
@ -65,15 +66,15 @@ class EMRServerlessResponse(BaseResponse):
auto_stop_configuration=auto_stop_configuration,
network_configuration=network_configuration,
)
return (200, {}, json.dumps(dict(application)))
return 200, {}, json.dumps(dict(application))
def delete_application(self):
def delete_application(self) -> TYPE_RESPONSE:
application_id = self._get_param("applicationId")
self.emrserverless_backend.delete_application(application_id=application_id)
return (200, {}, None)
return 200, {}, ""
def get_application(self):
def get_application(self) -> TYPE_RESPONSE:
application_id = self._get_param("applicationId")
application = self.emrserverless_backend.get_application(
@ -82,7 +83,7 @@ class EMRServerlessResponse(BaseResponse):
response = {"application": application}
return 200, {}, json.dumps(response)
def list_applications(self):
def list_applications(self) -> TYPE_RESPONSE:
states = self.querystring.get("states", [])
max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
@ -95,19 +96,19 @@ class EMRServerlessResponse(BaseResponse):
response = {"applications": applications, "nextToken": next_token}
return 200, {}, json.dumps(response)
def start_application(self):
def start_application(self) -> TYPE_RESPONSE:
application_id = self._get_param("applicationId")
self.emrserverless_backend.start_application(application_id=application_id)
return (200, {}, None)
return 200, {}, ""
def stop_application(self):
def stop_application(self) -> TYPE_RESPONSE:
application_id = self._get_param("applicationId")
self.emrserverless_backend.stop_application(application_id=application_id)
return (200, {}, None)
return 200, {}, ""
def update_application(self):
def update_application(self) -> TYPE_RESPONSE:
application_id = self._get_param("applicationId")
initial_capacity = self._get_param("initialCapacity")
maximum_capacity = self._get_param("maximumCapacity")

View File

@ -1,57 +1,24 @@
from moto.moto_api._internal import mock_random as random
import string
from typing import Any, Dict
def get_partition(region):
valid_matches = [
# (region prefix, aws partition)
("cn-", "aws-cn"),
("us-gov-", "aws-us-gov"),
("us-gov-iso-", "aws-iso"),
("us-gov-iso-b-", "aws-iso-b"),
]
for prefix, partition in valid_matches:
if region.startswith(prefix):
return partition
return "aws"
def random_id(size=13):
def random_id(size: int = 13) -> str:
chars = list(range(10)) + list(string.ascii_lowercase)
return "".join(str(random.choice(chars)) for x in range(size))
def random_appplication_id():
def random_appplication_id() -> str:
return random_id(size=16)
def random_job_id():
def random_job_id() -> str:
return random_id(size=16)
def default_auto_start_configuration():
def default_auto_start_configuration() -> Dict[str, bool]:
return {"enabled": True}
def default_auto_stop_configuration():
def default_auto_stop_configuration() -> Dict[str, Any]:
return {"enabled": True, "idleTimeoutMinutes": 15}
def paginated_list(full_list, sort_key, max_results, next_token):
"""
Returns a tuple containing a slice of the full list starting at next_token and ending with at most the max_results
number of elements, and the new next_token which can be passed back in for the next segment of the full list.
"""
if next_token is None or not next_token:
next_token = 0
next_token = int(next_token)
sorted_list = sorted(full_list, key=lambda d: d[sort_key])
values = sorted_list[next_token : next_token + max_results]
if len(values) == max_results:
new_next_token = str(next_token + max_results)
else:
new_next_token = None
return values, new_next_token

View File

@ -1,17 +1,18 @@
from typing import Optional
from moto.core.exceptions import JsonRESTError
class IllegalStatusException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("IllegalStatusException", message)
class InvalidEventPatternException(JsonRESTError):
code = 400
def __init__(self, reason=None):
def __init__(self, reason: Optional[str] = None):
msg = "Event pattern is not valid. "
if reason:
msg += f"Reason: {reason}"
@ -22,19 +23,19 @@ class InvalidEventPatternException(JsonRESTError):
class ResourceNotFoundException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ResourceNotFoundException", message)
class ResourceAlreadyExistsException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ResourceAlreadyExistsException", message)
class ValidationException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ValidationException", message)

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,8 @@
import json
from typing import Any, Dict
_EVENT_S3_OBJECT_CREATED = {
_EVENT_S3_OBJECT_CREATED: Dict[str, Any] = {
"version": "0",
"id": "17793124-05d4-b198-2fde-7ededc63b103",
"detail-type": "Object Created",
@ -14,7 +15,9 @@ _EVENT_S3_OBJECT_CREATED = {
}
def send_notification(source, event_name, region, resources, detail):
def send_notification(
source: str, event_name: str, region: str, resources: Any, detail: Any
) -> None:
try:
_send_safe_notification(source, event_name, region, resources, detail)
except: # noqa
@ -22,7 +25,9 @@ def send_notification(source, event_name, region, resources, detail):
pass
def _send_safe_notification(source, event_name, region, resources, detail):
def _send_safe_notification(
source: str, event_name: str, region: str, resources: Any, detail: Any
) -> None:
from .models import events_backends
event = None
@ -51,7 +56,7 @@ def _send_safe_notification(source, event_name, region, resources, detail):
_invoke_lambda(account_id, target.get("Arn"), event=event)
def _invoke_lambda(account_id, fn_arn, event):
def _invoke_lambda(account_id: str, fn_arn: str, event: Any) -> None:
from moto.awslambda import lambda_backends
lmbda_region = fn_arn.split(":")[3]

View File

@ -1,36 +1,18 @@
import json
from typing import Any, Dict, Tuple
from moto.core.responses import BaseResponse
from moto.events import events_backends
from moto.events.models import events_backends, EventsBackend
class EventsHandler(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="events")
@property
def events_backend(self):
"""
Events Backend
:return: Events Backend object
:rtype: moto.events.models.EventsBackend
"""
def events_backend(self) -> EventsBackend:
return events_backends[self.current_account][self.region]
@property
def request_params(self):
if not hasattr(self, "_json_body"):
try:
self._json_body = json.loads(self.body)
except ValueError:
self._json_body = {}
return self._json_body
def _get_param(self, param_name, if_none=None):
return self.request_params.get(param_name, if_none)
def _create_response(self, result):
def _create_response(self, result: Any) -> Tuple[str, Dict[str, Any]]:
"""
Creates a proper response for the API.
@ -44,12 +26,14 @@ class EventsHandler(BaseResponse):
"""
return json.dumps(result), self.response_headers
def error(self, type_, message="", status=400):
headers = self.response_headers
def error(
self, type_: str, message: str = "", status: int = 400
) -> Tuple[str, Dict[str, Any]]:
headers: Dict[str, Any] = self.response_headers
headers["status"] = status
return json.dumps({"__type": type_, "message": message}), headers
def put_rule(self):
def put_rule(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
event_pattern = self._get_param("EventPattern")
scheduled_expression = self._get_param("ScheduleExpression")
@ -72,7 +56,7 @@ class EventsHandler(BaseResponse):
result = {"RuleArn": rule.arn}
return self._create_response(result)
def delete_rule(self):
def delete_rule(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
if not name:
@ -81,7 +65,7 @@ class EventsHandler(BaseResponse):
return "", self.response_headers
def describe_rule(self):
def describe_rule(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
if not name:
@ -92,7 +76,7 @@ class EventsHandler(BaseResponse):
result = rule.describe()
return self._create_response(result)
def disable_rule(self):
def disable_rule(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
if not name:
@ -105,7 +89,7 @@ class EventsHandler(BaseResponse):
return "", self.response_headers
def enable_rule(self):
def enable_rule(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
if not name:
@ -118,10 +102,10 @@ class EventsHandler(BaseResponse):
return "", self.response_headers
def generate_presigned_url(self):
def generate_presigned_url(self) -> None:
pass
def list_rule_names_by_target(self):
def list_rule_names_by_target(self) -> Tuple[str, Dict[str, Any]]:
target_arn = self._get_param("TargetArn")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
@ -137,7 +121,7 @@ class EventsHandler(BaseResponse):
return json.dumps(res), self.response_headers
def list_rules(self):
def list_rules(self) -> Tuple[str, Dict[str, Any]]:
prefix = self._get_param("NamePrefix")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
@ -152,7 +136,7 @@ class EventsHandler(BaseResponse):
return json.dumps(rules_obj), self.response_headers
def list_targets_by_rule(self):
def list_targets_by_rule(self) -> Tuple[str, Dict[str, Any]]:
rule_name = self._get_param("Rule")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
@ -171,7 +155,7 @@ class EventsHandler(BaseResponse):
return json.dumps(targets), self.response_headers
def put_events(self):
def put_events(self) -> str:
events = self._get_param("Entries")
entries = self.events_backend.put_events(events)
@ -184,7 +168,7 @@ class EventsHandler(BaseResponse):
return json.dumps(response)
def put_targets(self):
def put_targets(self) -> Tuple[str, Dict[str, Any]]:
rule_name = self._get_param("Rule")
event_bus_name = self._get_param("EventBusName", "default")
targets = self._get_param("Targets")
@ -196,7 +180,7 @@ class EventsHandler(BaseResponse):
self.response_headers,
)
def remove_targets(self):
def remove_targets(self) -> Tuple[str, Dict[str, Any]]:
rule_name = self._get_param("Rule")
event_bus_name = self._get_param("EventBusName", "default")
ids = self._get_param("Ids")
@ -208,10 +192,10 @@ class EventsHandler(BaseResponse):
self.response_headers,
)
def test_event_pattern(self):
def test_event_pattern(self) -> None:
pass
def put_permission(self):
def put_permission(self) -> str:
event_bus_name = self._get_param("EventBusName")
action = self._get_param("Action")
principal = self._get_param("Principal")
@ -225,7 +209,7 @@ class EventsHandler(BaseResponse):
return ""
def remove_permission(self):
def remove_permission(self) -> str:
event_bus_name = self._get_param("EventBusName")
statement_id = self._get_param("StatementId")
remove_all_permissions = self._get_param("RemoveAllPermissions")
@ -236,7 +220,7 @@ class EventsHandler(BaseResponse):
return ""
def describe_event_bus(self):
def describe_event_bus(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
event_bus = self.events_backend.describe_event_bus(name)
@ -247,7 +231,7 @@ class EventsHandler(BaseResponse):
return json.dumps(response), self.response_headers
def create_event_bus(self):
def create_event_bus(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
event_source_name = self._get_param("EventSourceName")
tags = self._get_param("Tags")
@ -255,7 +239,7 @@ class EventsHandler(BaseResponse):
event_bus = self.events_backend.create_event_bus(name, event_source_name, tags)
return json.dumps({"EventBusArn": event_bus.arn}), self.response_headers
def list_event_buses(self):
def list_event_buses(self) -> Tuple[str, Dict[str, Any]]:
name_prefix = self._get_param("NamePrefix")
# ToDo: add 'NextToken' & 'Limit' parameters
@ -270,37 +254,37 @@ class EventsHandler(BaseResponse):
return json.dumps({"EventBuses": response}), self.response_headers
def delete_event_bus(self):
def delete_event_bus(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
self.events_backend.delete_event_bus(name)
return "", self.response_headers
def list_tags_for_resource(self):
def list_tags_for_resource(self) -> Tuple[str, Dict[str, Any]]:
arn = self._get_param("ResourceARN")
result = self.events_backend.list_tags_for_resource(arn)
return json.dumps(result), self.response_headers
def tag_resource(self):
def tag_resource(self) -> Tuple[str, Dict[str, Any]]:
arn = self._get_param("ResourceARN")
tags = self._get_param("Tags")
result = self.events_backend.tag_resource(arn, tags)
self.events_backend.tag_resource(arn, tags)
return json.dumps(result), self.response_headers
return "{}", self.response_headers
def untag_resource(self):
def untag_resource(self) -> Tuple[str, Dict[str, Any]]:
arn = self._get_param("ResourceARN")
tags = self._get_param("TagKeys")
result = self.events_backend.untag_resource(arn, tags)
self.events_backend.untag_resource(arn, tags)
return json.dumps(result), self.response_headers
return "{}", self.response_headers
def create_archive(self):
def create_archive(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ArchiveName")
source_arn = self._get_param("EventSourceArn")
description = self._get_param("Description")
@ -322,14 +306,14 @@ class EventsHandler(BaseResponse):
self.response_headers,
)
def describe_archive(self):
def describe_archive(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ArchiveName")
result = self.events_backend.describe_archive(name)
return json.dumps(result), self.response_headers
def list_archives(self):
def list_archives(self) -> Tuple[str, Dict[str, Any]]:
name_prefix = self._get_param("NamePrefix")
source_arn = self._get_param("EventSourceArn")
state = self._get_param("State")
@ -338,7 +322,7 @@ class EventsHandler(BaseResponse):
return json.dumps({"Archives": result}), self.response_headers
def update_archive(self):
def update_archive(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ArchiveName")
description = self._get_param("Description")
event_pattern = self._get_param("EventPattern")
@ -350,14 +334,14 @@ class EventsHandler(BaseResponse):
return json.dumps(result), self.response_headers
def delete_archive(self):
def delete_archive(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ArchiveName")
self.events_backend.delete_archive(name)
return "", self.response_headers
def start_replay(self):
def start_replay(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ReplayName")
description = self._get_param("Description")
source_arn = self._get_param("EventSourceArn")
@ -371,14 +355,14 @@ class EventsHandler(BaseResponse):
return json.dumps(result), self.response_headers
def describe_replay(self):
def describe_replay(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ReplayName")
result = self.events_backend.describe_replay(name)
return json.dumps(result), self.response_headers
def list_replays(self):
def list_replays(self) -> Tuple[str, Dict[str, Any]]:
name_prefix = self._get_param("NamePrefix")
source_arn = self._get_param("EventSourceArn")
state = self._get_param("State")
@ -387,14 +371,14 @@ class EventsHandler(BaseResponse):
return json.dumps({"Replays": result}), self.response_headers
def cancel_replay(self):
def cancel_replay(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("ReplayName")
result = self.events_backend.cancel_replay(name)
return json.dumps(result), self.response_headers
def create_connection(self):
def create_connection(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
description = self._get_param("Description")
authorization_type = self._get_param("AuthorizationType")
@ -416,7 +400,7 @@ class EventsHandler(BaseResponse):
self.response_headers,
)
def list_connections(self):
def list_connections(self) -> Tuple[str, Dict[str, Any]]:
connections = self.events_backend.list_connections()
result = []
for connection in connections:
@ -432,12 +416,12 @@ class EventsHandler(BaseResponse):
return json.dumps({"Connections": result}), self.response_headers
def describe_connection(self):
def describe_connection(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
result = self.events_backend.describe_connection(name)
return json.dumps(result), self.response_headers
def update_connection(self):
def update_connection(self) -> Tuple[str, Dict[str, Any]]:
updates = dict(
name=self._get_param("Name"),
description=self._get_param("Description"),
@ -447,12 +431,12 @@ class EventsHandler(BaseResponse):
result = self.events_backend.update_connection(**updates)
return self._create_response(result)
def delete_connection(self):
def delete_connection(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
result = self.events_backend.delete_connection(name)
return json.dumps(result), self.response_headers
def create_api_destination(self):
def create_api_destination(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
description = self._get_param("Description")
connection_arn = self._get_param("ConnectionArn")
@ -472,7 +456,7 @@ class EventsHandler(BaseResponse):
)
return self._create_response(result)
def list_api_destinations(self):
def list_api_destinations(self) -> Tuple[str, Dict[str, Any]]:
destinations = self.events_backend.list_api_destinations()
result = []
for destination in destinations:
@ -491,12 +475,12 @@ class EventsHandler(BaseResponse):
return json.dumps({"ApiDestinations": result}), self.response_headers
def describe_api_destination(self):
def describe_api_destination(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
result = self.events_backend.describe_api_destination(name)
return self._create_response(result)
def update_api_destination(self):
def update_api_destination(self) -> Tuple[str, Dict[str, Any]]:
updates = dict(
connection_arn=self._get_param("ConnectionArn"),
description=self._get_param("Description"),
@ -511,7 +495,7 @@ class EventsHandler(BaseResponse):
result = self.events_backend.update_api_destination(**updates)
return self._create_response(result)
def delete_api_destination(self):
def delete_api_destination(self) -> Tuple[str, Dict[str, Any]]:
name = self._get_param("Name")
result = self.events_backend.delete_api_destination(name)
return self._create_response(result)
self.events_backend.delete_api_destination(name)
return self._create_response({})

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/elasticache,moto/elasticbeanstalk,moto/elastictranscoder,moto/elb,moto/elbv2,moto/emr,moto/es,moto/moto_api,moto/neptune
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/moto_api,moto/neptune
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract