diff --git a/moto/emr/models.py b/moto/emr/models.py index 26e8f2a65..466467360 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -20,6 +20,35 @@ class FakeInstanceGroup(object): self.num_instances = instance_count +class Cluster(object): + def __init__(self, id, name, availability_zone, ec2_key_name, subnet_id, + ec2_iam_profile, log_uri): + self.id = id + self.name = name + self.applications = [] + self.auto_terminate = "false" + self.availability_zone = availability_zone + self.subnet_id = subnet_id + self.ec2_iam_profile = ec2_iam_profile + self.log_uri = log_uri + self.master_public_dns_name = "" + self.normalized_instance_hours = 0 + self.requested_ami_version = "2.4.2" + self.running_ami_version = "2.4.2" + self.service_role = "my-service-role" + self.state = "RUNNING" + self.tags = {} + self.termination_protected = "false" + self.visible_to_all_users = "false" + + def add_tags(self, tags): + self.tags.update(tags) + + def remove_tags(self, tag_keys): + for key in tag_keys: + self.tags.pop(key, None) + + class FakeStep(object): def __init__(self, state, **kwargs): # 'Steps.member.1.HadoopJarStep.Jar': ['/home/hadoop/contrib/streaming/hadoop-streaming.jar'], @@ -68,11 +97,24 @@ class FakeJobFlow(object): self.normalized_instance_hours = 0 self.ec2_key_name = instance_attrs.get('ec2_key_name') self.availability_zone = instance_attrs.get('placement.availability_zone') + self.subnet_id = instance_attrs.get('ec2_subnet_id') self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps') self.termination_protected = instance_attrs.get('termination_protected') self.instance_group_ids = [] + def create_cluster(self): + cluster = Cluster( + id=self.id, + name=self.name, + availability_zone=self.availability_zone, + ec2_key_name=self.ec2_key_name, + subnet_id=self.subnet_id, + ec2_iam_profile=self.role, + log_uri=self.log_uri, + ) + return cluster + def terminate(self): self.state = 'TERMINATED' @@ -129,12 +171,15 @@ class ElasticMapReduceBackend(BaseBackend): def __init__(self): self.job_flows = {} + self.clusters = {} self.instance_groups = {} def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs): job_id = random_job_id() job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs) self.job_flows[job_id] = job_flow + cluster = job_flow.create_cluster() + self.clusters[cluster.id] = cluster return job_flow def add_job_flow_steps(self, job_flow_id, steps): @@ -142,8 +187,12 @@ class ElasticMapReduceBackend(BaseBackend): job_flow.add_steps(steps) return job_flow - def describe_job_flows(self): - return self.job_flows.values() + def describe_job_flows(self, job_flow_ids=None): + jobs = self.job_flows.values() + if job_flow_ids: + return [job for job in jobs if job.id in job_flow_ids] + else: + return jobs def terminate_job_flows(self, job_ids): flows = [flow for flow in self.describe_job_flows() if flow.id in job_ids] @@ -151,6 +200,12 @@ class ElasticMapReduceBackend(BaseBackend): flow.terminate() return flows + def list_clusters(self): + return self.clusters.values() + + def get_cluster(self, cluster_id): + return self.clusters[cluster_id] + def get_instance_groups(self, instance_group_ids): return [ group for group_id, group @@ -181,5 +236,13 @@ class ElasticMapReduceBackend(BaseBackend): job = self.job_flows[job_id] job.set_visibility(visible_to_all_users) + def add_tags(self, cluster_id, tags): + cluster = self.get_cluster(cluster_id) + cluster.add_tags(tags) + + def remove_tags(self, cluster_id, tag_keys): + cluster = self.get_cluster(cluster_id) + cluster.remove_tags(tag_keys) + emr_backend = ElasticMapReduceBackend() diff --git a/moto/emr/responses.py b/moto/emr/responses.py index b8e6d2fb7..337d0417b 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals from moto.core.responses import BaseResponse from .models import emr_backend +from .utils import tags_from_query_string class ElasticMapReduceResponse(BaseResponse): @@ -30,7 +31,8 @@ class ElasticMapReduceResponse(BaseResponse): return template.render(job_flow=job_flow) def describe_job_flows(self): - job_flows = emr_backend.describe_job_flows() + job_flow_ids = self._get_multi_param("JobFlowIds.member") + job_flows = emr_backend.describe_job_flows(job_flow_ids) template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) return template.render(job_flows=job_flows) @@ -60,6 +62,31 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE) return template.render() + def list_clusters(self): + clusters = emr_backend.list_clusters() + template = self.response_template(LIST_CLUSTERS_TEMPLATE) + return template.render(clusters=clusters) + + def describe_cluster(self): + cluster_id = self._get_param('ClusterId') + cluster = emr_backend.get_cluster(cluster_id) + template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE) + return template.render(cluster=cluster) + + def add_tags(self): + cluster_id = self._get_param('ResourceId') + tags = tags_from_query_string(self.querystring) + emr_backend.add_tags(cluster_id, tags) + template = self.response_template(ADD_TAGS_TEMPLATE) + return template.render() + + def remove_tags(self): + cluster_id = self._get_param('ResourceId') + tag_keys = self._get_multi_param('TagKeys.member') + emr_backend.remove_tags(cluster_id, tag_keys) + template = self.response_template(REMOVE_TAGS_TEMPLATE) + return template.render() + RUN_JOB_FLOW_TEMPLATE = """ @@ -163,6 +190,85 @@ ADD_JOB_FLOW_STEPS_TEMPLATE = """ + + {% for cluster in clusters %} + + {{ cluster.id }} + {{ cluster.name }} + {{ cluster.normalized_instance_hours }} + + {{ cluster.state }} + + + + + + + + {% endfor %} + + + + + 2690d7eb-ed86-11dd-9877-6fad448a8418 + + +""" + +DESCRIBE_CLUSTER_TEMPLATE = """ + + + {{ cluster.id }} + + {% for tag_key, tag_value in cluster.tags.items() %} + + {{ tag_key }} + {{ tag_value }} + + {% endfor %} + + + {{ cluster.availability_zone }} + {{ cluster.subnet_id }} + {{ cluster.ec2_key_name }} + + {{ cluster.running_ami_version }} + {{ cluster.visible_to_all_users }} + + + Terminated by user request + USER_REQUEST + + {{ cluster.state }} + + 2014-01-24T01:21:21Z + 2014-01-24T01:25:26Z + 2014-01-24T02:19:46Z + + + {{ cluster.auto_terminate }} + {{ cluster.name }} + {{ cluster.requested_ami_version }} + + {% for application in cluster.applications %} + + {{ application.name }} + {{ application.version }} + + {% endfor %} + + {{ cluster.termination_protection }} + ec2-184-0-0-1.us-west-1.compute.amazonaws.com + {{ cluster.normalized_instance_hours }} + {{ cluster.service_role }} + + + + aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee + +""" + ADD_INSTANCE_GROUPS_TEMPLATE = """ {% for instance_group in instance_groups %}{{ instance_group.id }}{% if loop.index != loop.length %},{% endif %}{% endfor %} """ @@ -182,3 +288,20 @@ SET_VISIBLE_TO_ALL_USERS_TEMPLATE = """ + + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + + +""" + +REMOVE_TAGS_TEMPLATE = """ + + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + + +""" diff --git a/moto/emr/utils.py b/moto/emr/utils.py index e09c1e123..b4262c177 100644 --- a/moto/emr/utils.py +++ b/moto/emr/utils.py @@ -14,3 +14,19 @@ def random_instance_group_id(size=13): chars = list(range(10)) + list(string.ascii_uppercase) job_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size)) return 'i-{0}'.format(job_tag) + + +def tags_from_query_string(querystring_dict): + prefix = 'Tags' + suffix = 'Key' + response_values = {} + for key, value in querystring_dict.items(): + if key.startswith(prefix) and key.endswith(suffix): + tag_index = key.replace(prefix + ".", "").replace("." + suffix, "") + tag_key = querystring_dict.get("Tags.{0}.Key".format(tag_index))[0] + tag_value_key = "Tags.{0}.Value".format(tag_index) + if tag_value_key in querystring_dict: + response_values[tag_key] = querystring_dict.get(tag_value_key)[0] + else: + response_values[tag_key] = None + return response_values diff --git a/tests/test_emr/test_emr.py b/tests/test_emr/test_emr.py index c8d5fca3b..601341626 100644 --- a/tests/test_emr/test_emr.py +++ b/tests/test_emr/test_emr.py @@ -123,6 +123,31 @@ def test_terminate_job_flow(): flow.state.should.equal('TERMINATED') +@mock_emr +def test_describe_job_flows(): + conn = boto.connect_emr() + job1_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[] + ) + job2_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[] + ) + + jobs = conn.describe_jobflows() + jobs.should.have.length_of(2) + + jobs = conn.describe_jobflows(jobflow_ids=[job2_id]) + jobs.should.have.length_of(1) + jobs[0].jobflowid.should.equal(job2_id) + + first_job = conn.describe_jobflow(job1_id) + first_job.jobflowid.should.equal(job1_id) + + @mock_emr def test_add_steps_to_flow(): conn = boto.connect_emr() @@ -291,3 +316,61 @@ def test_set_visible_to_all_users(): job_flow = conn.describe_jobflow(job_id) job_flow.visibletoallusers.should.equal('False') + + +@mock_emr +def test_list_clusters(): + conn = boto.connect_emr() + conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[], + ) + + summary = conn.list_clusters() + clusters = summary.clusters + clusters.should.have.length_of(1) + cluster = clusters[0] + cluster.name.should.equal("My jobflow") + cluster.normalizedinstancehours.should.equal('0') + cluster.status.state.should.equal("RUNNING") + + +@mock_emr +def test_describe_cluster(): + conn = boto.connect_emr() + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[], + ) + + cluster = conn.describe_cluster(job_id) + cluster.name.should.equal("My jobflow") + cluster.normalizedinstancehours.should.equal('0') + cluster.status.state.should.equal("RUNNING") + + +@mock_emr +def test_cluster_tagging(): + conn = boto.connect_emr() + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[], + ) + cluster_id = job_id + conn.add_tags(cluster_id, {"tag1": "val1", "tag2": "val2"}) + + cluster = conn.describe_cluster(cluster_id) + cluster.tags.should.have.length_of(2) + tags = {tag.key: tag.value for tag in cluster.tags} + tags['tag1'].should.equal('val1') + tags['tag2'].should.equal('val2') + + # Remove a tag + conn.remove_tags(cluster_id, ["tag1"]) + cluster = conn.describe_cluster(cluster_id) + cluster.tags.should.have.length_of(1) + tags = {tag.key: tag.value for tag in cluster.tags} + tags['tag2'].should.equal('val2')