import json from datetime import datetime from typing import Any, Dict, List, Tuple from moto.core.utils import iso_8601_datetime_with_milliseconds from moto.iam.exceptions import IAMNotFoundException from moto.iam.models import iam_backends, IAMBackend from moto.codepipeline.exceptions import ( InvalidStructureException, PipelineNotFoundException, ResourceNotFoundException, InvalidTagsException, TooManyTagsException, ) from moto.core import BaseBackend, BackendDict, BaseModel class CodePipeline(BaseModel): def __init__(self, account_id: str, region: str, pipeline: Dict[str, Any]): # the version number for a new pipeline is always 1 pipeline["version"] = 1 self.pipeline = self.add_default_values(pipeline) self.tags: Dict[str, str] = {} self._arn = f"arn:aws:codepipeline:{region}:{account_id}:{pipeline['name']}" self._created = datetime.utcnow() self._updated = datetime.utcnow() @property def metadata(self) -> Dict[str, str]: return { "pipelineArn": self._arn, "created": iso_8601_datetime_with_milliseconds(self._created), "updated": iso_8601_datetime_with_milliseconds(self._updated), } def add_default_values(self, pipeline: Dict[str, Any]) -> Dict[str, Any]: for stage in pipeline["stages"]: for action in stage["actions"]: if "runOrder" not in action: action["runOrder"] = 1 if "configuration" not in action: action["configuration"] = {} if "outputArtifacts" not in action: action["outputArtifacts"] = [] if "inputArtifacts" not in action: action["inputArtifacts"] = [] return pipeline def validate_tags(self, tags: List[Dict[str, str]]) -> None: for tag in tags: if tag["key"].startswith("aws:"): raise InvalidTagsException( "Not allowed to modify system tags. " "System tags start with 'aws:'. " "msg=[Caller is an end user and not allowed to mutate system tags]" ) if (len(self.tags) + len(tags)) > 50: raise TooManyTagsException(self._arn) class CodePipelineBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.pipelines: Dict[str, CodePipeline] = {} @staticmethod def default_vpc_endpoint_service( service_region: str, zones: List[str] ) -> List[Dict[str, str]]: """Default VPC endpoint service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "codepipeline", policy_supported=False ) @property def iam_backend(self) -> IAMBackend: return iam_backends[self.account_id]["global"] def create_pipeline( self, pipeline: Dict[str, Any], tags: List[Dict[str, str]] ) -> Tuple[Dict[str, Any], List[Dict[str, str]]]: name = pipeline["name"] if name in self.pipelines: raise InvalidStructureException( f"A pipeline with the name '{name}' already exists in account '{self.account_id}'" ) try: role = self.iam_backend.get_role_by_arn(pipeline["roleArn"]) trust_policy_statements = json.loads(role.assume_role_policy_document)[ "Statement" ] trusted_service_principals = [ i["Principal"]["Service"] for i in trust_policy_statements ] if "codepipeline.amazonaws.com" not in trusted_service_principals: raise IAMNotFoundException("") except IAMNotFoundException: raise InvalidStructureException( f"CodePipeline is not authorized to perform AssumeRole on role {pipeline['roleArn']}" ) if len(pipeline["stages"]) < 2: raise InvalidStructureException( "Pipeline has only 1 stage(s). There should be a minimum of 2 stages in a pipeline" ) self.pipelines[pipeline["name"]] = CodePipeline( self.account_id, self.region_name, pipeline ) if tags is not None: self.pipelines[pipeline["name"]].validate_tags(tags) new_tags = {tag["key"]: tag["value"] for tag in tags} self.pipelines[pipeline["name"]].tags.update(new_tags) else: tags = [] return pipeline, sorted(tags, key=lambda i: i["key"]) def get_pipeline(self, name: str) -> Tuple[Dict[str, Any], Dict[str, str]]: codepipeline = self.pipelines.get(name) if not codepipeline: raise PipelineNotFoundException( f"Account '{self.account_id}' does not have a pipeline with name '{name}'" ) return codepipeline.pipeline, codepipeline.metadata def update_pipeline(self, pipeline: Dict[str, Any]) -> Dict[str, Any]: codepipeline = self.pipelines.get(pipeline["name"]) if not codepipeline: raise ResourceNotFoundException( f"The account with id '{self.account_id}' does not include a pipeline with the name '{pipeline['name']}'" ) # version number is auto incremented pipeline["version"] = codepipeline.pipeline["version"] + 1 codepipeline._updated = datetime.utcnow() codepipeline.pipeline = codepipeline.add_default_values(pipeline) return codepipeline.pipeline def list_pipelines(self) -> List[Dict[str, str]]: pipelines = [] for name, codepipeline in self.pipelines.items(): pipelines.append( { "name": name, "version": codepipeline.pipeline["version"], "created": codepipeline.metadata["created"], "updated": codepipeline.metadata["updated"], } ) return sorted(pipelines, key=lambda i: i["name"]) def delete_pipeline(self, name: str) -> None: self.pipelines.pop(name, None) def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]: name = arn.split(":")[-1] pipeline = self.pipelines.get(name) if not pipeline: raise ResourceNotFoundException( f"The account with id '{self.account_id}' does not include a pipeline with the name '{name}'" ) tags = [{"key": key, "value": value} for key, value in pipeline.tags.items()] return sorted(tags, key=lambda i: i["key"]) def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None: name = arn.split(":")[-1] pipeline = self.pipelines.get(name) if not pipeline: raise ResourceNotFoundException( f"The account with id '{self.account_id}' does not include a pipeline with the name '{name}'" ) pipeline.validate_tags(tags) for tag in tags: pipeline.tags.update({tag["key"]: tag["value"]}) def untag_resource(self, arn: str, tag_keys: List[str]) -> None: name = arn.split(":")[-1] pipeline = self.pipelines.get(name) if not pipeline: raise ResourceNotFoundException( f"The account with id '{self.account_id}' does not include a pipeline with the name '{name}'" ) for key in tag_keys: pipeline.tags.pop(key, None) codepipeline_backends = BackendDict(CodePipelineBackend, "codepipeline")