diff --git a/moto/compat.py b/moto/compat.py index a92a5f67b..2dd2d879e 100644 --- a/moto/compat.py +++ b/moto/compat.py @@ -3,3 +3,8 @@ try: except ImportError: # python 2.6 or earlier, use backport from ordereddict import OrderedDict # flake8: noqa + +try: + from urlparse import urlparse # flake8: noqa +except ImportError: + from urllib.parse import urlparse # flake8: noqa diff --git a/moto/core/responses.py b/moto/core/responses.py index 09ced7a2c..6699f9076 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -8,7 +8,10 @@ from jinja2 import Environment, DictLoader, TemplateNotFound import six from six.moves.urllib.parse import parse_qs, urlparse +import xmltodict +from pkg_resources import resource_filename from werkzeug.exceptions import HTTPException +from moto.compat import OrderedDict from moto.core.utils import camelcase_to_underscores, method_names_from_class @@ -90,6 +93,7 @@ class BaseResponse(_TemplateEnvironmentMixin): default_region = 'us-east-1' region_regex = r'\.(.+?)\.amazonaws\.com' + aws_service_spec = None @classmethod def dispatch(cls, *args, **kwargs): @@ -115,7 +119,20 @@ class BaseResponse(_TemplateEnvironmentMixin): if not querystring: querystring.update(parse_qs(urlparse(full_url).query, keep_blank_values=True)) if not querystring: - querystring.update(parse_qs(self.body, keep_blank_values=True)) + if 'json' in request.headers.get('content-type', []) and self.aws_service_spec: + if isinstance(self.body, six.binary_type): + decoded = json.loads(self.body.decode('utf-8')) + else: + decoded = json.loads(self.body) + + target = request.headers.get('x-amz-target') or request.headers.get('X-Amz-Target') + service, method = target.split('.') + input_spec = self.aws_service_spec.input_spec(method) + flat = flatten_json_request_body('', decoded, input_spec) + for key, value in flat.items(): + querystring[key] = [value] + else: + querystring.update(parse_qs(self.body, keep_blank_values=True)) if not querystring: querystring.update(headers) @@ -125,15 +142,19 @@ class BaseResponse(_TemplateEnvironmentMixin): self.path = urlparse(full_url).path self.querystring = querystring self.method = request.method - region = re.search(self.region_regex, full_url) - if region: - self.region = region.group(1) - else: - self.region = self.default_region + self.region = self.get_region_from_url(full_url) self.headers = request.headers self.response_headers = headers + def get_region_from_url(self, full_url): + match = re.search(self.region_regex, full_url) + if match: + region = match.group(1) + else: + region = self.default_region + return region + def _dispatch(self, request, full_url, headers): self.setup_class(request, full_url, headers) return self.call_action() @@ -164,21 +185,26 @@ class BaseResponse(_TemplateEnvironmentMixin): return status, headers, body raise NotImplementedError("The {0} action has not been implemented".format(action)) - def _get_param(self, param_name): - return self.querystring.get(param_name, [None])[0] + def _get_param(self, param_name, if_none=None): + val = self.querystring.get(param_name) + if val is not None: + return val[0] + return if_none - def _get_int_param(self, param_name): + def _get_int_param(self, param_name, if_none=None): val = self._get_param(param_name) if val is not None: return int(val) + return if_none - def _get_bool_param(self, param_name): + def _get_bool_param(self, param_name, if_none=None): val = self._get_param(param_name) if val is not None: if val.lower() == 'true': return True elif val.lower() == 'false': return False + return if_none def _get_multi_param(self, param_prefix): """ @@ -257,6 +283,28 @@ class BaseResponse(_TemplateEnvironmentMixin): param_index += 1 return results + def _get_map_prefix(self, param_prefix): + results = {} + param_index = 1 + while 1: + index_prefix = '{0}.{1}.'.format(param_prefix, param_index) + + k, v = None, None + for key, value in self.querystring.items(): + if key.startswith(index_prefix): + if key.endswith('.key'): + k = value[0] + elif key.endswith('.value'): + v = value[0] + + if not (k and v): + break + + results[k] = v + param_index += 1 + + return results + @property def request_json(self): return 'JSON' in self.querystring.get('ContentType', []) @@ -299,3 +347,227 @@ def metadata_response(request, full_url, headers): else: raise NotImplementedError("The {0} metadata path has not been implemented".format(path)) return 200, headers, result + + +class _RecursiveDictRef(object): + """Store a recursive reference to dict.""" + def __init__(self): + self.key = None + self.dic = {} + + def __repr__(self): + return '{!r}'.format(self.dic) + + def __getattr__(self, key): + return self.dic.__getattr__(key) + + def set_reference(self, key, dic): + """Set the RecursiveDictRef object to keep reference to dict object + (dic) at the key. + + """ + self.key = key + self.dic = dic + + +class AWSServiceSpec(object): + """Parse data model from botocore. This is used to recover type info + for fields in AWS API XML response. + + """ + + def __init__(self, path): + self.path = resource_filename('botocore', path) + with open(self.path) as f: + spec = json.load(f) + self.metadata = spec['metadata'] + self.operations = spec['operations'] + self.shapes = spec['shapes'] + + def input_spec(self, operation): + try: + op = self.operations[operation] + except KeyError: + raise ValueError('Invalid operation: {}'.format(operation)) + if 'input' not in op: + return {} + shape = self.shapes[op['input']['shape']] + return self._expand(shape) + + def output_spec(self, operation): + """Produce a JSON with a valid API response syntax for operation, but + with type information. Each node represented by a key has the + value containing field type, e.g., + + output_spec["SomeBooleanNode"] => {"type": "boolean"} + + """ + try: + op = self.operations[operation] + except KeyError: + raise ValueError('Invalid operation: {}'.format(operation)) + if 'output' not in op: + return {} + shape = self.shapes[op['output']['shape']] + return self._expand(shape) + + def _expand(self, shape): + def expand(dic, seen=None): + seen = seen or {} + if dic['type'] == 'structure': + nodes = {} + for k, v in dic['members'].items(): + seen_till_here = dict(seen) + if k in seen_till_here: + nodes[k] = seen_till_here[k] + continue + seen_till_here[k] = _RecursiveDictRef() + nodes[k] = expand(self.shapes[v['shape']], seen_till_here) + seen_till_here[k].set_reference(k, nodes[k]) + nodes['type'] = 'structure' + return nodes + + elif dic['type'] == 'list': + seen_till_here = dict(seen) + shape = dic['member']['shape'] + if shape in seen_till_here: + return seen_till_here[shape] + seen_till_here[shape] = _RecursiveDictRef() + expanded = expand(self.shapes[shape], seen_till_here) + seen_till_here[shape].set_reference(shape, expanded) + return {'type': 'list', 'member': expanded} + + elif dic['type'] == 'map': + seen_till_here = dict(seen) + node = {'type': 'map'} + + if 'shape' in dic['key']: + shape = dic['key']['shape'] + seen_till_here[shape] = _RecursiveDictRef() + node['key'] = expand(self.shapes[shape], seen_till_here) + seen_till_here[shape].set_reference(shape, node['key']) + else: + node['key'] = dic['key']['type'] + + if 'shape' in dic['value']: + shape = dic['value']['shape'] + seen_till_here[shape] = _RecursiveDictRef() + node['value'] = expand(self.shapes[shape], seen_till_here) + seen_till_here[shape].set_reference(shape, node['value']) + else: + node['value'] = dic['value']['type'] + + return node + + else: + return {'type': dic['type']} + + return expand(shape) + + +def to_str(value, spec): + vtype = spec['type'] + if vtype == 'boolean': + return 'true' if value else 'false' + elif vtype == 'integer': + return str(value) + elif vtype == 'string': + return str(value) + elif value is None: + return 'null' + else: + raise TypeError('Unknown type {}'.format(vtype)) + + +def from_str(value, spec): + vtype = spec['type'] + if vtype == 'boolean': + return True if value == 'true' else False + elif vtype == 'integer': + return int(value) + elif vtype == 'float': + return float(value) + elif vtype == 'timestamp': + return value + elif vtype == 'string': + return value + raise TypeError('Unknown type {}'.format(vtype)) + + +def flatten_json_request_body(prefix, dict_body, spec): + """Convert a JSON request body into query params.""" + if len(spec) == 1 and 'type' in spec: + return {prefix: to_str(dict_body, spec)} + + flat = {} + for key, value in dict_body.items(): + node_type = spec[key]['type'] + if node_type == 'list': + for idx, v in enumerate(value, 1): + pref = key + '.member.' + str(idx) + flat.update(flatten_json_request_body(pref, v, spec[key]['member'])) + elif node_type == 'map': + for idx, (k, v) in enumerate(value.items(), 1): + pref = key + '.entry.' + str(idx) + flat.update(flatten_json_request_body(pref + '.key', k, spec[key]['key'])) + flat.update(flatten_json_request_body(pref + '.value', v, spec[key]['value'])) + else: + flat.update(flatten_json_request_body(key, value, spec[key])) + + if prefix: + prefix = prefix + '.' + return dict((prefix + k, v) for k, v in flat.items()) + + +def xml_to_json_response(service_spec, operation, xml, result_node=None): + """Convert rendered XML response to JSON for use with boto3.""" + + def transform(value, spec): + """Apply transformations to make the output JSON comply with the + expected form. This function applies: + + (1) Type cast to nodes with "type" property (e.g., 'true' to + True). XML field values are all in text so this step is + necessary to convert it to valid JSON objects. + + (2) Squashes "member" nodes to lists. + + """ + if len(spec) == 1: + return from_str(value, spec) + + od = OrderedDict() + for k, v in value.items(): + if k.startswith('@') or v is None: + continue + + if spec[k]['type'] == 'list': + if len(spec[k]['member']) == 1: + if isinstance(v['member'], list): + od[k] = transform(v['member'], spec[k]['member']) + else: + od[k] = [transform(v['member'], spec[k]['member'])] + elif isinstance(v['member'], list): + od[k] = [transform(o, spec[k]['member']) for o in v['member']] + elif isinstance(v['member'], OrderedDict): + od[k] = [transform(v['member'], spec[k]['member'])] + else: + raise ValueError('Malformatted input') + elif spec[k]['type'] == 'map': + key = from_str(v['entry']['key'], spec[k]['key']) + val = from_str(v['entry']['value'], spec[k]['value']) + od[k] = {key: val} + else: + od[k] = transform(v, spec[k]) + return od + + dic = xmltodict.parse(xml) + output_spec = service_spec.output_spec(operation) + try: + for k in (result_node or (operation + 'Response', operation + 'Result')): + dic = dic[k] + except KeyError: + return None + else: + return transform(dic, output_spec) + return None diff --git a/moto/emr/models.py b/moto/emr/models.py index 353c968ec..acc573698 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -1,47 +1,227 @@ from __future__ import unicode_literals +from datetime import datetime import boto.emr +import pytz from moto.core import BaseBackend -from .utils import random_instance_group_id, random_job_id +from .utils import random_instance_group_id, random_cluster_id, random_step_id -DEFAULT_JOB_FLOW_ROLE = 'EMRJobflowDefault' + +class FakeApplication(object): + def __init__(self, name, version, args=None, additional_info=None): + self.additional_info = additional_info or {} + self.args = args or [] + self.name = name + self.version = version + + +class FakeBootstrapAction(object): + def __init__(self, args, name, script_path): + self.args = args or [] + self.name = name + self.script_path = script_path class FakeInstanceGroup(object): - def __init__(self, id, instance_count, instance_role, instance_type, market, name, bid_price=None): - self.id = id + def __init__(self, instance_count, instance_role, instance_type, market, name, + id=None, bid_price=None): + self.id = id or random_instance_group_id() + + self.bid_price = bid_price + self.market = market + self.name = name self.num_instances = instance_count self.role = instance_role self.type = instance_type - self.market = market - self.name = name - self.bid_price = bid_price + + self.creation_datetime = datetime.now(pytz.utc) + self.start_datetime = datetime.now(pytz.utc) + self.ready_datetime = datetime.now(pytz.utc) + self.end_datetime = None + self.state = 'RUNNING' def set_instance_count(self, instance_count): self.num_instances = instance_count -class Cluster(object): - def __init__(self, id, name, availability_zone, ec2_key_name, subnet_id, - ec2_iam_profile, log_uri): - self.id = id +class FakeStep(object): + def __init__(self, + state, + name='', + jar='', + args=None, + properties=None, + action_on_failure='TERMINATE_CLUSTER'): + self.id = random_step_id() + + self.action_on_failure = action_on_failure + self.args = args or [] self.name = name + self.jar = jar + self.properties = properties or {} + + self.creation_datetime = datetime.now(pytz.utc) + self.end_datetime = None + self.ready_datetime = None + self.start_datetime = None + self.state = state + + +class FakeCluster(object): + def __init__(self, + emr_backend, + name, + log_uri, + job_flow_role, + service_role, + steps, + instance_attrs, + bootstrap_actions=None, + configurations=None, + cluster_id=None, + visible_to_all_users='false', + release_label=None, + requested_ami_version=None, + running_ami_version=None): + self.id = cluster_id or random_cluster_id() + emr_backend.clusters[self.id] = self + self.emr_backend = emr_backend + self.applications = [] - self.auto_terminate = "false" - self.availability_zone = availability_zone - self.subnet_id = subnet_id - self.ec2_iam_profile = ec2_iam_profile - self.log_uri = log_uri - self.master_public_dns_name = "" - self.normalized_instance_hours = 0 - self.requested_ami_version = "2.4.2" - self.running_ami_version = "2.4.2" - self.service_role = "my-service-role" - self.state = "RUNNING" + + self.bootstrap_actions = [] + for bootstrap_action in (bootstrap_actions or []): + self.add_bootstrap_action(bootstrap_action) + + self.configurations = configurations or [] + self.tags = {} - self.termination_protected = "false" - self.visible_to_all_users = "false" + + self.log_uri = log_uri + self.name = name + self.normalized_instance_hours = 0 + + self.steps = [] + self.add_steps(steps) + + self.set_visibility(visible_to_all_users) + + self.instance_group_ids = [] + self.master_instance_group_id = None + self.core_instance_group_id = None + if 'master_instance_type' in instance_attrs and instance_attrs['master_instance_type']: + self.emr_backend.add_instance_groups( + self.id, + [{'instance_count': 1, + 'instance_role': 'MASTER', + 'instance_type': instance_attrs['master_instance_type'], + 'market': 'ON_DEMAND', + 'name': 'master'}]) + if 'slave_instance_type' in instance_attrs and instance_attrs['slave_instance_type']: + self.emr_backend.add_instance_groups( + self.id, + [{'instance_count': instance_attrs['instance_count'] - 1, + 'instance_role': 'CORE', + 'instance_type': instance_attrs['slave_instance_type'], + 'market': 'ON_DEMAND', + 'name': 'slave'}]) + self.additional_master_security_groups = instance_attrs.get('additional_master_security_groups') + self.additional_slave_security_groups = instance_attrs.get('additional_slave_security_groups') + self.availability_zone = instance_attrs.get('availability_zone') + self.ec2_key_name = instance_attrs.get('ec2_key_name') + self.ec2_subnet_id = instance_attrs.get('ec2_subnet_id') + self.hadoop_version = instance_attrs.get('hadoop_version') + self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps') + self.master_security_group = instance_attrs.get('emr_managed_master_security_group') + self.service_access_security_group = instance_attrs.get('service_access_security_group') + self.slave_security_group = instance_attrs.get('emr_managed_slave_security_group') + self.termination_protected = instance_attrs.get('termination_protected') + + self.release_label = release_label + self.requested_ami_version = requested_ami_version + self.running_ami_version = running_ami_version + + self.role = job_flow_role or 'EMRJobflowDefault' + self.service_role = service_role + + self.creation_datetime = datetime.now(pytz.utc) + self.start_datetime = None + self.ready_datetime = None + self.end_datetime = None + self.state = None + + self.start_cluster() + self.run_bootstrap_actions() + + @property + def instance_groups(self): + return self.emr_backend.get_instance_groups(self.instance_group_ids) + + @property + def master_instance_type(self): + return self.emr_backend.instance_groups[self.master_instance_group_id].type + + @property + def slave_instance_type(self): + return self.emr_backend.instance_groups[self.core_instance_group_id].type + + @property + def instance_count(self): + return sum(group.num_instances for group in self.instance_groups) + + def start_cluster(self): + self.state = 'STARTING' + self.start_datetime = datetime.now(pytz.utc) + + def run_bootstrap_actions(self): + self.state = 'BOOTSTRAPPING' + self.ready_datetime = datetime.now(pytz.utc) + self.state = 'WAITING' + if not self.steps: + if not self.keep_job_flow_alive_when_no_steps: + self.terminate() + + def terminate(self): + self.state = 'TERMINATING' + self.end_datetime = datetime.now(pytz.utc) + self.state = 'TERMINATED' + + def add_applications(self, applications): + self.applications.extend([ + FakeApplication( + name=app.get('name', ''), + version=app.get('version', ''), + args=app.get('args', []), + additional_info=app.get('additiona_info', {})) + for app in applications]) + + def add_bootstrap_action(self, bootstrap_action): + self.bootstrap_actions.append(FakeBootstrapAction(**bootstrap_action)) + + def add_instance_group(self, instance_group): + if instance_group.role == 'MASTER': + if self.master_instance_group_id: + raise Exception('Cannot add another master instance group') + self.master_instance_group_id = instance_group.id + if instance_group.role == 'CORE': + if self.core_instance_group_id: + raise Exception('Cannot add another core instance group') + self.core_instance_group_id = instance_group.id + self.instance_group_ids.append(instance_group.id) + + def add_steps(self, steps): + added_steps = [] + for step in steps: + if self.steps: + # If we already have other steps, this one is pending + fake = FakeStep(state='PENDING', **step) + else: + fake = FakeStep(state='STARTING', **step) + self.steps.append(fake) + added_steps.append(fake) + self.state = 'RUNNING' + return added_steps def add_tags(self, tags): self.tags.update(tags) @@ -50,166 +230,61 @@ class Cluster(object): for key in tag_keys: self.tags.pop(key, None) - -class FakeStep(object): - def __init__(self, state, **kwargs): - # 'Steps.member.1.HadoopJarStep.Jar': ['/home/hadoop/contrib/streaming/hadoop-streaming.jar'], - # 'Steps.member.1.HadoopJarStep.Args.member.1': ['-mapper'], - # 'Steps.member.1.HadoopJarStep.Args.member.2': ['s3n://elasticmapreduce/samples/wordcount/wordSplitter.py'], - # 'Steps.member.1.HadoopJarStep.Args.member.3': ['-reducer'], - # 'Steps.member.1.HadoopJarStep.Args.member.4': ['aggregate'], - # 'Steps.member.1.HadoopJarStep.Args.member.5': ['-input'], - # 'Steps.member.1.HadoopJarStep.Args.member.6': ['s3n://elasticmapreduce/samples/wordcount/input'], - # 'Steps.member.1.HadoopJarStep.Args.member.7': ['-output'], - # 'Steps.member.1.HadoopJarStep.Args.member.8': ['s3n:///output/wordcount_output'], - # 'Steps.member.1.ActionOnFailure': ['TERMINATE_JOB_FLOW'], - # 'Steps.member.1.Name': ['My wordcount example']} - - self.action_on_failure = kwargs['action_on_failure'] - self.name = kwargs['name'] - self.jar = kwargs['hadoop_jar_step._jar'] - self.args = [] - self.state = state - - arg_index = 1 - while True: - arg = kwargs.get('hadoop_jar_step._args.member.{0}'.format(arg_index)) - if arg: - self.args.append(arg) - arg_index += 1 - else: - break - - -class FakeJobFlow(object): - def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, emr_backend): - self.id = job_id - self.name = name - self.log_uri = log_uri - self.role = job_flow_role or DEFAULT_JOB_FLOW_ROLE - self.state = "STARTING" - self.steps = [] - self.add_steps(steps) - - self.initial_instance_count = instance_attrs.get('instance_count', 0) - self.initial_master_instance_type = instance_attrs.get('master_instance_type') - self.initial_slave_instance_type = instance_attrs.get('slave_instance_type') - - self.set_visibility(visible_to_all_users) - self.normalized_instance_hours = 0 - self.ec2_key_name = instance_attrs.get('ec2_key_name') - self.availability_zone = instance_attrs.get('placement.availability_zone') - self.subnet_id = instance_attrs.get('ec2_subnet_id') - self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps') - self.termination_protected = instance_attrs.get('termination_protected') - - self.instance_group_ids = [] - - self.emr_backend = emr_backend - - def create_cluster(self): - cluster = Cluster( - id=self.id, - name=self.name, - availability_zone=self.availability_zone, - ec2_key_name=self.ec2_key_name, - subnet_id=self.subnet_id, - ec2_iam_profile=self.role, - log_uri=self.log_uri, - ) - return cluster - - def terminate(self): - self.state = 'TERMINATED' - - def set_visibility(self, visibility): - if visibility == 'true': - self.visible_to_all_users = True - else: - self.visible_to_all_users = False - def set_termination_protection(self, value): self.termination_protected = value - def add_steps(self, steps): - for index, step in enumerate(steps): - if self.steps: - # If we already have other steps, this one is pending - self.steps.append(FakeStep(state='PENDING', **step)) - else: - self.steps.append(FakeStep(state='STARTING', **step)) - - def add_instance_group(self, instance_group_id): - self.instance_group_ids.append(instance_group_id) - - @property - def instance_groups(self): - return self.emr_backend.get_instance_groups(self.instance_group_ids) - - @property - def master_instance_type(self): - groups = self.instance_groups - if groups: - return groups[0].type - else: - return self.initial_master_instance_type - - @property - def slave_instance_type(self): - groups = self.instance_groups - if groups: - return groups[0].type - else: - return self.initial_slave_instance_type - - @property - def instance_count(self): - groups = self.instance_groups - if not groups: - # No groups,return initial instance count - return self.initial_instance_count - count = 0 - for group in groups: - count += int(group.num_instances) - return count + def set_visibility(self, visibility): + self.visible_to_all_users = visibility class ElasticMapReduceBackend(BaseBackend): - def __init__(self): - self.job_flows = {} + def __init__(self, region_name): + super(ElasticMapReduceBackend, self).__init__() + self.region_name = region_name self.clusters = {} self.instance_groups = {} - def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs): - job_id = random_job_id() - job_flow = FakeJobFlow( - job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, self) - self.job_flows[job_id] = job_flow - cluster = job_flow.create_cluster() - self.clusters[cluster.id] = cluster - return job_flow + def reset(self): + region_name = self.region_name + self.__dict__ = {} + self.__init__(region_name) + + def add_applications(self, cluster_id, applications): + cluster = self.get_cluster(cluster_id) + cluster.add_applications(applications) + + def add_instance_groups(self, cluster_id, instance_groups): + cluster = self.clusters[cluster_id] + result_groups = [] + for instance_group in instance_groups: + group = FakeInstanceGroup(**instance_group) + self.instance_groups[group.id] = group + cluster.add_instance_group(group) + result_groups.append(group) + return result_groups def add_job_flow_steps(self, job_flow_id, steps): - job_flow = self.job_flows[job_flow_id] - job_flow.add_steps(steps) - return job_flow + cluster = self.clusters[job_flow_id] + steps = cluster.add_steps(steps) + return steps + + def add_tags(self, cluster_id, tags): + cluster = self.get_cluster(cluster_id) + cluster.add_tags(tags) def describe_job_flows(self, job_flow_ids=None): - jobs = self.job_flows.values() + clusters = self.clusters.values() if job_flow_ids: - return [job for job in jobs if job.id in job_flow_ids] + return [cluster for cluster in clusters if cluster.id in job_flow_ids] else: - return jobs + return clusters - def terminate_job_flows(self, job_ids): - flows = [flow for flow in self.describe_job_flows() if flow.id in job_ids] - for flow in flows: - flow.terminate() - return flows - - def list_clusters(self): - return self.clusters.values() + def describe_step(self, cluster_id, step_id): + cluster = self.clusters[cluster_id] + for step in cluster.steps: + if step.id == step_id: + return step def get_cluster(self, cluster_id): return self.clusters[cluster_id] @@ -221,43 +296,50 @@ class ElasticMapReduceBackend(BaseBackend): if group_id in instance_group_ids ] - def add_instance_groups(self, job_flow_id, instance_groups): - job_flow = self.job_flows[job_flow_id] - result_groups = [] - for instance_group in instance_groups: - instance_group_id = random_instance_group_id() - group = FakeInstanceGroup(instance_group_id, **instance_group) - self.instance_groups[instance_group_id] = group - job_flow.add_instance_group(instance_group_id) - result_groups.append(group) - return result_groups + def list_bootstrap_actions(self, cluster_id): + return self.clusters[cluster_id].bootstrap_actions + + def list_clusters(self): + return self.clusters.values() + + def list_instance_groups(self, cluster_id): + return self.clusters[cluster_id].instance_groups + + def list_steps(self, cluster_id, step_states=None): + return self.clusters[cluster_id].steps def modify_instance_groups(self, instance_groups): result_groups = [] for instance_group in instance_groups: group = self.instance_groups[instance_group['instance_group_id']] - group.set_instance_count(instance_group['instance_count']) + group.set_instance_count(int(instance_group['instance_count'])) return result_groups - def set_visible_to_all_users(self, job_ids, visible_to_all_users): - for job_id in job_ids: - job = self.job_flows[job_id] - job.set_visibility(visible_to_all_users) - - def set_termination_protection(self, job_ids, value): - for job_id in job_ids: - job = self.job_flows[job_id] - job.set_termination_protection(value) - - def add_tags(self, cluster_id, tags): - cluster = self.get_cluster(cluster_id) - cluster.add_tags(tags) - def remove_tags(self, cluster_id, tag_keys): cluster = self.get_cluster(cluster_id) cluster.remove_tags(tag_keys) + def run_job_flow(self, **kwargs): + return FakeCluster(self, **kwargs) + + def set_visible_to_all_users(self, job_flow_ids, visible_to_all_users): + for job_flow_id in job_flow_ids: + cluster = self.clusters[job_flow_id] + cluster.set_visibility(visible_to_all_users) + + def set_termination_protection(self, job_flow_ids, value): + for job_flow_id in job_flow_ids: + cluster = self.clusters[job_flow_id] + cluster.set_termination_protection(value) + + def terminate_job_flows(self, job_flow_ids): + clusters = [cluster for cluster in self.describe_job_flows() + if cluster.id in job_flow_ids] + for cluster in clusters: + cluster.terminate() + return clusters + emr_backends = {} for region in boto.emr.regions(): - emr_backends[region.name] = ElasticMapReduceBackend() + emr_backends[region.name] = ElasticMapReduceBackend(region.name) diff --git a/moto/emr/responses.py b/moto/emr/responses.py index dc6aeb3f0..a9b4d951b 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -1,121 +1,81 @@ from __future__ import unicode_literals import json +import re +from datetime import datetime +from functools import wraps +import pytz +from botocore.exceptions import ClientError + +from moto.compat import urlparse +from moto.core.responses import AWSServiceSpec from moto.core.responses import BaseResponse +from moto.core.responses import xml_to_json_response from .models import emr_backends +from .utils import steps_from_query_string from .utils import tags_from_query_string +def generate_boto3_response(operation): + """The decorator to convert an XML response to JSON, if the request is + determined to be from boto3. Pass the API action as a parameter. + + """ + def _boto3_request(method): + @wraps(method) + def f(self, *args, **kwargs): + rendered = method(self, *args, **kwargs) + if 'json' in self.headers.get('Content-Type', []): + self.response_headers.update( + {'x-amzn-requestid': '2690d7eb-ed86-11dd-9877-6fad448a8419', + 'date': datetime.now(pytz.utc).strftime('%a, %d %b %Y %H:%M:%S %Z'), + 'content-type': 'application/x-amz-json-1.1'}) + resp = xml_to_json_response(self.aws_service_spec, operation, rendered) + return '' if resp is None else json.dumps(resp) + return rendered + return f + return _boto3_request + + class ElasticMapReduceResponse(BaseResponse): + # EMR end points are inconsistent in the placement of region name + # in the URL, so parsing it out needs to be handled differently + region_regex = [re.compile(r'elasticmapreduce\.(.+?)\.amazonaws\.com'), + re.compile(r'(.+?)\.elasticmapreduce\.amazonaws\.com')] + + aws_service_spec = AWSServiceSpec('data/emr/2009-03-31/service-2.json') + + def get_region_from_url(self, full_url): + parsed = urlparse(full_url) + for regex in self.region_regex: + match = regex.search(parsed.netloc) + if match: + return match.group(1) + return self.default_region + @property def backend(self): return emr_backends[self.region] - @property - def boto3_request(self): - return 'json' in self.headers.get('Content-Type', []) - - def add_job_flow_steps(self): - job_flow_id = self._get_param('JobFlowId') - steps = self._get_list_prefix('Steps.member') - - job_flow = self.backend.add_job_flow_steps(job_flow_id, steps) - template = self.response_template(ADD_JOB_FLOW_STEPS_TEMPLATE) - return template.render(job_flow=job_flow) - - def run_job_flow(self): - flow_name = self._get_param('Name') - log_uri = self._get_param('LogUri') - steps = self._get_list_prefix('Steps.member') - instance_attrs = self._get_dict_param('Instances.') - job_flow_role = self._get_param('JobFlowRole') - visible_to_all_users = self._get_param('VisibleToAllUsers') - - job_flow = self.backend.run_job_flow( - flow_name, log_uri, job_flow_role, - visible_to_all_users, steps, instance_attrs - ) - instance_groups = self._get_list_prefix('Instances.InstanceGroups.member') - if instance_groups: - self.backend.add_instance_groups(job_flow.id, instance_groups) - - if self.boto3_request: - return json.dumps({ - "JobFlowId": job_flow.id - }) - - template = self.response_template(RUN_JOB_FLOW_TEMPLATE) - return template.render(job_flow=job_flow) - - def describe_job_flows(self): - job_flow_ids = self._get_multi_param("JobFlowIds.member") - job_flows = self.backend.describe_job_flows(job_flow_ids) - template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) - return template.render(job_flows=job_flows) - - def terminate_job_flows(self): - job_ids = self._get_multi_param('JobFlowIds.member.') - job_flows = self.backend.terminate_job_flows(job_ids) - template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE) - return template.render(job_flows=job_flows) - + @generate_boto3_response('AddInstanceGroups') def add_instance_groups(self): jobflow_id = self._get_param('JobFlowId') instance_groups = self._get_list_prefix('InstanceGroups.member') + for item in instance_groups: + item['instance_count'] = int(item['instance_count']) instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups) template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE) return template.render(instance_groups=instance_groups) - def modify_instance_groups(self): - instance_groups = self._get_list_prefix('InstanceGroups.member') - instance_groups = self.backend.modify_instance_groups(instance_groups) - template = self.response_template(MODIFY_INSTANCE_GROUPS_TEMPLATE) - return template.render(instance_groups=instance_groups) - - def set_visible_to_all_users(self): - visible_to_all_users = self._get_param('VisibleToAllUsers') - job_ids = self._get_multi_param('JobFlowIds.member') - self.backend.set_visible_to_all_users(job_ids, visible_to_all_users) - template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE) - return template.render() - - def set_termination_protection(self): - termination_protection = self._get_param('TerminationProtected') - job_ids = self._get_multi_param('JobFlowIds.member') - self.backend.set_termination_protection(job_ids, termination_protection) - template = self.response_template(SET_TERMINATION_PROTECTION_TEMPLATE) - return template.render() - - def list_clusters(self): - clusters = self.backend.list_clusters() - - if self.boto3_request: - return json.dumps({ - "Clusters": [ - { - "Id": cluster.id, - "Name": cluster.name, - "Status": { - "State": cluster.state, - "StatusChangeReason": {}, - "TimeLine": {}, - }, - "NormalizedInstanceHours": cluster.normalized_instance_hours, - } for cluster in clusters - ], - "Marker": "" - }) - - template = self.response_template(LIST_CLUSTERS_TEMPLATE) - return template.render(clusters=clusters) - - def describe_cluster(self): - cluster_id = self._get_param('ClusterId') - cluster = self.backend.get_cluster(cluster_id) - template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE) - return template.render(cluster=cluster) + @generate_boto3_response('AddJobFlowSteps') + def add_job_flow_steps(self): + job_flow_id = self._get_param('JobFlowId') + steps = self.backend.add_job_flow_steps(job_flow_id, steps_from_query_string(self._get_list_prefix('Steps.member'))) + template = self.response_template(ADD_JOB_FLOW_STEPS_TEMPLATE) + return template.render(steps=steps) + @generate_boto3_response('AddTags') def add_tags(self): cluster_id = self._get_param('ResourceId') tags = tags_from_query_string(self.querystring) @@ -123,6 +83,80 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(ADD_TAGS_TEMPLATE) return template.render() + def cancel_steps(self): + raise NotImplementedError + + def create_security_configuration(self): + raise NotImplementedError + + def delete_security_configuration(self): + raise NotImplementedError + + @generate_boto3_response('DescribeCluster') + def describe_cluster(self): + cluster_id = self._get_param('ClusterId') + cluster = self.backend.get_cluster(cluster_id) + template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE) + return template.render(cluster=cluster) + + @generate_boto3_response('DescribeJobFlows') + def describe_job_flows(self): + job_flow_ids = self._get_multi_param("JobFlowIds.member") + clusters = self.backend.describe_job_flows(job_flow_ids) + template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) + return template.render(clusters=clusters) + + def describe_security_configuration(self): + raise NotImplementedError + + @generate_boto3_response('DescribeStep') + def describe_step(self): + cluster_id = self._get_param('ClusterId') + step_id = self._get_param('StepId') + step = self.backend.describe_step(cluster_id, step_id) + template = self.response_template(DESCRIBE_STEP_TEMPLATE) + return template.render(step=step) + + @generate_boto3_response('ListBootstrapActions') + def list_bootstrap_actions(self): + cluster_id = self._get_param('ClusterId') + bootstrap_actions = self.backend.list_bootstrap_actions(cluster_id) + template = self.response_template(LIST_BOOTSTRAP_ACTIONS_TEMPLATE) + return template.render(bootstrap_actions=bootstrap_actions) + + @generate_boto3_response('ListClusters') + def list_clusters(self): + clusters = self.backend.list_clusters() + template = self.response_template(LIST_CLUSTERS_TEMPLATE) + return template.render(clusters=clusters) + + @generate_boto3_response('ListInstanceGroups') + def list_instance_groups(self): + cluster_id = self._get_param('ClusterId') + instance_groups = self.backend.list_instance_groups(cluster_id) + template = self.response_template(LIST_INSTANCE_GROUPS_TEMPLATE) + return template.render(instance_groups=instance_groups) + + def list_instances(self): + raise NotImplementedError + + @generate_boto3_response('ListSteps') + def list_steps(self): + cluster_id = self._get_param('ClusterId') + steps = self.backend.list_steps(cluster_id) + template = self.response_template(LIST_STEPS_TEMPLATE) + return template.render(steps=steps) + + @generate_boto3_response('ModifyInstanceGroups') + def modify_instance_groups(self): + instance_groups = self._get_list_prefix('InstanceGroups.member') + for item in instance_groups: + item['instance_count'] = int(item['instance_count']) + instance_groups = self.backend.modify_instance_groups(instance_groups) + template = self.response_template(MODIFY_INSTANCE_GROUPS_TEMPLATE) + return template.render(instance_groups=instance_groups) + + @generate_boto3_response('RemoveTags') def remove_tags(self): cluster_id = self._get_param('ResourceId') tag_keys = self._get_multi_param('TagKeys.member') @@ -130,170 +164,162 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(REMOVE_TAGS_TEMPLATE) return template.render() + @generate_boto3_response('RunJobFlow') + def run_job_flow(self): + instance_attrs = dict( + master_instance_type=self._get_param('Instances.MasterInstanceType'), + slave_instance_type=self._get_param('Instances.SlaveInstanceType'), + instance_count=self._get_int_param('Instances.InstanceCount', 1), + ec2_key_name=self._get_param('Instances.Ec2KeyName'), + ec2_subnet_id=self._get_param('Instances.Ec2SubnetId'), + hadoop_version=self._get_param('Instances.HadoopVersion'), + availability_zone=self._get_param('Instances.Placement.AvailabilityZone', self.backend.region_name + 'a'), + keep_job_flow_alive_when_no_steps=self._get_bool_param('Instances.KeepJobFlowAliveWhenNoSteps', False), + termination_protected=self._get_bool_param('Instances.TerminationProtected', False), + emr_managed_master_security_group=self._get_param('Instances.EmrManagedMasterSecurityGroup'), + emr_managed_slave_security_group=self._get_param('Instances.EmrManagedSlaveSecurityGroup'), + service_access_security_group=self._get_param('Instances.ServiceAccessSecurityGroup'), + additional_master_security_groups=self._get_multi_param('Instances.AdditionalMasterSecurityGroups.member.'), + additional_slave_security_groups=self._get_multi_param('Instances.AdditionalSlaveSecurityGroups.member.')) -RUN_JOB_FLOW_TEMPLATE = """ - - {{ job_flow.id }} - - - - 8296d8b8-ed85-11dd-9877-6fad448a8419 - - -""" + kwargs = dict( + name=self._get_param('Name'), + log_uri=self._get_param('LogUri'), + job_flow_role=self._get_param('JobFlowRole'), + service_role=self._get_param('ServiceRole'), + steps=steps_from_query_string(self._get_list_prefix('Steps.member')), + visible_to_all_users=self._get_bool_param('VisibleToAllUsers', False), + instance_attrs=instance_attrs, + ) -DESCRIBE_JOB_FLOWS_TEMPLATE = """ - - - {% for job_flow in job_flows %} - - - 2009-01-28T21:49:16Z - 2009-01-28T21:49:16Z - {{ job_flow.state }} - - {{ job_flow.name }} - {{ job_flow.role }} - {{ job_flow.log_uri }} - - {% for step in job_flow.steps %} - - - 2009-01-28T21:49:16Z - {{ step.state }} - - - - {{ step.jar }} - MyMainClass - - {% for arg in step.args %} - {{ arg }} - {% endfor %} - - - - {{ step.name }} - CONTINUE - - - {% endfor %} - - {{ job_flow.id }} - - - us-east-1a - - {{ job_flow.slave_instance_type }} - {{ job_flow.master_instance_type }} - {{ job_flow.ec2_key_name }} - {{ job_flow.normalized_instance_hours }} - {{ job_flow.visible_to_all_users }} - {{ job_flow.instance_count }} - {{ job_flow.keep_job_flow_alive_when_no_steps }} - {{ job_flow.termination_protected }} - ec2-184-0-0-1.us-west-1.compute.amazonaws.com - - {% for instance_group in job_flow.instance_groups %} - - {{ instance_group.id }} - {{ instance_group.role }} - {{ instance_group.num_instances }} - {{ instance_group.type }} - {{ instance_group.market }} - {{ instance_group.name }} - {{ instance_group.bid_price }} - - {% endfor %} - - - - {% endfor %} - - - - - 9cea3229-ed85-11dd-9877-6fad448a8419 - - -""" + bootstrap_actions = self._get_list_prefix('BootstrapActions.member') + if bootstrap_actions: + for ba in bootstrap_actions: + args = [] + idx = 1 + keyfmt = 'script_bootstrap_action._args.member.{0}' + key = keyfmt.format(idx) + while key in ba: + args.append(ba.pop(key)) + idx += 1 + key = keyfmt.format(idx) + ba['args'] = args + ba['script_path'] = ba.pop('script_bootstrap_action._path') + kwargs['bootstrap_actions'] = bootstrap_actions -TERMINATE_JOB_FLOWS_TEMPLATE = """ - - - 2690d7eb-ed86-11dd-9877-6fad448a8419 - - -""" + configurations = self._get_list_prefix('Configurations.member') + if configurations: + for idx, config in enumerate(configurations, 1): + for key in list(config.keys()): + if key.startswith('properties.'): + config.pop(key) + config['properties'] = {} + map_items = self._get_map_prefix('Configurations.member.{0}.Properties.entry'.format(idx)) + config['properties'] = map_items + + kwargs['configurations'] = configurations + + release_label = self._get_param('ReleaseLabel') + ami_version = self._get_param('AmiVersion') + if release_label: + kwargs['release_label'] = release_label + if ami_version: + message = ( + 'Only one AMI version and release label may be specified. ' + 'Provided AMI: {0}, release label: {1}.').format( + ami_version, release_label) + raise ClientError( + {'Error': {'Code': 'ValidationException', + 'Message': message}}, 'RunJobFlow') + else: + if ami_version: + kwargs['requested_ami_version'] = ami_version + kwargs['running_ami_version'] = ami_version + else: + kwargs['running_ami_version'] = '1.0.0' + + cluster = self.backend.run_job_flow(**kwargs) + + applications = self._get_list_prefix('Applications.member') + if applications: + self.backend.add_applications(cluster.id, applications) + else: + self.backend.add_applications( + cluster.id, [{'Name': 'Hadoop', 'Version': '0.18'}]) + + instance_groups = self._get_list_prefix('Instances.InstanceGroups.member') + if instance_groups: + for ig in instance_groups: + ig['instance_count'] = int(ig['instance_count']) + self.backend.add_instance_groups(cluster.id, instance_groups) + + tags = self._get_list_prefix('Tags.member') + if tags: + self.backend.add_tags( + cluster.id, dict((d['key'], d['value']) for d in tags)) + + template = self.response_template(RUN_JOB_FLOW_TEMPLATE) + return template.render(cluster=cluster) + + @generate_boto3_response('SetTerminationProtection') + def set_termination_protection(self): + termination_protection = self._get_param('TerminationProtected') + job_ids = self._get_multi_param('JobFlowIds.member') + self.backend.set_termination_protection(job_ids, termination_protection) + template = self.response_template(SET_TERMINATION_PROTECTION_TEMPLATE) + return template.render() + + @generate_boto3_response('SetVisibleToAllUsers') + def set_visible_to_all_users(self): + visible_to_all_users = self._get_param('VisibleToAllUsers') + job_ids = self._get_multi_param('JobFlowIds.member') + self.backend.set_visible_to_all_users(job_ids, visible_to_all_users) + template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE) + return template.render() + + @generate_boto3_response('TerminateJobFlows') + def terminate_job_flows(self): + job_ids = self._get_multi_param('JobFlowIds.member.') + self.backend.terminate_job_flows(job_ids) + template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE) + return template.render() + + +ADD_INSTANCE_GROUPS_TEMPLATE = """ + + + {% for instance_group in instance_groups %} + {{ instance_group.id }} + {% endfor %} + + + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + +""" ADD_JOB_FLOW_STEPS_TEMPLATE = """ - - - df6f4f4a-ed85-11dd-9877-6fad448a8419 - - + + + {% for step in steps %} + {{ step.id }} + {% endfor %} + + + + df6f4f4a-ed85-11dd-9877-6fad448a8419 + """ -LIST_CLUSTERS_TEMPLATE = """ - - {% for cluster in clusters %} - - {{ cluster.id }} - {{ cluster.name }} - {{ cluster.normalized_instance_hours }} - - {{ cluster.state }} - - - - - - - - {% endfor %} - - - - - 2690d7eb-ed86-11dd-9877-6fad448a8418 - - -""" +ADD_TAGS_TEMPLATE = """ + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + +""" DESCRIBE_CLUSTER_TEMPLATE = """ - {{ cluster.id }} - - {% for tag_key, tag_value in cluster.tags.items() %} - - {{ tag_key }} - {{ tag_value }} - - {% endfor %} - - - {{ cluster.availability_zone }} - {{ cluster.subnet_id }} - {{ cluster.ec2_key_name }} - - {{ cluster.running_ami_version }} - {{ cluster.visible_to_all_users }} - - - Terminated by user request - USER_REQUEST - - {{ cluster.state }} - - 2014-01-24T01:21:21Z - 2014-01-24T01:25:26Z - 2014-01-24T02:19:46Z - - - {{ cluster.auto_terminate }} - {{ cluster.name }} - {{ cluster.requested_ami_version }} {% for application in cluster.applications %} @@ -302,10 +328,85 @@ DESCRIBE_CLUSTER_TEMPLATE = """ - {% for instance_group in instance_groups %}{{ instance_group.id }}{% if loop.index != loop.length %},{% endif %}{% endfor %} -""" +DESCRIBE_JOB_FLOWS_TEMPLATE = """ + + + {% for cluster in clusters %} + + {% if cluster.running_ami_version is not none %} + {{ cluster.running_ami_version }} + {% endif %} + {% if cluster.bootstrap_actions %} + + {% for bootstrap_action in cluster.bootstrap_actions %} + + + {{ bootstrap_action.name }} + + + {% for arg in bootstrap_action.args %} + {{ arg }} + {% endfor %} + + {{ bootstrap_action.script_path }} + + + + {% endfor %} + + {% endif %} + + {{ cluster.creation_datetime.isoformat() }} + {% if cluster.end_datetime is not none %} + {{ cluster.end_datetime.isoformat() }} + {% endif %} + {% if cluster.last_state_change_reason is not none %} + {{ cluster.last_state_change_reason }} + {% endif %} + {% if cluster.ready_datetime is not none %} + {{ cluster.ready_datetime.isoformat() }} + {% endif %} + {% if cluster.start_datetime is not none %} + {{ cluster.start_datetime.isoformat() }} + {% endif %} + {{ cluster.state }} + + + {% if cluster.ec2_key_name is not none %} + {{ cluster.ec2_key_name }} + {% endif %} + {% if cluster.ec2_subnet_id is not none %} + {{ cluster.ec2_subnet_id }} + {% endif %} + {{ cluster.hadoop_version }} + {{ cluster.instance_count }} + + {% for instance_group in cluster.instance_groups %} + + {% if instance_group.bid_price is not none %} + {{ instance_group.bid_price }} + {% endif %} + {{ instance_group.creation_datetime.isoformat() }} + {% if instance_group.end_datetime is not none %} + {{ instance_group.end_datetime.isoformat() }} + {% endif %} + + {{ instance_group.id }} + {{ instance_group.num_instances }} + {{ instance_group.role }} + {{ instance_group.num_instances }} + {{ instance_group.type }} + + {{ instance_group.market }} + {{ instance_group.name }} + {% if instance_group.ready_datetime is not none %} + {{ instance_group.ready_datetime.isoformat() }} + {% endif %} + {% if instance_group.start_datetime is not none %} + {{ instance_group.start_datetime.isoformat() }} + {% endif %} + {{ instance_group.state }} + + {% endfor %} + + {{ cluster.keep_job_flow_alive_when_no_steps|lower }} + {{ cluster.master_instance_id }} + {{ cluster.master_instance_type }} + ec2-184-0-0-1.{{ cluster.region }}.compute.amazonaws.com + {{ cluster.normalized_instance_hours }} + + {{ cluster.availability_zone }} + + {{ cluster.slave_instance_type }} + {{ cluster.termination_protected|lower }} + + {{ cluster.id }} + {{ cluster.role }} + {{ cluster.log_uri }} + {{ cluster.name }} + {{ cluster.service_role }} + + {% for step in cluster.steps %} + + + {{ step.creation_datetime.isoformat() }} + {% if step.end_datetime is not none %} + {{ step.end_datetime.isoformat() }} + {% endif %} + {% if step.last_state_change_reason is not none %} + {{ step.last_state_change_reason }} + {% endif %} + {% if step.ready_datetime is not none %} + {{ step.ready_datetime.isoformat() }} + {% endif %} + {% if step.start_datetime is not none %} + {{ step.start_datetime.isoformat() }} + {% endif %} + {{ step.state }} + + + {{ step.action_on_failure }} + + {{ step.jar }} + {{ step.main_class }} + + {% for arg in step.args %} + {{ arg }} + {% endfor %} + + + + {{ step.name }} + + + {% endfor %} + + + {{ cluster.visible_to_all_users|lower }} + + {% endfor %} + + + + 9cea3229-ed85-11dd-9877-6fad448a8419 + +""" + +DESCRIBE_STEP_TEMPLATE = """ + + + {{ step.action_on_failure }} + + + {% for arg in step.args %} + {{ arg }} + {% endfor %} + + {{ step.jar }} + + + {% for key, val in step.properties.items() %} + + {{ key }} + {{ val }} + + {% endfor %} + + + {{ step.id }} + {{ step.name }} + + + {{ step.state }} + {{ step.state_change_reason }} + + {{ step.creation_datetime.isoformat() }} + {% if step.end_datetime is not none %} + {{ step.end_datetime.isoformat() }} + {% endif %} + {% if step.ready_datetime is not none %} + {{ step.start_datetime.isoformat() }} + {% endif %} + + + + + + df6f4f4a-ed85-11dd-9877-6fad448a8419 + +""" + +LIST_BOOTSTRAP_ACTIONS_TEMPLATE = """ + + + {% for bootstrap_action in bootstrap_actions %} + + + {% for arg in bootstrap_action.args %} + {{ arg }} + {% endfor %} + + {{ bootstrap_action.name }} + {{ bootstrap_action.script_path }} + + {% endfor %} + + + + df6f4f4a-ed85-11dd-9877-6fad448a8419 + +""" + +LIST_CLUSTERS_TEMPLATE = """ + + + {% for cluster in clusters %} + + {{ cluster.id }} + {{ cluster.name }} + {{ cluster.normalized_instance_hours }} + + {{ cluster.state }} + + USER_REQUEST + {% if cluster.last_state_change_reason is not none %} + {{ cluster.last_state_change_reason }} + {% endif %} + + + {{ cluster.creation_datetime.isoformat() }} + {% if cluster.end_datetime is not none %} + {{ cluster.end_datetime.isoformat() }} + {% endif %} + {% if cluster.ready_datetime is not none %} + {{ cluster.ready_datetime.isoformat() }} + {% endif %} + + + + {% endfor %} + + + + + 2690d7eb-ed86-11dd-9877-6fad448a8418 + +""" + +LIST_INSTANCE_GROUPS_TEMPLATE = """ + + + {% for instance_group in instance_groups %} + + {% if instance_group.bid_price is not none %} + {{ instance_group.bid_price }} + {% endif %} + + + {% if instance_group.ebs_optimized is not none %} + {{ instance_group.ebs_optimized }} + {% endif %} + {{ instance_group.id }} + {{ instance_group.role }} + {{ instance_group.type }} + {{ instance_group.market }} + {{ instance_group.name }} + {{ instance_group.num_instances }} + {{ instance_group.num_instances }} + + {{ instance_group.state }} + + {% if instance_group.state_change_reason is not none %} + {{ instance_group.state_change_reason }} + {% endif %} + USER_REQUEST + + + {{ instance_group.creation_datetime.isoformat() }} + {% if instance_group.end_datetime is not none %} + {{ instance_group.end_datetime.isoformat() }} + {% endif %} + {% if instance_group.ready_datetime is not none %} + {{ instance_group.ready_datetime.isoformat() }} + {% endif %} + + + + {% endfor %} + + + + 8296d8b8-ed85-11dd-9877-6fad448a8419 + +""" + +LIST_STEPS_TEMPLATE = """ + + + {% for step in steps %} + + {{ step.action_on_failure }} + + + {% for arg in step.args %} + {{ arg }} + {% endfor %} + + {{ step.jar }} + + + {% for key, val in step.properties.items() %} + + {{ key }} + {{ val }} + + {% endfor %} + + + {{ step.id }} + {{ step.name }} + + + {{ step.state }} + {{ step.state_change_reason }} + + {{ step.creation_datetime.isoformat() }} + {% if step.end_datetime is not none %} + {{ step.end_datetime.isoformat() }} + {% endif %} + {% if step.ready_datetime is not none %} + {{ step.start_datetime.isoformat() }} + {% endif %} + + + + {% endfor %} + + + + df6f4f4a-ed85-11dd-9877-6fad448a8419 + +""" MODIFY_INSTANCE_GROUPS_TEMPLATE = """ - - - 2690d7eb-ed86-11dd-9877-6fad448a8419 - - + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + """ -SET_VISIBLE_TO_ALL_USERS_TEMPLATE = """ - - - 2690d7eb-ed86-11dd-9877-6fad448a8419 - - -""" +REMOVE_TAGS_TEMPLATE = """ + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + +""" +RUN_JOB_FLOW_TEMPLATE = """ + + {{ cluster.id }} + + + 8296d8b8-ed85-11dd-9877-6fad448a8419 + +""" SET_TERMINATION_PROTECTION_TEMPLATE = """ - - - 2690d7eb-ed86-11dd-9877-6fad448a8419 - - + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + """ -ADD_TAGS_TEMPLATE = """ - - - 2690d7eb-ed86-11dd-9877-6fad448a8419 - - -""" +SET_VISIBLE_TO_ALL_USERS_TEMPLATE = """ + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + +""" -REMOVE_TAGS_TEMPLATE = """ - - - 2690d7eb-ed86-11dd-9877-6fad448a8419 - - -""" +TERMINATE_JOB_FLOWS_TEMPLATE = """ + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + +""" diff --git a/moto/emr/urls.py b/moto/emr/urls.py index 83eb62b28..870eaf9d7 100644 --- a/moto/emr/urls.py +++ b/moto/emr/urls.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals from .responses import ElasticMapReduceResponse url_bases = [ + "https?://(.+).elasticmapreduce.amazonaws.com", "https?://elasticmapreduce.(.+).amazonaws.com", ] diff --git a/moto/emr/utils.py b/moto/emr/utils.py index b4262c177..328fdd783 100644 --- a/moto/emr/utils.py +++ b/moto/emr/utils.py @@ -1,19 +1,25 @@ from __future__ import unicode_literals import random import string + import six -def random_job_id(size=13): +def random_id(size=13): chars = list(range(10)) + list(string.ascii_uppercase) - job_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size)) - return 'j-{0}'.format(job_tag) + return ''.join(six.text_type(random.choice(chars)) for x in range(size)) + + +def random_cluster_id(size=13): + return 'j-{0}'.format(random_id()) + + +def random_step_id(size=13): + return 's-{0}'.format(random_id()) def random_instance_group_id(size=13): - chars = list(range(10)) + list(string.ascii_uppercase) - job_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size)) - return 'i-{0}'.format(job_tag) + return 'i-{0}'.format(random_id()) def tags_from_query_string(querystring_dict): @@ -30,3 +36,18 @@ def tags_from_query_string(querystring_dict): else: response_values[tag_key] = None return response_values + + +def steps_from_query_string(querystring_dict): + steps = [] + for step in querystring_dict: + step['jar'] = step.pop('hadoop_jar_step._jar') + step['properties'] = dict((o['Key'], o['Value']) for o in step.get('properties', [])) + step['args'] = [] + idx = 1 + keyfmt = 'hadoop_jar_step._args.member.{0}' + while keyfmt.format(idx) in step: + step['args'].append(step.pop(keyfmt.format(idx))) + idx += 1 + steps.append(step) + return steps diff --git a/setup.py b/setup.py index 2ee6db386..dd7a12b26 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ install_requires = [ "xmltodict", "six", "werkzeug", + "pytz" ] extras_require = { diff --git a/tests/test_core/test_responses.py b/tests/test_core/test_responses.py new file mode 100644 index 000000000..aa89ac840 --- /dev/null +++ b/tests/test_core/test_responses.py @@ -0,0 +1,73 @@ +from __future__ import unicode_literals + +import sure # noqa + +from moto.core.responses import AWSServiceSpec +from moto.core.responses import flatten_json_request_body + + +def test_flatten_json_request_body(): + spec = AWSServiceSpec('data/emr/2009-03-31/service-2.json').input_spec('RunJobFlow') + + body = { + 'Name': 'cluster', + 'Instances': { + 'Ec2KeyName': 'ec2key', + 'InstanceGroups': [ + {'InstanceRole': 'MASTER', + 'InstanceType': 'm1.small'}, + {'InstanceRole': 'CORE', + 'InstanceType': 'm1.medium'}, + ], + 'Placement': {'AvailabilityZone': 'us-east-1'}, + }, + 'Steps': [ + {'HadoopJarStep': { + 'Properties': [ + {'Key': 'k1', 'Value': 'v1'}, + {'Key': 'k2', 'Value': 'v2'} + ], + 'Args': ['arg1', 'arg2']}}, + ], + 'Configurations': [ + {'Classification': 'class', + 'Properties': {'propkey1': 'propkey1', + 'propkey2': 'propkey2'}}, + {'Classification': 'anotherclass', + 'Properties': {'propkey3': 'propkey3'}}, + ] + } + + flat = flatten_json_request_body('', body, spec) + flat['Name'].should.equal(body['Name']) + flat['Instances.Ec2KeyName'].should.equal(body['Instances']['Ec2KeyName']) + for idx in range(2): + flat['Instances.InstanceGroups.member.' + str(idx + 1) + '.InstanceRole'].should.equal(body['Instances']['InstanceGroups'][idx]['InstanceRole']) + flat['Instances.InstanceGroups.member.' + str(idx + 1) + '.InstanceType'].should.equal(body['Instances']['InstanceGroups'][idx]['InstanceType']) + flat['Instances.Placement.AvailabilityZone'].should.equal(body['Instances']['Placement']['AvailabilityZone']) + + for idx in range(1): + prefix = 'Steps.member.' + str(idx + 1) + '.HadoopJarStep' + step = body['Steps'][idx]['HadoopJarStep'] + i = 0 + while prefix + '.Properties.member.' + str(i + 1) + '.Key' in flat: + flat[prefix + '.Properties.member.' + str(i + 1) + '.Key'].should.equal(step['Properties'][i]['Key']) + flat[prefix + '.Properties.member.' + str(i + 1) + '.Value'].should.equal(step['Properties'][i]['Value']) + i += 1 + i = 0 + while prefix + '.Args.member.' + str(i + 1) in flat: + flat[prefix + '.Args.member.' + str(i + 1)].should.equal(step['Args'][i]) + i += 1 + + for idx in range(2): + flat['Configurations.member.' + str(idx + 1) + '.Classification'].should.equal(body['Configurations'][idx]['Classification']) + + props = {} + i = 1 + keyfmt = 'Configurations.member.{0}.Properties.entry.{1}' + key = keyfmt.format(idx + 1, i) + while key + '.key' in flat: + props[flat[key + '.key']] = flat[key + '.value'] + i += 1 + key = keyfmt.format(idx + 1, i) + props.should.equal(body['Configurations'][idx]['Properties']) diff --git a/tests/test_emr/test_emr.py b/tests/test_emr/test_emr.py index 61090054a..9fe67513e 100644 --- a/tests/test_emr/test_emr.py +++ b/tests/test_emr/test_emr.py @@ -1,197 +1,111 @@ from __future__ import unicode_literals import boto +from boto.emr.bootstrap_action import BootstrapAction from boto.emr.instance_group import InstanceGroup - from boto.emr.step import StreamingStep + +import six import sure # noqa from moto import mock_emr from tests.helpers import requires_boto_gte -@mock_emr -def test_create_job_flow_in_multiple_regions(): - step = StreamingStep( - name='My wordcount example', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input', - output='s3n://output_bucket/output/wordcount_output' - ) +run_jobflow_args = dict( + job_flow_role='EMR_EC2_DefaultRole', + keep_alive=True, + log_uri='s3://some_bucket/jobflow_logs', + master_instance_type='c1.medium', + name='My jobflow', + num_instances=2, + service_role='EMR_DefaultRole', + slave_instance_type='c1.medium', +) - west1_conn = boto.emr.connect_to_region('us-east-1') - west1_job_id = west1_conn.run_jobflow( - name='us-east-1', - log_uri='s3://some_bucket/jobflow_logs', - master_instance_type='m1.medium', - slave_instance_type='m1.small', - steps=[step], - ) - west2_conn = boto.emr.connect_to_region('eu-west-1') - west2_job_id = west2_conn.run_jobflow( - name='eu-west-1', - log_uri='s3://some_bucket/jobflow_logs', - master_instance_type='m1.medium', - slave_instance_type='m1.small', - steps=[step], - ) - - west1_job_flow = west1_conn.describe_jobflow(west1_job_id) - west1_job_flow.name.should.equal('us-east-1') - west2_job_flow = west2_conn.describe_jobflow(west2_job_id) - west2_job_flow.name.should.equal('eu-west-1') +input_instance_groups = [ + InstanceGroup(1, 'MASTER', 'c1.medium', 'ON_DEMAND', 'master'), + InstanceGroup(3, 'CORE', 'c1.medium', 'ON_DEMAND', 'core'), + InstanceGroup(6, 'TASK', 'c1.large', 'SPOT', 'task-1', '0.07'), + InstanceGroup(10, 'TASK', 'c1.xlarge', 'SPOT', 'task-2', '0.05'), +] @mock_emr -def test_create_job_flow(): +def test_describe_cluster(): conn = boto.connect_emr() - - step1 = StreamingStep( - name='My wordcount example', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input', - output='s3n://output_bucket/output/wordcount_output' - ) - - step2 = StreamingStep( - name='My wordcount example2', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input2', - output='s3n://output_bucket/output/wordcount_output2' - ) - - job_id = conn.run_jobflow( - name='My jobflow', + args = run_jobflow_args.copy() + args.update(dict( + api_params={ + 'Applications.member.1.Name': 'Spark', + 'Applications.member.1.Version': '2.4.2', + 'Configurations.member.1.Classification': 'yarn-site', + 'Configurations.member.1.Properties.entry.1.key': 'someproperty', + 'Configurations.member.1.Properties.entry.1.value': 'somevalue', + 'Instances.EmrManagedMasterSecurityGroup': 'master-security-group', + 'Instances.Ec2SubnetId': 'subnet-8be41cec', + }, + availability_zone='us-east-2b', + ec2_keyname='mykey', + job_flow_role='EMR_EC2_DefaultRole', + keep_alive=False, log_uri='s3://some_bucket/jobflow_logs', - master_instance_type='m1.medium', - slave_instance_type='m1.small', - steps=[step1, step2], - ) - - job_flow = conn.describe_jobflow(job_id) - job_flow.state.should.equal('STARTING') - job_flow.jobflowid.should.equal(job_id) - job_flow.name.should.equal('My jobflow') - job_flow.masterinstancetype.should.equal('m1.medium') - job_flow.slaveinstancetype.should.equal('m1.small') - job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs') - job_flow.visibletoallusers.should.equal('False') - int(job_flow.normalizedinstancehours).should.equal(0) - job_step = job_flow.steps[0] - job_step.name.should.equal('My wordcount example') - job_step.state.should.equal('STARTING') - args = [arg.value for arg in job_step.args] - args.should.equal([ - '-mapper', - 's3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - '-reducer', - 'aggregate', - '-input', - 's3n://elasticmapreduce/samples/wordcount/input', - '-output', - 's3n://output_bucket/output/wordcount_output', - ]) - - job_step2 = job_flow.steps[1] - job_step2.name.should.equal('My wordcount example2') - job_step2.state.should.equal('PENDING') - args = [arg.value for arg in job_step2.args] - args.should.equal([ - '-mapper', - 's3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', - '-reducer', - 'aggregate', - '-input', - 's3n://elasticmapreduce/samples/wordcount/input2', - '-output', - 's3n://output_bucket/output/wordcount_output2', - ]) - - -@requires_boto_gte("2.8") -@mock_emr -def test_create_job_flow_with_new_params(): - # Test that run_jobflow works with newer params - conn = boto.connect_emr() - - conn.run_jobflow( name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - master_instance_type='m1.medium', - slave_instance_type='m1.small', - job_flow_role='some-role-arn', - steps=[], - ) - - -@requires_boto_gte("2.8") -@mock_emr -def test_create_job_flow_visible_to_all_users(): - conn = boto.connect_emr() - - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[], + service_role='EMR_DefaultRole', visible_to_all_users=True, - ) - job_flow = conn.describe_jobflow(job_id) - job_flow.visibletoallusers.should.equal('True') + )) + cluster_id = conn.run_jobflow(**args) + input_tags = {'tag1': 'val1', 'tag2': 'val2'} + conn.add_tags(cluster_id, input_tags) + cluster = conn.describe_cluster(cluster_id) + cluster.applications[0].name.should.equal('Spark') + cluster.applications[0].version.should.equal('2.4.2') + cluster.autoterminate.should.equal('true') -@requires_boto_gte("2.8") -@mock_emr -def test_create_job_flow_with_instance_groups(): - conn = boto.connect_emr() + # configurations appear not be supplied as attributes? - instance_groups = [InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07'), - InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07')] - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[], - instance_groups=instance_groups - ) + attrs = cluster.ec2instanceattributes + # AdditionalMasterSecurityGroups + # AdditionalSlaveSecurityGroups + attrs.ec2availabilityzone.should.equal(args['availability_zone']) + attrs.ec2keyname.should.equal(args['ec2_keyname']) + attrs.ec2subnetid.should.equal(args['api_params']['Instances.Ec2SubnetId']) + # EmrManagedMasterSecurityGroups + # EmrManagedSlaveSecurityGroups + attrs.iaminstanceprofile.should.equal(args['job_flow_role']) + # ServiceAccessSecurityGroup - job_flow = conn.describe_jobflow(job_id) - int(job_flow.instancecount).should.equal(12) - instance_group = job_flow.instancegroups[0] - int(instance_group.instancerunningcount).should.equal(6) + cluster.id.should.equal(cluster_id) + cluster.loguri.should.equal(args['log_uri']) + cluster.masterpublicdnsname.should.be.a(six.string_types) + cluster.name.should.equal(args['name']) + int(cluster.normalizedinstancehours).should.equal(0) + # cluster.release_label + cluster.shouldnt.have.property('requestedamiversion') + cluster.runningamiversion.should.equal('1.0.0') + # cluster.securityconfiguration + cluster.servicerole.should.equal(args['service_role']) + + cluster.status.state.should.equal('TERMINATED') + cluster.status.statechangereason.message.should.be.a(six.string_types) + cluster.status.statechangereason.code.should.be.a(six.string_types) + cluster.status.timeline.creationdatetime.should.be.a(six.string_types) + # cluster.status.timeline.enddatetime.should.be.a(six.string_types) + # cluster.status.timeline.readydatetime.should.be.a(six.string_types) + + dict((item.key, item.value) for item in cluster.tags).should.equal(input_tags) + + cluster.terminationprotected.should.equal('false') + cluster.visibletoallusers.should.equal('true') @mock_emr -def test_terminate_job_flow(): +def test_describe_jobflows(): conn = boto.connect_emr() - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[] - ) - - flow = conn.describe_jobflows()[0] - flow.state.should.equal('STARTING') - conn.terminate_jobflow(job_id) - flow = conn.describe_jobflows()[0] - flow.state.should.equal('TERMINATED') - - -@mock_emr -def test_describe_job_flows(): - conn = boto.connect_emr() - job1_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[] - ) - job2_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[] - ) + job1_id = conn.run_jobflow(**run_jobflow_args) + job2_id = conn.run_jobflow(**run_jobflow_args) jobs = conn.describe_jobflows() jobs.should.have.length_of(2) @@ -205,252 +119,454 @@ def test_describe_job_flows(): @mock_emr -def test_add_steps_to_flow(): +def test_describe_jobflow(): conn = boto.connect_emr() + args = run_jobflow_args.copy() + args.update(dict( + ami_version='3.8.1', + api_params={ + #'Applications.member.1.Name': 'Spark', + #'Applications.member.1.Version': '2.4.2', + #'Configurations.member.1.Classification': 'yarn-site', + #'Configurations.member.1.Properties.entry.1.key': 'someproperty', + #'Configurations.member.1.Properties.entry.1.value': 'somevalue', + #'Instances.EmrManagedMasterSecurityGroup': 'master-security-group', + 'Instances.Ec2SubnetId': 'subnet-8be41cec', + }, + ec2_keyname='mykey', + hadoop_version='2.4.0', - step1 = StreamingStep( - name='My wordcount example', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input', - output='s3n://output_bucket/output/wordcount_output' - ) - - job_id = conn.run_jobflow( name='My jobflow', log_uri='s3://some_bucket/jobflow_logs', - steps=[step1] - ) + keep_alive=True, + master_instance_type='c1.medium', + slave_instance_type='c1.medium', + num_instances=2, + availability_zone='us-west-2b', + + job_flow_role='EMR_EC2_DefaultRole', + service_role='EMR_DefaultRole', + visible_to_all_users=True, + )) + + cluster_id = conn.run_jobflow(**args) + jf = conn.describe_jobflow(cluster_id) + jf.amiversion.should.equal(args['ami_version']) + jf.bootstrapactions.should.equal(None) + jf.creationdatetime.should.be.a(six.string_types) + jf.should.have.property('laststatechangereason') + jf.readydatetime.should.be.a(six.string_types) + jf.startdatetime.should.be.a(six.string_types) + jf.state.should.equal('WAITING') + + jf.ec2keyname.should.equal(args['ec2_keyname']) + # Ec2SubnetId + jf.hadoopversion.should.equal(args['hadoop_version']) + int(jf.instancecount).should.equal(2) + + for ig in jf.instancegroups: + ig.creationdatetime.should.be.a(six.string_types) + # ig.enddatetime.should.be.a(six.string_types) + ig.should.have.property('instancegroupid').being.a(six.string_types) + int(ig.instancerequestcount).should.equal(1) + ig.instancerole.should.be.within(['MASTER', 'CORE']) + int(ig.instancerunningcount).should.equal(1) + ig.instancetype.should.equal('c1.medium') + ig.laststatechangereason.should.be.a(six.string_types) + ig.market.should.equal('ON_DEMAND') + ig.name.should.be.a(six.string_types) + ig.readydatetime.should.be.a(six.string_types) + ig.startdatetime.should.be.a(six.string_types) + ig.state.should.equal('RUNNING') + + jf.keepjobflowalivewhennosteps.should.equal('true') + jf.masterinstanceid.should.be.a(six.string_types) + jf.masterinstancetype.should.equal(args['master_instance_type']) + jf.masterpublicdnsname.should.be.a(six.string_types) + int(jf.normalizedinstancehours).should.equal(0) + jf.availabilityzone.should.equal(args['availability_zone']) + jf.slaveinstancetype.should.equal(args['slave_instance_type']) + jf.terminationprotected.should.equal('false') + + jf.jobflowid.should.equal(cluster_id) + # jf.jobflowrole.should.equal(args['job_flow_role']) + jf.loguri.should.equal(args['log_uri']) + jf.name.should.equal(args['name']) + # jf.servicerole.should.equal(args['service_role']) + + jf.steps.should.have.length_of(0) + + list(i.value for i in jf.supported_products).should.equal([]) + jf.visibletoallusers.should.equal('true') + + +@mock_emr +def test_list_clusters(): + conn = boto.connect_emr() + + args = run_jobflow_args.copy() + args['name'] = 'jobflow1' + cluster1_id = conn.run_jobflow(**args) + args['name'] = 'jobflow2' + cluster2_id = conn.run_jobflow(**args) + conn.terminate_jobflow(cluster2_id) + + summary = conn.list_clusters() + clusters = summary.clusters + clusters.should.have.length_of(2) + + expected = { + cluster1_id: { + 'id': cluster1_id, + 'name': 'jobflow1', + 'normalizedinstancehours': 0, + 'state': 'WAITING'}, + cluster2_id: { + 'id': cluster2_id, + 'name': 'jobflow2', + 'normalizedinstancehours': 0, + 'state': 'TERMINATED'}, + } + + for x in clusters: + y = expected[x.id] + x.id.should.equal(y['id']) + x.name.should.equal(y['name']) + int(x.normalizedinstancehours).should.equal(y['normalizedinstancehours']) + x.status.state.should.equal(y['state']) + x.status.timeline.creationdatetime.should.be.a(six.string_types) + if y['state'] == 'TERMINATED': + x.status.timeline.enddatetime.should.be.a(six.string_types) + else: + x.status.timeline.shouldnt.have.property('enddatetime') + x.status.timeline.readydatetime.should.be.a(six.string_types) + + +@mock_emr +def test_run_jobflow(): + conn = boto.connect_emr() + args = run_jobflow_args.copy() + job_id = conn.run_jobflow(**args) job_flow = conn.describe_jobflow(job_id) - job_flow.state.should.equal('STARTING') + job_flow.state.should.equal('WAITING') job_flow.jobflowid.should.equal(job_id) - job_flow.name.should.equal('My jobflow') - job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs') - - step2 = StreamingStep( - name='My wordcount example2', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input2', - output='s3n://output_bucket/output/wordcount_output2' - ) - - conn.add_jobflow_steps(job_id, [step2]) - - job_flow = conn.describe_jobflow(job_id) - job_step = job_flow.steps[0] - job_step.name.should.equal('My wordcount example') - job_step.state.should.equal('STARTING') - args = [arg.value for arg in job_step.args] - args.should.equal([ - '-mapper', - 's3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - '-reducer', - 'aggregate', - '-input', - 's3n://elasticmapreduce/samples/wordcount/input', - '-output', - 's3n://output_bucket/output/wordcount_output', - ]) - - job_step2 = job_flow.steps[1] - job_step2.name.should.equal('My wordcount example2') - job_step2.state.should.equal('PENDING') - args = [arg.value for arg in job_step2.args] - args.should.equal([ - '-mapper', - 's3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', - '-reducer', - 'aggregate', - '-input', - 's3n://elasticmapreduce/samples/wordcount/input2', - '-output', - 's3n://output_bucket/output/wordcount_output2', - ]) + job_flow.name.should.equal(args['name']) + job_flow.masterinstancetype.should.equal(args['master_instance_type']) + job_flow.slaveinstancetype.should.equal(args['slave_instance_type']) + job_flow.loguri.should.equal(args['log_uri']) + job_flow.visibletoallusers.should.equal('false') + int(job_flow.normalizedinstancehours).should.equal(0) + job_flow.steps.should.have.length_of(0) @mock_emr -def test_create_instance_groups(): - conn = boto.connect_emr() +def test_run_jobflow_in_multiple_regions(): + regions = {} + for region in ['us-east-1', 'eu-west-1']: + conn = boto.emr.connect_to_region(region) + args = run_jobflow_args.copy() + args['name'] = region + cluster_id = conn.run_jobflow(**args) + regions[region] = {'conn': conn, 'cluster_id': cluster_id} - step1 = StreamingStep( - name='My wordcount example', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input', - output='s3n://output_bucket/output/wordcount_output' - ) - - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[step1], - ) - - instance_group = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07') - instance_group = conn.add_instance_groups(job_id, [instance_group]) - instance_group_id = instance_group.instancegroupids - job_flow = conn.describe_jobflows()[0] - int(job_flow.instancecount).should.equal(6) - instance_group = job_flow.instancegroups[0] - instance_group.instancegroupid.should.equal(instance_group_id) - int(instance_group.instancerunningcount).should.equal(6) - instance_group.instancerole.should.equal('TASK') - instance_group.instancetype.should.equal('c1.medium') - instance_group.market.should.equal('SPOT') - instance_group.name.should.equal('spot-0.07') - instance_group.bidprice.should.equal('0.07') - - -@mock_emr -def test_modify_instance_groups(): - conn = boto.connect_emr() - - step1 = StreamingStep( - name='My wordcount example', - mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', - reducer='aggregate', - input='s3n://elasticmapreduce/samples/wordcount/input', - output='s3n://output_bucket/output/wordcount_output' - ) - - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[step1] - ) - - instance_group1 = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07') - instance_group2 = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07') - instance_group = conn.add_instance_groups(job_id, [instance_group1, instance_group2]) - instance_group_ids = instance_group.instancegroupids.split(",") - - job_flow = conn.describe_jobflows()[0] - int(job_flow.instancecount).should.equal(12) - instance_group = job_flow.instancegroups[0] - int(instance_group.instancerunningcount).should.equal(6) - - conn.modify_instance_groups(instance_group_ids, [2, 3]) - - job_flow = conn.describe_jobflows()[0] - int(job_flow.instancecount).should.equal(5) - instance_group1 = [ - group for group - in job_flow.instancegroups - if group.instancegroupid == instance_group_ids[0] - ][0] - int(instance_group1.instancerunningcount).should.equal(2) - instance_group2 = [ - group for group - in job_flow.instancegroups - if group.instancegroupid == instance_group_ids[1] - ][0] - int(instance_group2.instancerunningcount).should.equal(3) + for region in regions.keys(): + conn = regions[region]['conn'] + jf = conn.describe_jobflow(regions[region]['cluster_id']) + jf.name.should.equal(region) @requires_boto_gte("2.8") @mock_emr -def test_set_visible_to_all_users(): +def test_run_jobflow_with_new_params(): + # Test that run_jobflow works with newer params conn = boto.connect_emr() + conn.run_jobflow(**run_jobflow_args) - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[], - visible_to_all_users=False, - ) + +@requires_boto_gte("2.8") +@mock_emr +def test_run_jobflow_with_visible_to_all_users(): + conn = boto.connect_emr() + for expected in (True, False): + job_id = conn.run_jobflow( + visible_to_all_users=expected, + **run_jobflow_args + ) + job_flow = conn.describe_jobflow(job_id) + job_flow.visibletoallusers.should.equal(str(expected).lower()) + + +@requires_boto_gte("2.8") +@mock_emr +def test_run_jobflow_with_instance_groups(): + input_groups = dict((g.name, g) for g in input_instance_groups) + conn = boto.connect_emr() + job_id = conn.run_jobflow(instance_groups=input_instance_groups, + **run_jobflow_args) job_flow = conn.describe_jobflow(job_id) - job_flow.visibletoallusers.should.equal('False') - - conn.set_visible_to_all_users(job_id, True) - - job_flow = conn.describe_jobflow(job_id) - job_flow.visibletoallusers.should.equal('True') - - conn.set_visible_to_all_users(job_id, False) - - job_flow = conn.describe_jobflow(job_id) - job_flow.visibletoallusers.should.equal('False') + int(job_flow.instancecount).should.equal(sum(g.num_instances for g in input_instance_groups)) + for instance_group in job_flow.instancegroups: + expected = input_groups[instance_group.name] + instance_group.should.have.property('instancegroupid') + int(instance_group.instancerunningcount).should.equal(expected.num_instances) + instance_group.instancerole.should.equal(expected.role) + instance_group.instancetype.should.equal(expected.type) + instance_group.market.should.equal(expected.market) + if hasattr(expected, 'bidprice'): + instance_group.bidprice.should.equal(expected.bidprice) @requires_boto_gte("2.8") @mock_emr def test_set_termination_protection(): conn = boto.connect_emr() - - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[] - ) + job_id = conn.run_jobflow(**run_jobflow_args) job_flow = conn.describe_jobflow(job_id) - job_flow.terminationprotected.should.equal(u'None') + job_flow.terminationprotected.should.equal('false') conn.set_termination_protection(job_id, True) - job_flow = conn.describe_jobflow(job_id) job_flow.terminationprotected.should.equal('true') conn.set_termination_protection(job_id, False) - job_flow = conn.describe_jobflow(job_id) job_flow.terminationprotected.should.equal('false') +@requires_boto_gte("2.8") @mock_emr -def test_list_clusters(): +def test_set_visible_to_all_users(): conn = boto.connect_emr() - conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[], - ) + args = run_jobflow_args.copy() + args['visible_to_all_users'] = False + job_id = conn.run_jobflow(**args) + job_flow = conn.describe_jobflow(job_id) + job_flow.visibletoallusers.should.equal('false') - summary = conn.list_clusters() - clusters = summary.clusters - clusters.should.have.length_of(1) - cluster = clusters[0] - cluster.name.should.equal("My jobflow") - cluster.normalizedinstancehours.should.equal('0') - cluster.status.state.should.equal("RUNNING") + conn.set_visible_to_all_users(job_id, True) + job_flow = conn.describe_jobflow(job_id) + job_flow.visibletoallusers.should.equal('true') + + conn.set_visible_to_all_users(job_id, False) + job_flow = conn.describe_jobflow(job_id) + job_flow.visibletoallusers.should.equal('false') @mock_emr -def test_describe_cluster(): +def test_terminate_jobflow(): conn = boto.connect_emr() - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[], + job_id = conn.run_jobflow(**run_jobflow_args) + flow = conn.describe_jobflows()[0] + flow.state.should.equal('WAITING') + + conn.terminate_jobflow(job_id) + flow = conn.describe_jobflows()[0] + flow.state.should.equal('TERMINATED') + + +# testing multiple end points for each feature + +@mock_emr +def test_bootstrap_actions(): + bootstrap_actions = [ + BootstrapAction( + name='bs1', + path='path/to/script', + bootstrap_action_args=['arg1', 'arg2']), + BootstrapAction( + name='bs2', + path='path/to/anotherscript', + bootstrap_action_args=[]) + ] + + conn = boto.connect_emr() + cluster_id = conn.run_jobflow( + bootstrap_actions=bootstrap_actions, + **run_jobflow_args ) - cluster = conn.describe_cluster(job_id) - cluster.name.should.equal("My jobflow") - cluster.normalizedinstancehours.should.equal('0') - cluster.status.state.should.equal("RUNNING") + jf = conn.describe_jobflow(cluster_id) + for x, y in zip(jf.bootstrapactions, bootstrap_actions): + x.name.should.equal(y.name) + x.path.should.equal(y.path) + list(o.value for o in x.args).should.equal(y.args()) + + resp = conn.list_bootstrap_actions(cluster_id) + for i, y in enumerate(bootstrap_actions): + x = resp.actions[i] + x.name.should.equal(y.name) + x.scriptpath.should.equal(y.path) + list(arg.value for arg in x.args).should.equal(y.args()) @mock_emr -def test_cluster_tagging(): - conn = boto.connect_emr() - job_id = conn.run_jobflow( - name='My jobflow', - log_uri='s3://some_bucket/jobflow_logs', - steps=[], - ) - cluster_id = job_id - conn.add_tags(cluster_id, {"tag1": "val1", "tag2": "val2"}) +def test_instance_groups(): + input_groups = dict((g.name, g) for g in input_instance_groups) + conn = boto.connect_emr() + args = run_jobflow_args.copy() + for key in ['master_instance_type', 'slave_instance_type', 'num_instances']: + del args[key] + args['instance_groups'] = input_instance_groups[:2] + job_id = conn.run_jobflow(**args) + + jf = conn.describe_jobflow(job_id) + base_instance_count = int(jf.instancecount) + + conn.add_instance_groups(job_id, input_instance_groups[2:]) + + jf = conn.describe_jobflow(job_id) + int(jf.instancecount).should.equal(sum(g.num_instances for g in input_instance_groups)) + for x in jf.instancegroups: + y = input_groups[x.name] + if hasattr(y, 'bidprice'): + x.bidprice.should.equal(y.bidprice) + x.creationdatetime.should.be.a(six.string_types) + # x.enddatetime.should.be.a(six.string_types) + x.should.have.property('instancegroupid') + int(x.instancerequestcount).should.equal(y.num_instances) + x.instancerole.should.equal(y.role) + int(x.instancerunningcount).should.equal(y.num_instances) + x.instancetype.should.equal(y.type) + x.laststatechangereason.should.be.a(six.string_types) + x.market.should.equal(y.market) + x.name.should.be.a(six.string_types) + x.readydatetime.should.be.a(six.string_types) + x.startdatetime.should.be.a(six.string_types) + x.state.should.equal('RUNNING') + + for x in conn.list_instance_groups(job_id).instancegroups: + y = input_groups[x.name] + if hasattr(y, 'bidprice'): + x.bidprice.should.equal(y.bidprice) + # Configurations + # EbsBlockDevices + # EbsOptimized + x.should.have.property('id') + x.instancegrouptype.should.equal(y.role) + x.instancetype.should.equal(y.type) + x.market.should.equal(y.market) + x.name.should.equal(y.name) + int(x.requestedinstancecount).should.equal(y.num_instances) + int(x.runninginstancecount).should.equal(y.num_instances) + # ShrinkPolicy + x.status.state.should.equal('RUNNING') + x.status.statechangereason.code.should.be.a(six.string_types) + x.status.statechangereason.message.should.be.a(six.string_types) + x.status.timeline.creationdatetime.should.be.a(six.string_types) + # x.status.timeline.enddatetime.should.be.a(six.string_types) + x.status.timeline.readydatetime.should.be.a(six.string_types) + + igs = dict((g.name, g) for g in jf.instancegroups) + + conn.modify_instance_groups( + [igs['task-1'].instancegroupid, igs['task-2'].instancegroupid], + [2, 3]) + jf = conn.describe_jobflow(job_id) + int(jf.instancecount).should.equal(base_instance_count + 5) + igs = dict((g.name, g) for g in jf.instancegroups) + int(igs['task-1'].instancerunningcount).should.equal(2) + int(igs['task-2'].instancerunningcount).should.equal(3) + + +@mock_emr +def test_steps(): + input_steps = [ + StreamingStep( + name='My wordcount example', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input', + output='s3n://output_bucket/output/wordcount_output'), + StreamingStep( + name='My wordcount example2', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input2', + output='s3n://output_bucket/output/wordcount_output2') + ] + + # TODO: implementation and test for cancel_steps + + conn = boto.connect_emr() + cluster_id = conn.run_jobflow( + steps=[input_steps[0]], + **run_jobflow_args) + + jf = conn.describe_jobflow(cluster_id) + jf.steps.should.have.length_of(1) + + conn.add_jobflow_steps(cluster_id, [input_steps[1]]) + + jf = conn.describe_jobflow(cluster_id) + jf.steps.should.have.length_of(2) + for step in jf.steps: + step.actiononfailure.should.equal('TERMINATE_JOB_FLOW') + list(arg.value for arg in step.args).should.have.length_of(8) + step.creationdatetime.should.be.a(six.string_types) + # step.enddatetime.should.be.a(six.string_types) + step.jar.should.equal('/home/hadoop/contrib/streaming/hadoop-streaming.jar') + step.laststatechangereason.should.be.a(six.string_types) + step.mainclass.should.equal('') + step.name.should.be.a(six.string_types) + # step.readydatetime.should.be.a(six.string_types) + # step.startdatetime.should.be.a(six.string_types) + step.state.should.be.within(['STARTING', 'PENDING']) + + expected = dict((s.name, s) for s in input_steps) + + for x in conn.list_steps(cluster_id).steps: + y = expected[x.name] + # actiononfailure + list(arg.value for arg in x.config.args).should.equal([ + '-mapper', y.mapper, + '-reducer', y.reducer, + '-input', y.input, + '-output', y.output, + ]) + x.config.jar.should.equal('/home/hadoop/contrib/streaming/hadoop-streaming.jar') + x.config.mainclass.should.equal('') + # properties + x.should.have.property('id').should.be.a(six.string_types) + x.name.should.equal(y.name) + x.status.state.should.be.within(['STARTING', 'PENDING']) + # x.status.statechangereason + x.status.timeline.creationdatetime.should.be.a(six.string_types) + # x.status.timeline.enddatetime.should.be.a(six.string_types) + # x.status.timeline.startdatetime.should.be.a(six.string_types) + + x = conn.describe_step(cluster_id, x.id) + list(arg.value for arg in x.config.args).should.equal([ + '-mapper', y.mapper, + '-reducer', y.reducer, + '-input', y.input, + '-output', y.output, + ]) + x.config.jar.should.equal('/home/hadoop/contrib/streaming/hadoop-streaming.jar') + x.config.mainclass.should.equal('') + # properties + x.should.have.property('id').should.be.a(six.string_types) + x.name.should.equal(y.name) + x.status.state.should.be.within(['STARTING', 'PENDING']) + # x.status.statechangereason + x.status.timeline.creationdatetime.should.be.a(six.string_types) + # x.status.timeline.enddatetime.should.be.a(six.string_types) + # x.status.timeline.startdatetime.should.be.a(six.string_types) + + +@mock_emr +def test_tags(): + input_tags = {"tag1": "val1", "tag2": "val2"} + + conn = boto.connect_emr() + cluster_id = conn.run_jobflow(**run_jobflow_args) + + conn.add_tags(cluster_id, input_tags) cluster = conn.describe_cluster(cluster_id) cluster.tags.should.have.length_of(2) - tags = dict((tag.key, tag.value) for tag in cluster.tags) - tags['tag1'].should.equal('val1') - tags['tag2'].should.equal('val2') + dict((t.key, t.value) for t in cluster.tags).should.equal(input_tags) - # Remove a tag - conn.remove_tags(cluster_id, ["tag1"]) + conn.remove_tags(cluster_id, list(input_tags.keys())) cluster = conn.describe_cluster(cluster_id) - cluster.tags.should.have.length_of(1) - tags = dict((tag.key, tag.value) for tag in cluster.tags) - tags['tag2'].should.equal('val2') + cluster.tags.should.have.length_of(0) diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index 1a336a599..f389c1900 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -1,46 +1,586 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals +from copy import deepcopy import boto3 +import six import sure # noqa +from botocore.exceptions import ClientError +from nose.tools import assert_raises from moto import mock_emr +run_job_flow_args = dict( + Instances={ + 'InstanceCount': 3, + 'KeepJobFlowAliveWhenNoSteps': True, + 'MasterInstanceType': 'c3.medium', + 'Placement': {'AvailabilityZone': 'us-east-1a'}, + 'SlaveInstanceType': 'c3.xlarge', + }, + JobFlowRole='EMR_EC2_DefaultRole', + LogUri='s3://mybucket/log', + Name='cluster', + ServiceRole='EMR_DefaultRole', + VisibleToAllUsers=True) + + +input_instance_groups = [ + {'InstanceCount': 1, + 'InstanceRole': 'MASTER', + 'InstanceType': 'c1.medium', + 'Market': 'ON_DEMAND', + 'Name': 'master'}, + {'InstanceCount': 3, + 'InstanceRole': 'CORE', + 'InstanceType': 'c1.medium', + 'Market': 'ON_DEMAND', + 'Name': 'core'}, + {'InstanceCount': 6, + 'InstanceRole': 'TASK', + 'InstanceType': 'c1.large', + 'Market': 'SPOT', + 'Name': 'task-1', + 'BidPrice': '0.07'}, + {'InstanceCount': 10, + 'InstanceRole': 'TASK', + 'InstanceType': 'c1.xlarge', + 'Market': 'SPOT', + 'Name': 'task-2', + 'BidPrice': '0.05'}, +] + + @mock_emr -def test_run_job_flow(): +def test_describe_cluster(): client = boto3.client('emr', region_name='us-east-1') - cluster_id = client.run_job_flow( - Name='cluster', - Instances={ - 'MasterInstanceType': 'c3.xlarge', - 'SlaveInstanceType': 'c3.xlarge', - 'InstanceCount': 3, - 'Placement': {'AvailabilityZone': 'us-east-1a'}, - 'KeepJobFlowAliveWhenNoSteps': True, - }, - VisibleToAllUsers=True, - ) - cluster_id.should.have.key('JobFlowId') + + args = deepcopy(run_job_flow_args) + args['Applications'] = [{'Name': 'Spark', 'Version': '2.4.2'}] + args['Configurations'] = [ + {'Classification': 'yarn-site', + 'Properties': {'someproperty': 'somevalue'}}] + args['Instances']['AdditionalMasterSecurityGroups'] = ['additional-master'] + args['Instances']['AdditionalSlaveSecurityGroups'] = ['additional-slave'] + args['Instances']['Ec2KeyName'] = 'mykey' + args['Instances']['Ec2SubnetId'] = 'subnet-8be41cec' + args['Instances']['EmrManagedMasterSecurityGroup'] = 'master-security-group' + args['Instances']['EmrManagedSlaveSecurityGroup'] = 'slave-security-group' + args['Instances']['KeepJobFlowAliveWhenNoSteps'] = False + args['Instances']['ServiceAccessSecurityGroup'] = 'service-access-security-group' + args['Tags'] = [{'Key': 'tag1', 'Value': 'val1'}, + {'Key': 'tag2', 'Value': 'val2'}] + + cluster_id = client.run_job_flow(**args)['JobFlowId'] + + cl = client.describe_cluster(ClusterId=cluster_id)['Cluster'] + cl['Applications'][0]['Name'].should.equal('Spark') + cl['Applications'][0]['Version'].should.equal('2.4.2') + cl['AutoTerminate'].should.equal(True) + + config = cl['Configurations'][0] + config['Classification'].should.equal('yarn-site') + config['Properties'].should.equal(args['Configurations'][0]['Properties']) + + attrs = cl['Ec2InstanceAttributes'] + attrs['AdditionalMasterSecurityGroups'].should.equal(args['Instances']['AdditionalMasterSecurityGroups']) + attrs['AdditionalSlaveSecurityGroups'].should.equal(args['Instances']['AdditionalSlaveSecurityGroups']) + attrs['Ec2AvailabilityZone'].should.equal('us-east-1a') + attrs['Ec2KeyName'].should.equal(args['Instances']['Ec2KeyName']) + attrs['Ec2SubnetId'].should.equal(args['Instances']['Ec2SubnetId']) + attrs['EmrManagedMasterSecurityGroup'].should.equal(args['Instances']['EmrManagedMasterSecurityGroup']) + attrs['EmrManagedSlaveSecurityGroup'].should.equal(args['Instances']['EmrManagedSlaveSecurityGroup']) + attrs['IamInstanceProfile'].should.equal(args['JobFlowRole']) + attrs['ServiceAccessSecurityGroup'].should.equal(args['Instances']['ServiceAccessSecurityGroup']) + cl['Id'].should.equal(cluster_id) + cl['LogUri'].should.equal(args['LogUri']) + cl['MasterPublicDnsName'].should.be.a(six.string_types) + cl['Name'].should.equal(args['Name']) + cl['NormalizedInstanceHours'].should.equal(0) + # cl['ReleaseLabel'].should.equal('emr-5.0.0') + cl.shouldnt.have.key('RequestedAmiVersion') + cl['RunningAmiVersion'].should.equal('1.0.0') + # cl['SecurityConfiguration'].should.be.a(six.string_types) + cl['ServiceRole'].should.equal(args['ServiceRole']) + + status = cl['Status'] + status['State'].should.equal('TERMINATED') + # cluster['Status']['StateChangeReason'] + status['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') + # status['Timeline']['EndDateTime'].should.equal(datetime(2014, 1, 24, 2, 19, 46, tzinfo=pytz.utc)) + status['Timeline']['ReadyDateTime'].should.be.a('datetime.datetime') + + dict((t['Key'], t['Value']) for t in cl['Tags']).should.equal( + dict((t['Key'], t['Value']) for t in args['Tags'])) + + cl['TerminationProtected'].should.equal(False) + cl['VisibleToAllUsers'].should.equal(True) + + +@mock_emr +def test_describe_job_flows(): + client = boto3.client('emr', region_name='us-east-1') + cluster1_id = client.run_job_flow(**run_job_flow_args)['JobFlowId'] + cluster2_id = client.run_job_flow(**run_job_flow_args)['JobFlowId'] + + resp = client.describe_job_flows() + resp['JobFlows'].should.have.length_of(2) + + resp = client.describe_job_flows(JobFlowIds=[cluster2_id]) + resp['JobFlows'].should.have.length_of(1) + resp['JobFlows'][0]['JobFlowId'].should.equal(cluster2_id) + + resp = client.describe_job_flows(JobFlowIds=[cluster1_id]) + resp['JobFlows'].should.have.length_of(1) + resp['JobFlows'][0]['JobFlowId'].should.equal(cluster1_id) + + +@mock_emr +def test_describe_job_flow(): + client = boto3.client('emr', region_name='us-east-1') + + args = deepcopy(run_job_flow_args) + args['AmiVersion'] = '3.8.1' + args['Instances'].update( + {'Ec2KeyName': 'ec2keyname', + 'Ec2SubnetId': 'subnet-8be41cec', + 'HadoopVersion': '2.4.0'}) + args['VisibleToAllUsers'] = True + + cluster_id = client.run_job_flow(**args)['JobFlowId'] + + jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + + jf['AmiVersion'].should.equal(args['AmiVersion']) + jf.shouldnt.have.key('BootstrapActions') + esd = jf['ExecutionStatusDetail'] + esd['CreationDateTime'].should.be.a('datetime.datetime') + # esd['EndDateTime'].should.be.a('datetime.datetime') + # esd['LastStateChangeReason'].should.be.a(six.string_types) + esd['ReadyDateTime'].should.be.a('datetime.datetime') + esd['StartDateTime'].should.be.a('datetime.datetime') + esd['State'].should.equal('WAITING') + attrs = jf['Instances'] + attrs['Ec2KeyName'].should.equal(args['Instances']['Ec2KeyName']) + attrs['Ec2SubnetId'].should.equal(args['Instances']['Ec2SubnetId']) + attrs['HadoopVersion'].should.equal(args['Instances']['HadoopVersion']) + attrs['InstanceCount'].should.equal(args['Instances']['InstanceCount']) + for ig in attrs['InstanceGroups']: + # ig['BidPrice'] + ig['CreationDateTime'].should.be.a('datetime.datetime') + # ig['EndDateTime'].should.be.a('datetime.datetime') + ig['InstanceGroupId'].should.be.a(six.string_types) + ig['InstanceRequestCount'].should.be.a(int) + ig['InstanceRole'].should.be.within(['MASTER', 'CORE']) + ig['InstanceRunningCount'].should.be.a(int) + ig['InstanceType'].should.be.within(['c3.medium', 'c3.xlarge']) + # ig['LastStateChangeReason'].should.be.a(six.string_types) + ig['Market'].should.equal('ON_DEMAND') + ig['Name'].should.be.a(six.string_types) + ig['ReadyDateTime'].should.be.a('datetime.datetime') + ig['StartDateTime'].should.be.a('datetime.datetime') + ig['State'].should.equal('RUNNING') + attrs['KeepJobFlowAliveWhenNoSteps'].should.equal(True) + # attrs['MasterInstanceId'].should.be.a(six.string_types) + attrs['MasterInstanceType'].should.equal(args['Instances']['MasterInstanceType']) + attrs['MasterPublicDnsName'].should.be.a(six.string_types) + attrs['NormalizedInstanceHours'].should.equal(0) + attrs['Placement']['AvailabilityZone'].should.equal(args['Instances']['Placement']['AvailabilityZone']) + attrs['SlaveInstanceType'].should.equal(args['Instances']['SlaveInstanceType']) + attrs['TerminationProtected'].should.equal(False) + jf['JobFlowId'].should.equal(cluster_id) + jf['JobFlowRole'].should.equal(args['JobFlowRole']) + jf['LogUri'].should.equal(args['LogUri']) + jf['Name'].should.equal(args['Name']) + jf['ServiceRole'].should.equal(args['ServiceRole']) + jf.shouldnt.have.key('Steps') + jf.shouldnt.have.key('SupportedProducts') + jf['VisibleToAllUsers'].should.equal(True) @mock_emr def test_list_clusters(): client = boto3.client('emr', region_name='us-east-1') - client.run_job_flow( - Name='cluster', - Instances={ - 'MasterInstanceType': 'c3.xlarge', - 'SlaveInstanceType': 'c3.xlarge', - 'InstanceCount': 3, - 'Placement': {'AvailabilityZone': 'us-east-1a'}, - 'KeepJobFlowAliveWhenNoSteps': True, - }, - VisibleToAllUsers=True, - ) + args = deepcopy(run_job_flow_args) + args['Name'] = 'jobflow1' + cluster1_id = client.run_job_flow(**args)['JobFlowId'] + args['Name'] = 'jobflow2' + cluster2_id = client.run_job_flow(**args)['JobFlowId'] + client.terminate_job_flows(JobFlowIds=[cluster2_id]) + summary = client.list_clusters() clusters = summary['Clusters'] - clusters.should.have.length_of(1) - cluster = clusters[0] - cluster['NormalizedInstanceHours'].should.equal(0) - cluster['Status']['State'].should.equal("RUNNING") + clusters.should.have.length_of(2) + + expected = { + cluster1_id: { + 'Id': cluster1_id, + 'Name': 'jobflow1', + 'NormalizedInstanceHours': 0, + 'State': 'WAITING'}, + cluster2_id: { + 'Id': cluster2_id, + 'Name': 'jobflow2', + 'NormalizedInstanceHours': 0, + 'State': 'TERMINATED'}, + } + + for x in clusters: + y = expected[x['Id']] + x['Id'].should.equal(y['Id']) + x['Name'].should.equal(y['Name']) + x['NormalizedInstanceHours'].should.equal(y['NormalizedInstanceHours']) + x['Status']['State'].should.equal(y['State']) + x['Status']['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') + if y['State'] == 'TERMINATED': + x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') + else: + x['Status']['Timeline'].shouldnt.have.key('EndDateTime') + x['Status']['Timeline']['ReadyDateTime'].should.be.a('datetime.datetime') + + +@mock_emr +def test_run_job_flow(): + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + cluster_id = client.run_job_flow(**args)['JobFlowId'] + resp = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + resp['ExecutionStatusDetail']['State'].should.equal('WAITING') + resp['JobFlowId'].should.equal(cluster_id) + resp['Name'].should.equal(args['Name']) + resp['Instances']['MasterInstanceType'].should.equal(args['Instances']['MasterInstanceType']) + resp['Instances']['SlaveInstanceType'].should.equal(args['Instances']['SlaveInstanceType']) + resp['LogUri'].should.equal(args['LogUri']) + resp['VisibleToAllUsers'].should.equal(args['VisibleToAllUsers']) + resp['Instances']['NormalizedInstanceHours'].should.equal(0) + resp.shouldnt.have.key('Steps') + + +@mock_emr +def test_run_job_flow_with_invalid_params(): + client = boto3.client('emr', region_name='us-east-1') + with assert_raises(ClientError) as e: + # cannot set both AmiVersion and ReleaseLabel + args = deepcopy(run_job_flow_args) + args['AmiVersion'] = '2.4' + args['ReleaseLabel'] = 'emr-5.0.0' + client.run_job_flow(**args) + e.exception.response['Error']['Code'].should.equal('ValidationException') + + +@mock_emr +def test_run_job_flow_in_multiple_regions(): + regions = {} + for region in ['us-east-1', 'eu-west-1']: + client = boto3.client('emr', region_name=region) + args = deepcopy(run_job_flow_args) + args['Name'] = region + cluster_id = client.run_job_flow(**args)['JobFlowId'] + regions[region] = {'client': client, 'cluster_id': cluster_id} + + for region in regions.keys(): + client = regions[region]['client'] + resp = client.describe_cluster(ClusterId=regions[region]['cluster_id']) + resp['Cluster']['Name'].should.equal(region) + + +@mock_emr +def test_run_job_flow_with_new_params(): + client = boto3.client('emr', region_name='us-east-1') + resp = client.run_job_flow(**run_job_flow_args) + resp.should.have.key('JobFlowId') + + +@mock_emr +def test_run_job_flow_with_visible_to_all_users(): + client = boto3.client('emr', region_name='us-east-1') + for expected in (True, False): + args = deepcopy(run_job_flow_args) + args['VisibleToAllUsers'] = expected + resp = client.run_job_flow(**args) + cluster_id = resp['JobFlowId'] + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['VisibleToAllUsers'].should.equal(expected) + + +@mock_emr +def test_run_job_flow_with_instance_groups(): + input_groups = dict((g['Name'], g) for g in input_instance_groups) + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + args['Instances'] = {'InstanceGroups': input_instance_groups} + cluster_id = client.run_job_flow(**args)['JobFlowId'] + groups = client.list_instance_groups(ClusterId=cluster_id)['InstanceGroups'] + for x in groups: + y = input_groups[x['Name']] + x.should.have.key('Id') + x['RequestedInstanceCount'].should.equal(y['InstanceCount']) + x['InstanceGroupType'].should.equal(y['InstanceRole']) + x['InstanceType'].should.equal(y['InstanceType']) + x['Market'].should.equal(y['Market']) + if 'BidPrice' in y: + x['BidPrice'].should.equal(y['BidPrice']) + + +@mock_emr +def test_set_termination_protection(): + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + args['Instances']['TerminationProtected'] = False + resp = client.run_job_flow(**args) + cluster_id = resp['JobFlowId'] + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['TerminationProtected'].should.equal(False) + + for expected in (True, False): + resp = client.set_termination_protection(JobFlowIds=[cluster_id], + TerminationProtected=expected) + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['TerminationProtected'].should.equal(expected) + + +@mock_emr +def test_set_visible_to_all_users(): + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + args['VisibleToAllUsers'] = False + resp = client.run_job_flow(**args) + cluster_id = resp['JobFlowId'] + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['VisibleToAllUsers'].should.equal(False) + + for expected in (True, False): + resp = client.set_visible_to_all_users(JobFlowIds=[cluster_id], + VisibleToAllUsers=expected) + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['VisibleToAllUsers'].should.equal(expected) + + +@mock_emr +def test_terminate_job_flows(): + client = boto3.client('emr', region_name='us-east-1') + + resp = client.run_job_flow(**run_job_flow_args) + cluster_id = resp['JobFlowId'] + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['Status']['State'].should.equal('WAITING') + + resp = client.terminate_job_flows(JobFlowIds=[cluster_id]) + resp = client.describe_cluster(ClusterId=cluster_id) + resp['Cluster']['Status']['State'].should.equal('TERMINATED') + + +# testing multiple end points for each feature + +@mock_emr +def test_bootstrap_actions(): + bootstrap_actions = [ + {'Name': 'bs1', + 'ScriptBootstrapAction': { + 'Args': ['arg1', 'arg2'], + 'Path': 'path/to/script'}}, + {'Name': 'bs2', + 'ScriptBootstrapAction': { + 'Path': 'path/to/anotherscript'}} + ] + + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + args['BootstrapActions'] = bootstrap_actions + cluster_id = client.run_job_flow(**args)['JobFlowId'] + + cl = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + for x, y in zip(cl['BootstrapActions'], bootstrap_actions): + x['BootstrapActionConfig'].should.equal(y) + + resp = client.list_bootstrap_actions(ClusterId=cluster_id) + for x, y in zip(resp['BootstrapActions'], bootstrap_actions): + x['Name'].should.equal(y['Name']) + if 'Args' in y['ScriptBootstrapAction']: + x['Args'].should.equal(y['ScriptBootstrapAction']['Args']) + x['ScriptPath'].should.equal(y['ScriptBootstrapAction']['Path']) + + +@mock_emr +def test_instance_groups(): + input_groups = dict((g['Name'], g) for g in input_instance_groups) + + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + for key in ['MasterInstanceType', 'SlaveInstanceType', 'InstanceCount']: + del args['Instances'][key] + args['Instances']['InstanceGroups'] = input_instance_groups[:2] + cluster_id = client.run_job_flow(**args)['JobFlowId'] + + jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + base_instance_count = jf['Instances']['InstanceCount'] + + client.add_instance_groups(JobFlowId=cluster_id, InstanceGroups=input_instance_groups[2:]) + + jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + jf['Instances']['InstanceCount'].should.equal(sum(g['InstanceCount'] for g in input_instance_groups)) + for x in jf['Instances']['InstanceGroups']: + y = input_groups[x['Name']] + if hasattr(y, 'BidPrice'): + x['BidPrice'].should.equal('BidPrice') + x['CreationDateTime'].should.be.a('datetime.datetime') + # x['EndDateTime'].should.be.a('datetime.datetime') + x.should.have.key('InstanceGroupId') + x['InstanceRequestCount'].should.equal(y['InstanceCount']) + x['InstanceRole'].should.equal(y['InstanceRole']) + x['InstanceRunningCount'].should.equal(y['InstanceCount']) + x['InstanceType'].should.equal(y['InstanceType']) + # x['LastStateChangeReason'].should.equal(y['LastStateChangeReason']) + x['Market'].should.equal(y['Market']) + x['Name'].should.equal(y['Name']) + x['ReadyDateTime'].should.be.a('datetime.datetime') + x['StartDateTime'].should.be.a('datetime.datetime') + x['State'].should.equal('RUNNING') + + groups = client.list_instance_groups(ClusterId=cluster_id)['InstanceGroups'] + for x in groups: + y = input_groups[x['Name']] + if hasattr(y, 'BidPrice'): + x['BidPrice'].should.equal('BidPrice') + # Configurations + # EbsBlockDevices + # EbsOptimized + x.should.have.key('Id') + x['InstanceGroupType'].should.equal(y['InstanceRole']) + x['InstanceType'].should.equal(y['InstanceType']) + x['Market'].should.equal(y['Market']) + x['Name'].should.equal(y['Name']) + x['RequestedInstanceCount'].should.equal(y['InstanceCount']) + x['RunningInstanceCount'].should.equal(y['InstanceCount']) + # ShrinkPolicy + x['Status']['State'].should.equal('RUNNING') + x['Status']['StateChangeReason']['Code'].should.be.a(six.string_types) + # x['Status']['StateChangeReason']['Message'].should.be.a(six.string_types) + x['Status']['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') + # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') + x['Status']['Timeline']['ReadyDateTime'].should.be.a('datetime.datetime') + + igs = dict((g['Name'], g) for g in groups) + client.modify_instance_groups( + InstanceGroups=[ + {'InstanceGroupId': igs['task-1']['Id'], + 'InstanceCount': 2}, + {'InstanceGroupId': igs['task-2']['Id'], + 'InstanceCount': 3}]) + jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + jf['Instances']['InstanceCount'].should.equal(base_instance_count + 5) + igs = dict((g['Name'], g) for g in jf['Instances']['InstanceGroups']) + igs['task-1']['InstanceRunningCount'].should.equal(2) + igs['task-2']['InstanceRunningCount'].should.equal(3) + + +@mock_emr +def test_steps(): + input_steps = [{ + 'HadoopJarStep': { + 'Args': [ + 'hadoop-streaming', + '-files', 's3://elasticmapreduce/samples/wordcount/wordSplitter.py#wordSplitter.py', + '-mapper', 'python wordSplitter.py', + '-input', 's3://elasticmapreduce/samples/wordcount/input', + '-output', 's3://output_bucket/output/wordcount_output', + '-reducer', 'aggregate' + ], + 'Jar': 'command-runner.jar', + }, + 'Name': 'My wordcount example', + }, { + 'HadoopJarStep': { + 'Args': [ + 'hadoop-streaming', + '-files', 's3://elasticmapreduce/samples/wordcount/wordSplitter2.py#wordSplitter2.py', + '-mapper', 'python wordSplitter2.py', + '-input', 's3://elasticmapreduce/samples/wordcount/input2', + '-output', 's3://output_bucket/output/wordcount_output2', + '-reducer', 'aggregate' + ], + 'Jar': 'command-runner.jar', + }, + 'Name': 'My wordcount example2', + }] + + # TODO: implementation and test for cancel_steps + + client = boto3.client('emr', region_name='us-east-1') + args = deepcopy(run_job_flow_args) + args['Steps'] = [input_steps[0]] + cluster_id = client.run_job_flow(**args)['JobFlowId'] + + jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + jf['Steps'].should.have.length_of(1) + + client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[input_steps[1]]) + + jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] + jf['Steps'].should.have.length_of(2) + for idx, (x, y) in enumerate(zip(jf['Steps'], input_steps)): + x['ExecutionStatusDetail'].should.have.key('CreationDateTime') + # x['ExecutionStatusDetail'].should.have.key('EndDateTime') + # x['ExecutionStatusDetail'].should.have.key('LastStateChangeReason') + # x['ExecutionStatusDetail'].should.have.key('StartDateTime') + x['ExecutionStatusDetail']['State'].should.equal('STARTING' if idx == 0 else 'PENDING') + x['StepConfig']['ActionOnFailure'].should.equal('TERMINATE_CLUSTER') + x['StepConfig']['HadoopJarStep']['Args'].should.equal(y['HadoopJarStep']['Args']) + x['StepConfig']['HadoopJarStep']['Jar'].should.equal(y['HadoopJarStep']['Jar']) + if 'MainClass' in y['HadoopJarStep']: + x['StepConfig']['HadoopJarStep']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) + if 'Properties' in y['HadoopJarStep']: + x['StepConfig']['HadoopJarStep']['Properties'].should.equal(y['HadoopJarStep']['Properties']) + x['StepConfig']['Name'].should.equal(y['Name']) + + expected = dict((s['Name'], s) for s in input_steps) + + steps = client.list_steps(ClusterId=cluster_id)['Steps'] + steps.should.have.length_of(2) + for x in steps: + y = expected[x['Name']] + x['ActionOnFailure'].should.equal('TERMINATE_CLUSTER') + x['Config']['Args'].should.equal(y['HadoopJarStep']['Args']) + x['Config']['Jar'].should.equal(y['HadoopJarStep']['Jar']) + # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) + # Properties + x['Id'].should.be.a(six.string_types) + x['Name'].should.equal(y['Name']) + x['Status']['State'].should.be.within(['STARTING', 'PENDING']) + # StateChangeReason + x['Status']['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') + # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') + # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') + + x = client.describe_step(ClusterId=cluster_id, StepId=x['Id'])['Step'] + x['ActionOnFailure'].should.equal('TERMINATE_CLUSTER') + x['Config']['Args'].should.equal(y['HadoopJarStep']['Args']) + x['Config']['Jar'].should.equal(y['HadoopJarStep']['Jar']) + # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) + # Properties + x['Id'].should.be.a(six.string_types) + x['Name'].should.equal(y['Name']) + x['Status']['State'].should.be.within(['STARTING', 'PENDING']) + # StateChangeReason + x['Status']['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') + # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') + # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') + + +@mock_emr +def test_tags(): + input_tags = [{'Key': 'newkey1', 'Value': 'newval1'}, + {'Key': 'newkey2', 'Value': 'newval2'}] + + client = boto3.client('emr', region_name='us-east-1') + cluster_id = client.run_job_flow(**run_job_flow_args)['JobFlowId'] + + client.add_tags(ResourceId=cluster_id, Tags=input_tags) + resp = client.describe_cluster(ClusterId=cluster_id)['Cluster'] + resp['Tags'].should.have.length_of(2) + dict((t['Key'], t['Value']) for t in resp['Tags']).should.equal(dict((t['Key'], t['Value']) for t in input_tags)) + + client.remove_tags(ResourceId=cluster_id, TagKeys=[t['Key'] for t in input_tags]) + resp = client.describe_cluster(ClusterId=cluster_id)['Cluster'] + resp.shouldnt.have.key('Tags')