Merge pull request #3314 from bblommers/dependency-test

List dependencies for services - add integration test
This commit is contained in:
Steve Pulec 2020-09-23 07:07:31 -05:00 committed by GitHub
commit ce60e9e3b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1434 additions and 1246 deletions

View File

@ -1,13 +1,17 @@
Moto Changelog
===================
Unreleased
-----
* Reduced dependency overhead.
It is now possible to install dependencies for only specific services using:
pip install moto[service1,service1].
See the README for more information.
1.3.16
-----
* Undoing dependency cleanup until we cut a larger release
1.3.15
-----
Full list of PRs merged in this release:
https://github.com/spulec/moto/pulls?q=is%3Apr+is%3Aclosed+merged%3A2019-11-14..2020-09-07
@ -15,9 +19,6 @@ https://github.com/spulec/moto/pulls?q=is%3Apr+is%3Aclosed+merged%3A2019-11-14..
General Changes:
* The scaffold.py-script has been fixed to make it easier to scaffold new services.
See the README for an introduction.
* Reduced dependency overhead.
It is now possible to install dependencies for only a specific service using pip install moto[service].
Available services: all, acm, awslambda, batch, cloudformation, cognitoidp, ec2, iotdata, iam, xray
New Services:
* Application Autoscaling
@ -213,6 +214,10 @@ https://github.com/spulec/moto/pulls?q=is%3Apr+is%3Aclosed+merged%3A2019-11-14..
* SNS - Now supports sending a message directly to a phone number
* SQS - MessageAttributes now support labeled DataTypes
1.3.15
-----
This release broke dependency management for a lot of services - please upgrade to 1.3.16.
1.3.14
-----

View File

@ -57,3 +57,6 @@ implementation_coverage:
scaffold:
@pip install -r requirements-dev.txt > /dev/null
exec python scripts/scaffold.py
int_test:
@./scripts/int_test.sh

View File

@ -9,6 +9,25 @@
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/moto.svg)
![PyPI - Downloads](https://img.shields.io/pypi/dw/moto.svg) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
## Install
To install moto for a specific service:
```console
$ pip install moto[ec2,s3]
```
This will install Moto, and the dependencies required for that specific service.
If you don't care about the number of dependencies, or if you want to mock many AWS services:
```console
$ pip install moto[all]
```
Not all services might be covered, in which case you might see a warning:
`moto 1.3.16 does not provide the extra 'service'`.
You can ignore the warning, or simply install moto as is:
```console
$ pip install moto
```
## In a nutshell
Moto is a library that allows your tests to easily mock out AWS Services.
@ -459,15 +478,7 @@ require that you update your hosts file for your code to work properly:
1. `s3-control`
For the above services, this is required because the hostname is in the form of `AWS_ACCOUNT_ID.localhost`.
As a result, you need to add that entry to your host file for your tests to function properly.
## Install
```console
$ pip install moto
```
As a result, you need to add that entry to your host file for your tests to function properly.
## Releases

View File

@ -14,10 +14,9 @@ try:
except ImportError:
from urllib.parse import urlparse
import responses
from moto.core import BaseBackend, BaseModel
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from .utils import create_id
from moto.core.utils import path_url
from moto.sts.models import ACCOUNT_ID
from .exceptions import (
ApiKeyNotFoundException,
UsagePlanNotFoundException,

View File

@ -2,7 +2,7 @@ from boto3 import Session
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from datetime import datetime
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
from .exceptions import RepositoryDoesNotExistException, RepositoryNameExistsException
import uuid

View File

@ -15,9 +15,7 @@ from moto.codepipeline.exceptions import (
InvalidTagsException,
TooManyTagsException,
)
from moto.core import BaseBackend, BaseModel
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
class CodePipeline(BaseModel):

View File

@ -27,7 +27,7 @@ from moto.core.utils import (
iso_8601_datetime_with_milliseconds,
camelcase_to_underscores,
)
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
from .exceptions import (
CidrLimitExceeded,
DependencyViolationError,

View File

@ -11,7 +11,7 @@ from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
EC2_RESOURCE_TO_PREFIX = {
"customer-gateway": "cgw",

View File

@ -4,8 +4,7 @@ import json
from boto3 import Session
from moto.core.exceptions import JsonRESTError
from moto.core import BaseBackend, CloudFormationModel
from moto.sts.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
from moto.utilities.tagging_service import TaggingService
from uuid import uuid4

View File

@ -6,11 +6,10 @@ from datetime import datetime, timedelta
from boto3 import Session
from moto.core import BaseBackend, CloudFormationModel
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
from moto.core.utils import unix_time
from moto.utilities.tagging_service import TaggingService
from moto.core.exceptions import JsonRESTError
from moto.iam.models import ACCOUNT_ID
from .utils import decrypt, encrypt, generate_key_id, generate_master_key

View File

@ -1,14 +1,13 @@
from __future__ import unicode_literals
import os
from boto3 import Session
from copy import deepcopy
from datetime import datetime
from moto.core import BaseBackend, BaseModel
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.exceptions import RESTError
from moto.ec2 import ec2_backends
from moto.sagemaker import validators
from moto.sts.models import ACCOUNT_ID
from .exceptions import MissingModel
@ -909,5 +908,9 @@ class SageMakerModelBackend(BaseBackend):
sagemaker_backends = {}
for region, ec2_backend in ec2_backends.items():
for region in Session().get_available_regions("sagemaker"):
sagemaker_backends[region] = SageMakerModelBackend(region)
for region in Session().get_available_regions("sagemaker", partition_name="aws-us-gov"):
sagemaker_backends[region] = SageMakerModelBackend(region)
for region in Session().get_available_regions("sagemaker", partition_name="aws-cn"):
sagemaker_backends[region] = SageMakerModelBackend(region)

View File

@ -4,9 +4,8 @@ from datetime import datetime
from boto3 import Session
from moto.core import BaseBackend
from moto.core import ACCOUNT_ID, BaseBackend
from moto.core.utils import iso_8601_datetime_without_milliseconds
from moto.sts.models import ACCOUNT_ID
from uuid import uuid4
from .exceptions import (
ExecutionAlreadyExists,

View File

@ -1,11 +1,11 @@
from __future__ import unicode_literals
import bisect
from boto3 import Session
import datetime
from collections import defaultdict
import json
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
from .exceptions import BadSegmentException, AWSError
@ -287,5 +287,9 @@ class XRayBackend(BaseBackend):
xray_backends = {}
for region, ec2_backend in ec2_backends.items():
for region in Session().get_available_regions("xray"):
xray_backends[region] = XRayBackend()
for region in Session().get_available_regions("xray", partition_name="aws-us-gov"):
xray_backends[region] = XRayBackend()
for region in Session().get_available_regions("xray", partition_name="aws-cn"):
xray_backends[region] = XRayBackend()

View File

@ -1,18 +1,16 @@
-r requirements.txt
nose
-r requirements-tests.txt
black==19.10b0; python_version >= '3.6'
regex==2019.11.1; python_version >= '3.6' # Needed for black
sure==1.4.11
coverage==4.5.4
flake8==3.7.8
freezegun
flask
flask-cors
boto>=2.45.0
boto3>=1.4.4
botocore>=1.15.13
six>=1.9
parameterized>=0.7.0
prompt-toolkit==2.0.10 # 3.x is not available with python2
click==6.7
inflection==0.3.1

4
requirements-tests.txt Normal file
View File

@ -0,0 +1,4 @@
nose
sure==1.4.11
freezegun
parameterized>=0.7.0

87
scripts/int_test.sh Executable file
View File

@ -0,0 +1,87 @@
#
# Dependency Integration Test script
#
# Runs a test to verify whether each service has the correct dependencies listed in setup.py
#
# ::Algorithm::
# For each valid service:
# - Create a virtual environment
# - Install only the necessary dependencies
# - Run the tests for that service
# - If the tests fail:
# - This service is probably missing a dependency
# - A log file with the test results will be created (test_results_service.log)
# - Delete the virtual environment
#
# Note:
# Only tested on Linux
# Parallelized to test 4 services at the time.
# Could take some time to run - around 20 minutes on the author's machine
overwrite() { echo -e "\r\033[1A\033[0K$@"; }
contains() {
[[ $1 =~ (^|[[:space:]])$2($|[[:space:]]) ]] && return 0 || return 1
}
valid_service() {
# Verify whether this is a valid service
# We'll ignore metadata folders, and folders that test generic Moto behaviour
# We'll also ignore CloudFormation, as it will always depend on other services
local ignore_moto_folders="core instance_metadata __pycache__ templates cloudformation"
if echo $ignore_moto_folders | grep -q "$1"; then
return 1
else
return 0
fi
}
test_service() {
service=$1
path_to_test_file=$2
venv_path="test_venv_${service}"
overwrite "Running tests for ${service}.."
virtualenv ${venv_path} -p `which python3` > /dev/null
source ${venv_path}/bin/activate > /dev/null
# Can't just install requirements-file, as it points to all dependencies
pip install -r requirements-tests.txt > /dev/null
pip install .[$service] > /dev/null 2>&1
# Restart venv - ensure these deps are loaded
deactivate
source ${venv_path}/bin/activate > /dev/null
# Run tests for this service
test_result_filename="test_results_${service}.log"
touch $test_result_filename
nosetests -qxs --ignore-files="test_server\.py" --ignore-files="test_${service}_cloudformation\.py" --ignore-files="test_integration\.py" $path_to_test_file >$test_result_filename 2>&1
RESULT=$?
if [[ $RESULT != 0 ]]; then
echo -e "Tests for ${service} have failed!\n"
else
rm $test_result_filename
fi
deactivate
rm -rf ${venv_path}
}
echo "Running Dependency tests..."
ITER=0
for file in moto/*
do
if [[ -d $file ]]; then
service=${file:5}
path_to_test_file="tests/test_${service}"
if valid_service $service && [[ -d $path_to_test_file ]]; then
test_service $service $path_to_test_file &
elif valid_service $service; then
echo -e "No tests for ${service} can be found on ${path_to_test_file}!\n"
fi
if (( $ITER % 4 == 0 )); then
# Ensure we're only processing 4 services at the time
wait
fi
fi
ITER=$(expr $ITER + 1)
done
wait

View File

@ -33,6 +33,7 @@ install_requires = [
"boto>=2.36.0",
"boto3>=1.9.201",
"botocore>=1.12.201",
"cryptography>=2.3.0",
"requests>=2.5",
"xmltodict",
"six>1.9",
@ -74,7 +75,6 @@ else:
"zipp",
]
_dep_cryptography = "cryptography>=2.3.0"
_dep_PyYAML = "PyYAML>=5.1"
_dep_python_jose = "python-jose[cryptography]>=3.1.0,<4.0.0"
_dep_python_jose_ecdsa_pin = "ecdsa<0.15" # https://github.com/spulec/moto/pull/3263#discussion_r477404984
@ -87,7 +87,6 @@ _dep_sshpubkeys_py2 = "sshpubkeys>=3.1.0,<4.0; python_version<'3'"
_dep_sshpubkeys_py3 = "sshpubkeys>=3.1.0; python_version>'3'"
all_extra_deps = [
_dep_cryptography,
_dep_PyYAML,
_dep_python_jose,
_dep_python_jose_ecdsa_pin,
@ -105,18 +104,22 @@ all_server_deps = all_extra_deps + ['flask', 'flask-cors']
# i.e. even those without extra dependencies.
# Would be good for future-compatibility, I guess.
extras_per_service = {
'acm': [_dep_cryptography],
'apigateway': [_dep_python_jose, _dep_python_jose_ecdsa_pin],
'awslambda': [_dep_docker],
'batch': [_dep_docker],
'cloudformation': [_dep_PyYAML, _dep_cfn_lint],
'cloudformation': [_dep_docker, _dep_PyYAML, _dep_cfn_lint],
'cognitoidp': [_dep_python_jose, _dep_python_jose_ecdsa_pin],
"ec2": [_dep_cryptography, _dep_sshpubkeys_py2, _dep_sshpubkeys_py3],
'iam': [_dep_cryptography],
'dynamodb2': [_dep_docker],
'dynamodbstreams': [_dep_docker],
"ec2": [_dep_docker, _dep_sshpubkeys_py2, _dep_sshpubkeys_py3],
'iotdata': [_dep_jsondiff],
's3': [_dep_cryptography],
's3': [_dep_PyYAML],
'ses': [_dep_docker],
'sns': [_dep_docker],
'sqs': [_dep_docker],
'ssm': [_dep_docker, _dep_PyYAML, _dep_cfn_lint],
'xray': [_dep_aws_xray_sdk],
}
extras_require = {
'all': all_extra_deps,
'server': all_server_deps,
@ -148,8 +151,7 @@ setup(
],
},
packages=find_packages(exclude=("tests", "tests.*")),
# Addding all requirements for now until we cut a larger release
install_requires=install_requires + all_extra_deps,
install_requires=install_requires,
extras_require=extras_require,
include_package_data=True,
license="Apache",

View File

View File

@ -2,7 +2,7 @@ import boto3
import sure # noqa
from moto import mock_codecommit
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
from botocore.exceptions import ClientError
from nose.tools import assert_raises

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import boto3
import sure # noqa
from botocore.exceptions import ClientError
from nose.tools import assert_raises

View File

@ -12,7 +12,7 @@ import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from moto.ec2.models import AMIS, OWNER_ID
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
from tests.helpers import requires_boto_gte

View File

@ -0,0 +1,100 @@
from moto import mock_cloudformation_deprecated, mock_ec2_deprecated
from moto import mock_cloudformation, mock_ec2
from tests.test_cloudformation.fixtures import vpc_eni
import boto
import boto3
import json
import sure # noqa
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
eni.private_ip_addresses.should.have.length_of(1)
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eni = [
resource
for resource in resources
if resource.resource_type == "AWS::EC2::NetworkInterface"
][0]
cfn_eni.physical_resource_id.should.equal(eni.id)
outputs = {output.key: output.value for output in stack.outputs}
outputs["ENIIpAddress"].should.equal(eni.private_ip_addresses[0].private_ip_address)
@mock_ec2
@mock_cloudformation
def test_volume_size_through_cloudformation():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf = boto3.client("cloudformation", region_name="us-east-1")
volume_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testInstance": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-d3adb33f",
"KeyName": "dummy",
"InstanceType": "t2.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": "50"}}
],
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
template_json = json.dumps(volume_template)
cf.create_stack(StackName="test_stack", TemplateBody=template_json)
instances = ec2.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(50)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_subnet_tags_through_cloudformation():
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testSubnet": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"VpcId": vpc.id,
"CidrBlock": "10.0.0.0/24",
"AvailabilityZone": "us-west-1b",
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack("test_stack", template_body=template_json)
subnet = vpc_conn.get_all_subnets(filters={"cidrBlock": "10.0.0.0/24"})[0]
subnet.tags["foo"].should.equal("bar")
subnet.tags["blah"].should.equal("baz")

View File

@ -7,15 +7,12 @@ from nose.tools import assert_raises
import boto3
from botocore.exceptions import ClientError
import boto
import boto.cloudformation
import boto.ec2
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_cloudformation_deprecated, mock_ec2_deprecated
from moto import mock_ec2, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
from tests.test_cloudformation.fixtures import vpc_eni
import json
@mock_ec2_deprecated
@ -501,27 +498,3 @@ def test_elastic_network_interfaces_describe_network_interfaces_with_filter():
eni1.private_ip_address
)
response["NetworkInterfaces"][0]["Description"].should.equal(eni1.description)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
eni.private_ip_addresses.should.have.length_of(1)
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eni = [
resource
for resource in resources
if resource.resource_type == "AWS::EC2::NetworkInterface"
][0]
cfn_eni.physical_resource_id.should.equal(eni.id)
outputs = {output.key: output.value for output in stack.outputs}
outputs["ENIIpAddress"].should.equal(eni.private_ip_addresses[0].private_ip_address)

View File

@ -7,19 +7,17 @@ import tests.backport_assert_raises
from nose.tools import assert_raises
import base64
import datetime
import ipaddress
import json
import six
import boto
import boto3
from boto.ec2.instance import Reservation, InstanceAttribute
from boto.exception import EC2ResponseError, EC2ResponseError
from boto.exception import EC2ResponseError
from freezegun import freeze_time
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2, mock_cloudformation
from moto import mock_ec2_deprecated, mock_ec2
from tests.helpers import requires_boto_gte
@ -1673,40 +1671,3 @@ def test_describe_instance_attribute():
invalid_instance_attribute=invalid_instance_attribute
)
ex.exception.response["Error"]["Message"].should.equal(message)
@mock_ec2
@mock_cloudformation
def test_volume_size_through_cloudformation():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf = boto3.client("cloudformation", region_name="us-east-1")
volume_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testInstance": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-d3adb33f",
"KeyName": "dummy",
"InstanceType": "t2.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": "50"}}
],
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
template_json = json.dumps(volume_template)
cf.create_stack(StackName="test_stack", TemplateBody=template_json)
instances = ec2.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(50)

View File

@ -9,8 +9,8 @@ from botocore.exceptions import ClientError
import pytz
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from moto.backends import get_model
from moto import mock_ec2, mock_ec2_deprecated, settings
from moto.ec2.models import ec2_backends
from moto.core.utils import iso_8601_datetime_with_milliseconds
@ -184,13 +184,14 @@ def test_request_spot_instances_fulfilled():
request.state.should.equal("open")
get_model("SpotInstanceRequest", "us-east-1")[0].state = "active"
if not settings.TEST_SERVER_MODE:
ec2_backends["us-east-1"].spot_instance_requests[request.id].state = "active"
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
request.state.should.equal("active")
request.state.should.equal("active")
@mock_ec2_deprecated
@ -247,10 +248,11 @@ def test_request_spot_instances_setting_instance_id():
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(price=0.5, image_id="ami-abcd1234")
req = get_model("SpotInstanceRequest", "us-east-1")[0]
req.state = "active"
req.instance_id = "i-12345678"
if not settings.TEST_SERVER_MODE:
req = ec2_backends["us-east-1"].spot_instance_requests[request[0].id]
req.state = "active"
req.instance_id = "i-12345678"
request = conn.get_all_spot_instance_requests()[0]
assert request.state == "active"
assert request.instance_id == "i-12345678"
request = conn.get_all_spot_instance_requests()[0]
assert request.state == "active"
assert request.instance_id == "i-12345678"

View File

@ -9,11 +9,10 @@ import boto
import boto.vpc
from boto.exception import EC2ResponseError
from botocore.exceptions import ParamValidationError, ClientError
import json
import sure # noqa
import random
from moto import mock_cloudformation_deprecated, mock_ec2, mock_ec2_deprecated
from moto import mock_ec2, mock_ec2_deprecated
@mock_ec2_deprecated
@ -311,38 +310,6 @@ def test_get_subnets_filtering():
).should.throw(NotImplementedError)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_subnet_tags_through_cloudformation():
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testSubnet": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"VpcId": vpc.id,
"CidrBlock": "10.0.0.0/24",
"AvailabilityZone": "us-west-1b",
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack("test_stack", template_body=template_json)
subnet = vpc_conn.get_all_subnets(filters={"cidrBlock": "10.0.0.0/24"})[0]
subnet.tags["foo"].should.equal("bar")
subnet.tags["blah"].should.equal("baz")
@mock_ec2
def test_create_subnet_response_fields():
ec2 = boto3.resource("ec2", region_name="us-west-1")

View File

@ -1,8 +1,6 @@
from __future__ import unicode_literals
from datetime import datetime
from copy import deepcopy
from botocore.exceptions import ClientError
import boto3
import sure # noqa
@ -10,7 +8,6 @@ import json
from moto.ec2 import utils as ec2_utils
from uuid import UUID
from moto import mock_cloudformation, mock_elbv2
from moto import mock_ecs
from moto import mock_ec2
from nose.tools import assert_raises
@ -1649,120 +1646,6 @@ def test_resource_reservation_and_release_memory_reservation():
container_instance_description["runningTasksCount"].should.equal(0)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
}
},
}
template_json = json.dumps(template)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(0)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation_no_name():
# cloudformation should create a cluster name for you if you do not provide it
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-cluster.html#cfn-ecs-cluster-clustername
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {"testCluster": {"Type": "AWS::ECS::Cluster"}},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_cluster_name_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster1"},
}
},
}
template2 = deepcopy(template1)
template2["Resources"]["testCluster"]["Properties"]["ClusterName"] = "testcluster2"
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_resp = cfn_conn.create_stack(
StackName="test_stack", TemplateBody=template1_json
)
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName=stack_resp["StackId"], TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
resp["clusterArns"][0].endswith("testcluster2").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_task_definition_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_name = "test_stack"
cfn_conn.create_stack(StackName=stack_name, TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions()
len(resp["taskDefinitionArns"]).should.equal(1)
task_definition_arn = resp["taskDefinitionArns"][0]
task_definition_details = cfn_conn.describe_stack_resource(
StackName=stack_name, LogicalResourceId="testTaskDefinition"
)["StackResourceDetail"]
task_definition_details["PhysicalResourceId"].should.equal(task_definition_arn)
@mock_ec2
@mock_ecs
def test_task_definitions_unable_to_be_placed():
@ -1877,142 +1760,6 @@ def test_task_definitions_with_port_clash():
response["tasks"][0]["stoppedReason"].should.equal("")
@mock_ecs
@mock_cloudformation
def test_update_task_definition_family_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"Family": "testTaskDefinition1",
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template1_json)
template2 = deepcopy(template1)
template2["Resources"]["testTaskDefinition"]["Properties"][
"Family"
] = "testTaskDefinition2"
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions(familyPrefix="testTaskDefinition2")
len(resp["taskDefinitionArns"]).should.equal(1)
resp["taskDefinitionArns"][0].endswith("testTaskDefinition2:1").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_service_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"DesiredCount": 10,
"TaskDefinition": {"Ref": "testTaskDefinition"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_service_through_cloudformation_should_trigger_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"TaskDefinition": {"Ref": "testTaskDefinition"},
"DesiredCount": 10,
},
},
},
}
template_json1 = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json1)
template2 = deepcopy(template1)
template2["Resources"]["testService"]["Properties"]["DesiredCount"] = 5
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ec2
@mock_ecs
def test_attributes():

View File

@ -0,0 +1,254 @@
import boto3
import json
from copy import deepcopy
from moto import mock_cloudformation, mock_ecs
@mock_ecs
@mock_cloudformation
def test_update_task_definition_family_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"Family": "testTaskDefinition1",
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template1_json)
template2 = deepcopy(template1)
template2["Resources"]["testTaskDefinition"]["Properties"][
"Family"
] = "testTaskDefinition2"
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions(familyPrefix="testTaskDefinition2")
len(resp["taskDefinitionArns"]).should.equal(1)
resp["taskDefinitionArns"][0].endswith("testTaskDefinition2:1").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_service_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"DesiredCount": 10,
"TaskDefinition": {"Ref": "testTaskDefinition"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_service_through_cloudformation_should_trigger_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"TaskDefinition": {"Ref": "testTaskDefinition"},
"DesiredCount": 10,
},
},
},
}
template_json1 = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json1)
template2 = deepcopy(template1)
template2["Resources"]["testService"]["Properties"]["DesiredCount"] = 5
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
}
},
}
template_json = json.dumps(template)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(0)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation_no_name():
# cloudformation should create a cluster name for you if you do not provide it
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-cluster.html#cfn-ecs-cluster-clustername
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {"testCluster": {"Type": "AWS::ECS::Cluster"}},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_cluster_name_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster1"},
}
},
}
template2 = deepcopy(template1)
template2["Resources"]["testCluster"]["Properties"]["ClusterName"] = "testcluster2"
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_resp = cfn_conn.create_stack(
StackName="test_stack", TemplateBody=template1_json
)
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName=stack_resp["StackId"], TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
resp["clusterArns"][0].endswith("testcluster2").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_task_definition_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_name = "test_stack"
cfn_conn.create_stack(StackName=stack_name, TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions()
len(resp["taskDefinitionArns"]).should.equal(1)
task_definition_arn = resp["taskDefinitionArns"][0]
task_definition_details = cfn_conn.describe_stack_resource(
StackName=stack_name, LogicalResourceId="testTaskDefinition"
)["StackResourceDetail"]
task_definition_details["PhysicalResourceId"].should.equal(task_definition_arn)

View File

@ -1,6 +1,5 @@
from __future__ import unicode_literals
import json
import os
import boto3
import botocore
@ -8,7 +7,7 @@ from botocore.exceptions import ClientError, ParamValidationError
from nose.tools import assert_raises
import sure # noqa
from moto import mock_elbv2, mock_ec2, mock_acm, mock_cloudformation
from moto import mock_elbv2, mock_ec2, mock_acm
from moto.elbv2 import elbv2_backends
from moto.core import ACCOUNT_ID
@ -1667,82 +1666,6 @@ def test_modify_listener_http_to_https():
)
@mock_ec2
@mock_elbv2
@mock_cloudformation
def test_create_target_groups_through_cloudformation():
cfn_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
# test that setting a name manually as well as letting cloudformation create a name both work
# this is a special case because test groups have a name length limit of 22 characters, and must be unique
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-targetgroup.html#cfn-elasticloadbalancingv2-targetgroup-name
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"testGroup1": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 80,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup2": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 90,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup3": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Name": "MyTargetGroup",
"Port": 70,
"Protocol": "HTTPS",
"VpcId": {"Ref": "testVPC"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_target_groups_response = elbv2_client.describe_target_groups()
target_group_dicts = describe_target_groups_response["TargetGroups"]
assert len(target_group_dicts) == 3
# there should be 2 target groups with the same prefix of 10 characters (since the random suffix is 12)
# and one named MyTargetGroup
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"] == "MyTargetGroup"
]
)
== 1
)
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"].startswith("test-stack")
]
)
== 2
)
@mock_elbv2
@mock_ec2
def test_redirect_action_listener_rule():
@ -1816,95 +1739,6 @@ def test_redirect_action_listener_rule():
modify_listener_actions.should.equal(expected_default_actions)
@mock_elbv2
@mock_cloudformation
def test_redirect_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
describe_load_balancers_response["LoadBalancers"].should.have.length_of(1)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
]
)
@mock_elbv2
@mock_ec2
def test_cognito_action_listener_rule():
@ -1962,97 +1796,6 @@ def test_cognito_action_listener_rule():
describe_listener_actions.should.equal(action)
@mock_elbv2
@mock_cloudformation
def test_cognito_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
]
)
@mock_elbv2
@mock_ec2
def test_fixed_response_action_listener_rule():
@ -2108,93 +1851,6 @@ def test_fixed_response_action_listener_rule():
describe_listener_actions.should.equal(action)
@mock_elbv2
@mock_cloudformation
def test_fixed_response_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
]
)
@mock_elbv2
@mock_ec2
def test_fixed_response_action_listener_rule_validates_status_code():

View File

@ -0,0 +1,348 @@
import boto3
import json
from moto import mock_elbv2, mock_ec2, mock_cloudformation
from moto.core import ACCOUNT_ID
@mock_elbv2
@mock_cloudformation
def test_redirect_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
describe_load_balancers_response["LoadBalancers"].should.have.length_of(1)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
]
)
@mock_elbv2
@mock_cloudformation
def test_cognito_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
]
)
@mock_ec2
@mock_elbv2
@mock_cloudformation
def test_create_target_groups_through_cloudformation():
cfn_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
# test that setting a name manually as well as letting cloudformation create a name both work
# this is a special case because test groups have a name length limit of 22 characters, and must be unique
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-targetgroup.html#cfn-elasticloadbalancingv2-targetgroup-name
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"testGroup1": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 80,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup2": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 90,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup3": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Name": "MyTargetGroup",
"Port": 70,
"Protocol": "HTTPS",
"VpcId": {"Ref": "testVPC"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_target_groups_response = elbv2_client.describe_target_groups()
target_group_dicts = describe_target_groups_response["TargetGroups"]
assert len(target_group_dicts) == 3
# there should be 2 target groups with the same prefix of 10 characters (since the random suffix is 12)
# and one named MyTargetGroup
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"] == "MyTargetGroup"
]
)
== 1
)
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"].startswith("test-stack")
]
)
== 2
)
@mock_elbv2
@mock_cloudformation
def test_fixed_response_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
]
)

View File

@ -0,0 +1,383 @@
import base64
import boto3
import json
import time
import zlib
from botocore.exceptions import ClientError
from io import BytesIO
from moto import mock_logs, mock_lambda, mock_iam
from nose.tools import assert_raises
from zipfile import ZipFile, ZIP_DEFLATED
@mock_lambda
@mock_logs
def test_put_subscription_filter_update():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
creation_time = filter["creationTime"]
creation_time.should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
# to update an existing subscription filter the 'filerName' must be identical
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="[]",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.equal(creation_time)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = "[]"
# when
# only one subscription filter can be associated with a log group
with assert_raises(ClientError) as e:
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test-2",
filterPattern="",
destinationArn=function_arn,
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("LimitExceededException")
ex.response["Error"]["Message"].should.equal("Resource limit exceeded.")
@mock_lambda
@mock_logs
def test_put_subscription_filter_with_lambda():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
client_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{"timestamp": 0, "message": "test"},
{"timestamp": 0, "message": "test 2"},
],
)
# then
msg_showed_up, received_message = _wait_for_log_msg(
client_logs, "/aws/lambda/test", "awslogs"
)
assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format(
received_message
)
data = json.loads(received_message)["awslogs"]["data"]
response = json.loads(
zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8")
)
response["messageType"].should.equal("DATA_MESSAGE")
response["owner"].should.equal("123456789012")
response["logGroup"].should.equal("/test")
response["logStream"].should.equal("stream")
response["subscriptionFilters"].should.equal(["test"])
log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"])
log_events.should.have.length_of(2)
log_events[0]["id"].should.be.a(int)
log_events[0]["message"].should.equal("test")
log_events[0]["timestamp"].should.equal(0)
log_events[1]["id"].should.be.a(int)
log_events[1]["message"].should.equal("test 2")
log_events[1]["timestamp"].should.equal(0)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="test",
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(0)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="not-existing-log-group", filterName="test",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="wrong-filter-name",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified subscription filter does not exist."
)
@mock_logs
def test_put_subscription_filter_errors():
# given
client = boto3.client("logs", "us-east-1")
log_group_name = "/test"
client.create_log_group(logGroupName=log_group_name)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="not-existing-log-group",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:test",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
def _get_role_name(region_name):
with mock_iam():
iam = boto3.client("iam", region_name=region_name)
try:
return iam.get_role(RoleName="test-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="test-role", AssumeRolePolicyDocument="test policy", Path="/",
)["Role"]["Arn"]
def _get_test_zip_file():
func_str = """
def lambda_handler(event, context):
return event
"""
zip_output = BytesIO()
zip_file = ZipFile(zip_output, "w", ZIP_DEFLATED)
zip_file.writestr("lambda_function.py", func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def _wait_for_log_msg(client, log_group_name, expected_msg_part):
received_messages = []
start = time.time()
while (time.time() - start) < 10:
result = client.describe_log_streams(logGroupName=log_group_name)
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
for log_stream in log_streams:
result = client.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream["logStreamName"],
)
received_messages.extend(
[event["message"] for event in result.get("events")]
)
for message in received_messages:
if expected_msg_part in message:
return True, message
time.sleep(1)
return False, received_messages

View File

@ -1,17 +1,10 @@
import base64
import json
import time
import zlib
from io import BytesIO
from zipfile import ZipFile, ZIP_DEFLATED
import boto3
import os
import sure # noqa
import six
from botocore.exceptions import ClientError
from moto import mock_logs, settings, mock_lambda, mock_iam
from moto import mock_logs, settings
from nose.tools import assert_raises
from nose import SkipTest
@ -465,375 +458,3 @@ def test_describe_subscription_filters_errors():
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
@mock_lambda
@mock_logs
def test_put_subscription_filter_update():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
creation_time = filter["creationTime"]
creation_time.should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
# to update an existing subscription filter the 'filerName' must be identical
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="[]",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.equal(creation_time)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = "[]"
# when
# only one subscription filter can be associated with a log group
with assert_raises(ClientError) as e:
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test-2",
filterPattern="",
destinationArn=function_arn,
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("LimitExceededException")
ex.response["Error"]["Message"].should.equal("Resource limit exceeded.")
@mock_lambda
@mock_logs
def test_put_subscription_filter_with_lambda():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
client_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{"timestamp": 0, "message": "test"},
{"timestamp": 0, "message": "test 2"},
],
)
# then
msg_showed_up, received_message = _wait_for_log_msg(
client_logs, "/aws/lambda/test", "awslogs"
)
assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format(
received_message
)
data = json.loads(received_message)["awslogs"]["data"]
response = json.loads(
zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8")
)
response["messageType"].should.equal("DATA_MESSAGE")
response["owner"].should.equal("123456789012")
response["logGroup"].should.equal("/test")
response["logStream"].should.equal("stream")
response["subscriptionFilters"].should.equal(["test"])
log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"])
log_events.should.have.length_of(2)
log_events[0]["id"].should.be.a(int)
log_events[0]["message"].should.equal("test")
log_events[0]["timestamp"].should.equal(0)
log_events[1]["id"].should.be.a(int)
log_events[1]["message"].should.equal("test 2")
log_events[1]["timestamp"].should.equal(0)
@mock_logs
def test_put_subscription_filter_errors():
# given
client = boto3.client("logs", "us-east-1")
log_group_name = "/test"
client.create_log_group(logGroupName=log_group_name)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="not-existing-log-group",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:test",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="test",
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(0)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="not-existing-log-group", filterName="test",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="wrong-filter-name",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified subscription filter does not exist."
)
def _get_role_name(region_name):
with mock_iam():
iam = boto3.client("iam", region_name=region_name)
try:
return iam.get_role(RoleName="test-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="test-role", AssumeRolePolicyDocument="test policy", Path="/",
)["Role"]["Arn"]
def _get_test_zip_file():
func_str = """
def lambda_handler(event, context):
return event
"""
zip_output = BytesIO()
zip_file = ZipFile(zip_output, "w", ZIP_DEFLATED)
zip_file.writestr("lambda_function.py", func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def _wait_for_log_msg(client, log_group_name, expected_msg_part):
received_messages = []
start = time.time()
while (time.time() - start) < 10:
result = client.describe_log_streams(logGroupName=log_group_name)
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
for log_stream in log_streams:
result = client.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream["logStreamName"],
)
received_messages.extend(
[event["message"] for event in result.get("events")]
)
for message in received_messages:
if expected_msg_part in message:
return True, message
time.sleep(1)
return False, received_messages

View File

@ -3,7 +3,6 @@ from __future__ import unicode_literals
import base64
import json
import os
import time
import uuid
@ -17,34 +16,13 @@ from boto.exception import SQSError
from boto.sqs.message import Message, RawMessage
from botocore.exceptions import ClientError
from freezegun import freeze_time
from moto import mock_sqs, mock_sqs_deprecated, mock_cloudformation, settings
from moto import mock_sqs, mock_sqs_deprecated, mock_lambda, mock_logs, settings
from nose import SkipTest
from nose.tools import assert_raises
from tests.helpers import requires_boto_gte
from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name
from moto.core import ACCOUNT_ID
sqs_template_with_tags = """
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"SQSQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"Tags" : [
{
"Key" : "keyname1",
"Value" : "value1"
},
{
"Key" : "keyname2",
"Value" : "value2"
}
]
}
}
}
}"""
TEST_POLICY = """
{
"Version":"2012-10-17",
@ -2042,15 +2020,59 @@ def test_send_messages_to_fifo_without_message_group_id():
)
@mock_logs
@mock_lambda
@mock_sqs
@mock_cloudformation
def test_create_from_cloudformation_json_with_tags():
cf = boto3.client("cloudformation", region_name="us-east-1")
client = boto3.client("sqs", region_name="us-east-1")
def test_invoke_function_from_sqs_exception():
logs_conn = boto3.client("logs", region_name="us-east-1")
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(QueueName="test-sqs-queue1")
cf.create_stack(StackName="test-sqs", TemplateBody=sqs_template_with_tags)
conn = boto3.client("lambda", region_name="us-east-1")
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
queue_url = client.list_queues()["QueueUrls"][0]
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
)
queue_tags = client.list_queue_tags(QueueUrl=queue_url)["Tags"]
queue_tags.should.equal({"keyname1": "value1", "keyname2": "value2"})
assert response["EventSourceArn"] == queue.attributes["QueueArn"]
assert response["State"] == "Enabled"
entries = [
{
"Id": "1",
"MessageBody": json.dumps({"uuid": str(uuid.uuid4()), "test": "test"}),
}
]
queue.send_messages(Entries=entries)
start = time.time()
while (time.time() - start) < 30:
result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
assert len(log_streams) >= 1
result = logs_conn.get_log_events(
logGroupName="/aws/lambda/testFunction",
logStreamName=log_streams[0]["logStreamName"],
)
for event in result.get("events"):
if "custom log event" in event["message"]:
return
time.sleep(1)
assert False, "Test Failed"

View File

@ -0,0 +1,38 @@
import boto3
from moto import mock_sqs, mock_cloudformation
sqs_template_with_tags = """
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"SQSQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"Tags" : [
{
"Key" : "keyname1",
"Value" : "value1"
},
{
"Key" : "keyname2",
"Value" : "value2"
}
]
}
}
}
}"""
@mock_sqs
@mock_cloudformation
def test_create_from_cloudformation_json_with_tags():
cf = boto3.client("cloudformation", region_name="us-east-1")
client = boto3.client("sqs", region_name="us-east-1")
cf.create_stack(StackName="test-sqs", TemplateBody=sqs_template_with_tags)
queue_url = client.list_queues()["QueueUrls"][0]
queue_tags = client.list_queue_tags(QueueUrl=queue_url)["Tags"]
queue_tags.should.equal({"keyname1": "value1", "keyname2": "value2"})

View File

@ -7,12 +7,11 @@ import botocore.exceptions
import sure # noqa
import datetime
import uuid
import json
from botocore.exceptions import ClientError, ParamValidationError
from nose.tools import assert_raises
from moto import mock_ssm, mock_cloudformation
from moto import mock_ssm
@mock_ssm
@ -1714,68 +1713,3 @@ def test_get_command_invocation():
invocation_response = client.get_command_invocation(
CommandId=cmd_id, InstanceId=instance_id, PluginName="FAKE"
)
@mock_ssm
@mock_cloudformation
def test_get_command_invocations_from_stack():
stack_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test Stack",
"Resources": {
"EC2Instance1": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-test-image-id",
"KeyName": "test",
"InstanceType": "t2.micro",
"Tags": [
{"Key": "Test Description", "Value": "Test tag"},
{"Key": "Test Name", "Value": "Name tag for tests"},
],
},
}
},
"Outputs": {
"test": {
"Description": "Test Output",
"Value": "Test output value",
"Export": {"Name": "Test value to export"},
},
"PublicIP": {"Value": "Test public ip"},
},
}
cloudformation_client = boto3.client("cloudformation", region_name="us-east-1")
stack_template_str = json.dumps(stack_template)
response = cloudformation_client.create_stack(
StackName="test_stack",
TemplateBody=stack_template_str,
Capabilities=("CAPABILITY_IAM",),
)
client = boto3.client("ssm", region_name="us-east-1")
ssm_document = "AWS-RunShellScript"
params = {"commands": ["#!/bin/bash\necho 'hello world'"]}
response = client.send_command(
Targets=[
{"Key": "tag:aws:cloudformation:stack-name", "Values": ("test_stack",)}
],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region="us-east-2",
OutputS3BucketName="the-bucket",
OutputS3KeyPrefix="pref",
)
cmd = response["Command"]
cmd_id = cmd["CommandId"]
instance_ids = cmd["InstanceIds"]
invocation_response = client.get_command_invocation(
CommandId=cmd_id, InstanceId=instance_ids[0], PluginName="aws:runShellScript"
)

View File

@ -0,0 +1,70 @@
import boto3
import json
from moto import mock_ssm, mock_cloudformation
@mock_ssm
@mock_cloudformation
def test_get_command_invocations_from_stack():
stack_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test Stack",
"Resources": {
"EC2Instance1": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-test-image-id",
"KeyName": "test",
"InstanceType": "t2.micro",
"Tags": [
{"Key": "Test Description", "Value": "Test tag"},
{"Key": "Test Name", "Value": "Name tag for tests"},
],
},
}
},
"Outputs": {
"test": {
"Description": "Test Output",
"Value": "Test output value",
"Export": {"Name": "Test value to export"},
},
"PublicIP": {"Value": "Test public ip"},
},
}
cloudformation_client = boto3.client("cloudformation", region_name="us-east-1")
stack_template_str = json.dumps(stack_template)
response = cloudformation_client.create_stack(
StackName="test_stack",
TemplateBody=stack_template_str,
Capabilities=("CAPABILITY_IAM",),
)
client = boto3.client("ssm", region_name="us-east-1")
ssm_document = "AWS-RunShellScript"
params = {"commands": ["#!/bin/bash\necho 'hello world'"]}
response = client.send_command(
Targets=[
{"Key": "tag:aws:cloudformation:stack-name", "Values": ("test_stack",)}
],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region="us-east-2",
OutputS3BucketName="the-bucket",
OutputS3KeyPrefix="pref",
)
cmd = response["Command"]
cmd_id = cmd["CommandId"]
instance_ids = cmd["InstanceIds"]
invocation_response = client.get_command_invocation(
CommandId=cmd_id, InstanceId=instance_ids[0], PluginName="aws:runShellScript"
)