Add EMR clusters and tagging.
This commit is contained in:
parent
5ededbb297
commit
95b1fa42b2
@ -20,6 +20,35 @@ class FakeInstanceGroup(object):
|
||||
self.num_instances = instance_count
|
||||
|
||||
|
||||
class Cluster(object):
|
||||
def __init__(self, id, name, availability_zone, ec2_key_name, subnet_id,
|
||||
ec2_iam_profile, log_uri):
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.applications = []
|
||||
self.auto_terminate = "false"
|
||||
self.availability_zone = availability_zone
|
||||
self.subnet_id = subnet_id
|
||||
self.ec2_iam_profile = ec2_iam_profile
|
||||
self.log_uri = log_uri
|
||||
self.master_public_dns_name = ""
|
||||
self.normalized_instance_hours = 0
|
||||
self.requested_ami_version = "2.4.2"
|
||||
self.running_ami_version = "2.4.2"
|
||||
self.service_role = "my-service-role"
|
||||
self.state = "RUNNING"
|
||||
self.tags = {}
|
||||
self.termination_protected = "false"
|
||||
self.visible_to_all_users = "false"
|
||||
|
||||
def add_tags(self, tags):
|
||||
self.tags.update(tags)
|
||||
|
||||
def remove_tags(self, tag_keys):
|
||||
for key in tag_keys:
|
||||
self.tags.pop(key, None)
|
||||
|
||||
|
||||
class FakeStep(object):
|
||||
def __init__(self, state, **kwargs):
|
||||
# 'Steps.member.1.HadoopJarStep.Jar': ['/home/hadoop/contrib/streaming/hadoop-streaming.jar'],
|
||||
@ -68,11 +97,24 @@ class FakeJobFlow(object):
|
||||
self.normalized_instance_hours = 0
|
||||
self.ec2_key_name = instance_attrs.get('ec2_key_name')
|
||||
self.availability_zone = instance_attrs.get('placement.availability_zone')
|
||||
self.subnet_id = instance_attrs.get('ec2_subnet_id')
|
||||
self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps')
|
||||
self.termination_protected = instance_attrs.get('termination_protected')
|
||||
|
||||
self.instance_group_ids = []
|
||||
|
||||
def create_cluster(self):
|
||||
cluster = Cluster(
|
||||
id=self.id,
|
||||
name=self.name,
|
||||
availability_zone=self.availability_zone,
|
||||
ec2_key_name=self.ec2_key_name,
|
||||
subnet_id=self.subnet_id,
|
||||
ec2_iam_profile=self.role,
|
||||
log_uri=self.log_uri,
|
||||
)
|
||||
return cluster
|
||||
|
||||
def terminate(self):
|
||||
self.state = 'TERMINATED'
|
||||
|
||||
@ -129,12 +171,15 @@ class ElasticMapReduceBackend(BaseBackend):
|
||||
|
||||
def __init__(self):
|
||||
self.job_flows = {}
|
||||
self.clusters = {}
|
||||
self.instance_groups = {}
|
||||
|
||||
def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
|
||||
job_id = random_job_id()
|
||||
job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs)
|
||||
self.job_flows[job_id] = job_flow
|
||||
cluster = job_flow.create_cluster()
|
||||
self.clusters[cluster.id] = cluster
|
||||
return job_flow
|
||||
|
||||
def add_job_flow_steps(self, job_flow_id, steps):
|
||||
@ -142,8 +187,12 @@ class ElasticMapReduceBackend(BaseBackend):
|
||||
job_flow.add_steps(steps)
|
||||
return job_flow
|
||||
|
||||
def describe_job_flows(self):
|
||||
return self.job_flows.values()
|
||||
def describe_job_flows(self, job_flow_ids=None):
|
||||
jobs = self.job_flows.values()
|
||||
if job_flow_ids:
|
||||
return [job for job in jobs if job.id in job_flow_ids]
|
||||
else:
|
||||
return jobs
|
||||
|
||||
def terminate_job_flows(self, job_ids):
|
||||
flows = [flow for flow in self.describe_job_flows() if flow.id in job_ids]
|
||||
@ -151,6 +200,12 @@ class ElasticMapReduceBackend(BaseBackend):
|
||||
flow.terminate()
|
||||
return flows
|
||||
|
||||
def list_clusters(self):
|
||||
return self.clusters.values()
|
||||
|
||||
def get_cluster(self, cluster_id):
|
||||
return self.clusters[cluster_id]
|
||||
|
||||
def get_instance_groups(self, instance_group_ids):
|
||||
return [
|
||||
group for group_id, group
|
||||
@ -181,5 +236,13 @@ class ElasticMapReduceBackend(BaseBackend):
|
||||
job = self.job_flows[job_id]
|
||||
job.set_visibility(visible_to_all_users)
|
||||
|
||||
def add_tags(self, cluster_id, tags):
|
||||
cluster = self.get_cluster(cluster_id)
|
||||
cluster.add_tags(tags)
|
||||
|
||||
def remove_tags(self, cluster_id, tag_keys):
|
||||
cluster = self.get_cluster(cluster_id)
|
||||
cluster.remove_tags(tag_keys)
|
||||
|
||||
|
||||
emr_backend = ElasticMapReduceBackend()
|
||||
|
@ -2,6 +2,7 @@ from __future__ import unicode_literals
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import emr_backend
|
||||
from .utils import tags_from_query_string
|
||||
|
||||
|
||||
class ElasticMapReduceResponse(BaseResponse):
|
||||
@ -30,7 +31,8 @@ class ElasticMapReduceResponse(BaseResponse):
|
||||
return template.render(job_flow=job_flow)
|
||||
|
||||
def describe_job_flows(self):
|
||||
job_flows = emr_backend.describe_job_flows()
|
||||
job_flow_ids = self._get_multi_param("JobFlowIds.member")
|
||||
job_flows = emr_backend.describe_job_flows(job_flow_ids)
|
||||
template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE)
|
||||
return template.render(job_flows=job_flows)
|
||||
|
||||
@ -60,6 +62,31 @@ class ElasticMapReduceResponse(BaseResponse):
|
||||
template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def list_clusters(self):
|
||||
clusters = emr_backend.list_clusters()
|
||||
template = self.response_template(LIST_CLUSTERS_TEMPLATE)
|
||||
return template.render(clusters=clusters)
|
||||
|
||||
def describe_cluster(self):
|
||||
cluster_id = self._get_param('ClusterId')
|
||||
cluster = emr_backend.get_cluster(cluster_id)
|
||||
template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE)
|
||||
return template.render(cluster=cluster)
|
||||
|
||||
def add_tags(self):
|
||||
cluster_id = self._get_param('ResourceId')
|
||||
tags = tags_from_query_string(self.querystring)
|
||||
emr_backend.add_tags(cluster_id, tags)
|
||||
template = self.response_template(ADD_TAGS_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def remove_tags(self):
|
||||
cluster_id = self._get_param('ResourceId')
|
||||
tag_keys = self._get_multi_param('TagKeys.member')
|
||||
emr_backend.remove_tags(cluster_id, tag_keys)
|
||||
template = self.response_template(REMOVE_TAGS_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
|
||||
RUN_JOB_FLOW_TEMPLATE = """<RunJobFlowResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||
<RunJobFlowResult>
|
||||
@ -163,6 +190,85 @@ ADD_JOB_FLOW_STEPS_TEMPLATE = """<AddJobFlowStepsResponse xmlns="http://elasticm
|
||||
</ResponseMetadata>
|
||||
</AddJobFlowStepsResponse>"""
|
||||
|
||||
LIST_CLUSTERS_TEMPLATE = """<ListClustersResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||
<Clusters>
|
||||
{% for cluster in clusters %}
|
||||
<member>
|
||||
<Id>{{ cluster.id }}</Id>
|
||||
<Name>{{ cluster.name }}</Name>
|
||||
<NormalizedInstanceHours>{{ cluster.normalized_instance_hours }}</NormalizedInstanceHours>
|
||||
<Status>
|
||||
<State>{{ cluster.state }}</State>
|
||||
<StateChangeReason>
|
||||
<Code></Code>
|
||||
<Message></Message>
|
||||
</StateChangeReason>
|
||||
<Timeline></Timeline>
|
||||
</Status>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</Clusters>
|
||||
<Marker></Marker>
|
||||
<ResponseMetadata>
|
||||
<RequestId>
|
||||
2690d7eb-ed86-11dd-9877-6fad448a8418
|
||||
</RequestId>
|
||||
</ResponseMetadata>
|
||||
</ListClustersResponse>"""
|
||||
|
||||
DESCRIBE_CLUSTER_TEMPLATE = """<DescribeClusterResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||
<DescribeClusterResult>
|
||||
<Cluster>
|
||||
<Id>{{ cluster.id }}</Id>
|
||||
<Tags>
|
||||
{% for tag_key, tag_value in cluster.tags.items() %}
|
||||
<member>
|
||||
<Key>{{ tag_key }}</Key>
|
||||
<Value>{{ tag_value }}</Value>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</Tags>
|
||||
<Ec2InstanceAttributes>
|
||||
<Ec2AvailabilityZone>{{ cluster.availability_zone }}</Ec2AvailabilityZone>
|
||||
<Ec2SubnetId>{{ cluster.subnet_id }}</Ec2SubnetId>
|
||||
<Ec2KeyName>{{ cluster.ec2_key_name }}</Ec2KeyName>
|
||||
</Ec2InstanceAttributes>
|
||||
<RunningAmiVersion>{{ cluster.running_ami_version }}</RunningAmiVersion>
|
||||
<VisibleToAllUsers>{{ cluster.visible_to_all_users }}</VisibleToAllUsers>
|
||||
<Status>
|
||||
<StateChangeReason>
|
||||
<Message>Terminated by user request</Message>
|
||||
<Code>USER_REQUEST</Code>
|
||||
</StateChangeReason>
|
||||
<State>{{ cluster.state }}</State>
|
||||
<Timeline>
|
||||
<CreationDateTime>2014-01-24T01:21:21Z</CreationDateTime>
|
||||
<ReadyDateTime>2014-01-24T01:25:26Z</ReadyDateTime>
|
||||
<EndDateTime>2014-01-24T02:19:46Z</EndDateTime>
|
||||
</Timeline>
|
||||
</Status>
|
||||
<AutoTerminate>{{ cluster.auto_terminate }}</AutoTerminate>
|
||||
<Name>{{ cluster.name }}</Name>
|
||||
<RequestedAmiVersion>{{ cluster.requested_ami_version }}</RequestedAmiVersion>
|
||||
<Applications>
|
||||
{% for application in cluster.applications %}
|
||||
<member>
|
||||
<Name>{{ application.name }}</Name>
|
||||
<Version>{{ application.version }}</Version>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</Applications>
|
||||
<TerminationProtected>{{ cluster.termination_protection }}</TerminationProtected>
|
||||
<MasterPublicDnsName>ec2-184-0-0-1.us-west-1.compute.amazonaws.com</MasterPublicDnsName>
|
||||
<NormalizedInstanceHours>{{ cluster.normalized_instance_hours }}</NormalizedInstanceHours>
|
||||
<ServiceRole>{{ cluster.service_role }}</ServiceRole>
|
||||
</Cluster>
|
||||
</DescribeClusterResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee</RequestId>
|
||||
</ResponseMetadata>
|
||||
</DescribeClusterResponse>"""
|
||||
|
||||
ADD_INSTANCE_GROUPS_TEMPLATE = """<AddInstanceGroupsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||
<InstanceGroupIds>{% for instance_group in instance_groups %}{{ instance_group.id }}{% if loop.index != loop.length %},{% endif %}{% endfor %}</InstanceGroupIds>
|
||||
</AddInstanceGroupsResponse>"""
|
||||
@ -182,3 +288,20 @@ SET_VISIBLE_TO_ALL_USERS_TEMPLATE = """<SetVisibleToAllUsersResponse xmlns="http
|
||||
</RequestId>
|
||||
</ResponseMetadata>
|
||||
</SetVisibleToAllUsersResponse>"""
|
||||
|
||||
|
||||
ADD_TAGS_TEMPLATE = """<AddTagsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||
<ResponseMetadata>
|
||||
<RequestId>
|
||||
2690d7eb-ed86-11dd-9877-6fad448a8419
|
||||
</RequestId>
|
||||
</ResponseMetadata>
|
||||
</AddTagsResponse>"""
|
||||
|
||||
REMOVE_TAGS_TEMPLATE = """<RemoveTagsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||
<ResponseMetadata>
|
||||
<RequestId>
|
||||
2690d7eb-ed86-11dd-9877-6fad448a8419
|
||||
</RequestId>
|
||||
</ResponseMetadata>
|
||||
</RemoveTagsResponse>"""
|
||||
|
@ -14,3 +14,19 @@ def random_instance_group_id(size=13):
|
||||
chars = list(range(10)) + list(string.ascii_uppercase)
|
||||
job_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size))
|
||||
return 'i-{0}'.format(job_tag)
|
||||
|
||||
|
||||
def tags_from_query_string(querystring_dict):
|
||||
prefix = 'Tags'
|
||||
suffix = 'Key'
|
||||
response_values = {}
|
||||
for key, value in querystring_dict.items():
|
||||
if key.startswith(prefix) and key.endswith(suffix):
|
||||
tag_index = key.replace(prefix + ".", "").replace("." + suffix, "")
|
||||
tag_key = querystring_dict.get("Tags.{0}.Key".format(tag_index))[0]
|
||||
tag_value_key = "Tags.{0}.Value".format(tag_index)
|
||||
if tag_value_key in querystring_dict:
|
||||
response_values[tag_key] = querystring_dict.get(tag_value_key)[0]
|
||||
else:
|
||||
response_values[tag_key] = None
|
||||
return response_values
|
||||
|
@ -123,6 +123,31 @@ def test_terminate_job_flow():
|
||||
flow.state.should.equal('TERMINATED')
|
||||
|
||||
|
||||
@mock_emr
|
||||
def test_describe_job_flows():
|
||||
conn = boto.connect_emr()
|
||||
job1_id = conn.run_jobflow(
|
||||
name='My jobflow',
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
steps=[]
|
||||
)
|
||||
job2_id = conn.run_jobflow(
|
||||
name='My jobflow',
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
steps=[]
|
||||
)
|
||||
|
||||
jobs = conn.describe_jobflows()
|
||||
jobs.should.have.length_of(2)
|
||||
|
||||
jobs = conn.describe_jobflows(jobflow_ids=[job2_id])
|
||||
jobs.should.have.length_of(1)
|
||||
jobs[0].jobflowid.should.equal(job2_id)
|
||||
|
||||
first_job = conn.describe_jobflow(job1_id)
|
||||
first_job.jobflowid.should.equal(job1_id)
|
||||
|
||||
|
||||
@mock_emr
|
||||
def test_add_steps_to_flow():
|
||||
conn = boto.connect_emr()
|
||||
@ -291,3 +316,61 @@ def test_set_visible_to_all_users():
|
||||
|
||||
job_flow = conn.describe_jobflow(job_id)
|
||||
job_flow.visibletoallusers.should.equal('False')
|
||||
|
||||
|
||||
@mock_emr
|
||||
def test_list_clusters():
|
||||
conn = boto.connect_emr()
|
||||
conn.run_jobflow(
|
||||
name='My jobflow',
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
steps=[],
|
||||
)
|
||||
|
||||
summary = conn.list_clusters()
|
||||
clusters = summary.clusters
|
||||
clusters.should.have.length_of(1)
|
||||
cluster = clusters[0]
|
||||
cluster.name.should.equal("My jobflow")
|
||||
cluster.normalizedinstancehours.should.equal('0')
|
||||
cluster.status.state.should.equal("RUNNING")
|
||||
|
||||
|
||||
@mock_emr
|
||||
def test_describe_cluster():
|
||||
conn = boto.connect_emr()
|
||||
job_id = conn.run_jobflow(
|
||||
name='My jobflow',
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
steps=[],
|
||||
)
|
||||
|
||||
cluster = conn.describe_cluster(job_id)
|
||||
cluster.name.should.equal("My jobflow")
|
||||
cluster.normalizedinstancehours.should.equal('0')
|
||||
cluster.status.state.should.equal("RUNNING")
|
||||
|
||||
|
||||
@mock_emr
|
||||
def test_cluster_tagging():
|
||||
conn = boto.connect_emr()
|
||||
job_id = conn.run_jobflow(
|
||||
name='My jobflow',
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
steps=[],
|
||||
)
|
||||
cluster_id = job_id
|
||||
conn.add_tags(cluster_id, {"tag1": "val1", "tag2": "val2"})
|
||||
|
||||
cluster = conn.describe_cluster(cluster_id)
|
||||
cluster.tags.should.have.length_of(2)
|
||||
tags = {tag.key: tag.value for tag in cluster.tags}
|
||||
tags['tag1'].should.equal('val1')
|
||||
tags['tag2'].should.equal('val2')
|
||||
|
||||
# Remove a tag
|
||||
conn.remove_tags(cluster_id, ["tag1"])
|
||||
cluster = conn.describe_cluster(cluster_id)
|
||||
cluster.tags.should.have.length_of(1)
|
||||
tags = {tag.key: tag.value for tag in cluster.tags}
|
||||
tags['tag2'].should.equal('val2')
|
||||
|
Loading…
Reference in New Issue
Block a user