Merge pull request #50 from spulec/master

Merge upstream
This commit is contained in:
Bert Blommers 2020-07-11 09:03:53 +01:00 committed by GitHub
commit 920d074bb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 2549 additions and 43 deletions

View File

@ -28,7 +28,27 @@ How to teach Moto to support a new AWS endpoint:
* If one doesn't already exist, create a new issue describing what's missing. This is where we'll all talk about the new addition and help you get it done. * If one doesn't already exist, create a new issue describing what's missing. This is where we'll all talk about the new addition and help you get it done.
* Create a [pull request](https://help.github.com/articles/using-pull-requests/) and mention the issue # in the PR description. * Create a [pull request](https://help.github.com/articles/using-pull-requests/) and mention the issue # in the PR description.
* Try to add a failing test case. For example, if you're trying to implement `boto3.client('acm').import_certificate()` you'll want to add a new method called `def test_import_certificate` to `tests/test_acm/test_acm.py`. * Try to add a failing test case. For example, if you're trying to implement `boto3.client('acm').import_certificate()` you'll want to add a new method called `def test_import_certificate` to `tests/test_acm/test_acm.py`.
* If you can also implement the code that gets that test passing that's great. If not, just ask the community for a hand and somebody will assist you. * Implementing the feature itself can be done by creating a method called `import_certificate` in `moto/acm/responses.py`. It's considered good practice to deal with input/output formatting and validation in `responses.py`, and create a method `import_certificate` in `moto/acm/models.py` that handles the actual import logic.
* If you can also implement the code that gets that test passing then great! If not, just ask the community for a hand and somebody will assist you.
## Before pushing changes to GitHub
1. Run `black moto/ tests/` over your code to ensure that it is properly formatted
1. Run `make test` to ensure your tests are passing
## Python versions
moto currently supports both Python 2 and 3, so make sure your tests pass against both major versions of Python.
## Missing services
Implementing a new service from scratch is more work, but still quite straightforward. All the code that intercepts network requests to `*.amazonaws.com` is already handled for you in `moto/core` - all that's necessary for new services to be recognized is to create a new decorator and determine which URLs should be intercepted.
See this PR for an example of what's involved in creating a new service: https://github.com/spulec/moto/pull/2409/files
Note the `urls.py` that redirects all incoming URL requests to a generic `dispatch` method, which in turn will call the appropriate method in `responses.py`.
If you want more control over incoming requests or their bodies, it is possible to redirect specific requests to a custom method. See this PR for an example: https://github.com/spulec/moto/pull/2957/files
## Maintainers ## Maintainers

View File

@ -459,18 +459,18 @@
## application-autoscaling ## application-autoscaling
<details> <details>
<summary>0% implemented</summary> <summary>20% implemented</summary>
- [ ] delete_scaling_policy - [ ] delete_scaling_policy
- [ ] delete_scheduled_action - [ ] delete_scheduled_action
- [ ] deregister_scalable_target - [ ] deregister_scalable_target
- [ ] describe_scalable_targets - [x] describe_scalable_targets
- [ ] describe_scaling_activities - [ ] describe_scaling_activities
- [ ] describe_scaling_policies - [ ] describe_scaling_policies
- [ ] describe_scheduled_actions - [ ] describe_scheduled_actions
- [ ] put_scaling_policy - [ ] put_scaling_policy
- [ ] put_scheduled_action - [ ] put_scheduled_action
- [ ] register_scalable_target - [x] register_scalable_target - includes enhanced validation support for ECS targets
</details> </details>
## application-insights ## application-insights
@ -7093,7 +7093,7 @@
- [X] delete_bucket_tagging - [X] delete_bucket_tagging
- [ ] delete_bucket_website - [ ] delete_bucket_website
- [X] delete_object - [X] delete_object
- [ ] delete_object_tagging - [x] delete_object_tagging
- [ ] delete_objects - [ ] delete_objects
- [ ] delete_public_access_block - [ ] delete_public_access_block
- [ ] get_bucket_accelerate_configuration - [ ] get_bucket_accelerate_configuration

View File

@ -65,6 +65,8 @@ It gets even better! Moto isn't just for Python code and it isn't just for S3. L
|-------------------------------------------------------------------------------------| | |-------------------------------------------------------------------------------------| |
| API Gateway | @mock_apigateway | core endpoints done | | | API Gateway | @mock_apigateway | core endpoints done | |
|-------------------------------------------------------------------------------------| | |-------------------------------------------------------------------------------------| |
| Application Autoscaling | @mock_applicationautoscaling | basic endpoints done | |
|-------------------------------------------------------------------------------------| |
| Autoscaling | @mock_autoscaling | core endpoints done | | | Autoscaling | @mock_autoscaling | core endpoints done | |
|-------------------------------------------------------------------------------------| | |-------------------------------------------------------------------------------------| |
| Cloudformation | @mock_cloudformation | core endpoints done | | | Cloudformation | @mock_cloudformation | core endpoints done | |

View File

@ -15,6 +15,9 @@ mock_acm = lazy_load(".acm", "mock_acm")
mock_apigateway = lazy_load(".apigateway", "mock_apigateway") mock_apigateway = lazy_load(".apigateway", "mock_apigateway")
mock_apigateway_deprecated = lazy_load(".apigateway", "mock_apigateway_deprecated") mock_apigateway_deprecated = lazy_load(".apigateway", "mock_apigateway_deprecated")
mock_athena = lazy_load(".athena", "mock_athena") mock_athena = lazy_load(".athena", "mock_athena")
mock_applicationautoscaling = lazy_load(
".applicationautoscaling", "mock_applicationautoscaling"
)
mock_autoscaling = lazy_load(".autoscaling", "mock_autoscaling") mock_autoscaling = lazy_load(".autoscaling", "mock_autoscaling")
mock_autoscaling_deprecated = lazy_load(".autoscaling", "mock_autoscaling_deprecated") mock_autoscaling_deprecated = lazy_load(".autoscaling", "mock_autoscaling_deprecated")
mock_lambda = lazy_load(".awslambda", "mock_lambda") mock_lambda = lazy_load(".awslambda", "mock_lambda")

View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import applicationautoscaling_backends
from ..core.models import base_decorator
applicationautoscaling_backend = applicationautoscaling_backends["us-east-1"]
mock_applicationautoscaling = base_decorator(applicationautoscaling_backends)

View File

@ -0,0 +1,22 @@
from __future__ import unicode_literals
import json
class AWSError(Exception):
""" Copied from acm/models.py; this class now exists in >5 locations,
maybe this should be centralised for use by any module?
"""
TYPE = None
STATUS = 400
def __init__(self, message):
self.message = message
def response(self):
resp = {"__type": self.TYPE, "message": self.message}
return json.dumps(resp), dict(status=self.STATUS)
class AWSValidationException(AWSError):
TYPE = "ValidationException"

View File

@ -0,0 +1,179 @@
from __future__ import unicode_literals
from moto.core import BaseBackend, BaseModel
from moto.ecs import ecs_backends
from .exceptions import AWSValidationException
from collections import OrderedDict
from enum import Enum, unique
import time
@unique
class ServiceNamespaceValueSet(Enum):
APPSTREAM = "appstream"
RDS = "rds"
LAMBDA = "lambda"
CASSANDRA = "cassandra"
DYNAMODB = "dynamodb"
CUSTOM_RESOURCE = "custom-resource"
ELASTICMAPREDUCE = "elasticmapreduce"
EC2 = "ec2"
COMPREHEND = "comprehend"
ECS = "ecs"
SAGEMAKER = "sagemaker"
@unique
class ScalableDimensionValueSet(Enum):
CASSANDRA_TABLE_READ_CAPACITY_UNITS = "cassandra:table:ReadCapacityUnits"
CASSANDRA_TABLE_WRITE_CAPACITY_UNITS = "cassandra:table:WriteCapacityUnits"
DYNAMODB_INDEX_READ_CAPACITY_UNITS = "dynamodb:index:ReadCapacityUnits"
DYNAMODB_INDEX_WRITE_CAPACITY_UNITS = "dynamodb:index:WriteCapacityUnits"
DYNAMODB_TABLE_READ_CAPACITY_UNITS = "dynamodb:table:ReadCapacityUnits"
DYNAMODB_TABLE_WRITE_CAPACITY_UNITS = "dynamodb:table:WriteCapacityUnits"
RDS_CLUSTER_READ_REPLICA_COUNT = "rds:cluster:ReadReplicaCount"
RDS_CLUSTER_CAPACITY = "rds:cluster:Capacity"
COMPREHEND_DOCUMENT_CLASSIFIER_ENDPOINT_DESIRED_INFERENCE_UNITS = (
"comprehend:document-classifier-endpoint:DesiredInferenceUnits"
)
ELASTICMAPREDUCE_INSTANCE_FLEET_ON_DEMAND_CAPACITY = (
"elasticmapreduce:instancefleet:OnDemandCapacity"
)
ELASTICMAPREDUCE_INSTANCE_FLEET_SPOT_CAPACITY = (
"elasticmapreduce:instancefleet:SpotCapacity"
)
ELASTICMAPREDUCE_INSTANCE_GROUP_INSTANCE_COUNT = (
"elasticmapreduce:instancegroup:InstanceCount"
)
LAMBDA_FUNCTION_PROVISIONED_CONCURRENCY = "lambda:function:ProvisionedConcurrency"
APPSTREAM_FLEET_DESIRED_CAPACITY = "appstream:fleet:DesiredCapacity"
CUSTOM_RESOURCE_RESOURCE_TYPE_PROPERTY = "custom-resource:ResourceType:Property"
SAGEMAKER_VARIANT_DESIRED_INSTANCE_COUNT = "sagemaker:variant:DesiredInstanceCount"
EC2_SPOT_FLEET_REQUEST_TARGET_CAPACITY = "ec2:spot-fleet-request:TargetCapacity"
ECS_SERVICE_DESIRED_COUNT = "ecs:service:DesiredCount"
class ApplicationAutoscalingBackend(BaseBackend):
def __init__(self, region, ecs):
super(ApplicationAutoscalingBackend, self).__init__()
self.region = region
self.ecs_backend = ecs
self.targets = OrderedDict()
def reset(self):
region = self.region
ecs = self.ecs_backend
self.__dict__ = {}
self.__init__(region, ecs)
@property
def applicationautoscaling_backend(self):
return applicationautoscaling_backends[self.region]
def describe_scalable_targets(
self, namespace, r_ids=None, dimension=None,
):
""" Describe scalable targets. """
if r_ids is None:
r_ids = []
targets = self._flatten_scalable_targets(namespace)
if dimension is not None:
targets = [t for t in targets if t.scalable_dimension == dimension]
if len(r_ids) > 0:
targets = [t for t in targets if t.resource_id in r_ids]
return targets
def _flatten_scalable_targets(self, namespace):
""" Flatten scalable targets for a given service namespace down to a list. """
targets = []
for dimension in self.targets.keys():
for resource_id in self.targets[dimension].keys():
targets.append(self.targets[dimension][resource_id])
targets = [t for t in targets if t.service_namespace == namespace]
return targets
def register_scalable_target(self, namespace, r_id, dimension, **kwargs):
""" Registers or updates a scalable target. """
_ = _target_params_are_valid(namespace, r_id, dimension)
if namespace == ServiceNamespaceValueSet.ECS.value:
_ = self._ecs_service_exists_for_target(r_id)
if self._scalable_target_exists(r_id, dimension):
target = self.targets[dimension][r_id]
target.update(kwargs)
else:
target = FakeScalableTarget(self, namespace, r_id, dimension, **kwargs)
self._add_scalable_target(target)
return target
def _scalable_target_exists(self, r_id, dimension):
return r_id in self.targets.get(dimension, [])
def _ecs_service_exists_for_target(self, r_id):
""" Raises a ValidationException if an ECS service does not exist
for the specified resource ID.
"""
resource_type, cluster, service = r_id.split("/")
result = self.ecs_backend.describe_services(cluster, [service])
if len(result) != 1:
raise AWSValidationException("ECS service doesn't exist: {}".format(r_id))
return True
def _add_scalable_target(self, target):
if target.scalable_dimension not in self.targets:
self.targets[target.scalable_dimension] = OrderedDict()
if target.resource_id not in self.targets[target.scalable_dimension]:
self.targets[target.scalable_dimension][target.resource_id] = target
return target
def _target_params_are_valid(namespace, r_id, dimension):
""" Check whether namespace, resource_id and dimension are valid and consistent with each other. """
is_valid = True
valid_namespaces = [n.value for n in ServiceNamespaceValueSet]
if namespace not in valid_namespaces:
is_valid = False
if dimension is not None:
try:
valid_dimensions = [d.value for d in ScalableDimensionValueSet]
d_namespace, d_resource_type, scaling_property = dimension.split(":")
resource_type, cluster, service = r_id.split("/")
if (
dimension not in valid_dimensions
or d_namespace != namespace
or resource_type != d_resource_type
):
is_valid = False
except ValueError:
is_valid = False
if not is_valid:
raise AWSValidationException(
"Unsupported service namespace, resource type or scalable dimension"
)
return is_valid
class FakeScalableTarget(BaseModel):
def __init__(
self, backend, service_namespace, resource_id, scalable_dimension, **kwargs
):
self.applicationautoscaling_backend = backend
self.service_namespace = service_namespace
self.resource_id = resource_id
self.scalable_dimension = scalable_dimension
self.min_capacity = kwargs["min_capacity"]
self.max_capacity = kwargs["max_capacity"]
self.role_arn = kwargs["role_arn"]
self.suspended_state = kwargs["suspended_state"]
self.creation_time = time.time()
def update(self, **kwargs):
if kwargs["min_capacity"] is not None:
self.min_capacity = kwargs["min_capacity"]
if kwargs["max_capacity"] is not None:
self.max_capacity = kwargs["max_capacity"]
applicationautoscaling_backends = {}
for region_name, ecs_backend in ecs_backends.items():
applicationautoscaling_backends[region_name] = ApplicationAutoscalingBackend(
region_name, ecs_backend
)

View File

@ -0,0 +1,97 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
import json
from .models import (
applicationautoscaling_backends,
ScalableDimensionValueSet,
ServiceNamespaceValueSet,
)
from .exceptions import AWSValidationException
class ApplicationAutoScalingResponse(BaseResponse):
@property
def applicationautoscaling_backend(self):
return applicationautoscaling_backends[self.region]
def describe_scalable_targets(self):
try:
self._validate_params()
except AWSValidationException as e:
return e.response()
service_namespace = self._get_param("ServiceNamespace")
resource_ids = self._get_param("ResourceIds")
scalable_dimension = self._get_param("ScalableDimension")
max_results = self._get_int_param("MaxResults", 50)
marker = self._get_param("NextToken")
all_scalable_targets = self.applicationautoscaling_backend.describe_scalable_targets(
service_namespace, resource_ids, scalable_dimension
)
start = int(marker) + 1 if marker else 0
next_token = None
scalable_targets_resp = all_scalable_targets[start : start + max_results]
if len(all_scalable_targets) > start + max_results:
next_token = str(len(scalable_targets_resp) - 1)
targets = [_build_target(t) for t in scalable_targets_resp]
return json.dumps({"ScalableTargets": targets, "NextToken": next_token})
def register_scalable_target(self):
""" Registers or updates a scalable target. """
try:
self._validate_params()
self.applicationautoscaling_backend.register_scalable_target(
self._get_param("ServiceNamespace"),
self._get_param("ResourceId"),
self._get_param("ScalableDimension"),
min_capacity=self._get_int_param("MinCapacity"),
max_capacity=self._get_int_param("MaxCapacity"),
role_arn=self._get_param("RoleARN"),
suspended_state=self._get_param("SuspendedState"),
)
except AWSValidationException as e:
return e.response()
return json.dumps({})
def _validate_params(self):
""" Validate parameters.
TODO Integrate this validation with the validation in models.py
"""
namespace = self._get_param("ServiceNamespace")
dimension = self._get_param("ScalableDimension")
messages = []
dimensions = [d.value for d in ScalableDimensionValueSet]
message = None
if dimension is not None and dimension not in dimensions:
messages.append(
"Value '{}' at 'scalableDimension' "
"failed to satisfy constraint: Member must satisfy enum value set: "
"{}".format(dimension, dimensions)
)
namespaces = [n.value for n in ServiceNamespaceValueSet]
if namespace is not None and namespace not in namespaces:
messages.append(
"Value '{}' at 'serviceNamespace' "
"failed to satisfy constraint: Member must satisfy enum value set: "
"{}".format(namespace, namespaces)
)
if len(messages) == 1:
message = "1 validation error detected: {}".format(messages[0])
elif len(messages) > 1:
message = "{} validation errors detected: {}".format(
len(messages), "; ".join(messages)
)
if message:
raise AWSValidationException(message)
def _build_target(t):
return {
"CreationTime": t.creation_time,
"ServiceNamespace": t.service_namespace,
"ResourceId": t.resource_id,
"RoleARN": t.role_arn,
"ScalableDimension": t.scalable_dimension,
"MaxCapacity": t.max_capacity,
"MinCapacity": t.min_capacity,
"SuspendedState": t.suspended_state,
}

View File

@ -0,0 +1,8 @@
from __future__ import unicode_literals
from .responses import ApplicationAutoScalingResponse
url_bases = ["https?://application-autoscaling.(.+).amazonaws.com"]
url_paths = {
"{0}/$": ApplicationAutoScalingResponse.dispatch,
}

View File

@ -0,0 +1,10 @@
from six.moves.urllib.parse import urlparse
def region_from_applicationautoscaling_url(url):
domain = urlparse(url).netloc
if "." in domain:
return domain.split(".")[1]
else:
return "us-east-1"

View File

@ -6,6 +6,10 @@ BACKENDS = {
"acm": ("acm", "acm_backends"), "acm": ("acm", "acm_backends"),
"apigateway": ("apigateway", "apigateway_backends"), "apigateway": ("apigateway", "apigateway_backends"),
"athena": ("athena", "athena_backends"), "athena": ("athena", "athena_backends"),
"applicationautoscaling": (
"applicationautoscaling",
"applicationautoscaling_backends",
),
"autoscaling": ("autoscaling", "autoscaling_backends"), "autoscaling": ("autoscaling", "autoscaling_backends"),
"batch": ("batch", "batch_backends"), "batch": ("batch", "batch_backends"),
"cloudformation": ("cloudformation", "cloudformation_backends"), "cloudformation": ("cloudformation", "cloudformation_backends"),

View File

@ -560,6 +560,23 @@ class ResourceMap(collections_abc.Mapping):
if value_type == "CommaDelimitedList" or value_type.startswith("List"): if value_type == "CommaDelimitedList" or value_type.startswith("List"):
value = value.split(",") value = value.split(",")
def _parse_number_parameter(num_string):
"""CloudFormation NUMBER types can be an int or float.
Try int first and then fall back to float if that fails
"""
try:
return int(num_string)
except ValueError:
return float(num_string)
if value_type == "List<Number>":
# The if statement directly above already converted
# to a list. Now we convert each element to a number
value = [_parse_number_parameter(v) for v in value]
if value_type == "Number":
value = _parse_number_parameter(value)
if parameter_slot.get("NoEcho"): if parameter_slot.get("NoEcho"):
self.no_echo_parameter_keys.append(key) self.no_echo_parameter_keys.append(key)

View File

@ -365,8 +365,8 @@ class CloudFormationResponse(BaseResponse):
except (ValueError, KeyError): except (ValueError, KeyError):
pass pass
try: try:
description = yaml.load(template_body)["Description"] description = yaml.load(template_body, Loader=yaml.Loader)["Description"]
except (yaml.ParserError, KeyError): except (yaml.parser.ParserError, yaml.scanner.ScannerError, KeyError):
pass pass
template = self.response_template(VALIDATE_STACK_RESPONSE_TEMPLATE) template = self.response_template(VALIDATE_STACK_RESPONSE_TEMPLATE)
return template.render(description=description) return template.render(description=description)

View File

@ -128,8 +128,12 @@ class CognitoIdpUserPool(BaseModel):
"exp": now + expires_in, "exp": now + expires_in,
} }
payload.update(extra_data) payload.update(extra_data)
headers = {"kid": "dummy"} # KID as present in jwks-public.json
return jws.sign(payload, self.json_web_key, algorithm="RS256"), expires_in return (
jws.sign(payload, self.json_web_key, headers, algorithm="RS256"),
expires_in,
)
def create_id_token(self, client_id, username): def create_id_token(self, client_id, username):
extra_data = self.get_user_extra_data_by_client_id(client_id, username) extra_data = self.get_user_extra_data_by_client_id(client_id, username)

View File

@ -5,5 +5,5 @@ url_bases = ["https?://cognito-idp.(.+).amazonaws.com"]
url_paths = { url_paths = {
"{0}/$": CognitoIdpResponse.dispatch, "{0}/$": CognitoIdpResponse.dispatch,
"{0}/<user_pool_id>/.well-known/jwks.json$": CognitoIdpJsonWebKeyResponse().serve_json_web_key, "{0}/(?P<user_pool_id>[^/]+)/.well-known/jwks.json$": CognitoIdpJsonWebKeyResponse().serve_json_web_key,
} }

View File

@ -272,7 +272,24 @@ class StreamShard(BaseModel):
return [i.to_json() for i in self.items[start:end]] return [i.to_json() for i in self.items[start:end]]
class LocalSecondaryIndex(BaseModel): class SecondaryIndex(BaseModel):
def project(self, item):
"""
Enforces the ProjectionType of this Index (LSI/GSI)
Removes any non-wanted attributes from the item
:param item:
:return:
"""
if self.projection:
if self.projection.get("ProjectionType", None) == "KEYS_ONLY":
allowed_attributes = ",".join(
[key["AttributeName"] for key in self.schema]
)
item.filter(allowed_attributes)
return item
class LocalSecondaryIndex(SecondaryIndex):
def __init__(self, index_name, schema, projection): def __init__(self, index_name, schema, projection):
self.name = index_name self.name = index_name
self.schema = schema self.schema = schema
@ -294,7 +311,7 @@ class LocalSecondaryIndex(BaseModel):
) )
class GlobalSecondaryIndex(BaseModel): class GlobalSecondaryIndex(SecondaryIndex):
def __init__( def __init__(
self, index_name, schema, projection, status="ACTIVE", throughput=None self, index_name, schema, projection, status="ACTIVE", throughput=None
): ):
@ -719,6 +736,10 @@ class Table(BaseModel):
results = [item for item in results if filter_expression.expr(item)] results = [item for item in results if filter_expression.expr(item)]
results = copy.deepcopy(results) results = copy.deepcopy(results)
if index_name:
index = self.get_index(index_name)
for result in results:
index.project(result)
if projection_expression: if projection_expression:
for result in results: for result in results:
result.filter(projection_expression) result.filter(projection_expression)
@ -739,11 +760,16 @@ class Table(BaseModel):
def all_indexes(self): def all_indexes(self):
return (self.global_indexes or []) + (self.indexes or []) return (self.global_indexes or []) + (self.indexes or [])
def has_idx_items(self, index_name): def get_index(self, index_name, err=None):
all_indexes = self.all_indexes() all_indexes = self.all_indexes()
indexes_by_name = dict((i.name, i) for i in all_indexes) indexes_by_name = dict((i.name, i) for i in all_indexes)
idx = indexes_by_name[index_name] if err and index_name not in indexes_by_name:
raise err
return indexes_by_name[index_name]
def has_idx_items(self, index_name):
idx = self.get_index(index_name)
idx_col_set = set([i["AttributeName"] for i in idx.schema]) idx_col_set = set([i["AttributeName"] for i in idx.schema])
for hash_set in self.items.values(): for hash_set in self.items.values():
@ -766,14 +792,12 @@ class Table(BaseModel):
): ):
results = [] results = []
scanned_count = 0 scanned_count = 0
all_indexes = self.all_indexes()
indexes_by_name = dict((i.name, i) for i in all_indexes)
if index_name: if index_name:
if index_name not in indexes_by_name: err = InvalidIndexNameError(
raise InvalidIndexNameError( "The table does not have the specified index: %s" % index_name
"The table does not have the specified index: %s" % index_name )
) self.get_index(index_name, err)
items = self.has_idx_items(index_name) items = self.has_idx_items(index_name)
else: else:
items = self.all_items() items = self.all_items()
@ -847,9 +871,7 @@ class Table(BaseModel):
last_evaluated_key[self.range_key_attr] = results[-1].range_key last_evaluated_key[self.range_key_attr] = results[-1].range_key
if scanned_index: if scanned_index:
all_indexes = self.all_indexes() idx = self.get_index(scanned_index)
indexes_by_name = dict((i.name, i) for i in all_indexes)
idx = indexes_by_name[scanned_index]
idx_col_list = [i["AttributeName"] for i in idx.schema] idx_col_list = [i["AttributeName"] for i in idx.schema]
for col in idx_col_list: for col in idx_col_list:
last_evaluated_key[col] = results[-1].attrs[col] last_evaluated_key[col] = results[-1].attrs[col]

View File

@ -3547,6 +3547,7 @@ class Route(object):
self, self,
route_table, route_table,
destination_cidr_block, destination_cidr_block,
destination_ipv6_cidr_block,
local=False, local=False,
gateway=None, gateway=None,
instance=None, instance=None,
@ -3554,9 +3555,12 @@ class Route(object):
interface=None, interface=None,
vpc_pcx=None, vpc_pcx=None,
): ):
self.id = generate_route_id(route_table.id, destination_cidr_block) self.id = generate_route_id(
route_table.id, destination_cidr_block, destination_ipv6_cidr_block
)
self.route_table = route_table self.route_table = route_table
self.destination_cidr_block = destination_cidr_block self.destination_cidr_block = destination_cidr_block
self.destination_ipv6_cidr_block = destination_ipv6_cidr_block
self.local = local self.local = local
self.gateway = gateway self.gateway = gateway
self.instance = instance self.instance = instance
@ -3632,6 +3636,7 @@ class RouteBackend(object):
self, self,
route_table_id, route_table_id,
destination_cidr_block, destination_cidr_block,
destination_ipv6_cidr_block=None,
local=False, local=False,
gateway_id=None, gateway_id=None,
instance_id=None, instance_id=None,
@ -3656,9 +3661,10 @@ class RouteBackend(object):
gateway = self.get_internet_gateway(gateway_id) gateway = self.get_internet_gateway(gateway_id)
try: try:
ipaddress.IPv4Network( if destination_cidr_block:
six.text_type(destination_cidr_block), strict=False ipaddress.IPv4Network(
) six.text_type(destination_cidr_block), strict=False
)
except ValueError: except ValueError:
raise InvalidDestinationCIDRBlockParameterError(destination_cidr_block) raise InvalidDestinationCIDRBlockParameterError(destination_cidr_block)
@ -3668,6 +3674,7 @@ class RouteBackend(object):
route = Route( route = Route(
route_table, route_table,
destination_cidr_block, destination_cidr_block,
destination_ipv6_cidr_block,
local=local, local=local,
gateway=gateway, gateway=gateway,
instance=self.get_instance(instance_id) if instance_id else None, instance=self.get_instance(instance_id) if instance_id else None,

View File

@ -16,6 +16,7 @@ class RouteTables(BaseResponse):
def create_route(self): def create_route(self):
route_table_id = self._get_param("RouteTableId") route_table_id = self._get_param("RouteTableId")
destination_cidr_block = self._get_param("DestinationCidrBlock") destination_cidr_block = self._get_param("DestinationCidrBlock")
destination_ipv6_cidr_block = self._get_param("DestinationIpv6CidrBlock")
gateway_id = self._get_param("GatewayId") gateway_id = self._get_param("GatewayId")
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
nat_gateway_id = self._get_param("NatGatewayId") nat_gateway_id = self._get_param("NatGatewayId")
@ -25,6 +26,7 @@ class RouteTables(BaseResponse):
self.ec2_backend.create_route( self.ec2_backend.create_route(
route_table_id, route_table_id,
destination_cidr_block, destination_cidr_block,
destination_ipv6_cidr_block,
gateway_id=gateway_id, gateway_id=gateway_id,
instance_id=instance_id, instance_id=instance_id,
nat_gateway_id=nat_gateway_id, nat_gateway_id=nat_gateway_id,

View File

@ -86,6 +86,7 @@ DESCRIBE_VPC_PEERING_CONNECTIONS_RESPONSE = (
<ownerId>777788889999</ownerId> <ownerId>777788889999</ownerId>
<vpcId>{{ vpc_pcx.vpc.id }}</vpcId> <vpcId>{{ vpc_pcx.vpc.id }}</vpcId>
<cidrBlock>{{ vpc_pcx.vpc.cidr_block }}</cidrBlock> <cidrBlock>{{ vpc_pcx.vpc.cidr_block }}</cidrBlock>
<region>{{ vpc_pcx.vpc.ec2_backend.region_name }}</region>
</requesterVpcInfo> </requesterVpcInfo>
<accepterVpcInfo> <accepterVpcInfo>
<ownerId>""" <ownerId>"""
@ -98,6 +99,7 @@ DESCRIBE_VPC_PEERING_CONNECTIONS_RESPONSE = (
<allowEgressFromLocalVpcToRemoteClassicLink>true</allowEgressFromLocalVpcToRemoteClassicLink> <allowEgressFromLocalVpcToRemoteClassicLink>true</allowEgressFromLocalVpcToRemoteClassicLink>
<allowDnsResolutionFromRemoteVpc>false</allowDnsResolutionFromRemoteVpc> <allowDnsResolutionFromRemoteVpc>false</allowDnsResolutionFromRemoteVpc>
</peeringOptions> </peeringOptions>
<region>{{ vpc_pcx.peer_vpc.ec2_backend.region_name }}</region>
</accepterVpcInfo> </accepterVpcInfo>
<status> <status>
<code>{{ vpc_pcx._status.code }}</code> <code>{{ vpc_pcx._status.code }}</code>
@ -128,6 +130,7 @@ ACCEPT_VPC_PEERING_CONNECTION_RESPONSE = (
<ownerId>777788889999</ownerId> <ownerId>777788889999</ownerId>
<vpcId>{{ vpc_pcx.vpc.id }}</vpcId> <vpcId>{{ vpc_pcx.vpc.id }}</vpcId>
<cidrBlock>{{ vpc_pcx.vpc.cidr_block }}</cidrBlock> <cidrBlock>{{ vpc_pcx.vpc.cidr_block }}</cidrBlock>
<region>{{ vpc_pcx.vpc.ec2_backend.region_name }}</region>
</requesterVpcInfo> </requesterVpcInfo>
<accepterVpcInfo> <accepterVpcInfo>
<ownerId>""" <ownerId>"""
@ -140,6 +143,7 @@ ACCEPT_VPC_PEERING_CONNECTION_RESPONSE = (
<allowEgressFromLocalVpcToRemoteClassicLink>false</allowEgressFromLocalVpcToRemoteClassicLink> <allowEgressFromLocalVpcToRemoteClassicLink>false</allowEgressFromLocalVpcToRemoteClassicLink>
<allowDnsResolutionFromRemoteVpc>false</allowDnsResolutionFromRemoteVpc> <allowDnsResolutionFromRemoteVpc>false</allowDnsResolutionFromRemoteVpc>
</peeringOptions> </peeringOptions>
<region>{{ vpc_pcx.peer_vpc.ec2_backend.region_name }}</region>
</accepterVpcInfo> </accepterVpcInfo>
<status> <status>
<code>{{ vpc_pcx._status.code }}</code> <code>{{ vpc_pcx._status.code }}</code>

View File

@ -189,7 +189,9 @@ def random_ipv6_cidr():
return "2400:6500:{}:{}::/56".format(random_resource_id(4), random_resource_id(4)) return "2400:6500:{}:{}::/56".format(random_resource_id(4), random_resource_id(4))
def generate_route_id(route_table_id, cidr_block): def generate_route_id(route_table_id, cidr_block, ipv6_cidr_block=None):
if ipv6_cidr_block and not cidr_block:
cidr_block = ipv6_cidr_block
return "%s~%s" % (route_table_id, cidr_block) return "%s~%s" % (route_table_id, cidr_block)

View File

@ -36,3 +36,13 @@ class DBSubnetGroupNotFoundError(RDSClientError):
"DBSubnetGroupNotFound", "DBSubnetGroupNotFound",
"Subnet Group {0} not found.".format(subnet_group_name), "Subnet Group {0} not found.".format(subnet_group_name),
) )
class UnformattedGetAttTemplateException(Exception):
"""Duplicated from CloudFormation to prevent circular deps."""
description = (
"Template error: resource {0} does not support attribute type {1} in Fn::GetAtt"
)
status_code = 400

View File

@ -3,10 +3,10 @@ from __future__ import unicode_literals
import boto.rds import boto.rds
from jinja2 import Template from jinja2 import Template
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import get_random_hex from moto.core.utils import get_random_hex
from moto.ec2.models import ec2_backends from moto.ec2.models import ec2_backends
from moto.rds.exceptions import UnformattedGetAttTemplateException
from moto.rds2.models import rds2_backends from moto.rds2.models import rds2_backends

View File

@ -1566,6 +1566,10 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.keys[key_name] = FakeDeleteMarker(key=bucket.keys[key_name]) bucket.keys[key_name] = FakeDeleteMarker(key=bucket.keys[key_name])
def delete_object_tagging(self, bucket_name, key_name, version_id=None):
key = self.get_object(bucket_name, key_name, version_id=version_id)
self.tagger.delete_all_tags_for_resource(key.arn)
def delete_object(self, bucket_name, key_name, version_id=None): def delete_object(self, bucket_name, key_name, version_id=None):
key_name = clean_key_name(key_name) key_name = clean_key_name(key_name)
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)

View File

@ -1618,6 +1618,12 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.backend.cancel_multipart(bucket_name, upload_id) self.backend.cancel_multipart(bucket_name, upload_id)
return 204, {}, "" return 204, {}, ""
version_id = query.get("versionId", [None])[0] version_id = query.get("versionId", [None])[0]
if "tagging" in query:
self.backend.delete_object_tagging(
bucket_name, key_name, version_id=version_id
)
template = self.response_template(S3_DELETE_KEY_TAGGING_RESPONSE)
return 204, {}, template.render(version_id=version_id)
self.backend.delete_object(bucket_name, key_name, version_id=version_id) self.backend.delete_object(bucket_name, key_name, version_id=version_id)
return 204, {}, "" return 204, {}, ""
@ -1935,6 +1941,12 @@ S3_DELETE_KEYS_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
{% endfor %} {% endfor %}
</DeleteResult>""" </DeleteResult>"""
S3_DELETE_KEY_TAGGING_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<DeleteObjectTaggingResult xmlns="http://s3.amazonaws.com/doc/2006-03-01">
<VersionId>{{version_id}}</VersionId>
</DeleteObjectTaggingResult>
"""
S3_OBJECT_ACL_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?> S3_OBJECT_ACL_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> <AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Owner> <Owner>

View File

@ -53,3 +53,58 @@ class ValidationException(JsonRESTError):
def __init__(self, message): def __init__(self, message):
super(ValidationException, self).__init__("ValidationException", message) super(ValidationException, self).__init__("ValidationException", message)
class DocumentAlreadyExists(JsonRESTError):
code = 400
def __init__(self, message):
super(DocumentAlreadyExists, self).__init__("DocumentAlreadyExists", message)
class InvalidDocument(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocument, self).__init__("InvalidDocument", message)
class InvalidDocumentOperation(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocumentOperation, self).__init__(
"InvalidDocumentOperation", message
)
class InvalidDocumentContent(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocumentContent, self).__init__("InvalidDocumentContent", message)
class InvalidDocumentVersion(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocumentVersion, self).__init__("InvalidDocumentVersion", message)
class DuplicateDocumentVersionName(JsonRESTError):
code = 400
def __init__(self, message):
super(DuplicateDocumentVersionName, self).__init__(
"DuplicateDocumentVersionName", message
)
class DuplicateDocumentContent(JsonRESTError):
code = 400
def __init__(self, message):
super(DuplicateDocumentContent, self).__init__(
"DuplicateDocumentContent", message
)

View File

@ -1,17 +1,20 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import re import re
from boto3 import Session
from collections import defaultdict from collections import defaultdict
from moto.core import BaseBackend, BaseModel from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.exceptions import RESTError from moto.core.exceptions import RESTError
from moto.ec2 import ec2_backends
from moto.cloudformation import cloudformation_backends from moto.cloudformation import cloudformation_backends
import datetime import datetime
import time import time
import uuid import uuid
import itertools import itertools
import json
import yaml
import hashlib
from .utils import parameter_arn from .utils import parameter_arn
from .exceptions import ( from .exceptions import (
@ -22,6 +25,13 @@ from .exceptions import (
ParameterVersionLabelLimitExceeded, ParameterVersionLabelLimitExceeded,
ParameterVersionNotFound, ParameterVersionNotFound,
ParameterNotFound, ParameterNotFound,
DocumentAlreadyExists,
InvalidDocumentOperation,
InvalidDocument,
InvalidDocumentContent,
InvalidDocumentVersion,
DuplicateDocumentVersionName,
DuplicateDocumentContent,
) )
@ -102,6 +112,108 @@ class Parameter(BaseModel):
MAX_TIMEOUT_SECONDS = 3600 MAX_TIMEOUT_SECONDS = 3600
def generate_ssm_doc_param_list(parameters):
if not parameters:
return None
param_list = []
for param_name, param_info in parameters.items():
final_dict = {}
final_dict["Name"] = param_name
final_dict["Type"] = param_info["type"]
final_dict["Description"] = param_info["description"]
if (
param_info["type"] == "StringList"
or param_info["type"] == "StringMap"
or param_info["type"] == "MapList"
):
final_dict["DefaultValue"] = json.dumps(param_info["default"])
else:
final_dict["DefaultValue"] = str(param_info["default"])
param_list.append(final_dict)
return param_list
class Document(BaseModel):
def __init__(
self,
name,
version_name,
content,
document_type,
document_format,
requires,
attachments,
target_type,
tags,
document_version="1",
):
self.name = name
self.version_name = version_name
self.content = content
self.document_type = document_type
self.document_format = document_format
self.requires = requires
self.attachments = attachments
self.target_type = target_type
self.tags = tags
self.status = "Active"
self.document_version = document_version
self.owner = ACCOUNT_ID
self.created_date = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
if document_format == "JSON":
try:
content_json = json.loads(content)
except ValueError:
# Python2
raise InvalidDocumentContent(
"The content for the document is not valid."
)
except json.decoder.JSONDecodeError:
raise InvalidDocumentContent(
"The content for the document is not valid."
)
elif document_format == "YAML":
try:
content_json = yaml.safe_load(content)
except yaml.YAMLError:
raise InvalidDocumentContent(
"The content for the document is not valid."
)
else:
raise ValidationException("Invalid document format " + str(document_format))
self.content_json = content_json
try:
self.schema_version = str(content_json["schemaVersion"])
self.description = content_json.get("description")
self.outputs = content_json.get("outputs")
self.files = content_json.get("files")
# TODO add platformType (requires mapping the ssm actions to OS's this isn't well documented)
self.platform_types = ["Not Implemented (moto)"]
self.parameter_list = generate_ssm_doc_param_list(
content_json.get("parameters")
)
if (
self.schema_version == "0.3"
or self.schema_version == "2.0"
or self.schema_version == "2.2"
):
self.mainSteps = content_json["mainSteps"]
elif self.schema_version == "1.2":
self.runtimeConfig = content_json.get("runtimeConfig")
except KeyError:
raise InvalidDocumentContent("The content for the document is not valid.")
class Command(BaseModel): class Command(BaseModel):
def __init__( def __init__(
self, self,
@ -269,8 +381,96 @@ class Command(BaseModel):
return invocation return invocation
def _validate_document_format(document_format):
aws_doc_formats = ["JSON", "YAML"]
if document_format not in aws_doc_formats:
raise ValidationException("Invalid document format " + str(document_format))
def _validate_document_info(content, name, document_type, document_format, strict=True):
aws_ssm_name_regex = r"^[a-zA-Z0-9_\-.]{3,128}$"
aws_name_reject_list = ["aws-", "amazon", "amzn"]
aws_doc_types = [
"Command",
"Policy",
"Automation",
"Session",
"Package",
"ApplicationConfiguration",
"ApplicationConfigurationSchema",
"DeploymentStrategy",
"ChangeCalendar",
]
_validate_document_format(document_format)
if not content:
raise ValidationException("Content is required")
if list(filter(name.startswith, aws_name_reject_list)):
raise ValidationException("Invalid document name " + str(name))
ssm_name_pattern = re.compile(aws_ssm_name_regex)
if not ssm_name_pattern.match(name):
raise ValidationException("Invalid document name " + str(name))
if strict and document_type not in aws_doc_types:
# Update document doesn't use document type
raise ValidationException("Invalid document type " + str(document_type))
def _document_filter_equal_comparator(keyed_value, filter):
for v in filter["Values"]:
if keyed_value == v:
return True
return False
def _document_filter_list_includes_comparator(keyed_value_list, filter):
for v in filter["Values"]:
if v in keyed_value_list:
return True
return False
def _document_filter_match(filters, ssm_doc):
for filter in filters:
if filter["Key"] == "Name" and not _document_filter_equal_comparator(
ssm_doc.name, filter
):
return False
elif filter["Key"] == "Owner":
if len(filter["Values"]) != 1:
raise ValidationException("Owner filter can only have one value.")
if filter["Values"][0] == "Self":
# Update to running account ID
filter["Values"][0] = ACCOUNT_ID
if not _document_filter_equal_comparator(ssm_doc.owner, filter):
return False
elif filter[
"Key"
] == "PlatformTypes" and not _document_filter_list_includes_comparator(
ssm_doc.platform_types, filter
):
return False
elif filter["Key"] == "DocumentType" and not _document_filter_equal_comparator(
ssm_doc.document_type, filter
):
return False
elif filter["Key"] == "TargetType" and not _document_filter_equal_comparator(
ssm_doc.target_type, filter
):
return False
return True
class SimpleSystemManagerBackend(BaseBackend): class SimpleSystemManagerBackend(BaseBackend):
def __init__(self): def __init__(self, region_name=None):
super(SimpleSystemManagerBackend, self).__init__()
# each value is a list of all of the versions for a parameter # each value is a list of all of the versions for a parameter
# to get the current value, grab the last item of the list # to get the current value, grab the last item of the list
self._parameters = defaultdict(list) self._parameters = defaultdict(list)
@ -278,11 +478,356 @@ class SimpleSystemManagerBackend(BaseBackend):
self._resource_tags = defaultdict(lambda: defaultdict(dict)) self._resource_tags = defaultdict(lambda: defaultdict(dict))
self._commands = [] self._commands = []
self._errors = [] self._errors = []
self._documents = defaultdict(dict)
# figure out what region we're in self._region = region_name
for region, backend in ssm_backends.items():
if backend == self: def reset(self):
self._region = region region_name = self._region
self.__dict__ = {}
self.__init__(region_name)
def _generate_document_description(self, document):
latest = self._documents[document.name]["latest_version"]
default_version = self._documents[document.name]["default_version"]
base = {
"Hash": hashlib.sha256(document.content.encode("utf-8")).hexdigest(),
"HashType": "Sha256",
"Name": document.name,
"Owner": document.owner,
"CreatedDate": document.created_date,
"Status": document.status,
"DocumentVersion": document.document_version,
"Description": document.description,
"Parameters": document.parameter_list,
"PlatformTypes": document.platform_types,
"DocumentType": document.document_type,
"SchemaVersion": document.schema_version,
"LatestVersion": latest,
"DefaultVersion": default_version,
"DocumentFormat": document.document_format,
}
if document.version_name:
base["VersionName"] = document.version_name
if document.target_type:
base["TargetType"] = document.target_type
if document.tags:
base["Tags"] = document.tags
return base
def _generate_document_information(self, ssm_document, document_format):
base = {
"Name": ssm_document.name,
"DocumentVersion": ssm_document.document_version,
"Status": ssm_document.status,
"Content": ssm_document.content,
"DocumentType": ssm_document.document_type,
"DocumentFormat": document_format,
}
if document_format == "JSON":
base["Content"] = json.dumps(ssm_document.content_json)
elif document_format == "YAML":
base["Content"] = yaml.dump(ssm_document.content_json)
else:
raise ValidationException("Invalid document format " + str(document_format))
if ssm_document.version_name:
base["VersionName"] = ssm_document.version_name
if ssm_document.requires:
base["Requires"] = ssm_document.requires
if ssm_document.attachments:
base["AttachmentsContent"] = ssm_document.attachments
return base
def _generate_document_list_information(self, ssm_document):
base = {
"Name": ssm_document.name,
"Owner": ssm_document.owner,
"DocumentVersion": ssm_document.document_version,
"DocumentType": ssm_document.document_type,
"SchemaVersion": ssm_document.schema_version,
"DocumentFormat": ssm_document.document_format,
}
if ssm_document.version_name:
base["VersionName"] = ssm_document.version_name
if ssm_document.platform_types:
base["PlatformTypes"] = ssm_document.platform_types
if ssm_document.target_type:
base["TargetType"] = ssm_document.target_type
if ssm_document.tags:
base["Tags"] = ssm_document.tags
if ssm_document.requires:
base["Requires"] = ssm_document.requires
return base
def create_document(
self,
content,
requires,
attachments,
name,
version_name,
document_type,
document_format,
target_type,
tags,
):
ssm_document = Document(
name=name,
version_name=version_name,
content=content,
document_type=document_type,
document_format=document_format,
requires=requires,
attachments=attachments,
target_type=target_type,
tags=tags,
)
_validate_document_info(
content=content,
name=name,
document_type=document_type,
document_format=document_format,
)
if self._documents.get(ssm_document.name):
raise DocumentAlreadyExists("The specified document already exists.")
self._documents[ssm_document.name] = {
"documents": {ssm_document.document_version: ssm_document},
"default_version": ssm_document.document_version,
"latest_version": ssm_document.document_version,
}
return self._generate_document_description(ssm_document)
def delete_document(self, name, document_version, version_name, force):
documents = self._documents.get(name, {}).get("documents", {})
keys_to_delete = set()
if documents:
default_version = self._documents[name]["default_version"]
if (
documents[default_version].document_type
== "ApplicationConfigurationSchema"
and not force
):
raise InvalidDocumentOperation(
"You attempted to delete a document while it is still shared. "
"You must stop sharing the document before you can delete it."
)
if document_version and document_version == default_version:
raise InvalidDocumentOperation(
"Default version of the document can't be deleted."
)
if document_version or version_name:
# We delete only a specific version
delete_doc = self._find_document(name, document_version, version_name)
# we can't delete only the default version
if (
delete_doc
and delete_doc.document_version == default_version
and len(documents) != 1
):
raise InvalidDocumentOperation(
"Default version of the document can't be deleted."
)
if delete_doc:
keys_to_delete.add(delete_doc.document_version)
else:
raise InvalidDocument("The specified document does not exist.")
else:
# We are deleting all versions
keys_to_delete = set(documents.keys())
for key in keys_to_delete:
del self._documents[name]["documents"][key]
if len(self._documents[name]["documents"].keys()) == 0:
del self._documents[name]
else:
old_latest = self._documents[name]["latest_version"]
if old_latest not in self._documents[name]["documents"].keys():
leftover_keys = self._documents[name]["documents"].keys()
int_keys = []
for key in leftover_keys:
int_keys.append(int(key))
self._documents[name]["latest_version"] = str(sorted(int_keys)[-1])
else:
raise InvalidDocument("The specified document does not exist.")
def _find_document(
self, name, document_version=None, version_name=None, strict=True
):
if not self._documents.get(name):
raise InvalidDocument("The specified document does not exist.")
documents = self._documents[name]["documents"]
ssm_document = None
if not version_name and not document_version:
# Retrieve default version
default_version = self._documents[name]["default_version"]
ssm_document = documents.get(default_version)
elif version_name and document_version:
for doc_version, document in documents.items():
if (
doc_version == document_version
and document.version_name == version_name
):
ssm_document = document
break
else:
for doc_version, document in documents.items():
if document_version and doc_version == document_version:
ssm_document = document
break
if version_name and document.version_name == version_name:
ssm_document = document
break
if strict and not ssm_document:
raise InvalidDocument("The specified document does not exist.")
return ssm_document
def get_document(self, name, document_version, version_name, document_format):
ssm_document = self._find_document(name, document_version, version_name)
if not document_format:
document_format = ssm_document.document_format
else:
_validate_document_format(document_format=document_format)
return self._generate_document_information(ssm_document, document_format)
def update_document_default_version(self, name, document_version):
ssm_document = self._find_document(name, document_version=document_version)
self._documents[name]["default_version"] = document_version
base = {
"Name": ssm_document.name,
"DefaultVersion": document_version,
}
if ssm_document.version_name:
base["DefaultVersionName"] = ssm_document.version_name
return base
def update_document(
self,
content,
attachments,
name,
version_name,
document_version,
document_format,
target_type,
):
_validate_document_info(
content=content,
name=name,
document_type=None,
document_format=document_format,
strict=False,
)
if not self._documents.get(name):
raise InvalidDocument("The specified document does not exist.")
if (
self._documents[name]["latest_version"] != document_version
and document_version != "$LATEST"
):
raise InvalidDocumentVersion(
"The document version is not valid or does not exist."
)
if version_name and self._find_document(
name, version_name=version_name, strict=False
):
raise DuplicateDocumentVersionName(
"The specified version name is a duplicate."
)
old_ssm_document = self._find_document(name)
new_ssm_document = Document(
name=name,
version_name=version_name,
content=content,
document_type=old_ssm_document.document_type,
document_format=document_format,
requires=old_ssm_document.requires,
attachments=attachments,
target_type=target_type,
tags=old_ssm_document.tags,
document_version=str(int(self._documents[name]["latest_version"]) + 1),
)
for doc_version, document in self._documents[name]["documents"].items():
if document.content == new_ssm_document.content:
raise DuplicateDocumentContent(
"The content of the association document matches another document. "
"Change the content of the document and try again."
)
self._documents[name]["latest_version"] = str(
int(self._documents[name]["latest_version"]) + 1
)
self._documents[name]["documents"][
new_ssm_document.document_version
] = new_ssm_document
return self._generate_document_description(new_ssm_document)
def describe_document(self, name, document_version, version_name):
ssm_document = self._find_document(name, document_version, version_name)
return self._generate_document_description(ssm_document)
def list_documents(
self, document_filter_list, filters, max_results=10, next_token="0"
):
if document_filter_list:
raise ValidationException(
"DocumentFilterList is deprecated. Instead use Filters."
)
next_token = int(next_token)
results = []
dummy_token_tracker = 0
# Sort to maintain next token adjacency
for document_name, document_bundle in sorted(self._documents.items()):
if len(results) == max_results:
# There's still more to go so we need a next token
return results, str(next_token + len(results))
if dummy_token_tracker < next_token:
dummy_token_tracker = dummy_token_tracker + 1
continue
default_version = document_bundle["default_version"]
ssm_doc = self._documents[document_name]["documents"][default_version]
if filters and not _document_filter_match(filters, ssm_doc):
# If we have filters enabled, and we don't match them,
continue
else:
results.append(self._generate_document_list_information(ssm_doc))
# If we've fallen out of the loop, theres no more documents. No next token.
return results, ""
def delete_parameter(self, name): def delete_parameter(self, name):
return self._parameters.pop(name, None) return self._parameters.pop(name, None)
@ -804,5 +1349,9 @@ class SimpleSystemManagerBackend(BaseBackend):
ssm_backends = {} ssm_backends = {}
for region, ec2_backend in ec2_backends.items(): for region in Session().get_available_regions("ssm"):
ssm_backends[region] = SimpleSystemManagerBackend() ssm_backends[region] = SimpleSystemManagerBackend(region)
for region in Session().get_available_regions("ssm", partition_name="aws-us-gov"):
ssm_backends[region] = SimpleSystemManagerBackend(region)
for region in Session().get_available_regions("ssm", partition_name="aws-cn"):
ssm_backends[region] = SimpleSystemManagerBackend(region)

View File

@ -17,6 +17,116 @@ class SimpleSystemManagerResponse(BaseResponse):
except ValueError: except ValueError:
return {} return {}
def create_document(self):
content = self._get_param("Content")
requires = self._get_param("Requires")
attachments = self._get_param("Attachments")
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_type = self._get_param("DocumentType")
document_format = self._get_param("DocumentFormat", "JSON")
target_type = self._get_param("TargetType")
tags = self._get_param("Tags")
result = self.ssm_backend.create_document(
content=content,
requires=requires,
attachments=attachments,
name=name,
version_name=version_name,
document_type=document_type,
document_format=document_format,
target_type=target_type,
tags=tags,
)
return json.dumps({"DocumentDescription": result})
def delete_document(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
version_name = self._get_param("VersionName")
force = self._get_param("Force", False)
self.ssm_backend.delete_document(
name=name,
document_version=document_version,
version_name=version_name,
force=force,
)
return json.dumps({})
def get_document(self):
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_version = self._get_param("DocumentVersion")
document_format = self._get_param("DocumentFormat", "JSON")
document = self.ssm_backend.get_document(
name=name,
document_version=document_version,
document_format=document_format,
version_name=version_name,
)
return json.dumps(document)
def describe_document(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
version_name = self._get_param("VersionName")
result = self.ssm_backend.describe_document(
name=name, document_version=document_version, version_name=version_name
)
return json.dumps({"Document": result})
def update_document(self):
content = self._get_param("Content")
attachments = self._get_param("Attachments")
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_version = self._get_param("DocumentVersion")
document_format = self._get_param("DocumentFormat", "JSON")
target_type = self._get_param("TargetType")
result = self.ssm_backend.update_document(
content=content,
attachments=attachments,
name=name,
version_name=version_name,
document_version=document_version,
document_format=document_format,
target_type=target_type,
)
return json.dumps({"DocumentDescription": result})
def update_document_default_version(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
result = self.ssm_backend.update_document_default_version(
name=name, document_version=document_version
)
return json.dumps({"Description": result})
def list_documents(self):
document_filter_list = self._get_param("DocumentFilterList")
filters = self._get_param("Filters")
max_results = self._get_param("MaxResults", 10)
next_token = self._get_param("NextToken", "0")
documents, token = self.ssm_backend.list_documents(
document_filter_list=document_filter_list,
filters=filters,
max_results=max_results,
next_token=next_token,
)
return json.dumps({"DocumentIdentifiers": documents, "NextToken": token})
def _get_param(self, param, default=None): def _get_param(self, param, default=None):
return self.request_params.get(param, default) return self.request_params.get(param, default)

View File

@ -0,0 +1 @@
from __future__ import unicode_literals

View File

@ -0,0 +1,189 @@
from __future__ import unicode_literals
import boto3
from moto import mock_applicationautoscaling, mock_ecs
import sure # noqa
from nose.tools import with_setup
DEFAULT_REGION = "us-east-1"
DEFAULT_ECS_CLUSTER = "default"
DEFAULT_ECS_TASK = "test_ecs_task"
DEFAULT_ECS_SERVICE = "sample-webapp"
DEFAULT_SERVICE_NAMESPACE = "ecs"
DEFAULT_RESOURCE_ID = "service/{}/{}".format(DEFAULT_ECS_CLUSTER, DEFAULT_ECS_SERVICE)
DEFAULT_SCALABLE_DIMENSION = "ecs:service:DesiredCount"
DEFAULT_MIN_CAPACITY = 1
DEFAULT_MAX_CAPACITY = 1
DEFAULT_ROLE_ARN = "test:arn"
DEFAULT_SUSPENDED_STATE = {
"DynamicScalingInSuspended": True,
"DynamicScalingOutSuspended": True,
"ScheduledScalingSuspended": True,
}
def _create_ecs_defaults(ecs, create_service=True):
_ = ecs.create_cluster(clusterName=DEFAULT_ECS_CLUSTER)
_ = ecs.register_task_definition(
family=DEFAULT_ECS_TASK,
containerDefinitions=[
{
"name": "hello_world",
"image": "docker/hello-world:latest",
"cpu": 1024,
"memory": 400,
"essential": True,
"environment": [
{"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
],
"logConfiguration": {"logDriver": "json-file"},
}
],
)
if create_service:
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName=DEFAULT_ECS_SERVICE,
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_one_basic_ecs_success():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
client.register_scalable_target(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE,
ResourceId=DEFAULT_RESOURCE_ID,
ScalableDimension=DEFAULT_SCALABLE_DIMENSION,
)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(1)
t = response["ScalableTargets"][0]
t.should.have.key("ServiceNamespace").which.should.equal(DEFAULT_SERVICE_NAMESPACE)
t.should.have.key("ResourceId").which.should.equal(DEFAULT_RESOURCE_ID)
t.should.have.key("ScalableDimension").which.should.equal(
DEFAULT_SCALABLE_DIMENSION
)
t.should.have.key("CreationTime").which.should.be.a("datetime.datetime")
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_one_full_ecs_success():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
register_scalable_target(client)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(1)
t = response["ScalableTargets"][0]
t.should.have.key("ServiceNamespace").which.should.equal(DEFAULT_SERVICE_NAMESPACE)
t.should.have.key("ResourceId").which.should.equal(DEFAULT_RESOURCE_ID)
t.should.have.key("ScalableDimension").which.should.equal(
DEFAULT_SCALABLE_DIMENSION
)
t.should.have.key("MinCapacity").which.should.equal(DEFAULT_MIN_CAPACITY)
t.should.have.key("MaxCapacity").which.should.equal(DEFAULT_MAX_CAPACITY)
t.should.have.key("RoleARN").which.should.equal(DEFAULT_ROLE_ARN)
t.should.have.key("CreationTime").which.should.be.a("datetime.datetime")
t.should.have.key("SuspendedState")
t["SuspendedState"]["DynamicScalingInSuspended"].should.equal(
DEFAULT_SUSPENDED_STATE["DynamicScalingInSuspended"]
)
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_only_return_ecs_targets():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs, create_service=False)
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName="test1",
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName="test2",
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
register_scalable_target(
client,
ServiceNamespace="ecs",
ResourceId="service/{}/test1".format(DEFAULT_ECS_CLUSTER),
)
register_scalable_target(
client,
ServiceNamespace="ecs",
ResourceId="service/{}/test2".format(DEFAULT_ECS_CLUSTER),
)
register_scalable_target(
client,
ServiceNamespace="elasticmapreduce",
ResourceId="instancegroup/j-2EEZNYKUA1NTV/ig-1791Y4E1L8YI0",
ScalableDimension="elasticmapreduce:instancegroup:InstanceCount",
)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(2)
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_next_token_success():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs, create_service=False)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
for i in range(0, 100):
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName=str(i),
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
register_scalable_target(
client,
ServiceNamespace="ecs",
ResourceId="service/{}/{}".format(DEFAULT_ECS_CLUSTER, i),
)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(50)
response["ScalableTargets"][0]["ResourceId"].should.equal("service/default/0")
response.should.have.key("NextToken").which.should.equal("49")
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE, NextToken=str(response["NextToken"])
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(50)
response["ScalableTargets"][0]["ResourceId"].should.equal("service/default/50")
response.should_not.have.key("NextToken")
def register_scalable_target(client, **kwargs):
""" Build a default scalable target object for use in tests. """
return client.register_scalable_target(
ServiceNamespace=kwargs.get("ServiceNamespace", DEFAULT_SERVICE_NAMESPACE),
ResourceId=kwargs.get("ResourceId", DEFAULT_RESOURCE_ID),
ScalableDimension=kwargs.get("ScalableDimension", DEFAULT_SCALABLE_DIMENSION),
MinCapacity=kwargs.get("MinCapacity", DEFAULT_MIN_CAPACITY),
MaxCapacity=kwargs.get("MaxCapacity", DEFAULT_MAX_CAPACITY),
RoleARN=kwargs.get("RoleARN", DEFAULT_ROLE_ARN),
SuspendedState=kwargs.get("SuspendedState", DEFAULT_SUSPENDED_STATE),
)

View File

@ -0,0 +1,123 @@
from __future__ import unicode_literals
import boto3
from moto import mock_applicationautoscaling, mock_ecs
from moto.applicationautoscaling import models
from moto.applicationautoscaling.exceptions import AWSValidationException
from botocore.exceptions import ParamValidationError
from nose.tools import assert_raises
import sure # noqa
from botocore.exceptions import ClientError
from parameterized import parameterized
from .test_applicationautoscaling import register_scalable_target
DEFAULT_REGION = "us-east-1"
DEFAULT_ECS_CLUSTER = "default"
DEFAULT_ECS_TASK = "test_ecs_task"
DEFAULT_ECS_SERVICE = "sample-webapp"
DEFAULT_SERVICE_NAMESPACE = "ecs"
DEFAULT_RESOURCE_ID = "service/{}/{}".format(DEFAULT_ECS_CLUSTER, DEFAULT_ECS_SERVICE)
DEFAULT_SCALABLE_DIMENSION = "ecs:service:DesiredCount"
DEFAULT_MIN_CAPACITY = 1
DEFAULT_MAX_CAPACITY = 1
DEFAULT_ROLE_ARN = "test:arn"
@mock_applicationautoscaling
def test_describe_scalable_targets_no_params_should_raise_param_validation_errors():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ParamValidationError):
client.describe_scalable_targets()
@mock_applicationautoscaling
def test_register_scalable_target_no_params_should_raise_param_validation_errors():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ParamValidationError):
client.register_scalable_target()
@mock_applicationautoscaling
def test_register_scalable_target_with_none_service_namespace_should_raise_param_validation_errors():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ParamValidationError):
register_scalable_target(client, ServiceNamespace=None)
@mock_applicationautoscaling
def test_describe_scalable_targets_with_invalid_scalable_dimension_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ClientError) as err:
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE, ScalableDimension="foo",
)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].split(":")[0].should.look_like(
"1 validation error detected"
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_applicationautoscaling
def test_describe_scalable_targets_with_invalid_service_namespace_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ClientError) as err:
response = client.describe_scalable_targets(
ServiceNamespace="foo", ScalableDimension=DEFAULT_SCALABLE_DIMENSION,
)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].split(":")[0].should.look_like(
"1 validation error detected"
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_applicationautoscaling
def test_describe_scalable_targets_with_multiple_invalid_parameters_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ClientError) as err:
response = client.describe_scalable_targets(
ServiceNamespace="foo", ScalableDimension="bar",
)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].split(":")[0].should.look_like(
"2 validation errors detected"
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_ecs
@mock_applicationautoscaling
def test_register_scalable_target_ecs_with_non_existent_service_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
resource_id = "service/{}/foo".format(DEFAULT_ECS_CLUSTER)
with assert_raises(ClientError) as err:
register_scalable_target(client, ServiceNamespace="ecs", ResourceId=resource_id)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].should.equal(
"ECS service doesn't exist: {}".format(resource_id)
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@parameterized(
[
("ecs", "service/default/test-svc", "ecs:service:DesiredCount", True),
("ecs", "banana/default/test-svc", "ecs:service:DesiredCount", False),
("rds", "service/default/test-svc", "ecs:service:DesiredCount", False),
]
)
def test_target_params_are_valid_success(namespace, r_id, dimension, expected):
if expected is True:
models._target_params_are_valid(namespace, r_id, dimension).should.equal(
expected
)
else:
with assert_raises(AWSValidationException):
models._target_params_are_valid(namespace, r_id, dimension)
# TODO add a test for not-supplied MinCapacity or MaxCapacity (ValidationException)

View File

@ -67,6 +67,8 @@ get_availability_zones_output = {"Outputs": {"Output1": {"Value": {"Fn::GetAZs":
parameters = { parameters = {
"Parameters": { "Parameters": {
"Param": {"Type": "String"}, "Param": {"Type": "String"},
"NumberParam": {"Type": "Number"},
"NumberListParam": {"Type": "List<Number>"},
"NoEchoParam": {"Type": "String", "NoEcho": True}, "NoEchoParam": {"Type": "String", "NoEcho": True},
} }
} }
@ -303,12 +305,23 @@ def test_parse_stack_with_parameters():
stack_id="test_id", stack_id="test_id",
name="test_stack", name="test_stack",
template=parameters_template_json, template=parameters_template_json,
parameters={"Param": "visible value", "NoEchoParam": "hidden value"}, parameters={
"Param": "visible value",
"NumberParam": "42",
"NumberListParam": "42,3.14159",
"NoEchoParam": "hidden value",
},
region_name="us-west-1", region_name="us-west-1",
) )
stack.resource_map.no_echo_parameter_keys.should.have("NoEchoParam") stack.resource_map.no_echo_parameter_keys.should.have("NoEchoParam")
stack.resource_map.no_echo_parameter_keys.should_not.have("Param") stack.resource_map.no_echo_parameter_keys.should_not.have("Param")
stack.resource_map.no_echo_parameter_keys.should_not.have("NumberParam")
stack.resource_map.no_echo_parameter_keys.should_not.have("NumberListParam")
stack.resource_map.resolved_parameters["NumberParam"].should.equal(42)
stack.resource_map.resolved_parameters["NumberListParam"].should.equal(
[42, 3.14159]
)
def test_parse_equals_condition(): def test_parse_equals_condition():

View File

@ -40,6 +40,16 @@ json_template = {
}, },
} }
json_valid_template_with_tabs = """
{
\t"AWSTemplateFormatVersion": "2010-09-09",
\t"Description": "Stack 2",
\t"Resources": {
\t\t"Queue": {"Type": "AWS::SQS::Queue", "Properties": {"VisibilityTimeout": 60}}
\t}
}
"""
# One resource is required # One resource is required
json_bad_template = {"AWSTemplateFormatVersion": "2010-09-09", "Description": "Stack 1"} json_bad_template = {"AWSTemplateFormatVersion": "2010-09-09", "Description": "Stack 1"}
@ -56,6 +66,15 @@ def test_boto3_json_validate_successful():
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
@mock_cloudformation
def test_boto3_json_with_tabs_validate_successful():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
response = cf_conn.validate_template(TemplateBody=json_valid_template_with_tabs)
assert response["Description"] == "Stack 2"
assert response["Parameters"] == []
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
@mock_cloudformation @mock_cloudformation
def test_boto3_json_invalid_missing_resource(): def test_boto3_json_invalid_missing_resource():
cf_conn = boto3.client("cloudformation", region_name="us-east-1") cf_conn = boto3.client("cloudformation", region_name="us-east-1")

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import json import json
import os import os
import random import random
import requests
import uuid import uuid
import boto3 import boto3
@ -10,10 +11,10 @@ import boto3
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
import sure # noqa import sure # noqa
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from jose import jws from jose import jws, jwk, jwt
from nose.tools import assert_raises from nose.tools import assert_raises
from moto import mock_cognitoidp from moto import mock_cognitoidp, settings
from moto.core import ACCOUNT_ID from moto.core import ACCOUNT_ID
@ -1341,3 +1342,80 @@ def test_admin_update_user_attributes():
val.should.equal("Doe") val.should.equal("Doe")
elif attr["Name"] == "given_name": elif attr["Name"] == "given_name":
val.should.equal("Jane") val.should.equal("Jane")
# Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json,
# which isnt mocked in ServerMode
if not settings.TEST_SERVER_MODE:
@mock_cognitoidp
def test_idtoken_contains_kid_header():
# https://github.com/spulec/moto/issues/3078
# Setup
cognito = boto3.client("cognito-idp", "us-west-2")
user_pool_id = cognito.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"][
"Id"
]
client = cognito.create_user_pool_client(
UserPoolId=user_pool_id,
ExplicitAuthFlows=[
"ALLOW_ADMIN_USER_PASSWORD_AUTH",
"ALLOW_REFRESH_TOKEN_AUTH",
"ALLOW_ADMIN_NO_SRP_AUTH",
],
AllowedOAuthFlows=["code", "implicit"],
ClientName=str(uuid.uuid4()),
CallbackURLs=["https://example.com"],
)
client_id = client["UserPoolClient"]["ClientId"]
username = str(uuid.uuid4())
temporary_password = "1TemporaryP@ssword"
cognito.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
TemporaryPassword=temporary_password,
)
result = cognito.admin_initiate_auth(
UserPoolId=user_pool_id,
ClientId=client_id,
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={"USERNAME": username, "PASSWORD": temporary_password},
)
# A newly created user is forced to set a new password
# This sets a new password and logs the user in (creates tokens)
password = "1F@kePassword"
result = cognito.respond_to_auth_challenge(
Session=result["Session"],
ClientId=client_id,
ChallengeName="NEW_PASSWORD_REQUIRED",
ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password},
)
#
id_token = result["AuthenticationResult"]["IdToken"]
# Verify the KID header is present in the token, and corresponds to the KID supplied by the public JWT
verify_kid_header(id_token)
def verify_kid_header(token):
"""Verifies the kid-header is corresponds with the public key"""
headers = jwt.get_unverified_headers(token)
kid = headers["kid"]
key_index = -1
keys = fetch_public_keys()
for i in range(len(keys)):
if kid == keys[i]["kid"]:
key_index = i
break
if key_index == -1:
raise Exception("Public key (kid) not found in jwks.json")
def fetch_public_keys():
keys_url = "https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json".format(
"us-west-2", "someuserpoolid"
)
response = requests.get(keys_url).json()
return response["keys"]

View File

@ -5316,3 +5316,88 @@ def test_transact_write_items_fails_with_transaction_canceled_exception():
ex.exception.response["Error"]["Message"].should.equal( ex.exception.response["Error"]["Message"].should.equal(
"Transaction cancelled, please refer cancellation reasons for specific reasons [None, ConditionalCheckFailed]" "Transaction cancelled, please refer cancellation reasons for specific reasons [None, ConditionalCheckFailed]"
) )
@mock_dynamodb2
def test_gsi_projection_type_keys_only():
table_schema = {
"KeySchema": [{"AttributeName": "partitionKey", "KeyType": "HASH"}],
"GlobalSecondaryIndexes": [
{
"IndexName": "GSI-K1",
"KeySchema": [
{"AttributeName": "gsiK1PartitionKey", "KeyType": "HASH"},
{"AttributeName": "gsiK1SortKey", "KeyType": "RANGE"},
],
"Projection": {"ProjectionType": "KEYS_ONLY",},
}
],
"AttributeDefinitions": [
{"AttributeName": "partitionKey", "AttributeType": "S"},
{"AttributeName": "gsiK1PartitionKey", "AttributeType": "S"},
{"AttributeName": "gsiK1SortKey", "AttributeType": "S"},
],
}
item = {
"partitionKey": "pk-1",
"gsiK1PartitionKey": "gsi-pk",
"gsiK1SortKey": "gsi-sk",
"someAttribute": "lore ipsum",
}
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
table = dynamodb.Table("test-table")
table.put_item(Item=item)
items = table.query(
KeyConditionExpression=Key("gsiK1PartitionKey").eq("gsi-pk"),
IndexName="GSI-K1",
)["Items"]
items.should.have.length_of(1)
# Item should only include GSI Keys, as per the ProjectionType
items[0].should.equal({"gsiK1PartitionKey": "gsi-pk", "gsiK1SortKey": "gsi-sk"})
@mock_dynamodb2
def test_lsi_projection_type_keys_only():
table_schema = {
"KeySchema": [{"AttributeName": "partitionKey", "KeyType": "HASH"}],
"LocalSecondaryIndexes": [
{
"IndexName": "LSI",
"KeySchema": [
{"AttributeName": "partitionKey", "KeyType": "HASH"},
{"AttributeName": "lsiK1SortKey", "KeyType": "RANGE"},
],
"Projection": {"ProjectionType": "KEYS_ONLY",},
}
],
"AttributeDefinitions": [
{"AttributeName": "partitionKey", "AttributeType": "S"},
{"AttributeName": "lsiK1SortKey", "AttributeType": "S"},
],
}
item = {
"partitionKey": "pk-1",
"lsiK1SortKey": "lsi-sk",
"someAttribute": "lore ipsum",
}
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
table = dynamodb.Table("test-table")
table.put_item(Item=item)
items = table.query(
KeyConditionExpression=Key("partitionKey").eq("pk-1"), IndexName="LSI",
)["Items"]
items.should.have.length_of(1)
# Item should only include GSI Keys, as per the ProjectionType
items[0].should.equal({"partitionKey": "pk-1", "lsiK1SortKey": "lsi-sk"})

View File

@ -582,6 +582,17 @@ def test_create_route_with_invalid_destination_cidr_block_parameter():
) )
) )
route_table.create_route(
DestinationIpv6CidrBlock="2001:db8::/125", GatewayId=internet_gateway.id
)
new_routes = [
route
for route in route_table.routes
if route.destination_cidr_block != vpc.cidr_block
]
new_routes.should.have.length_of(1)
new_routes[0].route_table_id.shouldnt.be.equal(None)
@mock_ec2 @mock_ec2
def test_create_route_with_network_interface_id(): def test_create_route_with_network_interface_id():

View File

@ -160,8 +160,26 @@ def test_vpc_peering_connections_cross_region_accept():
VpcPeeringConnectionIds=[vpc_pcx_usw1.id] VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
) )
acp_pcx_apn1["VpcPeeringConnection"]["Status"]["Code"].should.equal("active") acp_pcx_apn1["VpcPeeringConnection"]["Status"]["Code"].should.equal("active")
acp_pcx_apn1["VpcPeeringConnection"]["AccepterVpcInfo"]["Region"].should.equal(
"ap-northeast-1"
)
acp_pcx_apn1["VpcPeeringConnection"]["RequesterVpcInfo"]["Region"].should.equal(
"us-west-1"
)
des_pcx_apn1["VpcPeeringConnections"][0]["Status"]["Code"].should.equal("active") des_pcx_apn1["VpcPeeringConnections"][0]["Status"]["Code"].should.equal("active")
des_pcx_apn1["VpcPeeringConnections"][0]["AccepterVpcInfo"]["Region"].should.equal(
"ap-northeast-1"
)
des_pcx_apn1["VpcPeeringConnections"][0]["RequesterVpcInfo"]["Region"].should.equal(
"us-west-1"
)
des_pcx_usw1["VpcPeeringConnections"][0]["Status"]["Code"].should.equal("active") des_pcx_usw1["VpcPeeringConnections"][0]["Status"]["Code"].should.equal("active")
des_pcx_usw1["VpcPeeringConnections"][0]["AccepterVpcInfo"]["Region"].should.equal(
"ap-northeast-1"
)
des_pcx_usw1["VpcPeeringConnections"][0]["RequesterVpcInfo"]["Region"].should.equal(
"us-west-1"
)
@mock_ec2 @mock_ec2

View File

@ -2424,9 +2424,13 @@ def test_boto3_put_object_with_tagging():
s3.put_object(Bucket=bucket_name, Key=key, Body="test", Tagging="foo=bar") s3.put_object(Bucket=bucket_name, Key=key, Body="test", Tagging="foo=bar")
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key) s3.get_object_tagging(Bucket=bucket_name, Key=key)["TagSet"].should.contain(
{"Key": "foo", "Value": "bar"}
)
resp["TagSet"].should.contain({"Key": "foo", "Value": "bar"}) s3.delete_object_tagging(Bucket=bucket_name, Key=key)
s3.get_object_tagging(Bucket=bucket_name, Key=key)["TagSet"].should.equal([])
@mock_s3 @mock_s3

View File

View File

@ -0,0 +1,769 @@
from __future__ import unicode_literals
import boto3
import botocore.exceptions
import sure # noqa
import datetime
import json
import pkg_resources
import yaml
import hashlib
import copy
from moto.core import ACCOUNT_ID
from moto import mock_ssm
def _get_yaml_template():
template_path = "/".join(["test_ssm", "test_templates", "good.yaml"])
resource_path = pkg_resources.resource_string("tests", template_path)
return resource_path
def _validate_document_description(
doc_name,
doc_description,
json_doc,
expected_document_version,
expected_latest_version,
expected_default_version,
expected_format,
):
if expected_format == "JSON":
doc_description["Hash"].should.equal(
hashlib.sha256(json.dumps(json_doc).encode("utf-8")).hexdigest()
)
else:
doc_description["Hash"].should.equal(
hashlib.sha256(yaml.dump(json_doc).encode("utf-8")).hexdigest()
)
doc_description["HashType"].should.equal("Sha256")
doc_description["Name"].should.equal(doc_name)
doc_description["Owner"].should.equal(ACCOUNT_ID)
difference = datetime.datetime.utcnow() - doc_description["CreatedDate"]
if difference.min > datetime.timedelta(minutes=1):
assert False
doc_description["Status"].should.equal("Active")
doc_description["DocumentVersion"].should.equal(expected_document_version)
doc_description["Description"].should.equal(json_doc["description"])
doc_description["Parameters"] = sorted(
doc_description["Parameters"], key=lambda doc: doc["Name"]
)
doc_description["Parameters"][0]["Name"].should.equal("Parameter1")
doc_description["Parameters"][0]["Type"].should.equal("Integer")
doc_description["Parameters"][0]["Description"].should.equal("Command Duration.")
doc_description["Parameters"][0]["DefaultValue"].should.equal("3")
doc_description["Parameters"][1]["Name"].should.equal("Parameter2")
doc_description["Parameters"][1]["Type"].should.equal("String")
doc_description["Parameters"][1]["DefaultValue"].should.equal("def")
doc_description["Parameters"][2]["Name"].should.equal("Parameter3")
doc_description["Parameters"][2]["Type"].should.equal("Boolean")
doc_description["Parameters"][2]["Description"].should.equal("A boolean")
doc_description["Parameters"][2]["DefaultValue"].should.equal("False")
doc_description["Parameters"][3]["Name"].should.equal("Parameter4")
doc_description["Parameters"][3]["Type"].should.equal("StringList")
doc_description["Parameters"][3]["Description"].should.equal("A string list")
doc_description["Parameters"][3]["DefaultValue"].should.equal('["abc", "def"]')
doc_description["Parameters"][4]["Name"].should.equal("Parameter5")
doc_description["Parameters"][4]["Type"].should.equal("StringMap")
doc_description["Parameters"][5]["Name"].should.equal("Parameter6")
doc_description["Parameters"][5]["Type"].should.equal("MapList")
if expected_format == "JSON":
# We have to replace single quotes from the response to package it back up
json.loads(doc_description["Parameters"][4]["DefaultValue"]).should.equal(
{
"NotificationArn": "$dependency.topicArn",
"NotificationEvents": ["Failed"],
"NotificationType": "Command",
}
)
json.loads(doc_description["Parameters"][5]["DefaultValue"]).should.equal(
[
{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": "50"}},
{"DeviceName": "/dev/sdm", "Ebs": {"VolumeSize": "100"}},
]
)
else:
yaml.safe_load(doc_description["Parameters"][4]["DefaultValue"]).should.equal(
{
"NotificationArn": "$dependency.topicArn",
"NotificationEvents": ["Failed"],
"NotificationType": "Command",
}
)
yaml.safe_load(doc_description["Parameters"][5]["DefaultValue"]).should.equal(
[
{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": "50"}},
{"DeviceName": "/dev/sdm", "Ebs": {"VolumeSize": "100"}},
]
)
doc_description["DocumentType"].should.equal("Command")
doc_description["SchemaVersion"].should.equal("2.2")
doc_description["LatestVersion"].should.equal(expected_latest_version)
doc_description["DefaultVersion"].should.equal(expected_default_version)
doc_description["DocumentFormat"].should.equal(expected_format)
def _get_doc_validator(
response, version_name, doc_version, json_doc_content, document_format
):
response["Name"].should.equal("TestDocument3")
if version_name:
response["VersionName"].should.equal(version_name)
response["DocumentVersion"].should.equal(doc_version)
response["Status"].should.equal("Active")
if document_format == "JSON":
json.loads(response["Content"]).should.equal(json_doc_content)
else:
yaml.safe_load(response["Content"]).should.equal(json_doc_content)
response["DocumentType"].should.equal("Command")
response["DocumentFormat"].should.equal(document_format)
@mock_ssm
def test_create_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
response = client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="YAML",
)
doc_description = response["DocumentDescription"]
_validate_document_description(
"TestDocument", doc_description, json_doc, "1", "1", "1", "YAML"
)
response = client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument2",
DocumentType="Command",
DocumentFormat="JSON",
)
doc_description = response["DocumentDescription"]
_validate_document_description(
"TestDocument2", doc_description, json_doc, "1", "1", "1", "JSON"
)
response = client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="JSON",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
Tags=[{"Key": "testing", "Value": "testingValue"}],
)
doc_description = response["DocumentDescription"]
doc_description["VersionName"].should.equal("Base")
doc_description["TargetType"].should.equal("/AWS::EC2::Instance")
doc_description["Tags"].should.equal([{"Key": "testing", "Value": "testingValue"}])
_validate_document_description(
"TestDocument3", doc_description, json_doc, "1", "1", "1", "JSON"
)
try:
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("CreateDocument")
err.response["Error"]["Message"].should.equal(
"The specified document already exists."
)
try:
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument4",
DocumentType="Command",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("CreateDocument")
err.response["Error"]["Message"].should.equal(
"The content for the document is not valid."
)
del json_doc["parameters"]
response = client.create_document(
Content=yaml.dump(json_doc),
Name="EmptyParamDoc",
DocumentType="Command",
DocumentFormat="YAML",
)
doc_description = response["DocumentDescription"]
doc_description["Hash"].should.equal(
hashlib.sha256(yaml.dump(json_doc).encode("utf-8")).hexdigest()
)
doc_description["HashType"].should.equal("Sha256")
doc_description["Name"].should.equal("EmptyParamDoc")
doc_description["Owner"].should.equal(ACCOUNT_ID)
difference = datetime.datetime.utcnow() - doc_description["CreatedDate"]
if difference.min > datetime.timedelta(minutes=1):
assert False
doc_description["Status"].should.equal("Active")
doc_description["DocumentVersion"].should.equal("1")
doc_description["Description"].should.equal(json_doc["description"])
doc_description["DocumentType"].should.equal("Command")
doc_description["SchemaVersion"].should.equal("2.2")
doc_description["LatestVersion"].should.equal("1")
doc_description["DefaultVersion"].should.equal("1")
doc_description["DocumentFormat"].should.equal("YAML")
@mock_ssm
def test_get_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.get_document(Name="DNE")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
)
new_json_doc = copy.copy(json_doc)
new_json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
VersionName="NewBase",
)
response = client.get_document(Name="TestDocument3")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", DocumentFormat="YAML")
_get_doc_validator(response, "Base", "1", json_doc, "YAML")
response = client.get_document(Name="TestDocument3", DocumentFormat="JSON")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", VersionName="Base")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", DocumentVersion="1")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", DocumentVersion="2")
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
response = client.get_document(Name="TestDocument3", VersionName="NewBase")
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
response = client.get_document(
Name="TestDocument3", VersionName="NewBase", DocumentVersion="2"
)
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
try:
response = client.get_document(
Name="TestDocument3", VersionName="BadName", DocumentVersion="2"
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
try:
response = client.get_document(Name="TestDocument3", DocumentVersion="3")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
# Updating default should update normal get
client.update_document_default_version(Name="TestDocument3", DocumentVersion="2")
response = client.get_document(Name="TestDocument3", DocumentFormat="JSON")
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
@mock_ssm
def test_delete_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.delete_document(Name="DNE")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DeleteDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
# Test simple
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
)
client.delete_document(Name="TestDocument3")
try:
client.get_document(Name="TestDocument3")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
# Delete default version with other version is bad
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
)
new_json_doc = copy.copy(json_doc)
new_json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
VersionName="NewBase",
)
new_json_doc["description"] = "a new description2"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
)
new_json_doc["description"] = "a new description3"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
)
new_json_doc["description"] = "a new description4"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
)
try:
client.delete_document(Name="TestDocument3", DocumentVersion="1")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DeleteDocument")
err.response["Error"]["Message"].should.equal(
"Default version of the document can't be deleted."
)
try:
client.delete_document(Name="TestDocument3", VersionName="Base")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DeleteDocument")
err.response["Error"]["Message"].should.equal(
"Default version of the document can't be deleted."
)
# Make sure no ill side effects
response = client.get_document(Name="TestDocument3")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
client.delete_document(Name="TestDocument3", DocumentVersion="5")
# Check that latest version is changed
response = client.describe_document(Name="TestDocument3")
response["Document"]["LatestVersion"].should.equal("4")
client.delete_document(Name="TestDocument3", VersionName="NewBase")
# Make sure other versions okay
client.get_document(Name="TestDocument3", DocumentVersion="1")
client.get_document(Name="TestDocument3", DocumentVersion="3")
client.get_document(Name="TestDocument3", DocumentVersion="4")
client.delete_document(Name="TestDocument3")
try:
client.get_document(Name="TestDocument3", DocumentVersion="1")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
try:
client.get_document(Name="TestDocument3", DocumentVersion="3")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
try:
client.get_document(Name="TestDocument3", DocumentVersion="4")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
response = client.list_documents()
len(response["DocumentIdentifiers"]).should.equal(0)
@mock_ssm
def test_update_document_default_version():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.update_document_default_version(Name="DNE", DocumentVersion="1")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocumentDefaultVersion")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentType="Command",
VersionName="Base",
)
json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
)
json_doc["description"] = "a new description2"
client.update_document(
Content=json.dumps(json_doc), Name="TestDocument", DocumentVersion="$LATEST"
)
response = client.update_document_default_version(
Name="TestDocument", DocumentVersion="2"
)
response["Description"]["Name"].should.equal("TestDocument")
response["Description"]["DefaultVersion"].should.equal("2")
json_doc["description"] = "a new description3"
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
VersionName="NewBase",
)
response = client.update_document_default_version(
Name="TestDocument", DocumentVersion="4"
)
response["Description"]["Name"].should.equal("TestDocument")
response["Description"]["DefaultVersion"].should.equal("4")
response["Description"]["DefaultVersionName"].should.equal("NewBase")
@mock_ssm
def test_update_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.update_document(
Name="DNE",
Content=json.dumps(json_doc),
DocumentVersion="1",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="JSON",
VersionName="Base",
)
try:
client.update_document(
Name="TestDocument",
Content=json.dumps(json_doc),
DocumentVersion="2",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The document version is not valid or does not exist."
)
# Duplicate content throws an error
try:
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="1",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The content of the association document matches another "
"document. Change the content of the document and try again."
)
json_doc["description"] = "a new description"
# Duplicate version name
try:
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="1",
DocumentFormat="JSON",
VersionName="Base",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The specified version name is a duplicate."
)
response = client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
VersionName="Base2",
DocumentVersion="1",
DocumentFormat="JSON",
)
response["DocumentDescription"]["Description"].should.equal("a new description")
response["DocumentDescription"]["DocumentVersion"].should.equal("2")
response["DocumentDescription"]["LatestVersion"].should.equal("2")
response["DocumentDescription"]["DefaultVersion"].should.equal("1")
json_doc["description"] = "a new description2"
response = client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
VersionName="NewBase",
)
response["DocumentDescription"]["Description"].should.equal("a new description2")
response["DocumentDescription"]["DocumentVersion"].should.equal("3")
response["DocumentDescription"]["LatestVersion"].should.equal("3")
response["DocumentDescription"]["DefaultVersion"].should.equal("1")
response["DocumentDescription"]["VersionName"].should.equal("NewBase")
@mock_ssm
def test_describe_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.describe_document(Name="DNE")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DescribeDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
Tags=[{"Key": "testing", "Value": "testingValue"}],
)
response = client.describe_document(Name="TestDocument")
doc_description = response["Document"]
_validate_document_description(
"TestDocument", doc_description, json_doc, "1", "1", "1", "YAML"
)
# Adding update to check for issues
new_json_doc = copy.copy(json_doc)
new_json_doc["description"] = "a new description2"
client.update_document(
Content=json.dumps(new_json_doc), Name="TestDocument", DocumentVersion="$LATEST"
)
response = client.describe_document(Name="TestDocument")
doc_description = response["Document"]
_validate_document_description(
"TestDocument", doc_description, json_doc, "1", "2", "1", "YAML"
)
@mock_ssm
def test_list_documents():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="JSON",
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument2",
DocumentType="Command",
DocumentFormat="JSON",
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="JSON",
TargetType="/AWS::EC2::Instance",
)
response = client.list_documents()
len(response["DocumentIdentifiers"]).should.equal(3)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument")
response["DocumentIdentifiers"][1]["Name"].should.equal("TestDocument2")
response["DocumentIdentifiers"][2]["Name"].should.equal("TestDocument3")
response["NextToken"].should.equal("")
response = client.list_documents(MaxResults=1)
len(response["DocumentIdentifiers"]).should.equal(1)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("1")
response = client.list_documents(MaxResults=1, NextToken=response["NextToken"])
len(response["DocumentIdentifiers"]).should.equal(1)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument2")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("2")
response = client.list_documents(MaxResults=1, NextToken=response["NextToken"])
len(response["DocumentIdentifiers"]).should.equal(1)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument3")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("")
# making sure no bad interactions with update
json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
)
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument2",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
)
client.update_document_default_version(Name="TestDocument", DocumentVersion="2")
response = client.list_documents()
len(response["DocumentIdentifiers"]).should.equal(3)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("2")
response["DocumentIdentifiers"][1]["Name"].should.equal("TestDocument2")
response["DocumentIdentifiers"][1]["DocumentVersion"].should.equal("1")
response["DocumentIdentifiers"][2]["Name"].should.equal("TestDocument3")
response["DocumentIdentifiers"][2]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("")
response = client.list_documents(Filters=[{"Key": "Owner", "Values": ["Self"]}])
len(response["DocumentIdentifiers"]).should.equal(3)
response = client.list_documents(
Filters=[{"Key": "TargetType", "Values": ["/AWS::EC2::Instance"]}]
)
len(response["DocumentIdentifiers"]).should.equal(1)

View File

@ -0,0 +1,47 @@
schemaVersion: "2.2"
description: "Sample Yaml"
parameters:
Parameter1:
type: "Integer"
default: 3
description: "Command Duration."
allowedValues: [1,2,3,4]
Parameter2:
type: "String"
default: "def"
description:
allowedValues: ["abc", "def", "ghi"]
allowedPattern: r"^[a-zA-Z0-9_\-.]{3,128}$"
Parameter3:
type: "Boolean"
default: false
description: "A boolean"
allowedValues: [True, False]
Parameter4:
type: "StringList"
default: ["abc", "def"]
description: "A string list"
Parameter5:
type: "StringMap"
default:
NotificationType: Command
NotificationEvents:
- Failed
NotificationArn: "$dependency.topicArn"
description:
Parameter6:
type: "MapList"
default:
- DeviceName: "/dev/sda1"
Ebs:
VolumeSize: '50'
- DeviceName: "/dev/sdm"
Ebs:
VolumeSize: '100'
description:
mainSteps:
- action: "aws:runShellScript"
name: "sampleCommand"
inputs:
runCommand:
- "echo hi"