Merge pull request #2852 from bblommers/feature/mock_eb

Add mock for ElasticBean
This commit is contained in:
Steve Pulec 2020-04-25 18:42:25 -05:00 committed by GitHub
commit ac6c550fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1735 additions and 42 deletions

View File

@ -2878,15 +2878,15 @@
- [ ] test_failover
## elasticbeanstalk
0% implemented
13% implemented
- [ ] abort_environment_update
- [ ] apply_environment_managed_action
- [ ] check_dns_availability
- [ ] compose_environments
- [ ] create_application
- [X] create_application
- [ ] create_application_version
- [ ] create_configuration_template
- [ ] create_environment
- [X] create_environment
- [ ] create_platform_version
- [ ] create_storage_location
- [ ] delete_application
@ -2903,13 +2903,13 @@
- [ ] describe_environment_managed_action_history
- [ ] describe_environment_managed_actions
- [ ] describe_environment_resources
- [ ] describe_environments
- [X] describe_environments
- [ ] describe_events
- [ ] describe_instances_health
- [ ] describe_platform_version
- [ ] list_available_solution_stacks
- [X] list_available_solution_stacks
- [ ] list_platform_versions
- [ ] list_tags_for_resource
- [X] list_tags_for_resource
- [ ] rebuild_environment
- [ ] request_environment_info
- [ ] restart_app_server
@ -2921,7 +2921,7 @@
- [ ] update_application_version
- [ ] update_configuration_template
- [ ] update_environment
- [ ] update_tags_for_resource
- [X] update_tags_for_resource
- [ ] validate_configuration_settings
## elastictranscoder

View File

@ -21,6 +21,7 @@ from .datasync import mock_datasync # noqa
from .dynamodb import mock_dynamodb, mock_dynamodb_deprecated # noqa
from .dynamodb2 import mock_dynamodb2, mock_dynamodb2_deprecated # noqa
from .dynamodbstreams import mock_dynamodbstreams # noqa
from .elasticbeanstalk import mock_elasticbeanstalk # noqa
from .ec2 import mock_ec2, mock_ec2_deprecated # noqa
from .ec2_instance_connect import mock_ec2_instance_connect # noqa
from .ecr import mock_ecr, mock_ecr_deprecated # noqa

View File

@ -23,6 +23,7 @@ from moto.ec2 import ec2_backends
from moto.ec2_instance_connect import ec2_instance_connect_backends
from moto.ecr import ecr_backends
from moto.ecs import ecs_backends
from moto.elasticbeanstalk import eb_backends
from moto.elb import elb_backends
from moto.elbv2 import elbv2_backends
from moto.emr import emr_backends
@ -77,6 +78,7 @@ BACKENDS = {
"ec2_instance_connect": ec2_instance_connect_backends,
"ecr": ecr_backends,
"ecs": ecs_backends,
"elasticbeanstalk": eb_backends,
"elb": elb_backends,
"elbv2": elbv2_backends,
"events": events_backends,

View File

@ -328,3 +328,25 @@ def py2_strip_unicode_keys(blob):
blob = new_set
return blob
def tags_from_query_string(
querystring_dict, prefix="Tag", key_suffix="Key", value_suffix="Value"
):
response_values = {}
for key, value in querystring_dict.items():
if key.startswith(prefix) and key.endswith(key_suffix):
tag_index = key.replace(prefix + ".", "").replace("." + key_suffix, "")
tag_key = querystring_dict.get(
"{prefix}.{index}.{key_suffix}".format(
prefix=prefix, index=tag_index, key_suffix=key_suffix,
)
)[0]
tag_value_key = "{prefix}.{index}.{value_suffix}".format(
prefix=prefix, index=tag_index, value_suffix=value_suffix,
)
if tag_value_key in querystring_dict:
response_values[tag_key] = querystring_dict.get(tag_value_key)[0]
else:
response_values[tag_key] = None
return response_values

View File

@ -2,7 +2,8 @@ from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.models import validate_resource_ids
from moto.ec2.utils import tags_from_query_string, filters_from_querystring
from moto.ec2.utils import filters_from_querystring
from moto.core.utils import tags_from_query_string
class TagResponse(BaseResponse):

View File

@ -196,22 +196,6 @@ def split_route_id(route_id):
return values[0], values[1]
def tags_from_query_string(querystring_dict):
prefix = "Tag"
suffix = "Key"
response_values = {}
for key, value in querystring_dict.items():
if key.startswith(prefix) and key.endswith(suffix):
tag_index = key.replace(prefix + ".", "").replace("." + suffix, "")
tag_key = querystring_dict.get("Tag.{0}.Key".format(tag_index))[0]
tag_value_key = "Tag.{0}.Value".format(tag_index)
if tag_value_key in querystring_dict:
response_values[tag_key] = querystring_dict.get(tag_value_key)[0]
else:
response_values[tag_key] = None
return response_values
def dhcp_configuration_from_querystring(querystring, option="DhcpConfiguration"):
"""
turn:

View File

@ -0,0 +1,4 @@
from .models import eb_backends
from moto.core.models import base_decorator
mock_elasticbeanstalk = base_decorator(eb_backends)

View File

@ -0,0 +1,15 @@
from moto.core.exceptions import RESTError
class InvalidParameterValueError(RESTError):
def __init__(self, message):
super(InvalidParameterValueError, self).__init__(
"InvalidParameterValue", message
)
class ResourceNotFoundException(RESTError):
def __init__(self, message):
super(ResourceNotFoundException, self).__init__(
"ResourceNotFoundException", message
)

View File

@ -0,0 +1,152 @@
import weakref
from boto3 import Session
from moto.core import BaseBackend, BaseModel
from .exceptions import InvalidParameterValueError, ResourceNotFoundException
class FakeEnvironment(BaseModel):
def __init__(
self, application, environment_name, solution_stack_name, tags,
):
self.application = weakref.proxy(
application
) # weakref to break circular dependencies
self.environment_name = environment_name
self.solution_stack_name = solution_stack_name
self.tags = tags
@property
def application_name(self):
return self.application.application_name
@property
def environment_arn(self):
return (
"arn:aws:elasticbeanstalk:{region}:{account_id}:"
"environment/{application_name}/{environment_name}".format(
region=self.region,
account_id="123456789012",
application_name=self.application_name,
environment_name=self.environment_name,
)
)
@property
def platform_arn(self):
return "TODO" # TODO
@property
def region(self):
return self.application.region
class FakeApplication(BaseModel):
def __init__(self, backend, application_name):
self.backend = weakref.proxy(backend) # weakref to break cycles
self.application_name = application_name
self.environments = dict()
def create_environment(
self, environment_name, solution_stack_name, tags,
):
if environment_name in self.environments:
raise InvalidParameterValueError
env = FakeEnvironment(
application=self,
environment_name=environment_name,
solution_stack_name=solution_stack_name,
tags=tags,
)
self.environments[environment_name] = env
return env
@property
def region(self):
return self.backend.region
class EBBackend(BaseBackend):
def __init__(self, region):
self.region = region
self.applications = dict()
def reset(self):
# preserve region
region = self.region
self._reset_model_refs()
self.__dict__ = {}
self.__init__(region)
def create_application(self, application_name):
if application_name in self.applications:
raise InvalidParameterValueError(
"Application {} already exists.".format(application_name)
)
new_app = FakeApplication(backend=self, application_name=application_name,)
self.applications[application_name] = new_app
return new_app
def create_environment(self, app, environment_name, stack_name, tags):
return app.create_environment(
environment_name=environment_name,
solution_stack_name=stack_name,
tags=tags,
)
def describe_environments(self):
envs = []
for app in self.applications.values():
for env in app.environments.values():
envs.append(env)
return envs
def list_available_solution_stacks(self):
# Implemented in response.py
pass
def update_tags_for_resource(self, resource_arn, tags_to_add, tags_to_remove):
try:
res = self._find_environment_by_arn(resource_arn)
except KeyError:
raise ResourceNotFoundException(
"Resource not found for ARN '{}'.".format(resource_arn)
)
for key, value in tags_to_add.items():
res.tags[key] = value
for key in tags_to_remove:
del res.tags[key]
def list_tags_for_resource(self, resource_arn):
try:
res = self._find_environment_by_arn(resource_arn)
except KeyError:
raise ResourceNotFoundException(
"Resource not found for ARN '{}'.".format(resource_arn)
)
return res.tags
def _find_environment_by_arn(self, arn):
for app in self.applications.keys():
for env in self.applications[app].environments.values():
if env.environment_arn == arn:
return env
raise KeyError()
eb_backends = {}
for region in Session().get_available_regions("elasticbeanstalk"):
eb_backends[region] = EBBackend(region)
for region in Session().get_available_regions(
"elasticbeanstalk", partition_name="aws-us-gov"
):
eb_backends[region] = EBBackend(region)
for region in Session().get_available_regions(
"elasticbeanstalk", partition_name="aws-cn"
):
eb_backends[region] = EBBackend(region)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,11 @@
from __future__ import unicode_literals
from .responses import EBResponse
url_bases = [
r"https?://elasticbeanstalk.(?P<region>[a-zA-Z0-9\-_]+).amazonaws.com",
]
url_paths = {
"{0}/$": EBResponse.dispatch,
}

View File

@ -10,9 +10,10 @@ from six.moves.urllib.parse import urlparse
from moto.core.responses import AWSServiceSpec
from moto.core.responses import BaseResponse
from moto.core.responses import xml_to_json_response
from moto.core.utils import tags_from_query_string
from .exceptions import EmrError
from .models import emr_backends
from .utils import steps_from_query_string, tags_from_query_string
from .utils import steps_from_query_string
def generate_boto3_response(operation):
@ -91,7 +92,7 @@ class ElasticMapReduceResponse(BaseResponse):
@generate_boto3_response("AddTags")
def add_tags(self):
cluster_id = self._get_param("ResourceId")
tags = tags_from_query_string(self.querystring)
tags = tags_from_query_string(self.querystring, prefix="Tags")
self.backend.add_tags(cluster_id, tags)
template = self.response_template(ADD_TAGS_TEMPLATE)
return template.render()

View File

@ -22,22 +22,6 @@ def random_instance_group_id(size=13):
return "i-{0}".format(random_id())
def tags_from_query_string(querystring_dict):
prefix = "Tags"
suffix = "Key"
response_values = {}
for key, value in querystring_dict.items():
if key.startswith(prefix) and key.endswith(suffix):
tag_index = key.replace(prefix + ".", "").replace("." + suffix, "")
tag_key = querystring_dict.get("Tags.{0}.Key".format(tag_index))[0]
tag_value_key = "Tags.{0}.Value".format(tag_index)
if tag_value_key in querystring_dict:
response_values[tag_key] = querystring_dict.get(tag_value_key)[0]
else:
response_values[tag_key] = None
return response_values
def steps_from_query_string(querystring_dict):
steps = []
for step in querystring_dict:

130
tests/test_eb/test_eb.py Normal file
View File

@ -0,0 +1,130 @@
import boto3
import sure # noqa
from botocore.exceptions import ClientError
from moto import mock_elasticbeanstalk
@mock_elasticbeanstalk
def test_create_application():
# Create Elastic Beanstalk Application
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
app = conn.create_application(ApplicationName="myapp",)
app["Application"]["ApplicationName"].should.equal("myapp")
@mock_elasticbeanstalk
def test_create_application_dup():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
conn.create_application.when.called_with(ApplicationName="myapp",).should.throw(
ClientError
)
@mock_elasticbeanstalk
def test_describe_applications():
# Create Elastic Beanstalk Application
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
apps = conn.describe_applications()
len(apps["Applications"]).should.equal(1)
apps["Applications"][0]["ApplicationName"].should.equal("myapp")
@mock_elasticbeanstalk
def test_create_environment():
# Create Elastic Beanstalk Environment
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
app = conn.create_application(ApplicationName="myapp",)
env = conn.create_environment(ApplicationName="myapp", EnvironmentName="myenv",)
env["EnvironmentName"].should.equal("myenv")
@mock_elasticbeanstalk
def test_describe_environments():
# List Elastic Beanstalk Envs
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
conn.create_environment(
ApplicationName="myapp", EnvironmentName="myenv",
)
envs = conn.describe_environments()
envs = envs["Environments"]
len(envs).should.equal(1)
envs[0]["ApplicationName"].should.equal("myapp")
envs[0]["EnvironmentName"].should.equal("myenv")
def tags_dict_to_list(tag_dict):
tag_list = []
for key, value in tag_dict.items():
tag_list.append({"Key": key, "Value": value})
return tag_list
def tags_list_to_dict(tag_list):
tag_dict = {}
for tag in tag_list:
tag_dict[tag["Key"]] = tag["Value"]
return tag_dict
@mock_elasticbeanstalk
def test_create_environment_tags():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
env_tags = {"initial key": "initial value"}
env = conn.create_environment(
ApplicationName="myapp",
EnvironmentName="myenv",
Tags=tags_dict_to_list(env_tags),
)
tags = conn.list_tags_for_resource(ResourceArn=env["EnvironmentArn"],)
tags["ResourceArn"].should.equal(env["EnvironmentArn"])
tags_list_to_dict(tags["ResourceTags"]).should.equal(env_tags)
@mock_elasticbeanstalk
def test_update_tags():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
env_tags = {
"initial key": "initial value",
"to remove": "delete me",
"to update": "original",
}
env = conn.create_environment(
ApplicationName="myapp",
EnvironmentName="myenv",
Tags=tags_dict_to_list(env_tags),
)
extra_env_tags = {
"to update": "new",
"extra key": "extra value",
}
conn.update_tags_for_resource(
ResourceArn=env["EnvironmentArn"],
TagsToAdd=tags_dict_to_list(extra_env_tags),
TagsToRemove=["to remove"],
)
total_env_tags = env_tags.copy()
total_env_tags.update(extra_env_tags)
del total_env_tags["to remove"]
tags = conn.list_tags_for_resource(ResourceArn=env["EnvironmentArn"],)
tags["ResourceArn"].should.equal(env["EnvironmentArn"])
tags_list_to_dict(tags["ResourceTags"]).should.equal(total_env_tags)
@mock_elasticbeanstalk
def test_list_available_solution_stacks():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
stacks = conn.list_available_solution_stacks()
len(stacks["SolutionStacks"]).should.be.greater_than(0)
len(stacks["SolutionStacks"]).should.be.equal(len(stacks["SolutionStackDetails"]))