Merge pull request #739 from okomestudio/ts/emr_list_clusters

Implement filters and pagers for some EMR end points
This commit is contained in:
Steve Pulec 2016-11-06 09:37:00 -05:00 committed by GitHub
commit eaf70ac349
6 changed files with 336 additions and 109 deletions

View File

@ -3,6 +3,7 @@ import datetime
import json import json
import re import re
import pytz
from boto.exception import JSONResponseError from boto.exception import JSONResponseError
from jinja2 import Environment, DictLoader, TemplateNotFound from jinja2 import Environment, DictLoader, TemplateNotFound
@ -477,6 +478,11 @@ def to_str(value, spec):
return 'true' if value else 'false' return 'true' if value else 'false'
elif vtype == 'integer': elif vtype == 'integer':
return str(value) return str(value)
elif vtype == 'float':
return str(value)
elif vtype == 'timestamp':
return datetime.datetime.utcfromtimestamp(
value).replace(tzinfo=pytz.utc).isoformat()
elif vtype == 'string': elif vtype == 'string':
return str(value) return str(value)
elif value is None: elif value is None:

View File

@ -1,8 +1,10 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from datetime import datetime from datetime import datetime
from datetime import timedelta
import boto.emr import boto.emr
import pytz import pytz
from dateutil.parser import parse as dtparse
from moto.core import BaseBackend from moto.core import BaseBackend
from .utils import random_instance_group_id, random_cluster_id, random_step_id from .utils import random_instance_group_id, random_cluster_id, random_step_id
@ -273,12 +275,24 @@ class ElasticMapReduceBackend(BaseBackend):
cluster = self.get_cluster(cluster_id) cluster = self.get_cluster(cluster_id)
cluster.add_tags(tags) cluster.add_tags(tags)
def describe_job_flows(self, job_flow_ids=None): def describe_job_flows(self, job_flow_ids=None, job_flow_states=None, created_after=None, created_before=None):
clusters = self.clusters.values() clusters = self.clusters.values()
within_two_month = datetime.now(pytz.utc) - timedelta(days=60)
clusters = [c for c in clusters if c.creation_datetime >= within_two_month]
if job_flow_ids: if job_flow_ids:
return [cluster for cluster in clusters if cluster.id in job_flow_ids] clusters = [c for c in clusters if c.id in job_flow_ids]
else: if job_flow_states:
return clusters clusters = [c for c in clusters if c.state in job_flow_states]
if created_after:
created_after = dtparse(created_after)
clusters = [c for c in clusters if c.creation_datetime > created_after]
if created_before:
created_before = dtparse(created_before)
clusters = [c for c in clusters if c.creation_datetime < created_before]
return sorted(clusters, key=lambda x: x.id)[:512]
def describe_step(self, cluster_id, step_id): def describe_step(self, cluster_id, step_id):
cluster = self.clusters[cluster_id] cluster = self.clusters[cluster_id]
@ -296,17 +310,48 @@ class ElasticMapReduceBackend(BaseBackend):
if group_id in instance_group_ids if group_id in instance_group_ids
] ]
def list_bootstrap_actions(self, cluster_id): def list_bootstrap_actions(self, cluster_id, marker=None):
return self.clusters[cluster_id].bootstrap_actions max_items = 50
actions = self.clusters[cluster_id].bootstrap_actions
start_idx = 0 if marker is None else int(marker)
marker = None if len(actions) <= start_idx + max_items else str(start_idx + max_items)
return actions[start_idx:start_idx + max_items], marker
def list_clusters(self): def list_clusters(self, cluster_states=None, created_after=None,
return self.clusters.values() created_before=None, marker=None):
max_items = 50
clusters = self.clusters.values()
if cluster_states:
clusters = [c for c in clusters if c.state in cluster_states]
if created_after:
created_after = dtparse(created_after)
clusters = [c for c in clusters if c.creation_datetime > created_after]
if created_before:
created_before = dtparse(created_before)
clusters = [c for c in clusters if c.creation_datetime < created_before]
clusters = sorted(clusters, key=lambda x: x.id)
start_idx = 0 if marker is None else int(marker)
marker = None if len(clusters) <= start_idx + max_items else str(start_idx + max_items)
return clusters[start_idx:start_idx + max_items], marker
def list_instance_groups(self, cluster_id): def list_instance_groups(self, cluster_id, marker=None):
return self.clusters[cluster_id].instance_groups max_items = 50
groups = sorted(self.clusters[cluster_id].instance_groups,
key=lambda x: x.id)
start_idx = 0 if marker is None else int(marker)
marker = None if len(groups) <= start_idx + max_items else str(start_idx + max_items)
return groups[start_idx:start_idx + max_items], marker
def list_steps(self, cluster_id, step_states=None): def list_steps(self, cluster_id, marker=None, step_ids=None, step_states=None):
return self.clusters[cluster_id].steps max_items = 50
steps = self.clusters[cluster_id].steps
if step_ids:
steps = [s for s in steps if s.id in step_ids]
if step_states:
steps = [s for s in steps if s.state in step_states]
start_idx = 0 if marker is None else int(marker)
marker = None if len(steps) <= start_idx + max_items else str(start_idx + max_items)
return steps[start_idx:start_idx + max_items], marker
def modify_instance_groups(self, instance_groups): def modify_instance_groups(self, instance_groups):
result_groups = [] result_groups = []
@ -333,10 +378,11 @@ class ElasticMapReduceBackend(BaseBackend):
cluster.set_termination_protection(value) cluster.set_termination_protection(value)
def terminate_job_flows(self, job_flow_ids): def terminate_job_flows(self, job_flow_ids):
clusters = [cluster for cluster in self.describe_job_flows() clusters = []
if cluster.id in job_flow_ids] for job_flow_id in job_flow_ids:
for cluster in clusters: cluster = self.clusters[job_flow_id]
cluster.terminate() cluster.terminate()
clusters.append(cluster)
return clusters return clusters

View File

@ -101,8 +101,11 @@ class ElasticMapReduceResponse(BaseResponse):
@generate_boto3_response('DescribeJobFlows') @generate_boto3_response('DescribeJobFlows')
def describe_job_flows(self): def describe_job_flows(self):
created_after = self._get_param('CreatedAfter')
created_before = self._get_param('CreatedBefore')
job_flow_ids = self._get_multi_param("JobFlowIds.member") job_flow_ids = self._get_multi_param("JobFlowIds.member")
clusters = self.backend.describe_job_flows(job_flow_ids) job_flow_states = self._get_multi_param('JobFlowStates.member')
clusters = self.backend.describe_job_flows(job_flow_ids, job_flow_states, created_after, created_before)
template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE)
return template.render(clusters=clusters) return template.render(clusters=clusters)
@ -120,22 +123,28 @@ class ElasticMapReduceResponse(BaseResponse):
@generate_boto3_response('ListBootstrapActions') @generate_boto3_response('ListBootstrapActions')
def list_bootstrap_actions(self): def list_bootstrap_actions(self):
cluster_id = self._get_param('ClusterId') cluster_id = self._get_param('ClusterId')
bootstrap_actions = self.backend.list_bootstrap_actions(cluster_id) marker = self._get_param('Marker')
bootstrap_actions, marker = self.backend.list_bootstrap_actions(cluster_id, marker)
template = self.response_template(LIST_BOOTSTRAP_ACTIONS_TEMPLATE) template = self.response_template(LIST_BOOTSTRAP_ACTIONS_TEMPLATE)
return template.render(bootstrap_actions=bootstrap_actions) return template.render(bootstrap_actions=bootstrap_actions, marker=marker)
@generate_boto3_response('ListClusters') @generate_boto3_response('ListClusters')
def list_clusters(self): def list_clusters(self):
clusters = self.backend.list_clusters() cluster_states = self._get_multi_param('ClusterStates.member')
created_after = self._get_param('CreatedAfter')
created_before = self._get_param('CreatedBefore')
marker = self._get_param('Marker')
clusters, marker = self.backend.list_clusters(cluster_states, created_after, created_before, marker)
template = self.response_template(LIST_CLUSTERS_TEMPLATE) template = self.response_template(LIST_CLUSTERS_TEMPLATE)
return template.render(clusters=clusters) return template.render(clusters=clusters, marker=marker)
@generate_boto3_response('ListInstanceGroups') @generate_boto3_response('ListInstanceGroups')
def list_instance_groups(self): def list_instance_groups(self):
cluster_id = self._get_param('ClusterId') cluster_id = self._get_param('ClusterId')
instance_groups = self.backend.list_instance_groups(cluster_id) marker = self._get_param('Marker')
instance_groups, marker = self.backend.list_instance_groups(cluster_id, marker=marker)
template = self.response_template(LIST_INSTANCE_GROUPS_TEMPLATE) template = self.response_template(LIST_INSTANCE_GROUPS_TEMPLATE)
return template.render(instance_groups=instance_groups) return template.render(instance_groups=instance_groups, marker=marker)
def list_instances(self): def list_instances(self):
raise NotImplementedError raise NotImplementedError
@ -143,9 +152,12 @@ class ElasticMapReduceResponse(BaseResponse):
@generate_boto3_response('ListSteps') @generate_boto3_response('ListSteps')
def list_steps(self): def list_steps(self):
cluster_id = self._get_param('ClusterId') cluster_id = self._get_param('ClusterId')
steps = self.backend.list_steps(cluster_id) marker = self._get_param('Marker')
step_ids = self._get_multi_param('StepIds.member')
step_states = self._get_multi_param('StepStates.member')
steps, marker = self.backend.list_steps(cluster_id, marker=marker, step_ids=step_ids, step_states=step_states)
template = self.response_template(LIST_STEPS_TEMPLATE) template = self.response_template(LIST_STEPS_TEMPLATE)
return template.render(steps=steps) return template.render(steps=steps, marker=marker)
@generate_boto3_response('ModifyInstanceGroups') @generate_boto3_response('ModifyInstanceGroups')
def modify_instance_groups(self): def modify_instance_groups(self):
@ -623,6 +635,9 @@ LIST_BOOTSTRAP_ACTIONS_TEMPLATE = """<ListBootstrapActionsResponse xmlns="http:/
</member> </member>
{% endfor %} {% endfor %}
</BootstrapActions> </BootstrapActions>
{% if marker is not none %}
<Marker>{{ marker }}</Marker>
{% endif %}
</ListBootstrapActionsResult> </ListBootstrapActionsResult>
<ResponseMetadata> <ResponseMetadata>
<RequestId>df6f4f4a-ed85-11dd-9877-6fad448a8419</RequestId> <RequestId>df6f4f4a-ed85-11dd-9877-6fad448a8419</RequestId>
@ -658,7 +673,9 @@ LIST_CLUSTERS_TEMPLATE = """<ListClustersResponse xmlns="http://elasticmapreduce
</member> </member>
{% endfor %} {% endfor %}
</Clusters> </Clusters>
<Marker></Marker> {% if marker is not none %}
<Marker>{{ marker }}</Marker>
{% endif %}
</ListClustersResult> </ListClustersResult>
<ResponseMetadata> <ResponseMetadata>
<RequestId>2690d7eb-ed86-11dd-9877-6fad448a8418</RequestId> <RequestId>2690d7eb-ed86-11dd-9877-6fad448a8418</RequestId>
@ -706,6 +723,9 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """<ListInstanceGroupsResponse xmlns="http://ela
</member> </member>
{% endfor %} {% endfor %}
</InstanceGroups> </InstanceGroups>
{% if marker is not none %}
<Marker>{{ marker }}</Marker>
{% endif %}
</ListInstanceGroupsResult> </ListInstanceGroupsResult>
<ResponseMetadata> <ResponseMetadata>
<RequestId>8296d8b8-ed85-11dd-9877-6fad448a8419</RequestId> <RequestId>8296d8b8-ed85-11dd-9877-6fad448a8419</RequestId>
@ -760,6 +780,9 @@ LIST_STEPS_TEMPLATE = """<ListStepsResponse xmlns="http://elasticmapreduce.amazo
</member> </member>
{% endfor %} {% endfor %}
</Steps> </Steps>
{% if marker is not none %}
<Marker>{{ marker }}</Marker>
{% endif %}
</ListStepsResult> </ListStepsResult>
<ResponseMetadata> <ResponseMetadata>
<RequestId>df6f4f4a-ed85-11dd-9877-6fad448a8419</RequestId> <RequestId>df6f4f4a-ed85-11dd-9877-6fad448a8419</RequestId>

View File

@ -10,7 +10,8 @@ install_requires = [
"xmltodict", "xmltodict",
"six", "six",
"werkzeug", "werkzeug",
"pytz" "pytz",
"python-dateutil",
] ]
extras_require = { extras_require = {

View File

@ -1,6 +1,9 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import time
from datetime import datetime
import boto import boto
import pytz
from boto.emr.bootstrap_action import BootstrapAction from boto.emr.bootstrap_action import BootstrapAction
from boto.emr.instance_group import InstanceGroup from boto.emr.instance_group import InstanceGroup
from boto.emr.step import StreamingStep from boto.emr.step import StreamingStep
@ -104,18 +107,53 @@ def test_describe_cluster():
@mock_emr @mock_emr
def test_describe_jobflows(): def test_describe_jobflows():
conn = boto.connect_emr() conn = boto.connect_emr()
job1_id = conn.run_jobflow(**run_jobflow_args) args = run_jobflow_args.copy()
job2_id = conn.run_jobflow(**run_jobflow_args) expected = {}
for idx in range(400):
cluster_name = 'cluster' + str(idx)
args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'state': 'WAITING'
}
# need sleep since it appears the timestamp is always rounded to
# the nearest second internally
time.sleep(1)
timestamp = datetime.now(pytz.utc)
time.sleep(1)
for idx in range(400, 600):
cluster_name = 'cluster' + str(idx)
args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
conn.terminate_jobflow(cluster_id)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'state': 'TERMINATED'
}
jobs = conn.describe_jobflows() jobs = conn.describe_jobflows()
jobs.should.have.length_of(2) jobs.should.have.length_of(512)
jobs = conn.describe_jobflows(jobflow_ids=[job2_id]) for cluster_id, y in expected.items():
jobs.should.have.length_of(1) resp = conn.describe_jobflows(jobflow_ids=[cluster_id])
jobs[0].jobflowid.should.equal(job2_id) resp.should.have.length_of(1)
resp[0].jobflowid.should.equal(cluster_id)
first_job = conn.describe_jobflow(job1_id) resp = conn.describe_jobflows(states=['WAITING'])
first_job.jobflowid.should.equal(job1_id) resp.should.have.length_of(400)
for x in resp:
x.state.should.equal('WAITING')
resp = conn.describe_jobflows(created_before=timestamp)
resp.should.have.length_of(400)
resp = conn.describe_jobflows(created_after=timestamp)
resp.should.have.length_of(200)
@mock_emr @mock_emr
@ -204,43 +242,69 @@ def test_describe_jobflow():
@mock_emr @mock_emr
def test_list_clusters(): def test_list_clusters():
conn = boto.connect_emr() conn = boto.connect_emr()
args = run_jobflow_args.copy() args = run_jobflow_args.copy()
args['name'] = 'jobflow1' expected = {}
cluster1_id = conn.run_jobflow(**args)
args['name'] = 'jobflow2'
cluster2_id = conn.run_jobflow(**args)
conn.terminate_jobflow(cluster2_id)
summary = conn.list_clusters() for idx in range(40):
clusters = summary.clusters cluster_name = 'jobflow' + str(idx)
clusters.should.have.length_of(2) args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'normalizedinstancehours': '0',
'state': 'WAITING'
}
expected = { # need sleep since it appears the timestamp is always rounded to
cluster1_id: { # the nearest second internally
'id': cluster1_id, time.sleep(1)
'name': 'jobflow1', timestamp = datetime.now(pytz.utc)
'normalizedinstancehours': 0, time.sleep(1)
'state': 'WAITING'},
cluster2_id: {
'id': cluster2_id,
'name': 'jobflow2',
'normalizedinstancehours': 0,
'state': 'TERMINATED'},
}
for x in clusters: for idx in range(40, 70):
y = expected[x.id] cluster_name = 'jobflow' + str(idx)
x.id.should.equal(y['id']) args['name'] = cluster_name
x.name.should.equal(y['name']) cluster_id = conn.run_jobflow(**args)
int(x.normalizedinstancehours).should.equal(y['normalizedinstancehours']) conn.terminate_jobflow(cluster_id)
x.status.state.should.equal(y['state']) expected[cluster_id] = {
x.status.timeline.creationdatetime.should.be.a(six.string_types) 'id': cluster_id,
if y['state'] == 'TERMINATED': 'name': cluster_name,
x.status.timeline.enddatetime.should.be.a(six.string_types) 'normalizedinstancehours': '0',
else: 'state': 'TERMINATED'
x.status.timeline.shouldnt.have.property('enddatetime') }
x.status.timeline.readydatetime.should.be.a(six.string_types)
args = {}
while 1:
resp = conn.list_clusters(**args)
clusters = resp.clusters
len(clusters).should.be.lower_than_or_equal_to(50)
for x in clusters:
y = expected[x.id]
x.id.should.equal(y['id'])
x.name.should.equal(y['name'])
x.normalizedinstancehours.should.equal(y['normalizedinstancehours'])
x.status.state.should.equal(y['state'])
x.status.timeline.creationdatetime.should.be.a(six.string_types)
if y['state'] == 'TERMINATED':
x.status.timeline.enddatetime.should.be.a(six.string_types)
else:
x.status.timeline.shouldnt.have.property('enddatetime')
x.status.timeline.readydatetime.should.be.a(six.string_types)
if not hasattr(resp, 'marker'):
break
args = {'marker': resp.marker}
resp = conn.list_clusters(cluster_states=['TERMINATED'])
resp.clusters.should.have.length_of(30)
for x in resp.clusters:
x.status.state.should.equal('TERMINATED')
resp = conn.list_clusters(created_before=timestamp)
resp.clusters.should.have.length_of(40)
resp = conn.list_clusters(created_after=timestamp)
resp.clusters.should.have.length_of(30)
@mock_emr @mock_emr
@ -516,7 +580,8 @@ def test_steps():
expected = dict((s.name, s) for s in input_steps) expected = dict((s.name, s) for s in input_steps)
for x in conn.list_steps(cluster_id).steps: steps = conn.list_steps(cluster_id).steps
for x in steps:
y = expected[x.name] y = expected[x.name]
# actiononfailure # actiononfailure
list(arg.value for arg in x.config.args).should.equal([ list(arg.value for arg in x.config.args).should.equal([
@ -554,6 +619,17 @@ def test_steps():
# x.status.timeline.enddatetime.should.be.a(six.string_types) # x.status.timeline.enddatetime.should.be.a(six.string_types)
# x.status.timeline.startdatetime.should.be.a(six.string_types) # x.status.timeline.startdatetime.should.be.a(six.string_types)
@requires_boto_gte('2.39')
def test_list_steps_with_states():
# boto's list_steps prior to 2.39 has a bug that ignores
# step_states argument.
steps = conn.list_steps(cluster_id).steps
step_id = steps[0].id
steps = conn.list_steps(cluster_id, step_states=['STARTING']).steps
steps.should.have.length_of(1)
steps[0].id.should.equal(step_id)
test_list_steps_with_states()
@mock_emr @mock_emr
def test_tags(): def test_tags():

View File

@ -1,8 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import unicode_literals from __future__ import unicode_literals
import time
from copy import deepcopy from copy import deepcopy
from datetime import datetime
import boto3 import boto3
import pytz
import six import six
import sure # noqa import sure # noqa
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
@ -121,19 +124,54 @@ def test_describe_cluster():
@mock_emr @mock_emr
def test_describe_job_flows(): def test_describe_job_flows():
client = boto3.client('emr', region_name='us-east-1') client = boto3.client('emr', region_name='us-east-1')
cluster1_id = client.run_job_flow(**run_job_flow_args)['JobFlowId'] args = deepcopy(run_job_flow_args)
cluster2_id = client.run_job_flow(**run_job_flow_args)['JobFlowId'] expected = {}
for idx in range(400):
cluster_name = 'cluster' + str(idx)
args['Name'] = cluster_name
cluster_id = client.run_job_flow(**args)['JobFlowId']
expected[cluster_id] = {
'Id': cluster_id,
'Name': cluster_name,
'State': 'WAITING'
}
# need sleep since it appears the timestamp is always rounded to
# the nearest second internally
time.sleep(1)
timestamp = datetime.now(pytz.utc)
time.sleep(1)
for idx in range(400, 600):
cluster_name = 'cluster' + str(idx)
args['Name'] = cluster_name
cluster_id = client.run_job_flow(**args)['JobFlowId']
client.terminate_job_flows(JobFlowIds=[cluster_id])
expected[cluster_id] = {
'Id': cluster_id,
'Name': cluster_name,
'State': 'TERMINATED'
}
resp = client.describe_job_flows() resp = client.describe_job_flows()
resp['JobFlows'].should.have.length_of(2) resp['JobFlows'].should.have.length_of(512)
resp = client.describe_job_flows(JobFlowIds=[cluster2_id]) for cluster_id, y in expected.items():
resp['JobFlows'].should.have.length_of(1) resp = client.describe_job_flows(JobFlowIds=[cluster_id])
resp['JobFlows'][0]['JobFlowId'].should.equal(cluster2_id) resp['JobFlows'].should.have.length_of(1)
resp['JobFlows'][0]['JobFlowId'].should.equal(cluster_id)
resp = client.describe_job_flows(JobFlowIds=[cluster1_id]) resp = client.describe_job_flows(JobFlowStates=['WAITING'])
resp['JobFlows'].should.have.length_of(1) resp['JobFlows'].should.have.length_of(400)
resp['JobFlows'][0]['JobFlowId'].should.equal(cluster1_id) for x in resp['JobFlows']:
x['ExecutionStatusDetail']['State'].should.equal('WAITING')
resp = client.describe_job_flows(CreatedBefore=timestamp)
resp['JobFlows'].should.have.length_of(400)
resp = client.describe_job_flows(CreatedAfter=timestamp)
resp['JobFlows'].should.have.length_of(200)
@mock_emr @mock_emr
@ -203,41 +241,69 @@ def test_describe_job_flow():
def test_list_clusters(): def test_list_clusters():
client = boto3.client('emr', region_name='us-east-1') client = boto3.client('emr', region_name='us-east-1')
args = deepcopy(run_job_flow_args) args = deepcopy(run_job_flow_args)
args['Name'] = 'jobflow1' expected = {}
cluster1_id = client.run_job_flow(**args)['JobFlowId']
args['Name'] = 'jobflow2'
cluster2_id = client.run_job_flow(**args)['JobFlowId']
client.terminate_job_flows(JobFlowIds=[cluster2_id])
summary = client.list_clusters() for idx in range(40):
clusters = summary['Clusters'] cluster_name = 'jobflow' + str(idx)
clusters.should.have.length_of(2) args['Name'] = cluster_name
cluster_id = client.run_job_flow(**args)['JobFlowId']
expected = { expected[cluster_id] = {
cluster1_id: { 'Id': cluster_id,
'Id': cluster1_id, 'Name': cluster_name,
'Name': 'jobflow1',
'NormalizedInstanceHours': 0, 'NormalizedInstanceHours': 0,
'State': 'WAITING'}, 'State': 'WAITING'
cluster2_id: { }
'Id': cluster2_id,
'Name': 'jobflow2',
'NormalizedInstanceHours': 0,
'State': 'TERMINATED'},
}
for x in clusters: # need sleep since it appears the timestamp is always rounded to
y = expected[x['Id']] # the nearest second internally
x['Id'].should.equal(y['Id']) time.sleep(1)
x['Name'].should.equal(y['Name']) timestamp = datetime.now(pytz.utc)
x['NormalizedInstanceHours'].should.equal(y['NormalizedInstanceHours']) time.sleep(1)
x['Status']['State'].should.equal(y['State'])
x['Status']['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') for idx in range(40, 70):
if y['State'] == 'TERMINATED': cluster_name = 'jobflow' + str(idx)
x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') args['Name'] = cluster_name
else: cluster_id = client.run_job_flow(**args)['JobFlowId']
x['Status']['Timeline'].shouldnt.have.key('EndDateTime') client.terminate_job_flows(JobFlowIds=[cluster_id])
x['Status']['Timeline']['ReadyDateTime'].should.be.a('datetime.datetime') expected[cluster_id] = {
'Id': cluster_id,
'Name': cluster_name,
'NormalizedInstanceHours': 0,
'State': 'TERMINATED'
}
args = {}
while 1:
resp = client.list_clusters(**args)
clusters = resp['Clusters']
len(clusters).should.be.lower_than_or_equal_to(50)
for x in clusters:
y = expected[x['Id']]
x['Id'].should.equal(y['Id'])
x['Name'].should.equal(y['Name'])
x['NormalizedInstanceHours'].should.equal(y['NormalizedInstanceHours'])
x['Status']['State'].should.equal(y['State'])
x['Status']['Timeline']['CreationDateTime'].should.be.a('datetime.datetime')
if y['State'] == 'TERMINATED':
x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime')
else:
x['Status']['Timeline'].shouldnt.have.key('EndDateTime')
x['Status']['Timeline']['ReadyDateTime'].should.be.a('datetime.datetime')
marker = resp.get('Marker')
if marker is None:
break
args = {'Marker': marker}
resp = client.list_clusters(ClusterStates=['TERMINATED'])
resp['Clusters'].should.have.length_of(30)
for x in resp['Clusters']:
x['Status']['State'].should.equal('TERMINATED')
resp = client.list_clusters(CreatedBefore=timestamp)
resp['Clusters'].should.have.length_of(40)
resp = client.list_clusters(CreatedAfter=timestamp)
resp['Clusters'].should.have.length_of(30)
@mock_emr @mock_emr
@ -567,6 +633,15 @@ def test_steps():
# x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime')
# x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime')
step_id = steps[0]['Id']
steps = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])['Steps']
steps.should.have.length_of(1)
steps[0]['Id'].should.equal(step_id)
steps = client.list_steps(ClusterId=cluster_id, StepStates=['STARTING'])['Steps']
steps.should.have.length_of(1)
steps[0]['Id'].should.equal(step_id)
@mock_emr @mock_emr
def test_tags(): def test_tags():