Merge branch 'master' into ssm_docs

This commit is contained in:
Alex Bainbridge 2020-07-03 10:15:56 -04:00
commit bedcc83995
17 changed files with 727 additions and 7 deletions

View File

@ -28,7 +28,27 @@ How to teach Moto to support a new AWS endpoint:
* If one doesn't already exist, create a new issue describing what's missing. This is where we'll all talk about the new addition and help you get it done.
* Create a [pull request](https://help.github.com/articles/using-pull-requests/) and mention the issue # in the PR description.
* Try to add a failing test case. For example, if you're trying to implement `boto3.client('acm').import_certificate()` you'll want to add a new method called `def test_import_certificate` to `tests/test_acm/test_acm.py`.
* If you can also implement the code that gets that test passing that's great. If not, just ask the community for a hand and somebody will assist you.
* Implementing the feature itself can be done by creating a method called `import_certificate` in `moto/acm/responses.py`. It's considered good practice to deal with input/output formatting and validation in `responses.py`, and create a method `import_certificate` in `moto/acm/models.py` that handles the actual import logic.
* If you can also implement the code that gets that test passing then great! If not, just ask the community for a hand and somebody will assist you.
## Before pushing changes to GitHub
1. Run `black moto/ tests/` over your code to ensure that it is properly formatted
1. Run `make test` to ensure your tests are passing
## Python versions
moto currently supports both Python 2 and 3, so make sure your tests pass against both major versions of Python.
## Missing services
Implementing a new service from scratch is more work, but still quite straightforward. All the code that intercepts network requests to `*.amazonaws.com` is already handled for you in `moto/core` - all that's necessary for new services to be recognized is to create a new decorator and determine which URLs should be intercepted.
See this PR for an example of what's involved in creating a new service: https://github.com/spulec/moto/pull/2409/files
Note the `urls.py` that redirects all incoming URL requests to a generic `dispatch` method, which in turn will call the appropriate method in `responses.py`.
If you want more control over incoming requests or their bodies, it is possible to redirect specific requests to a custom method. See this PR for an example: https://github.com/spulec/moto/pull/2957/files
## Maintainers

View File

@ -459,18 +459,18 @@
## application-autoscaling
<details>
<summary>0% implemented</summary>
<summary>20% implemented</summary>
- [ ] delete_scaling_policy
- [ ] delete_scheduled_action
- [ ] deregister_scalable_target
- [ ] describe_scalable_targets
- [x] describe_scalable_targets
- [ ] describe_scaling_activities
- [ ] describe_scaling_policies
- [ ] describe_scheduled_actions
- [ ] put_scaling_policy
- [ ] put_scheduled_action
- [ ] register_scalable_target
- [x] register_scalable_target - includes enhanced validation support for ECS targets
</details>
## application-insights

View File

@ -65,6 +65,8 @@ It gets even better! Moto isn't just for Python code and it isn't just for S3. L
|-------------------------------------------------------------------------------------| |
| API Gateway | @mock_apigateway | core endpoints done | |
|-------------------------------------------------------------------------------------| |
| Application Autoscaling | @mock_applicationautoscaling | basic endpoints done | |
|-------------------------------------------------------------------------------------| |
| Autoscaling | @mock_autoscaling | core endpoints done | |
|-------------------------------------------------------------------------------------| |
| Cloudformation | @mock_cloudformation | core endpoints done | |

View File

@ -15,6 +15,9 @@ mock_acm = lazy_load(".acm", "mock_acm")
mock_apigateway = lazy_load(".apigateway", "mock_apigateway")
mock_apigateway_deprecated = lazy_load(".apigateway", "mock_apigateway_deprecated")
mock_athena = lazy_load(".athena", "mock_athena")
mock_applicationautoscaling = lazy_load(
".applicationautoscaling", "mock_applicationautoscaling"
)
mock_autoscaling = lazy_load(".autoscaling", "mock_autoscaling")
mock_autoscaling_deprecated = lazy_load(".autoscaling", "mock_autoscaling_deprecated")
mock_lambda = lazy_load(".awslambda", "mock_lambda")

View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import applicationautoscaling_backends
from ..core.models import base_decorator
applicationautoscaling_backend = applicationautoscaling_backends["us-east-1"]
mock_applicationautoscaling = base_decorator(applicationautoscaling_backends)

View File

@ -0,0 +1,22 @@
from __future__ import unicode_literals
import json
class AWSError(Exception):
""" Copied from acm/models.py; this class now exists in >5 locations,
maybe this should be centralised for use by any module?
"""
TYPE = None
STATUS = 400
def __init__(self, message):
self.message = message
def response(self):
resp = {"__type": self.TYPE, "message": self.message}
return json.dumps(resp), dict(status=self.STATUS)
class AWSValidationException(AWSError):
TYPE = "ValidationException"

View File

@ -0,0 +1,179 @@
from __future__ import unicode_literals
from moto.core import BaseBackend, BaseModel
from moto.ecs import ecs_backends
from .exceptions import AWSValidationException
from collections import OrderedDict
from enum import Enum, unique
import time
@unique
class ServiceNamespaceValueSet(Enum):
APPSTREAM = "appstream"
RDS = "rds"
LAMBDA = "lambda"
CASSANDRA = "cassandra"
DYNAMODB = "dynamodb"
CUSTOM_RESOURCE = "custom-resource"
ELASTICMAPREDUCE = "elasticmapreduce"
EC2 = "ec2"
COMPREHEND = "comprehend"
ECS = "ecs"
SAGEMAKER = "sagemaker"
@unique
class ScalableDimensionValueSet(Enum):
CASSANDRA_TABLE_READ_CAPACITY_UNITS = "cassandra:table:ReadCapacityUnits"
CASSANDRA_TABLE_WRITE_CAPACITY_UNITS = "cassandra:table:WriteCapacityUnits"
DYNAMODB_INDEX_READ_CAPACITY_UNITS = "dynamodb:index:ReadCapacityUnits"
DYNAMODB_INDEX_WRITE_CAPACITY_UNITS = "dynamodb:index:WriteCapacityUnits"
DYNAMODB_TABLE_READ_CAPACITY_UNITS = "dynamodb:table:ReadCapacityUnits"
DYNAMODB_TABLE_WRITE_CAPACITY_UNITS = "dynamodb:table:WriteCapacityUnits"
RDS_CLUSTER_READ_REPLICA_COUNT = "rds:cluster:ReadReplicaCount"
RDS_CLUSTER_CAPACITY = "rds:cluster:Capacity"
COMPREHEND_DOCUMENT_CLASSIFIER_ENDPOINT_DESIRED_INFERENCE_UNITS = (
"comprehend:document-classifier-endpoint:DesiredInferenceUnits"
)
ELASTICMAPREDUCE_INSTANCE_FLEET_ON_DEMAND_CAPACITY = (
"elasticmapreduce:instancefleet:OnDemandCapacity"
)
ELASTICMAPREDUCE_INSTANCE_FLEET_SPOT_CAPACITY = (
"elasticmapreduce:instancefleet:SpotCapacity"
)
ELASTICMAPREDUCE_INSTANCE_GROUP_INSTANCE_COUNT = (
"elasticmapreduce:instancegroup:InstanceCount"
)
LAMBDA_FUNCTION_PROVISIONED_CONCURRENCY = "lambda:function:ProvisionedConcurrency"
APPSTREAM_FLEET_DESIRED_CAPACITY = "appstream:fleet:DesiredCapacity"
CUSTOM_RESOURCE_RESOURCE_TYPE_PROPERTY = "custom-resource:ResourceType:Property"
SAGEMAKER_VARIANT_DESIRED_INSTANCE_COUNT = "sagemaker:variant:DesiredInstanceCount"
EC2_SPOT_FLEET_REQUEST_TARGET_CAPACITY = "ec2:spot-fleet-request:TargetCapacity"
ECS_SERVICE_DESIRED_COUNT = "ecs:service:DesiredCount"
class ApplicationAutoscalingBackend(BaseBackend):
def __init__(self, region, ecs):
super(ApplicationAutoscalingBackend, self).__init__()
self.region = region
self.ecs_backend = ecs
self.targets = OrderedDict()
def reset(self):
region = self.region
ecs = self.ecs_backend
self.__dict__ = {}
self.__init__(region, ecs)
@property
def applicationautoscaling_backend(self):
return applicationautoscaling_backends[self.region]
def describe_scalable_targets(
self, namespace, r_ids=None, dimension=None,
):
""" Describe scalable targets. """
if r_ids is None:
r_ids = []
targets = self._flatten_scalable_targets(namespace)
if dimension is not None:
targets = [t for t in targets if t.scalable_dimension == dimension]
if len(r_ids) > 0:
targets = [t for t in targets if t.resource_id in r_ids]
return targets
def _flatten_scalable_targets(self, namespace):
""" Flatten scalable targets for a given service namespace down to a list. """
targets = []
for dimension in self.targets.keys():
for resource_id in self.targets[dimension].keys():
targets.append(self.targets[dimension][resource_id])
targets = [t for t in targets if t.service_namespace == namespace]
return targets
def register_scalable_target(self, namespace, r_id, dimension, **kwargs):
""" Registers or updates a scalable target. """
_ = _target_params_are_valid(namespace, r_id, dimension)
if namespace == ServiceNamespaceValueSet.ECS.value:
_ = self._ecs_service_exists_for_target(r_id)
if self._scalable_target_exists(r_id, dimension):
target = self.targets[dimension][r_id]
target.update(kwargs)
else:
target = FakeScalableTarget(self, namespace, r_id, dimension, **kwargs)
self._add_scalable_target(target)
return target
def _scalable_target_exists(self, r_id, dimension):
return r_id in self.targets.get(dimension, [])
def _ecs_service_exists_for_target(self, r_id):
""" Raises a ValidationException if an ECS service does not exist
for the specified resource ID.
"""
resource_type, cluster, service = r_id.split("/")
result = self.ecs_backend.describe_services(cluster, [service])
if len(result) != 1:
raise AWSValidationException("ECS service doesn't exist: {}".format(r_id))
return True
def _add_scalable_target(self, target):
if target.scalable_dimension not in self.targets:
self.targets[target.scalable_dimension] = OrderedDict()
if target.resource_id not in self.targets[target.scalable_dimension]:
self.targets[target.scalable_dimension][target.resource_id] = target
return target
def _target_params_are_valid(namespace, r_id, dimension):
""" Check whether namespace, resource_id and dimension are valid and consistent with each other. """
is_valid = True
valid_namespaces = [n.value for n in ServiceNamespaceValueSet]
if namespace not in valid_namespaces:
is_valid = False
if dimension is not None:
try:
valid_dimensions = [d.value for d in ScalableDimensionValueSet]
d_namespace, d_resource_type, scaling_property = dimension.split(":")
resource_type, cluster, service = r_id.split("/")
if (
dimension not in valid_dimensions
or d_namespace != namespace
or resource_type != d_resource_type
):
is_valid = False
except ValueError:
is_valid = False
if not is_valid:
raise AWSValidationException(
"Unsupported service namespace, resource type or scalable dimension"
)
return is_valid
class FakeScalableTarget(BaseModel):
def __init__(
self, backend, service_namespace, resource_id, scalable_dimension, **kwargs
):
self.applicationautoscaling_backend = backend
self.service_namespace = service_namespace
self.resource_id = resource_id
self.scalable_dimension = scalable_dimension
self.min_capacity = kwargs["min_capacity"]
self.max_capacity = kwargs["max_capacity"]
self.role_arn = kwargs["role_arn"]
self.suspended_state = kwargs["suspended_state"]
self.creation_time = time.time()
def update(self, **kwargs):
if kwargs["min_capacity"] is not None:
self.min_capacity = kwargs["min_capacity"]
if kwargs["max_capacity"] is not None:
self.max_capacity = kwargs["max_capacity"]
applicationautoscaling_backends = {}
for region_name, ecs_backend in ecs_backends.items():
applicationautoscaling_backends[region_name] = ApplicationAutoscalingBackend(
region_name, ecs_backend
)

View File

@ -0,0 +1,97 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
import json
from .models import (
applicationautoscaling_backends,
ScalableDimensionValueSet,
ServiceNamespaceValueSet,
)
from .exceptions import AWSValidationException
class ApplicationAutoScalingResponse(BaseResponse):
@property
def applicationautoscaling_backend(self):
return applicationautoscaling_backends[self.region]
def describe_scalable_targets(self):
try:
self._validate_params()
except AWSValidationException as e:
return e.response()
service_namespace = self._get_param("ServiceNamespace")
resource_ids = self._get_param("ResourceIds")
scalable_dimension = self._get_param("ScalableDimension")
max_results = self._get_int_param("MaxResults", 50)
marker = self._get_param("NextToken")
all_scalable_targets = self.applicationautoscaling_backend.describe_scalable_targets(
service_namespace, resource_ids, scalable_dimension
)
start = int(marker) + 1 if marker else 0
next_token = None
scalable_targets_resp = all_scalable_targets[start : start + max_results]
if len(all_scalable_targets) > start + max_results:
next_token = str(len(scalable_targets_resp) - 1)
targets = [_build_target(t) for t in scalable_targets_resp]
return json.dumps({"ScalableTargets": targets, "NextToken": next_token})
def register_scalable_target(self):
""" Registers or updates a scalable target. """
try:
self._validate_params()
self.applicationautoscaling_backend.register_scalable_target(
self._get_param("ServiceNamespace"),
self._get_param("ResourceId"),
self._get_param("ScalableDimension"),
min_capacity=self._get_int_param("MinCapacity"),
max_capacity=self._get_int_param("MaxCapacity"),
role_arn=self._get_param("RoleARN"),
suspended_state=self._get_param("SuspendedState"),
)
except AWSValidationException as e:
return e.response()
return json.dumps({})
def _validate_params(self):
""" Validate parameters.
TODO Integrate this validation with the validation in models.py
"""
namespace = self._get_param("ServiceNamespace")
dimension = self._get_param("ScalableDimension")
messages = []
dimensions = [d.value for d in ScalableDimensionValueSet]
message = None
if dimension is not None and dimension not in dimensions:
messages.append(
"Value '{}' at 'scalableDimension' "
"failed to satisfy constraint: Member must satisfy enum value set: "
"{}".format(dimension, dimensions)
)
namespaces = [n.value for n in ServiceNamespaceValueSet]
if namespace is not None and namespace not in namespaces:
messages.append(
"Value '{}' at 'serviceNamespace' "
"failed to satisfy constraint: Member must satisfy enum value set: "
"{}".format(namespace, namespaces)
)
if len(messages) == 1:
message = "1 validation error detected: {}".format(messages[0])
elif len(messages) > 1:
message = "{} validation errors detected: {}".format(
len(messages), "; ".join(messages)
)
if message:
raise AWSValidationException(message)
def _build_target(t):
return {
"CreationTime": t.creation_time,
"ServiceNamespace": t.service_namespace,
"ResourceId": t.resource_id,
"RoleARN": t.role_arn,
"ScalableDimension": t.scalable_dimension,
"MaxCapacity": t.max_capacity,
"MinCapacity": t.min_capacity,
"SuspendedState": t.suspended_state,
}

View File

@ -0,0 +1,8 @@
from __future__ import unicode_literals
from .responses import ApplicationAutoScalingResponse
url_bases = ["https?://application-autoscaling.(.+).amazonaws.com"]
url_paths = {
"{0}/$": ApplicationAutoScalingResponse.dispatch,
}

View File

@ -0,0 +1,10 @@
from six.moves.urllib.parse import urlparse
def region_from_applicationautoscaling_url(url):
domain = urlparse(url).netloc
if "." in domain:
return domain.split(".")[1]
else:
return "us-east-1"

View File

@ -6,6 +6,10 @@ BACKENDS = {
"acm": ("acm", "acm_backends"),
"apigateway": ("apigateway", "apigateway_backends"),
"athena": ("athena", "athena_backends"),
"applicationautoscaling": (
"applicationautoscaling",
"applicationautoscaling_backends",
),
"autoscaling": ("autoscaling", "autoscaling_backends"),
"batch": ("batch", "batch_backends"),
"cloudformation": ("cloudformation", "cloudformation_backends"),

View File

@ -10,6 +10,7 @@ import six
import types
from io import BytesIO
from collections import defaultdict
from botocore.config import Config
from botocore.handlers import BUILTIN_HANDLERS
from botocore.awsrequest import AWSResponse
from six.moves.urllib.parse import urlparse
@ -416,6 +417,13 @@ class ServerModeMockAWS(BaseMockAWS):
import mock
def fake_boto3_client(*args, **kwargs):
region = self._get_region(*args, **kwargs)
if region:
if "config" in kwargs:
kwargs["config"].__dict__["user_agent_extra"] += " region/" + region
else:
config = Config(user_agent_extra="region/" + region)
kwargs["config"] = config
if "endpoint_url" not in kwargs:
kwargs["endpoint_url"] = "http://localhost:5000"
return real_boto3_client(*args, **kwargs)
@ -463,6 +471,14 @@ class ServerModeMockAWS(BaseMockAWS):
if six.PY2:
self._httplib_patcher.start()
def _get_region(self, *args, **kwargs):
if "region_name" in kwargs:
return kwargs["region_name"]
if type(args) == tuple and len(args) == 2:
service, region = args
return region
return None
def disable_patching(self):
if self._client_patcher:
self._client_patcher.stop()

View File

@ -188,6 +188,9 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
default_region = "us-east-1"
# to extract region, use [^.]
region_regex = re.compile(r"\.(?P<region>[a-z]{2}-[a-z]+-\d{1})\.amazonaws\.com")
region_from_useragent_regex = re.compile(
r"region/(?P<region>[a-z]{2}-[a-z]+-\d{1})"
)
param_list_regex = re.compile(r"(.*)\.(\d+)\.")
access_key_regex = re.compile(
r"AWS.*(?P<access_key>(?<![A-Z0-9])[A-Z0-9]{20}(?![A-Z0-9]))[:/]"
@ -272,9 +275,14 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.response_headers = {"server": "amazon.com"}
def get_region_from_url(self, request, full_url):
match = self.region_regex.search(full_url)
if match:
region = match.group(1)
url_match = self.region_regex.search(full_url)
user_agent_match = self.region_from_useragent_regex.search(
request.headers.get("User-Agent", "")
)
if url_match:
region = url_match.group(1)
elif user_agent_match:
region = user_agent_match.group(1)
elif (
"Authorization" in request.headers
and "AWS4" in request.headers["Authorization"]

View File

@ -0,0 +1 @@
from __future__ import unicode_literals

View File

@ -0,0 +1,189 @@
from __future__ import unicode_literals
import boto3
from moto import mock_applicationautoscaling, mock_ecs
import sure # noqa
from nose.tools import with_setup
DEFAULT_REGION = "us-east-1"
DEFAULT_ECS_CLUSTER = "default"
DEFAULT_ECS_TASK = "test_ecs_task"
DEFAULT_ECS_SERVICE = "sample-webapp"
DEFAULT_SERVICE_NAMESPACE = "ecs"
DEFAULT_RESOURCE_ID = "service/{}/{}".format(DEFAULT_ECS_CLUSTER, DEFAULT_ECS_SERVICE)
DEFAULT_SCALABLE_DIMENSION = "ecs:service:DesiredCount"
DEFAULT_MIN_CAPACITY = 1
DEFAULT_MAX_CAPACITY = 1
DEFAULT_ROLE_ARN = "test:arn"
DEFAULT_SUSPENDED_STATE = {
"DynamicScalingInSuspended": True,
"DynamicScalingOutSuspended": True,
"ScheduledScalingSuspended": True,
}
def _create_ecs_defaults(ecs, create_service=True):
_ = ecs.create_cluster(clusterName=DEFAULT_ECS_CLUSTER)
_ = ecs.register_task_definition(
family=DEFAULT_ECS_TASK,
containerDefinitions=[
{
"name": "hello_world",
"image": "docker/hello-world:latest",
"cpu": 1024,
"memory": 400,
"essential": True,
"environment": [
{"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
],
"logConfiguration": {"logDriver": "json-file"},
}
],
)
if create_service:
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName=DEFAULT_ECS_SERVICE,
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_one_basic_ecs_success():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
client.register_scalable_target(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE,
ResourceId=DEFAULT_RESOURCE_ID,
ScalableDimension=DEFAULT_SCALABLE_DIMENSION,
)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(1)
t = response["ScalableTargets"][0]
t.should.have.key("ServiceNamespace").which.should.equal(DEFAULT_SERVICE_NAMESPACE)
t.should.have.key("ResourceId").which.should.equal(DEFAULT_RESOURCE_ID)
t.should.have.key("ScalableDimension").which.should.equal(
DEFAULT_SCALABLE_DIMENSION
)
t.should.have.key("CreationTime").which.should.be.a("datetime.datetime")
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_one_full_ecs_success():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
register_scalable_target(client)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(1)
t = response["ScalableTargets"][0]
t.should.have.key("ServiceNamespace").which.should.equal(DEFAULT_SERVICE_NAMESPACE)
t.should.have.key("ResourceId").which.should.equal(DEFAULT_RESOURCE_ID)
t.should.have.key("ScalableDimension").which.should.equal(
DEFAULT_SCALABLE_DIMENSION
)
t.should.have.key("MinCapacity").which.should.equal(DEFAULT_MIN_CAPACITY)
t.should.have.key("MaxCapacity").which.should.equal(DEFAULT_MAX_CAPACITY)
t.should.have.key("RoleARN").which.should.equal(DEFAULT_ROLE_ARN)
t.should.have.key("CreationTime").which.should.be.a("datetime.datetime")
t.should.have.key("SuspendedState")
t["SuspendedState"]["DynamicScalingInSuspended"].should.equal(
DEFAULT_SUSPENDED_STATE["DynamicScalingInSuspended"]
)
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_only_return_ecs_targets():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs, create_service=False)
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName="test1",
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName="test2",
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
register_scalable_target(
client,
ServiceNamespace="ecs",
ResourceId="service/{}/test1".format(DEFAULT_ECS_CLUSTER),
)
register_scalable_target(
client,
ServiceNamespace="ecs",
ResourceId="service/{}/test2".format(DEFAULT_ECS_CLUSTER),
)
register_scalable_target(
client,
ServiceNamespace="elasticmapreduce",
ResourceId="instancegroup/j-2EEZNYKUA1NTV/ig-1791Y4E1L8YI0",
ScalableDimension="elasticmapreduce:instancegroup:InstanceCount",
)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(2)
@mock_ecs
@mock_applicationautoscaling
def test_describe_scalable_targets_next_token_success():
ecs = boto3.client("ecs", region_name=DEFAULT_REGION)
_create_ecs_defaults(ecs, create_service=False)
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
for i in range(0, 100):
_ = ecs.create_service(
cluster=DEFAULT_ECS_CLUSTER,
serviceName=str(i),
taskDefinition=DEFAULT_ECS_TASK,
desiredCount=2,
)
register_scalable_target(
client,
ServiceNamespace="ecs",
ResourceId="service/{}/{}".format(DEFAULT_ECS_CLUSTER, i),
)
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(50)
response["ScalableTargets"][0]["ResourceId"].should.equal("service/default/0")
response.should.have.key("NextToken").which.should.equal("49")
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE, NextToken=str(response["NextToken"])
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(response["ScalableTargets"]).should.equal(50)
response["ScalableTargets"][0]["ResourceId"].should.equal("service/default/50")
response.should_not.have.key("NextToken")
def register_scalable_target(client, **kwargs):
""" Build a default scalable target object for use in tests. """
return client.register_scalable_target(
ServiceNamespace=kwargs.get("ServiceNamespace", DEFAULT_SERVICE_NAMESPACE),
ResourceId=kwargs.get("ResourceId", DEFAULT_RESOURCE_ID),
ScalableDimension=kwargs.get("ScalableDimension", DEFAULT_SCALABLE_DIMENSION),
MinCapacity=kwargs.get("MinCapacity", DEFAULT_MIN_CAPACITY),
MaxCapacity=kwargs.get("MaxCapacity", DEFAULT_MAX_CAPACITY),
RoleARN=kwargs.get("RoleARN", DEFAULT_ROLE_ARN),
SuspendedState=kwargs.get("SuspendedState", DEFAULT_SUSPENDED_STATE),
)

View File

@ -0,0 +1,123 @@
from __future__ import unicode_literals
import boto3
from moto import mock_applicationautoscaling, mock_ecs
from moto.applicationautoscaling import models
from moto.applicationautoscaling.exceptions import AWSValidationException
from botocore.exceptions import ParamValidationError
from nose.tools import assert_raises
import sure # noqa
from botocore.exceptions import ClientError
from parameterized import parameterized
from .test_applicationautoscaling import register_scalable_target
DEFAULT_REGION = "us-east-1"
DEFAULT_ECS_CLUSTER = "default"
DEFAULT_ECS_TASK = "test_ecs_task"
DEFAULT_ECS_SERVICE = "sample-webapp"
DEFAULT_SERVICE_NAMESPACE = "ecs"
DEFAULT_RESOURCE_ID = "service/{}/{}".format(DEFAULT_ECS_CLUSTER, DEFAULT_ECS_SERVICE)
DEFAULT_SCALABLE_DIMENSION = "ecs:service:DesiredCount"
DEFAULT_MIN_CAPACITY = 1
DEFAULT_MAX_CAPACITY = 1
DEFAULT_ROLE_ARN = "test:arn"
@mock_applicationautoscaling
def test_describe_scalable_targets_no_params_should_raise_param_validation_errors():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ParamValidationError):
client.describe_scalable_targets()
@mock_applicationautoscaling
def test_register_scalable_target_no_params_should_raise_param_validation_errors():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ParamValidationError):
client.register_scalable_target()
@mock_applicationautoscaling
def test_register_scalable_target_with_none_service_namespace_should_raise_param_validation_errors():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ParamValidationError):
register_scalable_target(client, ServiceNamespace=None)
@mock_applicationautoscaling
def test_describe_scalable_targets_with_invalid_scalable_dimension_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ClientError) as err:
response = client.describe_scalable_targets(
ServiceNamespace=DEFAULT_SERVICE_NAMESPACE, ScalableDimension="foo",
)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].split(":")[0].should.look_like(
"1 validation error detected"
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_applicationautoscaling
def test_describe_scalable_targets_with_invalid_service_namespace_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ClientError) as err:
response = client.describe_scalable_targets(
ServiceNamespace="foo", ScalableDimension=DEFAULT_SCALABLE_DIMENSION,
)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].split(":")[0].should.look_like(
"1 validation error detected"
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_applicationautoscaling
def test_describe_scalable_targets_with_multiple_invalid_parameters_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
with assert_raises(ClientError) as err:
response = client.describe_scalable_targets(
ServiceNamespace="foo", ScalableDimension="bar",
)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].split(":")[0].should.look_like(
"2 validation errors detected"
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_ecs
@mock_applicationautoscaling
def test_register_scalable_target_ecs_with_non_existent_service_should_return_validation_exception():
client = boto3.client("application-autoscaling", region_name=DEFAULT_REGION)
resource_id = "service/{}/foo".format(DEFAULT_ECS_CLUSTER)
with assert_raises(ClientError) as err:
register_scalable_target(client, ServiceNamespace="ecs", ResourceId=resource_id)
err.response["Error"]["Code"].should.equal("ValidationException")
err.response["Error"]["Message"].should.equal(
"ECS service doesn't exist: {}".format(resource_id)
)
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@parameterized(
[
("ecs", "service/default/test-svc", "ecs:service:DesiredCount", True),
("ecs", "banana/default/test-svc", "ecs:service:DesiredCount", False),
("rds", "service/default/test-svc", "ecs:service:DesiredCount", False),
]
)
def test_target_params_are_valid_success(namespace, r_id, dimension, expected):
if expected is True:
models._target_params_are_valid(namespace, r_id, dimension).should.equal(
expected
)
else:
with assert_raises(AWSValidationException):
models._target_params_are_valid(namespace, r_id, dimension)
# TODO add a test for not-supplied MinCapacity or MaxCapacity (ValidationException)

View File

@ -1243,6 +1243,38 @@ def test_change_password():
result["AuthenticationResult"].should_not.be.none
@mock_cognitoidp
def test_change_password__using_custom_user_agent_header():
# https://github.com/spulec/moto/issues/3098
# As the admin_initiate_auth-method is unauthenticated, we use the user-agent header to pass in the region
# This test verifies this works, even if we pass in our own user-agent header
from botocore.config import Config
my_config = Config(user_agent_extra="more/info", signature_version="v4")
conn = boto3.client("cognito-idp", "us-west-2", config=my_config)
outputs = authentication_flow(conn)
# Take this opportunity to test change_password, which requires an access token.
newer_password = str(uuid.uuid4())
conn.change_password(
AccessToken=outputs["access_token"],
PreviousPassword=outputs["password"],
ProposedPassword=newer_password,
)
# Log in again, which should succeed without a challenge because the user is no
# longer in the force-new-password state.
result = conn.admin_initiate_auth(
UserPoolId=outputs["user_pool_id"],
ClientId=outputs["client_id"],
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password},
)
result["AuthenticationResult"].should_not.be.none
@mock_cognitoidp
def test_forgot_password():
conn = boto3.client("cognito-idp", "us-west-2")