Codepipeline - handling iam trust policies and absence of tags. (#5324)

This commit is contained in:
pvbouwel 2022-07-26 21:21:07 +02:00 committed by GitHub
parent 42b11bbe14
commit 7b43d81aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 132 additions and 126 deletions

View File

@ -92,10 +92,13 @@ class CodePipelineBackend(BaseBackend):
try:
role = self.iam_backend.get_role_by_arn(pipeline["roleArn"])
service_principal = json.loads(role.assume_role_policy_document)[
trust_policy_statements = json.loads(role.assume_role_policy_document)[
"Statement"
][0]["Principal"]["Service"]
if "codepipeline.amazonaws.com" not in service_principal:
]
trusted_service_principals = [
i["Principal"]["Service"] for i in trust_policy_statements
]
if "codepipeline.amazonaws.com" not in trusted_service_principals:
raise IAMNotFoundException("")
except IAMNotFoundException:
raise InvalidStructureException(
@ -111,11 +114,13 @@ class CodePipelineBackend(BaseBackend):
self.pipelines[pipeline["name"]] = CodePipeline(self.region_name, pipeline)
if tags:
if tags is not None:
self.pipelines[pipeline["name"]].validate_tags(tags)
new_tags = {tag["key"]: tag["value"] for tag in tags}
self.pipelines[pipeline["name"]].tags.update(new_tags)
else:
tags = []
return pipeline, sorted(tags, key=lambda i: i["key"])

View File

View File

@ -1,4 +1,5 @@
import json
from copy import deepcopy
from datetime import datetime
import boto3
@ -9,64 +10,65 @@ import pytest
from moto import mock_codepipeline, mock_iam
expected_pipeline_details = {
"name": "test-pipeline",
"roleArn": "arn:aws:iam::123456789012:role/test-role",
"artifactStore": {
"type": "S3",
"location": "codepipeline-us-east-1-123456789012",
},
"stages": [
{
"name": "Stage-1",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Source",
"owner": "AWS",
"provider": "S3",
"version": "1",
},
"runOrder": 1,
"configuration": {
"S3Bucket": "test-bucket",
"S3ObjectKey": "test-object",
},
"outputArtifacts": [{"name": "artifact"}],
"inputArtifacts": [],
}
],
},
{
"name": "Stage-2",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Approval",
"owner": "AWS",
"provider": "Manual",
"version": "1",
},
"runOrder": 1,
"configuration": {},
"outputArtifacts": [],
"inputArtifacts": [],
}
],
},
],
"version": 1,
}
@mock_codepipeline
def test_create_pipeline():
client = boto3.client("codepipeline", region_name="us-east-1")
response = create_basic_codepipeline(client, "test-pipeline")
response["pipeline"].should.equal(
{
"name": "test-pipeline",
"roleArn": "arn:aws:iam::123456789012:role/test-role",
"artifactStore": {
"type": "S3",
"location": "codepipeline-us-east-1-123456789012",
},
"stages": [
{
"name": "Stage-1",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Source",
"owner": "AWS",
"provider": "S3",
"version": "1",
},
"runOrder": 1,
"configuration": {
"S3Bucket": "test-bucket",
"S3ObjectKey": "test-object",
},
"outputArtifacts": [{"name": "artifact"}],
"inputArtifacts": [],
}
],
},
{
"name": "Stage-2",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Approval",
"owner": "AWS",
"provider": "Manual",
"version": "1",
},
"runOrder": 1,
"configuration": {},
"outputArtifacts": [],
"inputArtifacts": [],
}
],
},
],
"version": 1,
}
)
response["pipeline"].should.equal(expected_pipeline_details)
response["tags"].should.equal([{"key": "key", "value": "value"}])
@ -219,58 +221,7 @@ def test_get_pipeline():
response = client.get_pipeline(name="test-pipeline")
response["pipeline"].should.equal(
{
"name": "test-pipeline",
"roleArn": "arn:aws:iam::123456789012:role/test-role",
"artifactStore": {
"type": "S3",
"location": "codepipeline-us-east-1-123456789012",
},
"stages": [
{
"name": "Stage-1",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Source",
"owner": "AWS",
"provider": "S3",
"version": "1",
},
"runOrder": 1,
"configuration": {
"S3Bucket": "test-bucket",
"S3ObjectKey": "test-object",
},
"outputArtifacts": [{"name": "artifact"}],
"inputArtifacts": [],
}
],
},
{
"name": "Stage-2",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Approval",
"owner": "AWS",
"provider": "Manual",
"version": "1",
},
"runOrder": 1,
"configuration": {},
"outputArtifacts": [],
"inputArtifacts": [],
}
],
},
],
"version": 1,
}
)
response["pipeline"].should.equal(expected_pipeline_details)
response["metadata"]["pipelineArn"].should.equal(
"arn:aws:codepipeline:us-east-1:123456789012:test-pipeline"
)
@ -648,34 +599,41 @@ def test_untag_resource_errors():
)
simple_trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "codepipeline.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
@mock_iam
def get_role_arn():
def get_role_arn(name="test-role", trust_policy=None):
client = boto3.client("iam", region_name="us-east-1")
try:
return client.get_role(RoleName="test-role")["Role"]["Arn"]
return client.get_role(RoleName=name)["Role"]["Arn"]
except ClientError:
if trust_policy is None:
trust_policy = simple_trust_policy
return client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "codepipeline.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
),
RoleName=name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
)["Role"]["Arn"]
def create_basic_codepipeline(client, name):
def create_basic_codepipeline(client, name, role_arn=None, tags=None):
if role_arn is None:
role_arn = get_role_arn()
if tags is None:
tags = [{"key": "key", "value": "value"}]
return client.create_pipeline(
pipeline={
"name": name,
"roleArn": get_role_arn(),
"roleArn": role_arn,
"artifactStore": {
"type": "S3",
"location": "codepipeline-us-east-1-123456789012",
@ -716,5 +674,48 @@ def create_basic_codepipeline(client, name):
},
],
},
tags=[{"key": "key", "value": "value"}],
tags=tags,
)
extended_trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "codebuild.amazonaws.com"},
"Action": "sts:AssumeRole",
},
{
"Effect": "Allow",
"Principal": {"Service": "codepipeline.amazonaws.com"},
"Action": "sts:AssumeRole",
},
],
}
@mock_codepipeline
def test_create_pipeline_with_extended_trust_policy():
client = boto3.client("codepipeline", region_name="us-east-1")
role_arn = get_role_arn(
name="test-role-extended", trust_policy=extended_trust_policy
)
response = create_basic_codepipeline(client, "test-pipeline", role_arn=role_arn)
extended_pipeline_details = deepcopy(expected_pipeline_details)
extended_pipeline_details["roleArn"] = role_arn
response["pipeline"].should.equal(extended_pipeline_details)
response["tags"].should.equal([{"key": "key", "value": "value"}])
@mock_codepipeline
def test_create_pipeline_without_tags():
client = boto3.client("codepipeline", region_name="us-east-1")
response = create_basic_codepipeline(client, "test-pipeline", tags=[])
response["pipeline"].should.equal(expected_pipeline_details)
response["tags"].should.equal([])