Techdebt: MyPy I (#6092)

This commit is contained in:
Bert Blommers 2023-03-20 13:58:49 -01:00 committed by GitHub
parent 2b9c98895c
commit 6087a203fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 535 additions and 417 deletions

View File

@ -1,19 +1,19 @@
"""IdentityStoreBackend class with methods for supported APIs."""
from moto.moto_api._internal import mock_random
from typing import Dict, Tuple
from moto.moto_api._internal import mock_random
from moto.core import BaseBackend, BackendDict
class IdentityStoreBackend(BaseBackend):
"""Implementation of IdentityStore APIs."""
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.groups = {}
self.groups: Dict[str, Dict[str, str]] = {}
# add methods from here
def create_group(self, identity_store_id, display_name, description):
def create_group(
self, identity_store_id: str, display_name: str, description: str
) -> Tuple[str, str]:
group_id = str(mock_random.uuid4())
group_dict = {
"GroupId": group_id,

View File

@ -2,33 +2,27 @@
import json
from moto.core.responses import BaseResponse
from .models import identitystore_backends
from .models import identitystore_backends, IdentityStoreBackend
class IdentityStoreResponse(BaseResponse):
"""Handler for IdentityStore requests and responses."""
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="identitystore")
@property
def identitystore_backend(self):
def identitystore_backend(self) -> IdentityStoreBackend:
"""Return backend instance specific for this region."""
return identitystore_backends[self.current_account][self.region]
# add methods from here
def create_group(self):
params = self._get_params()
identity_store_id = params.get("IdentityStoreId")
display_name = params.get("DisplayName")
description = params.get("Description")
def create_group(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
display_name = self._get_param("DisplayName")
description = self._get_param("Description")
group_id, identity_store_id = self.identitystore_backend.create_group(
identity_store_id=identity_store_id,
display_name=display_name,
description=description,
)
return json.dumps(dict(GroupId=group_id, IdentityStoreId=identity_store_id))
def _get_params(self):
return json.loads(self.body)

View File

@ -1,20 +1,25 @@
import datetime
import json
from typing import Any
from urllib.parse import urlparse
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
class InstanceMetadataResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name=None)
def backends(self):
def backends(self) -> None:
pass
def metadata_response(
self, request, full_url, headers
): # pylint: disable=unused-argument
self,
request: Any, # pylint: disable=unused-argument
full_url: str,
headers: Any,
) -> TYPE_RESPONSE:
"""
Mock response for localhost metadata

View File

@ -1,4 +1,5 @@
import json
from typing import Optional
from moto.core.exceptions import JsonRESTError
@ -8,7 +9,7 @@ class IoTClientError(JsonRESTError):
class ResourceNotFoundException(IoTClientError):
def __init__(self, msg=None):
def __init__(self, msg: Optional[str] = None):
self.code = 404
super().__init__(
"ResourceNotFoundException", msg or "The specified resource does not exist"
@ -16,13 +17,13 @@ class ResourceNotFoundException(IoTClientError):
class InvalidRequestException(IoTClientError):
def __init__(self, msg=None):
def __init__(self, msg: Optional[str] = None):
self.code = 400
super().__init__("InvalidRequestException", msg or "The request is not valid.")
class InvalidStateTransitionException(IoTClientError):
def __init__(self, msg=None):
def __init__(self, msg: Optional[str] = None):
self.code = 409
super().__init__(
"InvalidStateTransitionException",
@ -31,7 +32,7 @@ class InvalidStateTransitionException(IoTClientError):
class VersionConflictException(IoTClientError):
def __init__(self, name):
def __init__(self, name: str):
self.code = 409
super().__init__(
"VersionConflictException",
@ -40,19 +41,19 @@ class VersionConflictException(IoTClientError):
class CertificateStateException(IoTClientError):
def __init__(self, msg, cert_id):
def __init__(self, msg: str, cert_id: str):
self.code = 406
super().__init__("CertificateStateException", f"{msg} Id: {cert_id}")
class DeleteConflictException(IoTClientError):
def __init__(self, msg):
def __init__(self, msg: str):
self.code = 409
super().__init__("DeleteConflictException", msg)
class ResourceAlreadyExistsException(IoTClientError):
def __init__(self, msg, resource_id, resource_arn):
def __init__(self, msg: str, resource_id: str, resource_arn: str):
self.code = 409
super().__init__(
"ResourceAlreadyExistsException", msg or "The resource already exists."
@ -67,7 +68,7 @@ class ResourceAlreadyExistsException(IoTClientError):
class VersionsLimitExceededException(IoTClientError):
def __init__(self, name):
def __init__(self, name: str):
self.code = 409
super().__init__(
"VersionsLimitExceededException",
@ -76,7 +77,7 @@ class VersionsLimitExceededException(IoTClientError):
class ThingStillAttached(IoTClientError):
def __init__(self, name):
def __init__(self, name: str):
self.code = 409
super().__init__(
"InvalidRequestException",

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,21 @@
import json
from typing import Any
from urllib.parse import unquote
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import iot_backends
from .models import iot_backends, IoTBackend
class IoTResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="iot")
@property
def iot_backend(self):
def iot_backend(self) -> IoTBackend:
return iot_backends[self.current_account][self.region]
def create_certificate_from_csr(self):
def create_certificate_from_csr(self) -> str:
certificate_signing_request = self._get_param("certificateSigningRequest")
set_as_active = self._get_param("setAsActive")
cert = self.iot_backend.create_certificate_from_csr(
@ -27,7 +29,7 @@ class IoTResponse(BaseResponse):
}
)
def create_thing(self):
def create_thing(self) -> str:
thing_name = self._get_param("thingName")
thing_type_name = self._get_param("thingTypeName")
attribute_payload = self._get_param("attributePayload")
@ -38,7 +40,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict(thingName=thing_name, thingArn=thing_arn))
def create_thing_type(self):
def create_thing_type(self) -> str:
thing_type_name = self._get_param("thingTypeName")
thing_type_properties = self._get_param("thingTypeProperties")
thing_type_name, thing_type_arn = self.iot_backend.create_thing_type(
@ -48,7 +50,7 @@ class IoTResponse(BaseResponse):
dict(thingTypeName=thing_type_name, thingTypeArn=thing_type_arn)
)
def list_thing_types(self):
def list_thing_types(self) -> str:
previous_next_token = self._get_param("nextToken")
max_results = self._get_int_param(
"maxResults", 50
@ -71,7 +73,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(thingTypes=result, nextToken=next_token))
def list_things(self):
def list_things(self) -> str:
previous_next_token = self._get_param("nextToken")
max_results = self._get_int_param(
"maxResults", 50
@ -89,34 +91,34 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(things=things, nextToken=next_token))
def describe_thing(self):
def describe_thing(self) -> str:
thing_name = self._get_param("thingName")
thing = self.iot_backend.describe_thing(thing_name=thing_name)
return json.dumps(thing.to_dict(include_default_client_id=True))
def describe_thing_type(self):
def describe_thing_type(self) -> str:
thing_type_name = self._get_param("thingTypeName")
thing_type = self.iot_backend.describe_thing_type(
thing_type_name=thing_type_name
)
return json.dumps(thing_type.to_dict())
def describe_endpoint(self):
def describe_endpoint(self) -> str:
endpoint_type = self._get_param("endpointType", "iot:Data-ATS")
endpoint = self.iot_backend.describe_endpoint(endpoint_type=endpoint_type)
return json.dumps(endpoint.to_dict())
def delete_thing(self):
def delete_thing(self) -> str:
thing_name = self._get_param("thingName")
self.iot_backend.delete_thing(thing_name=thing_name)
return json.dumps(dict())
def delete_thing_type(self):
def delete_thing_type(self) -> str:
thing_type_name = self._get_param("thingTypeName")
self.iot_backend.delete_thing_type(thing_type_name=thing_type_name)
return json.dumps(dict())
def deprecate_thing_type(self):
def deprecate_thing_type(self) -> str:
thing_type_name = self._get_param("thingTypeName")
undo_deprecate = self._get_param("undoDeprecate")
thing_type = self.iot_backend.deprecate_thing_type(
@ -124,7 +126,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(thing_type.to_dict())
def update_thing(self):
def update_thing(self) -> str:
thing_name = self._get_param("thingName")
thing_type_name = self._get_param("thingTypeName")
attribute_payload = self._get_param("attributePayload")
@ -137,7 +139,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def create_job(self):
def create_job(self) -> str:
job_arn, job_id, description = self.iot_backend.create_job(
job_id=self._get_param("jobId"),
targets=self._get_param("targets"),
@ -152,7 +154,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(jobArn=job_arn, jobId=job_id, description=description))
def describe_job(self):
def describe_job(self) -> str:
job = self.iot_backend.describe_job(job_id=self._get_param("jobId"))
return json.dumps(
dict(
@ -178,7 +180,7 @@ class IoTResponse(BaseResponse):
)
)
def delete_job(self):
def delete_job(self) -> str:
job_id = self._get_param("jobId")
force = self._get_bool_param("force")
@ -186,7 +188,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict())
def cancel_job(self):
def cancel_job(self) -> str:
job_id = self._get_param("jobId")
reason_code = self._get_param("reasonCode")
comment = self._get_param("comment")
@ -198,7 +200,7 @@ class IoTResponse(BaseResponse):
return json.dumps(job.to_dict())
def get_job_document(self):
def get_job_document(self) -> str:
job = self.iot_backend.get_job_document(job_id=self._get_param("jobId"))
if job.document is not None:
@ -208,7 +210,7 @@ class IoTResponse(BaseResponse):
# TODO: needs to be implemented to get document_source's content from S3
return json.dumps({"document": ""})
def list_jobs(self):
def list_jobs(self) -> str:
# not the default, but makes testing easier
max_results = self._get_int_param("maxResults", 50)
previous_next_token = self._get_param("nextToken")
@ -218,7 +220,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(jobs=jobs, nextToken=next_token))
def describe_job_execution(self):
def describe_job_execution(self) -> str:
job_id = self._get_param("jobId")
thing_name = self._get_param("thingName")
execution_number = self._get_int_param("executionNumber")
@ -228,7 +230,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(execution=job_execution.to_get_dict()))
def cancel_job_execution(self):
def cancel_job_execution(self) -> str:
job_id = self._get_param("jobId")
thing_name = self._get_param("thingName")
force = self._get_bool_param("force")
@ -239,7 +241,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict())
def delete_job_execution(self):
def delete_job_execution(self) -> str:
job_id = self._get_param("jobId")
thing_name = self._get_param("thingName")
execution_number = self._get_int_param("executionNumber")
@ -254,7 +256,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict())
def list_job_executions_for_job(self):
def list_job_executions_for_job(self) -> str:
job_id = self._get_param("jobId")
status = self._get_param("status")
max_results = self._get_int_param(
@ -262,12 +264,12 @@ class IoTResponse(BaseResponse):
) # not the default, but makes testing easier
next_token = self._get_param("nextToken")
job_executions, next_token = self.iot_backend.list_job_executions_for_job(
job_id=job_id, status=status, max_results=max_results, next_token=next_token
job_id=job_id, status=status, max_results=max_results, token=next_token
)
return json.dumps(dict(executionSummaries=job_executions, nextToken=next_token))
def list_job_executions_for_thing(self):
def list_job_executions_for_thing(self) -> str:
thing_name = self._get_param("thingName")
status = self._get_param("status")
max_results = self._get_int_param(
@ -283,7 +285,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(executionSummaries=job_executions, nextToken=next_token))
def create_keys_and_certificate(self):
def create_keys_and_certificate(self) -> str:
set_as_active = self._get_bool_param("setAsActive")
cert, key_pair = self.iot_backend.create_keys_and_certificate(
set_as_active=set_as_active
@ -297,18 +299,18 @@ class IoTResponse(BaseResponse):
)
)
def delete_ca_certificate(self):
def delete_ca_certificate(self) -> str:
certificate_id = self.path.split("/")[-1]
self.iot_backend.delete_ca_certificate(certificate_id=certificate_id)
return json.dumps(dict())
def delete_certificate(self):
def delete_certificate(self) -> str:
certificate_id = self._get_param("certificateId")
force_delete = self._get_bool_param("forceDelete", False)
self.iot_backend.delete_certificate(certificate_id, force_delete)
return json.dumps(dict())
def describe_ca_certificate(self):
def describe_ca_certificate(self) -> str:
certificate_id = self.path.split("/")[-1]
certificate = self.iot_backend.describe_ca_certificate(
certificate_id=certificate_id
@ -320,7 +322,7 @@ class IoTResponse(BaseResponse):
}
)
def describe_certificate(self):
def describe_certificate(self) -> str:
certificate_id = self._get_param("certificateId")
certificate = self.iot_backend.describe_certificate(
certificate_id=certificate_id
@ -329,23 +331,23 @@ class IoTResponse(BaseResponse):
dict(certificateDescription=certificate.to_description_dict())
)
def get_registration_code(self):
def get_registration_code(self) -> str:
code = self.iot_backend.get_registration_code()
return json.dumps(dict(registrationCode=code))
def list_certificates(self):
def list_certificates(self) -> str:
# page_size = self._get_int_param("pageSize")
# marker = self._get_param("marker")
# ascending_order = self._get_param("ascendingOrder")
certificates = self.iot_backend.list_certificates()
return json.dumps(dict(certificates=[_.to_dict() for _ in certificates]))
def list_certificates_by_ca(self):
def list_certificates_by_ca(self) -> str:
ca_certificate_id = self._get_param("caCertificateId")
certificates = self.iot_backend.list_certificates_by_ca(ca_certificate_id)
return json.dumps(dict(certificates=[_.to_dict() for _ in certificates]))
def register_ca_certificate(self):
def register_ca_certificate(self) -> str:
ca_certificate = self._get_param("caCertificate")
set_as_active = self._get_bool_param("setAsActive")
registration_config = self._get_param("registrationConfig")
@ -359,7 +361,7 @@ class IoTResponse(BaseResponse):
dict(certificateId=cert.certificate_id, certificateArn=cert.arn)
)
def register_certificate(self):
def register_certificate(self) -> str:
certificate_pem = self._get_param("certificatePem")
ca_certificate_pem = self._get_param("caCertificatePem")
set_as_active = self._get_bool_param("setAsActive")
@ -375,7 +377,7 @@ class IoTResponse(BaseResponse):
dict(certificateId=cert.certificate_id, certificateArn=cert.arn)
)
def register_certificate_without_ca(self):
def register_certificate_without_ca(self) -> str:
certificate_pem = self._get_param("certificatePem")
status = self._get_param("status")
@ -386,7 +388,7 @@ class IoTResponse(BaseResponse):
dict(certificateId=cert.certificate_id, certificateArn=cert.arn)
)
def update_ca_certificate(self):
def update_ca_certificate(self) -> str:
certificate_id = self.path.split("/")[-1]
new_status = self._get_param("newStatus")
config = self._get_param("registrationConfig")
@ -395,7 +397,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def update_certificate(self):
def update_certificate(self) -> str:
certificate_id = self._get_param("certificateId")
new_status = self._get_param("newStatus")
self.iot_backend.update_certificate(
@ -403,7 +405,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def create_policy(self):
def create_policy(self) -> str:
policy_name = self._get_param("policyName")
policy_document = self._get_param("policyDocument")
policy = self.iot_backend.create_policy(
@ -411,7 +413,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(policy.to_dict_at_creation())
def list_policies(self):
def list_policies(self) -> str:
# marker = self._get_param("marker")
# page_size = self._get_int_param("pageSize")
# ascending_order = self._get_param("ascendingOrder")
@ -420,17 +422,17 @@ class IoTResponse(BaseResponse):
# TODO: implement pagination in the future
return json.dumps(dict(policies=[_.to_dict() for _ in policies]))
def get_policy(self):
def get_policy(self) -> str:
policy_name = self._get_param("policyName")
policy = self.iot_backend.get_policy(policy_name=policy_name)
return json.dumps(policy.to_get_dict())
def delete_policy(self):
def delete_policy(self) -> str:
policy_name = self._get_param("policyName")
self.iot_backend.delete_policy(policy_name=policy_name)
return json.dumps(dict())
def create_policy_version(self):
def create_policy_version(self) -> str:
policy_name = self._get_param("policyName")
policy_document = self._get_param("policyDocument")
set_as_default = self._get_bool_param("setAsDefault")
@ -440,20 +442,20 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(policy_version.to_dict_at_creation()))
def set_default_policy_version(self):
def set_default_policy_version(self) -> str:
policy_name = self._get_param("policyName")
version_id = self._get_param("policyVersionId")
self.iot_backend.set_default_policy_version(policy_name, version_id)
return json.dumps(dict())
def get_policy_version(self):
def get_policy_version(self) -> str:
policy_name = self._get_param("policyName")
version_id = self._get_param("policyVersionId")
policy_version = self.iot_backend.get_policy_version(policy_name, version_id)
return json.dumps(dict(policy_version.to_get_dict()))
def list_policy_versions(self):
def list_policy_versions(self) -> str:
policy_name = self._get_param("policyName")
policiy_versions = self.iot_backend.list_policy_versions(
policy_name=policy_name
@ -461,20 +463,22 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(policyVersions=[_.to_dict() for _ in policiy_versions]))
def delete_policy_version(self):
def delete_policy_version(self) -> str:
policy_name = self._get_param("policyName")
version_id = self._get_param("policyVersionId")
self.iot_backend.delete_policy_version(policy_name, version_id)
return json.dumps(dict())
def attach_policy(self):
def attach_policy(self) -> str:
policy_name = self._get_param("policyName")
target = self._get_param("target")
self.iot_backend.attach_policy(policy_name=policy_name, target=target)
return json.dumps(dict())
def dispatch_attached_policies(self, request, full_url, headers):
def dispatch_attached_policies(
self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE:
# This endpoint requires specialized handling because it has
# a uri parameter containing forward slashes that is not
# correctly url encoded when we're running in server mode.
@ -485,7 +489,7 @@ class IoTResponse(BaseResponse):
self.querystring["target"] = [unquote(target)] if "%" in target else [target]
return self.call_action()
def list_attached_policies(self):
def list_attached_policies(self) -> str:
principal = self._get_param("target")
# marker = self._get_param("marker")
# page_size = self._get_int_param("pageSize")
@ -496,7 +500,7 @@ class IoTResponse(BaseResponse):
dict(policies=[_.to_dict() for _ in policies], nextMarker=next_marker)
)
def attach_principal_policy(self):
def attach_principal_policy(self) -> str:
policy_name = self._get_param("policyName")
principal = self.headers.get("x-amzn-iot-principal")
self.iot_backend.attach_principal_policy(
@ -504,13 +508,13 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def detach_policy(self):
def detach_policy(self) -> str:
policy_name = self._get_param("policyName")
target = self._get_param("target")
self.iot_backend.detach_policy(policy_name=policy_name, target=target)
return json.dumps(dict())
def detach_principal_policy(self):
def detach_principal_policy(self) -> str:
policy_name = self._get_param("policyName")
principal = self.headers.get("x-amzn-iot-principal")
self.iot_backend.detach_principal_policy(
@ -518,7 +522,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def list_principal_policies(self):
def list_principal_policies(self) -> str:
principal = self.headers.get("x-amzn-iot-principal")
# marker = self._get_param("marker")
# page_size = self._get_int_param("pageSize")
@ -530,7 +534,7 @@ class IoTResponse(BaseResponse):
dict(policies=[_.to_dict() for _ in policies], nextMarker=next_marker)
)
def list_policy_principals(self):
def list_policy_principals(self) -> str:
policy_name = self.headers.get("x-amzn-iot-policy")
# marker = self._get_param("marker")
# page_size = self._get_int_param("pageSize")
@ -540,13 +544,13 @@ class IoTResponse(BaseResponse):
next_marker = None
return json.dumps(dict(principals=principals, nextMarker=next_marker))
def list_targets_for_policy(self):
def list_targets_for_policy(self) -> str:
"""https://docs.aws.amazon.com/iot/latest/apireference/API_ListTargetsForPolicy.html"""
policy_name = self._get_param("policyName")
principals = self.iot_backend.list_targets_for_policy(policy_name=policy_name)
return json.dumps(dict(targets=principals, nextMarker=None))
def attach_thing_principal(self):
def attach_thing_principal(self) -> str:
thing_name = self._get_param("thingName")
principal = self.headers.get("x-amzn-principal")
self.iot_backend.attach_thing_principal(
@ -554,7 +558,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def detach_thing_principal(self):
def detach_thing_principal(self) -> str:
thing_name = self._get_param("thingName")
principal = self.headers.get("x-amzn-principal")
self.iot_backend.detach_thing_principal(
@ -562,7 +566,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def list_principal_things(self):
def list_principal_things(self) -> str:
next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
principal = self.headers.get("x-amzn-principal")
@ -571,19 +575,19 @@ class IoTResponse(BaseResponse):
next_token = None
return json.dumps(dict(things=things, nextToken=next_token))
def list_thing_principals(self):
def list_thing_principals(self) -> str:
thing_name = self._get_param("thingName")
principals = self.iot_backend.list_thing_principals(thing_name=thing_name)
return json.dumps(dict(principals=principals))
def describe_thing_group(self):
def describe_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
thing_group = self.iot_backend.describe_thing_group(
thing_group_name=thing_group_name
)
return json.dumps(thing_group.to_dict())
def create_thing_group(self):
def create_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
parent_group_name = self._get_param("parentGroupName")
thing_group_properties = self._get_param("thingGroupProperties")
@ -604,12 +608,12 @@ class IoTResponse(BaseResponse):
)
)
def delete_thing_group(self):
def delete_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
self.iot_backend.delete_thing_group(thing_group_name=thing_group_name)
return json.dumps(dict())
def list_thing_groups(self):
def list_thing_groups(self) -> str:
# next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
parent_group = self._get_param("parentGroup")
@ -627,7 +631,7 @@ class IoTResponse(BaseResponse):
# TODO: implement pagination in the future
return json.dumps(dict(thingGroups=rets, nextToken=next_token))
def update_thing_group(self):
def update_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
thing_group_properties = self._get_param("thingGroupProperties")
expected_version = self._get_param("expectedVersion")
@ -638,7 +642,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict(version=version))
def add_thing_to_thing_group(self):
def add_thing_to_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
thing_group_arn = self._get_param("thingGroupArn")
thing_name = self._get_param("thingName")
@ -651,7 +655,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def remove_thing_from_thing_group(self):
def remove_thing_from_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
thing_group_arn = self._get_param("thingGroupArn")
thing_name = self._get_param("thingName")
@ -664,7 +668,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def list_things_in_thing_group(self):
def list_things_in_thing_group(self) -> str:
thing_group_name = self._get_param("thingGroupName")
things = self.iot_backend.list_things_in_thing_group(
thing_group_name=thing_group_name
@ -673,7 +677,7 @@ class IoTResponse(BaseResponse):
thing_names = [_.thing_name for _ in things]
return json.dumps(dict(things=thing_names, nextToken=next_token))
def list_thing_groups_for_thing(self):
def list_thing_groups_for_thing(self) -> str:
thing_name = self._get_param("thingName")
# next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
@ -683,7 +687,7 @@ class IoTResponse(BaseResponse):
next_token = None
return json.dumps(dict(thingGroups=thing_groups, nextToken=next_token))
def update_thing_groups_for_thing(self):
def update_thing_groups_for_thing(self) -> str:
thing_name = self._get_param("thingName")
thing_groups_to_add = self._get_param("thingGroupsToAdd") or []
thing_groups_to_remove = self._get_param("thingGroupsToRemove") or []
@ -694,15 +698,15 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def list_topic_rules(self):
def list_topic_rules(self) -> str:
return json.dumps(dict(rules=self.iot_backend.list_topic_rules()))
def get_topic_rule(self):
def get_topic_rule(self) -> str:
return json.dumps(
self.iot_backend.get_topic_rule(rule_name=self._get_param("ruleName"))
)
def create_topic_rule(self):
def create_topic_rule(self) -> str:
self.iot_backend.create_topic_rule(
rule_name=self._get_param("ruleName"),
description=self._get_param("description"),
@ -714,7 +718,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def replace_topic_rule(self):
def replace_topic_rule(self) -> str:
self.iot_backend.replace_topic_rule(
rule_name=self._get_param("ruleName"),
description=self._get_param("description"),
@ -726,19 +730,19 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def delete_topic_rule(self):
def delete_topic_rule(self) -> str:
self.iot_backend.delete_topic_rule(rule_name=self._get_param("ruleName"))
return json.dumps(dict())
def enable_topic_rule(self):
def enable_topic_rule(self) -> str:
self.iot_backend.enable_topic_rule(rule_name=self._get_param("ruleName"))
return json.dumps(dict())
def disable_topic_rule(self):
def disable_topic_rule(self) -> str:
self.iot_backend.disable_topic_rule(rule_name=self._get_param("ruleName"))
return json.dumps(dict())
def create_domain_configuration(self):
def create_domain_configuration(self) -> str:
domain_configuration = self.iot_backend.create_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName"),
domain_name=self._get_param("domainName"),
@ -748,24 +752,24 @@ class IoTResponse(BaseResponse):
)
return json.dumps(domain_configuration.to_dict())
def delete_domain_configuration(self):
def delete_domain_configuration(self) -> str:
self.iot_backend.delete_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName")
)
return json.dumps(dict())
def describe_domain_configuration(self):
def describe_domain_configuration(self) -> str:
domain_configuration = self.iot_backend.describe_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName")
)
return json.dumps(domain_configuration.to_description_dict())
def list_domain_configurations(self):
def list_domain_configurations(self) -> str:
return json.dumps(
dict(domainConfigurations=self.iot_backend.list_domain_configurations())
)
def update_domain_configuration(self):
def update_domain_configuration(self) -> str:
domain_configuration = self.iot_backend.update_domain_configuration(
domain_configuration_name=self._get_param("domainConfigurationName"),
authorizer_config=self._get_param("authorizerConfig"),
@ -774,7 +778,7 @@ class IoTResponse(BaseResponse):
)
return json.dumps(domain_configuration.to_dict())
def search_index(self):
def search_index(self) -> str:
query = self._get_param("queryString")
things, groups = self.iot_backend.search_index(query)
return json.dumps({"things": things, "thingGroups": groups})
things = self.iot_backend.search_index(query)
return json.dumps({"things": things, "thingGroups": []})

View File

@ -6,7 +6,7 @@ class IoTDataPlaneClientError(JsonRESTError):
class ResourceNotFoundException(IoTDataPlaneClientError):
def __init__(self):
def __init__(self) -> None:
self.code = 404
super().__init__(
"ResourceNotFoundException", "The specified resource does not exist"
@ -14,12 +14,12 @@ class ResourceNotFoundException(IoTDataPlaneClientError):
class InvalidRequestException(IoTDataPlaneClientError):
def __init__(self, message):
def __init__(self, message: str):
self.code = 400
super().__init__("InvalidRequestException", message)
class ConflictException(IoTDataPlaneClientError):
def __init__(self, message):
def __init__(self, message: str):
self.code = 409
super().__init__("ConflictException", message)

View File

@ -1,10 +1,11 @@
import json
import time
import jsondiff
from typing import Any, Dict, List, Tuple, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import merge_dicts
from moto.iot import iot_backends
from moto.iot.models import iot_backends, IoTBackend
from .exceptions import (
ConflictException,
ResourceNotFoundException,
@ -17,7 +18,14 @@ class FakeShadow(BaseModel):
http://docs.aws.amazon.com/iot/latest/developerguide/thing-shadow-document-syntax.html
"""
def __init__(self, desired, reported, requested_payload, version, deleted=False):
def __init__(
self,
desired: Optional[str],
reported: Optional[str],
requested_payload: Optional[Dict[str, Any]],
version: int,
deleted: bool = False,
):
self.desired = desired
self.reported = reported
self.requested_payload = requested_payload
@ -33,7 +41,7 @@ class FakeShadow(BaseModel):
)
@classmethod
def create_from_previous_version(cls, previous_shadow, payload):
def create_from_previous_version(cls, previous_shadow: Optional["FakeShadow"], payload: Optional[Dict[str, Any]]) -> "FakeShadow": # type: ignore[misc]
"""
set None to payload when you want to delete shadow
"""
@ -55,11 +63,10 @@ class FakeShadow(BaseModel):
merge_dicts(state_document, payload, remove_nulls=True)
desired = state_document.get("state", {}).get("desired")
reported = state_document.get("state", {}).get("reported")
shadow = FakeShadow(desired, reported, payload, version)
return shadow
return FakeShadow(desired, reported, payload, version)
@classmethod
def parse_payload(cls, desired, reported):
def parse_payload(cls, desired: Optional[str], reported: Optional[str]) -> Any: # type: ignore[misc]
if desired is None:
delta = reported
elif reported is None:
@ -68,15 +75,15 @@ class FakeShadow(BaseModel):
delta = jsondiff.diff(desired, reported)
return delta
def _create_metadata_from_state(self, state, ts):
def _create_metadata_from_state(self, state: Any, ts: Any) -> Any:
"""
state must be disired or reported stype dict object
replces primitive type with {"timestamp": ts} in dict
state must be desired or reported stype dict object
replaces primitive type with {"timestamp": ts} in dict
"""
if state is None:
return None
def _f(elem, ts):
def _f(elem: Any, ts: Any) -> Any:
if isinstance(elem, dict):
return {_: _f(elem[_], ts) for _ in elem.keys()}
if isinstance(elem, list):
@ -85,9 +92,9 @@ class FakeShadow(BaseModel):
return _f(state, ts)
def to_response_dict(self):
desired = self.requested_payload["state"].get("desired", None)
reported = self.requested_payload["state"].get("reported", None)
def to_response_dict(self) -> Dict[str, Any]:
desired = self.requested_payload["state"].get("desired", None) # type: ignore
reported = self.requested_payload["state"].get("reported", None) # type: ignore
payload = {}
if desired is not None:
@ -111,7 +118,7 @@ class FakeShadow(BaseModel):
"version": self.version,
}
def to_dict(self, include_delta=True):
def to_dict(self, include_delta: bool = True) -> Dict[str, Any]:
"""returning nothing except for just top-level keys for now."""
if self.deleted:
return {"timestamp": self.timestamp, "version": self.version}
@ -139,15 +146,15 @@ class FakeShadow(BaseModel):
class IoTDataPlaneBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.published_payloads = list()
self.published_payloads: List[Tuple[str, str]] = list()
@property
def iot_backend(self):
def iot_backend(self) -> IoTBackend:
return iot_backends[self.account_id][self.region_name]
def update_thing_shadow(self, thing_name, payload):
def update_thing_shadow(self, thing_name: str, payload: str) -> FakeShadow:
"""
spec of payload:
- need node `state`
@ -158,32 +165,32 @@ class IoTDataPlaneBackend(BaseBackend):
# validate
try:
payload = json.loads(payload)
_payload = json.loads(payload)
except ValueError:
raise InvalidRequestException("invalid json")
if "state" not in payload:
if "state" not in _payload:
raise InvalidRequestException("need node `state`")
if not isinstance(payload["state"], dict):
if not isinstance(_payload["state"], dict):
raise InvalidRequestException("state node must be an Object")
if any(_ for _ in payload["state"].keys() if _ not in ["desired", "reported"]):
if any(_ for _ in _payload["state"].keys() if _ not in ["desired", "reported"]):
raise InvalidRequestException("State contains an invalid node")
if "version" in payload and thing.thing_shadow.version != payload["version"]:
if "version" in _payload and thing.thing_shadow.version != _payload["version"]:
raise ConflictException("Version conflict")
new_shadow = FakeShadow.create_from_previous_version(
thing.thing_shadow, payload
thing.thing_shadow, _payload
)
thing.thing_shadow = new_shadow
return thing.thing_shadow
def get_thing_shadow(self, thing_name):
def get_thing_shadow(self, thing_name: str) -> FakeShadow:
thing = self.iot_backend.describe_thing(thing_name)
if thing.thing_shadow is None or thing.thing_shadow.deleted:
raise ResourceNotFoundException()
return thing.thing_shadow
def delete_thing_shadow(self, thing_name):
def delete_thing_shadow(self, thing_name: str) -> FakeShadow:
thing = self.iot_backend.describe_thing(thing_name)
if thing.thing_shadow is None:
raise ResourceNotFoundException()
@ -194,7 +201,7 @@ class IoTDataPlaneBackend(BaseBackend):
thing.thing_shadow = new_shadow
return thing.thing_shadow
def publish(self, topic, payload):
def publish(self, topic: str, payload: str) -> None:
self.published_payloads.append((topic, payload))

View File

@ -1,11 +1,11 @@
from moto.core.responses import BaseResponse
from .models import iotdata_backends
from .models import iotdata_backends, IoTDataPlaneBackend
import json
from urllib.parse import unquote
class IoTDataPlaneResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="iot-data")
def _get_action(self) -> str:
@ -15,10 +15,10 @@ class IoTDataPlaneResponse(BaseResponse):
return super()._get_action()
@property
def iotdata_backend(self):
def iotdata_backend(self) -> IoTDataPlaneBackend:
return iotdata_backends[self.current_account][self.region]
def update_thing_shadow(self):
def update_thing_shadow(self) -> str:
thing_name = self._get_param("thingName")
payload = self.body
payload = self.iotdata_backend.update_thing_shadow(
@ -26,17 +26,17 @@ class IoTDataPlaneResponse(BaseResponse):
)
return json.dumps(payload.to_response_dict())
def get_thing_shadow(self):
def get_thing_shadow(self) -> str:
thing_name = self._get_param("thingName")
payload = self.iotdata_backend.get_thing_shadow(thing_name=thing_name)
return json.dumps(payload.to_dict())
def delete_thing_shadow(self):
def delete_thing_shadow(self) -> str:
thing_name = self._get_param("thingName")
payload = self.iotdata_backend.delete_thing_shadow(thing_name=thing_name)
return json.dumps(payload.to_dict())
def publish(self):
def publish(self) -> str:
topic = self.path.split("/topics/")[-1]
# a uri parameter containing forward slashes is not correctly url encoded when we're running in server mode.
# https://github.com/pallets/flask/issues/900

View File

@ -230,7 +230,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/iam,moto/moto_api,moto/neptune
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/moto_api,moto/neptune
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract