From 7b43d81aab22167c97ff7ccf80b72f31a37acc03 Mon Sep 17 00:00:00 2001 From: pvbouwel Date: Tue, 26 Jul 2022 21:21:07 +0200 Subject: [PATCH] Codepipeline - handling iam trust policies and absence of tags. (#5324) --- moto/codepipeline/models.py | 13 +- tests/test_codepipeline/__init__.py | 0 tests/test_codepipeline/test_codepipeline.py | 245 ++++++++++--------- 3 files changed, 132 insertions(+), 126 deletions(-) create mode 100644 tests/test_codepipeline/__init__.py diff --git a/moto/codepipeline/models.py b/moto/codepipeline/models.py index 5b73afc3f..72574e4b8 100644 --- a/moto/codepipeline/models.py +++ b/moto/codepipeline/models.py @@ -92,10 +92,13 @@ class CodePipelineBackend(BaseBackend): try: role = self.iam_backend.get_role_by_arn(pipeline["roleArn"]) - service_principal = json.loads(role.assume_role_policy_document)[ + trust_policy_statements = json.loads(role.assume_role_policy_document)[ "Statement" - ][0]["Principal"]["Service"] - if "codepipeline.amazonaws.com" not in service_principal: + ] + trusted_service_principals = [ + i["Principal"]["Service"] for i in trust_policy_statements + ] + if "codepipeline.amazonaws.com" not in trusted_service_principals: raise IAMNotFoundException("") except IAMNotFoundException: raise InvalidStructureException( @@ -111,11 +114,13 @@ class CodePipelineBackend(BaseBackend): self.pipelines[pipeline["name"]] = CodePipeline(self.region_name, pipeline) - if tags: + if tags is not None: self.pipelines[pipeline["name"]].validate_tags(tags) new_tags = {tag["key"]: tag["value"] for tag in tags} self.pipelines[pipeline["name"]].tags.update(new_tags) + else: + tags = [] return pipeline, sorted(tags, key=lambda i: i["key"]) diff --git a/tests/test_codepipeline/__init__.py b/tests/test_codepipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_codepipeline/test_codepipeline.py b/tests/test_codepipeline/test_codepipeline.py index 45a8ac24a..fc2698a06 100644 --- a/tests/test_codepipeline/test_codepipeline.py +++ b/tests/test_codepipeline/test_codepipeline.py @@ -1,4 +1,5 @@ import json +from copy import deepcopy from datetime import datetime import boto3 @@ -9,64 +10,65 @@ import pytest from moto import mock_codepipeline, mock_iam +expected_pipeline_details = { + "name": "test-pipeline", + "roleArn": "arn:aws:iam::123456789012:role/test-role", + "artifactStore": { + "type": "S3", + "location": "codepipeline-us-east-1-123456789012", + }, + "stages": [ + { + "name": "Stage-1", + "actions": [ + { + "name": "Action-1", + "actionTypeId": { + "category": "Source", + "owner": "AWS", + "provider": "S3", + "version": "1", + }, + "runOrder": 1, + "configuration": { + "S3Bucket": "test-bucket", + "S3ObjectKey": "test-object", + }, + "outputArtifacts": [{"name": "artifact"}], + "inputArtifacts": [], + } + ], + }, + { + "name": "Stage-2", + "actions": [ + { + "name": "Action-1", + "actionTypeId": { + "category": "Approval", + "owner": "AWS", + "provider": "Manual", + "version": "1", + }, + "runOrder": 1, + "configuration": {}, + "outputArtifacts": [], + "inputArtifacts": [], + } + ], + }, + ], + "version": 1, +} + + @mock_codepipeline def test_create_pipeline(): client = boto3.client("codepipeline", region_name="us-east-1") response = create_basic_codepipeline(client, "test-pipeline") - response["pipeline"].should.equal( - { - "name": "test-pipeline", - "roleArn": "arn:aws:iam::123456789012:role/test-role", - "artifactStore": { - "type": "S3", - "location": "codepipeline-us-east-1-123456789012", - }, - "stages": [ - { - "name": "Stage-1", - "actions": [ - { - "name": "Action-1", - "actionTypeId": { - "category": "Source", - "owner": "AWS", - "provider": "S3", - "version": "1", - }, - "runOrder": 1, - "configuration": { - "S3Bucket": "test-bucket", - "S3ObjectKey": "test-object", - }, - "outputArtifacts": [{"name": "artifact"}], - "inputArtifacts": [], - } - ], - }, - { - "name": "Stage-2", - "actions": [ - { - "name": "Action-1", - "actionTypeId": { - "category": "Approval", - "owner": "AWS", - "provider": "Manual", - "version": "1", - }, - "runOrder": 1, - "configuration": {}, - "outputArtifacts": [], - "inputArtifacts": [], - } - ], - }, - ], - "version": 1, - } - ) + response["pipeline"].should.equal(expected_pipeline_details) response["tags"].should.equal([{"key": "key", "value": "value"}]) @@ -219,58 +221,7 @@ def test_get_pipeline(): response = client.get_pipeline(name="test-pipeline") - response["pipeline"].should.equal( - { - "name": "test-pipeline", - "roleArn": "arn:aws:iam::123456789012:role/test-role", - "artifactStore": { - "type": "S3", - "location": "codepipeline-us-east-1-123456789012", - }, - "stages": [ - { - "name": "Stage-1", - "actions": [ - { - "name": "Action-1", - "actionTypeId": { - "category": "Source", - "owner": "AWS", - "provider": "S3", - "version": "1", - }, - "runOrder": 1, - "configuration": { - "S3Bucket": "test-bucket", - "S3ObjectKey": "test-object", - }, - "outputArtifacts": [{"name": "artifact"}], - "inputArtifacts": [], - } - ], - }, - { - "name": "Stage-2", - "actions": [ - { - "name": "Action-1", - "actionTypeId": { - "category": "Approval", - "owner": "AWS", - "provider": "Manual", - "version": "1", - }, - "runOrder": 1, - "configuration": {}, - "outputArtifacts": [], - "inputArtifacts": [], - } - ], - }, - ], - "version": 1, - } - ) + response["pipeline"].should.equal(expected_pipeline_details) response["metadata"]["pipelineArn"].should.equal( "arn:aws:codepipeline:us-east-1:123456789012:test-pipeline" ) @@ -648,34 +599,41 @@ def test_untag_resource_errors(): ) +simple_trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "codepipeline.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], +} + + @mock_iam -def get_role_arn(): +def get_role_arn(name="test-role", trust_policy=None): client = boto3.client("iam", region_name="us-east-1") try: - return client.get_role(RoleName="test-role")["Role"]["Arn"] + return client.get_role(RoleName=name)["Role"]["Arn"] except ClientError: + if trust_policy is None: + trust_policy = simple_trust_policy return client.create_role( - RoleName="test-role", - AssumeRolePolicyDocument=json.dumps( - { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": {"Service": "codepipeline.amazonaws.com"}, - "Action": "sts:AssumeRole", - } - ], - } - ), + RoleName=name, + AssumeRolePolicyDocument=json.dumps(trust_policy), )["Role"]["Arn"] -def create_basic_codepipeline(client, name): +def create_basic_codepipeline(client, name, role_arn=None, tags=None): + if role_arn is None: + role_arn = get_role_arn() + if tags is None: + tags = [{"key": "key", "value": "value"}] return client.create_pipeline( pipeline={ "name": name, - "roleArn": get_role_arn(), + "roleArn": role_arn, "artifactStore": { "type": "S3", "location": "codepipeline-us-east-1-123456789012", @@ -716,5 +674,48 @@ def create_basic_codepipeline(client, name): }, ], }, - tags=[{"key": "key", "value": "value"}], + tags=tags, ) + + +extended_trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "codebuild.amazonaws.com"}, + "Action": "sts:AssumeRole", + }, + { + "Effect": "Allow", + "Principal": {"Service": "codepipeline.amazonaws.com"}, + "Action": "sts:AssumeRole", + }, + ], +} + + +@mock_codepipeline +def test_create_pipeline_with_extended_trust_policy(): + client = boto3.client("codepipeline", region_name="us-east-1") + + role_arn = get_role_arn( + name="test-role-extended", trust_policy=extended_trust_policy + ) + response = create_basic_codepipeline(client, "test-pipeline", role_arn=role_arn) + + extended_pipeline_details = deepcopy(expected_pipeline_details) + extended_pipeline_details["roleArn"] = role_arn + + response["pipeline"].should.equal(extended_pipeline_details) + response["tags"].should.equal([{"key": "key", "value": "value"}]) + + +@mock_codepipeline +def test_create_pipeline_without_tags(): + client = boto3.client("codepipeline", region_name="us-east-1") + + response = create_basic_codepipeline(client, "test-pipeline", tags=[]) + + response["pipeline"].should.equal(expected_pipeline_details) + response["tags"].should.equal([])