TechDebt: MyPy CodeCommit & CodePipeline (#5624)
This commit is contained in:
parent
15891efcef
commit
ba1919cc6f
@ -4,7 +4,7 @@ from moto.core.exceptions import JsonRESTError
|
||||
class RepositoryNameExistsException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, repository_name):
|
||||
def __init__(self, repository_name: str):
|
||||
super().__init__(
|
||||
"RepositoryNameExistsException",
|
||||
"Repository named {0} already exists".format(repository_name),
|
||||
@ -14,7 +14,7 @@ class RepositoryNameExistsException(JsonRESTError):
|
||||
class RepositoryDoesNotExistException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, repository_name):
|
||||
def __init__(self, repository_name: str):
|
||||
super().__init__(
|
||||
"RepositoryDoesNotExistException",
|
||||
"{0} does not exist".format(repository_name),
|
||||
@ -24,7 +24,7 @@ class RepositoryDoesNotExistException(JsonRESTError):
|
||||
class InvalidRepositoryNameException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
"InvalidRepositoryNameException",
|
||||
"The repository name is not valid. Repository names can be any valid "
|
||||
|
@ -2,11 +2,18 @@ from moto.core import BaseBackend, BaseModel
|
||||
from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict
|
||||
from moto.moto_api._internal import mock_random
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
from .exceptions import RepositoryDoesNotExistException, RepositoryNameExistsException
|
||||
|
||||
|
||||
class CodeCommit(BaseModel):
|
||||
def __init__(self, account_id, region, repository_description, repository_name):
|
||||
def __init__(
|
||||
self,
|
||||
account_id: str,
|
||||
region: str,
|
||||
repository_description: str,
|
||||
repository_name: str,
|
||||
):
|
||||
current_date = iso_8601_datetime_with_milliseconds(datetime.utcnow())
|
||||
self.repository_metadata = dict()
|
||||
self.repository_metadata["repositoryName"] = repository_name
|
||||
@ -31,18 +38,22 @@ class CodeCommit(BaseModel):
|
||||
|
||||
|
||||
class CodeCommitBackend(BaseBackend):
|
||||
def __init__(self, region_name, account_id):
|
||||
def __init__(self, region_name: str, account_id: str):
|
||||
super().__init__(region_name, account_id)
|
||||
self.repositories = {}
|
||||
self.repositories: Dict[str, CodeCommit] = {}
|
||||
|
||||
@staticmethod
|
||||
def default_vpc_endpoint_service(service_region, zones):
|
||||
def default_vpc_endpoint_service(
|
||||
service_region: str, zones: List[str]
|
||||
) -> List[Dict[str, str]]:
|
||||
"""Default VPC endpoint service."""
|
||||
return BaseBackend.default_vpc_endpoint_service_factory(
|
||||
service_region, zones, "codecommit"
|
||||
)
|
||||
|
||||
def create_repository(self, repository_name, repository_description):
|
||||
def create_repository(
|
||||
self, repository_name: str, repository_description: str
|
||||
) -> Dict[str, str]:
|
||||
repository = self.repositories.get(repository_name)
|
||||
if repository:
|
||||
raise RepositoryNameExistsException(repository_name)
|
||||
@ -53,14 +64,14 @@ class CodeCommitBackend(BaseBackend):
|
||||
|
||||
return self.repositories[repository_name].repository_metadata
|
||||
|
||||
def get_repository(self, repository_name):
|
||||
def get_repository(self, repository_name: str) -> Dict[str, str]:
|
||||
repository = self.repositories.get(repository_name)
|
||||
if not repository:
|
||||
raise RepositoryDoesNotExistException(repository_name)
|
||||
|
||||
return repository.repository_metadata
|
||||
|
||||
def delete_repository(self, repository_name):
|
||||
def delete_repository(self, repository_name: str) -> Optional[str]:
|
||||
repository = self.repositories.get(repository_name)
|
||||
|
||||
if repository:
|
||||
|
@ -2,11 +2,11 @@ import json
|
||||
import re
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import codecommit_backends
|
||||
from .models import codecommit_backends, CodeCommitBackend
|
||||
from .exceptions import InvalidRepositoryNameException
|
||||
|
||||
|
||||
def _is_repository_name_valid(repository_name):
|
||||
def _is_repository_name_valid(repository_name: str) -> bool:
|
||||
name_regex = re.compile(r"[\w\.-]+")
|
||||
result = name_regex.split(repository_name)
|
||||
if len(result) > 0:
|
||||
@ -17,14 +17,14 @@ def _is_repository_name_valid(repository_name):
|
||||
|
||||
|
||||
class CodeCommitResponse(BaseResponse):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(service_name="codecommit")
|
||||
|
||||
@property
|
||||
def codecommit_backend(self):
|
||||
def codecommit_backend(self) -> CodeCommitBackend:
|
||||
return codecommit_backends[self.current_account][self.region]
|
||||
|
||||
def create_repository(self):
|
||||
def create_repository(self) -> str:
|
||||
if not _is_repository_name_valid(self._get_param("repositoryName")):
|
||||
raise InvalidRepositoryNameException()
|
||||
|
||||
@ -35,7 +35,7 @@ class CodeCommitResponse(BaseResponse):
|
||||
|
||||
return json.dumps({"repositoryMetadata": repository_metadata})
|
||||
|
||||
def get_repository(self):
|
||||
def get_repository(self) -> str:
|
||||
if not _is_repository_name_valid(self._get_param("repositoryName")):
|
||||
raise InvalidRepositoryNameException()
|
||||
|
||||
@ -45,7 +45,7 @@ class CodeCommitResponse(BaseResponse):
|
||||
|
||||
return json.dumps({"repositoryMetadata": repository_metadata})
|
||||
|
||||
def delete_repository(self):
|
||||
def delete_repository(self) -> str:
|
||||
if not _is_repository_name_valid(self._get_param("repositoryName")):
|
||||
raise InvalidRepositoryNameException()
|
||||
|
||||
|
@ -4,35 +4,35 @@ from moto.core.exceptions import JsonRESTError
|
||||
class InvalidStructureException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, message):
|
||||
def __init__(self, message: str):
|
||||
super().__init__("InvalidStructureException", message)
|
||||
|
||||
|
||||
class PipelineNotFoundException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, message):
|
||||
def __init__(self, message: str):
|
||||
super().__init__("PipelineNotFoundException", message)
|
||||
|
||||
|
||||
class ResourceNotFoundException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, message):
|
||||
def __init__(self, message: str):
|
||||
super().__init__("ResourceNotFoundException", message)
|
||||
|
||||
|
||||
class InvalidTagsException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, message):
|
||||
def __init__(self, message: str):
|
||||
super().__init__("InvalidTagsException", message)
|
||||
|
||||
|
||||
class TooManyTagsException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, arn):
|
||||
def __init__(self, arn: str):
|
||||
super().__init__(
|
||||
"TooManyTagsException", "Tag limit exceeded for resource [{}].".format(arn)
|
||||
)
|
||||
|
@ -1,11 +1,9 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict
|
||||
|
||||
from moto.iam.exceptions import IAMNotFoundException
|
||||
|
||||
from moto.iam import iam_backends
|
||||
from moto.iam.models import iam_backends, IAMBackend
|
||||
|
||||
from moto.codepipeline.exceptions import (
|
||||
InvalidStructureException,
|
||||
@ -18,26 +16,26 @@ from moto.core import BaseBackend, BaseModel
|
||||
|
||||
|
||||
class CodePipeline(BaseModel):
|
||||
def __init__(self, account_id, region, pipeline):
|
||||
def __init__(self, account_id: str, region: str, pipeline: Dict[str, Any]):
|
||||
# the version number for a new pipeline is always 1
|
||||
pipeline["version"] = 1
|
||||
|
||||
self.pipeline = self.add_default_values(pipeline)
|
||||
self.tags = {}
|
||||
self.tags: Dict[str, str] = {}
|
||||
|
||||
self._arn = f"arn:aws:codepipeline:{region}:{account_id}:{pipeline['name']}"
|
||||
self._created = datetime.utcnow()
|
||||
self._updated = datetime.utcnow()
|
||||
|
||||
@property
|
||||
def metadata(self):
|
||||
def metadata(self) -> Dict[str, str]:
|
||||
return {
|
||||
"pipelineArn": self._arn,
|
||||
"created": iso_8601_datetime_with_milliseconds(self._created),
|
||||
"updated": iso_8601_datetime_with_milliseconds(self._updated),
|
||||
}
|
||||
|
||||
def add_default_values(self, pipeline):
|
||||
def add_default_values(self, pipeline: Dict[str, Any]) -> Dict[str, Any]:
|
||||
for stage in pipeline["stages"]:
|
||||
for action in stage["actions"]:
|
||||
if "runOrder" not in action:
|
||||
@ -51,7 +49,7 @@ class CodePipeline(BaseModel):
|
||||
|
||||
return pipeline
|
||||
|
||||
def validate_tags(self, tags):
|
||||
def validate_tags(self, tags: List[Dict[str, str]]) -> None:
|
||||
for tag in tags:
|
||||
if tag["key"].startswith("aws:"):
|
||||
raise InvalidTagsException(
|
||||
@ -65,22 +63,26 @@ class CodePipeline(BaseModel):
|
||||
|
||||
|
||||
class CodePipelineBackend(BaseBackend):
|
||||
def __init__(self, region_name, account_id):
|
||||
def __init__(self, region_name: str, account_id: str):
|
||||
super().__init__(region_name, account_id)
|
||||
self.pipelines = {}
|
||||
self.pipelines: Dict[str, CodePipeline] = {}
|
||||
|
||||
@staticmethod
|
||||
def default_vpc_endpoint_service(service_region, zones):
|
||||
def default_vpc_endpoint_service(
|
||||
service_region: str, zones: List[str]
|
||||
) -> List[Dict[str, str]]:
|
||||
"""Default VPC endpoint service."""
|
||||
return BaseBackend.default_vpc_endpoint_service_factory(
|
||||
service_region, zones, "codepipeline", policy_supported=False
|
||||
)
|
||||
|
||||
@property
|
||||
def iam_backend(self):
|
||||
def iam_backend(self) -> IAMBackend:
|
||||
return iam_backends[self.account_id]["global"]
|
||||
|
||||
def create_pipeline(self, pipeline, tags):
|
||||
def create_pipeline(
|
||||
self, pipeline: Dict[str, Any], tags: List[Dict[str, str]]
|
||||
) -> Tuple[Dict[str, Any], List[Dict[str, str]]]:
|
||||
name = pipeline["name"]
|
||||
if name in self.pipelines:
|
||||
raise InvalidStructureException(
|
||||
@ -123,7 +125,7 @@ class CodePipelineBackend(BaseBackend):
|
||||
|
||||
return pipeline, sorted(tags, key=lambda i: i["key"])
|
||||
|
||||
def get_pipeline(self, name):
|
||||
def get_pipeline(self, name: str) -> Tuple[Dict[str, Any], Dict[str, str]]:
|
||||
codepipeline = self.pipelines.get(name)
|
||||
|
||||
if not codepipeline:
|
||||
@ -133,7 +135,7 @@ class CodePipelineBackend(BaseBackend):
|
||||
|
||||
return codepipeline.pipeline, codepipeline.metadata
|
||||
|
||||
def update_pipeline(self, pipeline):
|
||||
def update_pipeline(self, pipeline: Dict[str, Any]) -> Dict[str, Any]:
|
||||
codepipeline = self.pipelines.get(pipeline["name"])
|
||||
|
||||
if not codepipeline:
|
||||
@ -148,7 +150,7 @@ class CodePipelineBackend(BaseBackend):
|
||||
|
||||
return codepipeline.pipeline
|
||||
|
||||
def list_pipelines(self):
|
||||
def list_pipelines(self) -> List[Dict[str, str]]:
|
||||
pipelines = []
|
||||
|
||||
for name, codepipeline in self.pipelines.items():
|
||||
@ -163,10 +165,10 @@ class CodePipelineBackend(BaseBackend):
|
||||
|
||||
return sorted(pipelines, key=lambda i: i["name"])
|
||||
|
||||
def delete_pipeline(self, name):
|
||||
def delete_pipeline(self, name: str) -> None:
|
||||
self.pipelines.pop(name, None)
|
||||
|
||||
def list_tags_for_resource(self, arn):
|
||||
def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]:
|
||||
name = arn.split(":")[-1]
|
||||
pipeline = self.pipelines.get(name)
|
||||
|
||||
@ -179,7 +181,7 @@ class CodePipelineBackend(BaseBackend):
|
||||
|
||||
return sorted(tags, key=lambda i: i["key"])
|
||||
|
||||
def tag_resource(self, arn, tags):
|
||||
def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
|
||||
name = arn.split(":")[-1]
|
||||
pipeline = self.pipelines.get(name)
|
||||
|
||||
@ -193,7 +195,7 @@ class CodePipelineBackend(BaseBackend):
|
||||
for tag in tags:
|
||||
pipeline.tags.update({tag["key"]: tag["value"]})
|
||||
|
||||
def untag_resource(self, arn, tag_keys):
|
||||
def untag_resource(self, arn: str, tag_keys: List[str]) -> None:
|
||||
name = arn.split(":")[-1]
|
||||
pipeline = self.pipelines.get(name)
|
||||
|
||||
|
@ -1,63 +1,63 @@
|
||||
import json
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import codepipeline_backends
|
||||
from .models import codepipeline_backends, CodePipelineBackend
|
||||
|
||||
|
||||
class CodePipelineResponse(BaseResponse):
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(service_name="codepipeline")
|
||||
|
||||
@property
|
||||
def codepipeline_backend(self):
|
||||
def codepipeline_backend(self) -> CodePipelineBackend:
|
||||
return codepipeline_backends[self.current_account][self.region]
|
||||
|
||||
def create_pipeline(self):
|
||||
def create_pipeline(self) -> str:
|
||||
pipeline, tags = self.codepipeline_backend.create_pipeline(
|
||||
self._get_param("pipeline"), self._get_param("tags")
|
||||
)
|
||||
|
||||
return json.dumps({"pipeline": pipeline, "tags": tags})
|
||||
|
||||
def get_pipeline(self):
|
||||
def get_pipeline(self) -> str:
|
||||
pipeline, metadata = self.codepipeline_backend.get_pipeline(
|
||||
self._get_param("name")
|
||||
)
|
||||
|
||||
return json.dumps({"pipeline": pipeline, "metadata": metadata})
|
||||
|
||||
def update_pipeline(self):
|
||||
def update_pipeline(self) -> str:
|
||||
pipeline = self.codepipeline_backend.update_pipeline(
|
||||
self._get_param("pipeline")
|
||||
)
|
||||
|
||||
return json.dumps({"pipeline": pipeline})
|
||||
|
||||
def list_pipelines(self):
|
||||
def list_pipelines(self) -> str:
|
||||
pipelines = self.codepipeline_backend.list_pipelines()
|
||||
|
||||
return json.dumps({"pipelines": pipelines})
|
||||
|
||||
def delete_pipeline(self):
|
||||
def delete_pipeline(self) -> str:
|
||||
self.codepipeline_backend.delete_pipeline(self._get_param("name"))
|
||||
|
||||
return ""
|
||||
|
||||
def list_tags_for_resource(self):
|
||||
def list_tags_for_resource(self) -> str:
|
||||
tags = self.codepipeline_backend.list_tags_for_resource(
|
||||
self._get_param("resourceArn")
|
||||
)
|
||||
|
||||
return json.dumps({"tags": tags})
|
||||
|
||||
def tag_resource(self):
|
||||
def tag_resource(self) -> str:
|
||||
self.codepipeline_backend.tag_resource(
|
||||
self._get_param("resourceArn"), self._get_param("tags")
|
||||
)
|
||||
|
||||
return ""
|
||||
|
||||
def untag_resource(self):
|
||||
def untag_resource(self) -> str:
|
||||
self.codepipeline_backend.untag_resource(
|
||||
self._get_param("resourceArn"), self._get_param("tagKeys")
|
||||
)
|
||||
|
@ -6,7 +6,7 @@ XMLNS_IAM = "https://iam.amazonaws.com/doc/2010-05-08/"
|
||||
class IAMNotFoundException(RESTError):
|
||||
code = 404
|
||||
|
||||
def __init__(self, message):
|
||||
def __init__(self, message: str):
|
||||
super().__init__(
|
||||
"NoSuchEntity", message, xmlns=XMLNS_IAM, template="wrapped_single_error"
|
||||
)
|
||||
|
@ -18,7 +18,7 @@ disable = W,C,R,E
|
||||
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
|
||||
|
||||
[mypy]
|
||||
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch
|
||||
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit
|
||||
show_column_numbers=True
|
||||
show_error_codes = True
|
||||
disable_error_code=abstract
|
||||
|
Loading…
Reference in New Issue
Block a user