From 484faa54c4d1b97e10f6073aa25e357a953c286d Mon Sep 17 00:00:00 2001 From: Taro Sato Date: Tue, 18 Oct 2016 16:47:02 -0700 Subject: [PATCH] Implement filters and pagers for some EMR end points --- moto/core/responses.py | 6 ++ moto/emr/models.py | 76 ++++++++++++--- moto/emr/responses.py | 43 +++++++-- setup.py | 3 +- tests/test_emr/test_emr.py | 160 +++++++++++++++++++++++-------- tests/test_emr/test_emr_boto3.py | 157 ++++++++++++++++++++++-------- 6 files changed, 336 insertions(+), 109 deletions(-) diff --git a/moto/core/responses.py b/moto/core/responses.py index 11b83326e..5a66423c8 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -3,6 +3,7 @@ import datetime import json import re +import pytz from boto.exception import JSONResponseError from jinja2 import Environment, DictLoader, TemplateNotFound @@ -477,6 +478,11 @@ def to_str(value, spec): return 'true' if value else 'false' elif vtype == 'integer': return str(value) + elif vtype == 'float': + return str(value) + elif vtype == 'timestamp': + return datetime.datetime.utcfromtimestamp( + value).replace(tzinfo=pytz.utc).isoformat() elif vtype == 'string': return str(value) elif value is None: diff --git a/moto/emr/models.py b/moto/emr/models.py index acc573698..48ccd7c21 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -1,8 +1,10 @@ from __future__ import unicode_literals from datetime import datetime +from datetime import timedelta import boto.emr import pytz +from dateutil.parser import parse as dtparse from moto.core import BaseBackend from .utils import random_instance_group_id, random_cluster_id, random_step_id @@ -273,12 +275,24 @@ class ElasticMapReduceBackend(BaseBackend): cluster = self.get_cluster(cluster_id) cluster.add_tags(tags) - def describe_job_flows(self, job_flow_ids=None): + def describe_job_flows(self, job_flow_ids=None, job_flow_states=None, created_after=None, created_before=None): clusters = self.clusters.values() + + within_two_month = datetime.now(pytz.utc) - timedelta(days=60) + clusters = [c for c in clusters if c.creation_datetime >= within_two_month] + if job_flow_ids: - return [cluster for cluster in clusters if cluster.id in job_flow_ids] - else: - return clusters + clusters = [c for c in clusters if c.id in job_flow_ids] + if job_flow_states: + clusters = [c for c in clusters if c.state in job_flow_states] + if created_after: + created_after = dtparse(created_after) + clusters = [c for c in clusters if c.creation_datetime > created_after] + if created_before: + created_before = dtparse(created_before) + clusters = [c for c in clusters if c.creation_datetime < created_before] + + return sorted(clusters, key=lambda x: x.id)[:512] def describe_step(self, cluster_id, step_id): cluster = self.clusters[cluster_id] @@ -296,17 +310,48 @@ class ElasticMapReduceBackend(BaseBackend): if group_id in instance_group_ids ] - def list_bootstrap_actions(self, cluster_id): - return self.clusters[cluster_id].bootstrap_actions + def list_bootstrap_actions(self, cluster_id, marker=None): + max_items = 50 + actions = self.clusters[cluster_id].bootstrap_actions + start_idx = 0 if marker is None else int(marker) + marker = None if len(actions) <= start_idx + max_items else str(start_idx + max_items) + return actions[start_idx:start_idx + max_items], marker - def list_clusters(self): - return self.clusters.values() + def list_clusters(self, cluster_states=None, created_after=None, + created_before=None, marker=None): + max_items = 50 + clusters = self.clusters.values() + if cluster_states: + clusters = [c for c in clusters if c.state in cluster_states] + if created_after: + created_after = dtparse(created_after) + clusters = [c for c in clusters if c.creation_datetime > created_after] + if created_before: + created_before = dtparse(created_before) + clusters = [c for c in clusters if c.creation_datetime < created_before] + clusters = sorted(clusters, key=lambda x: x.id) + start_idx = 0 if marker is None else int(marker) + marker = None if len(clusters) <= start_idx + max_items else str(start_idx + max_items) + return clusters[start_idx:start_idx + max_items], marker - def list_instance_groups(self, cluster_id): - return self.clusters[cluster_id].instance_groups + def list_instance_groups(self, cluster_id, marker=None): + max_items = 50 + groups = sorted(self.clusters[cluster_id].instance_groups, + key=lambda x: x.id) + start_idx = 0 if marker is None else int(marker) + marker = None if len(groups) <= start_idx + max_items else str(start_idx + max_items) + return groups[start_idx:start_idx + max_items], marker - def list_steps(self, cluster_id, step_states=None): - return self.clusters[cluster_id].steps + def list_steps(self, cluster_id, marker=None, step_ids=None, step_states=None): + max_items = 50 + steps = self.clusters[cluster_id].steps + if step_ids: + steps = [s for s in steps if s.id in step_ids] + if step_states: + steps = [s for s in steps if s.state in step_states] + start_idx = 0 if marker is None else int(marker) + marker = None if len(steps) <= start_idx + max_items else str(start_idx + max_items) + return steps[start_idx:start_idx + max_items], marker def modify_instance_groups(self, instance_groups): result_groups = [] @@ -333,10 +378,11 @@ class ElasticMapReduceBackend(BaseBackend): cluster.set_termination_protection(value) def terminate_job_flows(self, job_flow_ids): - clusters = [cluster for cluster in self.describe_job_flows() - if cluster.id in job_flow_ids] - for cluster in clusters: + clusters = [] + for job_flow_id in job_flow_ids: + cluster = self.clusters[job_flow_id] cluster.terminate() + clusters.append(cluster) return clusters diff --git a/moto/emr/responses.py b/moto/emr/responses.py index a9b4d951b..427ab48c1 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -101,8 +101,11 @@ class ElasticMapReduceResponse(BaseResponse): @generate_boto3_response('DescribeJobFlows') def describe_job_flows(self): + created_after = self._get_param('CreatedAfter') + created_before = self._get_param('CreatedBefore') job_flow_ids = self._get_multi_param("JobFlowIds.member") - clusters = self.backend.describe_job_flows(job_flow_ids) + job_flow_states = self._get_multi_param('JobFlowStates.member') + clusters = self.backend.describe_job_flows(job_flow_ids, job_flow_states, created_after, created_before) template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) return template.render(clusters=clusters) @@ -120,22 +123,28 @@ class ElasticMapReduceResponse(BaseResponse): @generate_boto3_response('ListBootstrapActions') def list_bootstrap_actions(self): cluster_id = self._get_param('ClusterId') - bootstrap_actions = self.backend.list_bootstrap_actions(cluster_id) + marker = self._get_param('Marker') + bootstrap_actions, marker = self.backend.list_bootstrap_actions(cluster_id, marker) template = self.response_template(LIST_BOOTSTRAP_ACTIONS_TEMPLATE) - return template.render(bootstrap_actions=bootstrap_actions) + return template.render(bootstrap_actions=bootstrap_actions, marker=marker) @generate_boto3_response('ListClusters') def list_clusters(self): - clusters = self.backend.list_clusters() + cluster_states = self._get_multi_param('ClusterStates.member') + created_after = self._get_param('CreatedAfter') + created_before = self._get_param('CreatedBefore') + marker = self._get_param('Marker') + clusters, marker = self.backend.list_clusters(cluster_states, created_after, created_before, marker) template = self.response_template(LIST_CLUSTERS_TEMPLATE) - return template.render(clusters=clusters) + return template.render(clusters=clusters, marker=marker) @generate_boto3_response('ListInstanceGroups') def list_instance_groups(self): cluster_id = self._get_param('ClusterId') - instance_groups = self.backend.list_instance_groups(cluster_id) + marker = self._get_param('Marker') + instance_groups, marker = self.backend.list_instance_groups(cluster_id, marker=marker) template = self.response_template(LIST_INSTANCE_GROUPS_TEMPLATE) - return template.render(instance_groups=instance_groups) + return template.render(instance_groups=instance_groups, marker=marker) def list_instances(self): raise NotImplementedError @@ -143,9 +152,12 @@ class ElasticMapReduceResponse(BaseResponse): @generate_boto3_response('ListSteps') def list_steps(self): cluster_id = self._get_param('ClusterId') - steps = self.backend.list_steps(cluster_id) + marker = self._get_param('Marker') + step_ids = self._get_multi_param('StepIds.member') + step_states = self._get_multi_param('StepStates.member') + steps, marker = self.backend.list_steps(cluster_id, marker=marker, step_ids=step_ids, step_states=step_states) template = self.response_template(LIST_STEPS_TEMPLATE) - return template.render(steps=steps) + return template.render(steps=steps, marker=marker) @generate_boto3_response('ModifyInstanceGroups') def modify_instance_groups(self): @@ -623,6 +635,9 @@ LIST_BOOTSTRAP_ACTIONS_TEMPLATE = """