Deduplicate logic between ActivityType's and WorkflowType's

This commit is contained in:
Jean-Baptiste Barth 2015-09-30 21:01:05 +02:00
parent c4e903706c
commit 6e6b325225
2 changed files with 64 additions and 132 deletions

View File

@ -26,46 +26,31 @@ class Domain(object):
def __repr__(self): def __repr__(self):
return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__ return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__
def get_activity_type(self, name, version, ignore_empty=False): def get_type(self, kind, name, version, ignore_empty=False):
try: try:
return self.activity_types[name][version] _types = getattr(self, "{}_types".format(kind))
return _types[name][version]
except KeyError: except KeyError:
if not ignore_empty: if not ignore_empty:
raise SWFUnknownResourceFault( raise SWFUnknownResourceFault(
"type", "type",
"ActivityType=[name={}, version={}]".format(name, version) "{}Type=[name={}, version={}]".format(
kind.capitalize(), name, version
)
) )
def add_activity_type(self, _type): def add_type(self, _type):
self.activity_types[_type.name][_type.version] = _type if isinstance(_type, ActivityType):
self.activity_types[_type.name][_type.version] = _type
elif isinstance(_type, WorkflowType):
self.workflow_types[_type.name][_type.version] = _type
else:
raise ValueError("Unknown SWF type: {}".format(_type))
def find_activity_types(self, status): def find_types(self, kind, status):
_all = [] _all = []
for _, family in self.activity_types.iteritems(): _types = getattr(self, "{}_types".format(kind))
for _, _type in family.iteritems(): for _, family in _types.iteritems():
if _type.status == status:
_all.append(_type)
return _all
# TODO: refactor it with get_activity_type()
def get_workflow_type(self, name, version, ignore_empty=False):
try:
return self.workflow_types[name][version]
except KeyError:
if not ignore_empty:
raise SWFUnknownResourceFault(
"type",
"WorkflowType=[name={}, version={}]".format(name, version)
)
# TODO: refactor it with add_activity_type()
def add_workflow_type(self, _type):
self.workflow_types[_type.name][_type.version] = _type
# TODO: refactor it with find_activity_types()
def find_workflow_types(self, status):
_all = []
for _, family in self.workflow_types.iteritems():
for _, _type in family.iteritems(): for _, _type in family.iteritems():
if _type.status == status: if _type.status == status:
_all.append(_type) _all.append(_type)
@ -151,17 +136,17 @@ class SWFBackend(BaseBackend):
self._check_string(name) self._check_string(name)
return self._get_domain(name) return self._get_domain(name)
def list_activity_types(self, domain_name, status, reverse_order=None): def list_types(self, kind, domain_name, status, reverse_order=None):
self._check_string(domain_name) self._check_string(domain_name)
self._check_string(status) self._check_string(status)
domain = self._get_domain(domain_name) domain = self._get_domain(domain_name)
_types = domain.find_activity_types(status) _types = domain.find_types(kind, status)
_types = sorted(_types, key=lambda domain: domain.name) _types = sorted(_types, key=lambda domain: domain.name)
if reverse_order: if reverse_order:
_types = reversed(_types) _types = reversed(_types)
return _types return _types
def register_activity_type(self, domain_name, name, version, **kwargs): def register_type(self, kind, domain_name, name, version, **kwargs):
self._check_string(domain_name) self._check_string(domain_name)
self._check_string(name) self._check_string(name)
self._check_string(version) self._check_string(version)
@ -171,71 +156,30 @@ class SWFBackend(BaseBackend):
if value is not None: if value is not None:
self._check_string(value) self._check_string(value)
domain = self._get_domain(domain_name) domain = self._get_domain(domain_name)
_type = domain.get_activity_type(name, version, ignore_empty=True) _type = domain.get_type(kind, name, version, ignore_empty=True)
if _type: if _type:
raise SWFTypeAlreadyExistsFault(_type) raise SWFTypeAlreadyExistsFault(_type)
activity_type = ActivityType(name, version, **kwargs) _class = globals()["{}Type".format(kind.capitalize())]
domain.add_activity_type(activity_type) _type = _class(name, version, **kwargs)
domain.add_type(_type)
def deprecate_activity_type(self, domain_name, name, version): def deprecate_type(self, kind, domain_name, name, version):
self._check_string(domain_name) self._check_string(domain_name)
self._check_string(name) self._check_string(name)
self._check_string(version) self._check_string(version)
domain = self._get_domain(domain_name) domain = self._get_domain(domain_name)
_type = domain.get_activity_type(name, version) _type = domain.get_type(kind, name, version)
if _type.status == "DEPRECATED": if _type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(_type) raise SWFTypeDeprecatedFault(_type)
_type.status = "DEPRECATED" _type.status = "DEPRECATED"
def describe_activity_type(self, domain_name, name, version): def describe_type(self, kind, domain_name, name, version):
self._check_string(domain_name) self._check_string(domain_name)
self._check_string(name) self._check_string(name)
self._check_string(version) self._check_string(version)
domain = self._get_domain(domain_name) domain = self._get_domain(domain_name)
return domain.get_activity_type(name, version) return domain.get_type(kind, name, version)
def list_workflow_types(self, domain_name, status, reverse_order=None):
self._check_string(domain_name)
self._check_string(status)
domain = self._get_domain(domain_name)
_types = domain.find_workflow_types(status)
_types = sorted(_types, key=lambda domain: domain.name)
if reverse_order:
_types = reversed(_types)
return _types
def register_workflow_type(self, domain_name, name, version, **kwargs):
self._check_string(domain_name)
self._check_string(name)
self._check_string(version)
for _, value in kwargs.iteritems():
if value == (None,):
print _
if value is not None:
self._check_string(value)
domain = self._get_domain(domain_name)
_type = domain.get_workflow_type(name, version, ignore_empty=True)
if _type:
raise SWFTypeAlreadyExistsFault(_type)
workflow_type = WorkflowType(name, version, **kwargs)
domain.add_workflow_type(workflow_type)
def deprecate_workflow_type(self, domain_name, name, version):
self._check_string(domain_name)
self._check_string(name)
self._check_string(version)
domain = self._get_domain(domain_name)
_type = domain.get_workflow_type(name, version)
if _type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(_type)
_type.status = "DEPRECATED"
def describe_workflow_type(self, domain_name, name, version):
self._check_string(domain_name)
self._check_string(name)
self._check_string(version)
domain = self._get_domain(domain_name)
return domain.get_workflow_type(name, version)
swf_backends = {} swf_backends = {}
for region in boto.swf.regions(): for region in boto.swf.regions():

View File

@ -51,6 +51,33 @@ class SWFResponse(BaseResponse):
def _params(self): def _params(self):
return json.loads(self.body) return json.loads(self.body)
def _list_types(self, kind, template_str):
domain_name = self._params.get("domain")
status = self._params.get("registrationStatus")
reverse_order = self._params.get("reverseOrder", None)
types = self.swf_backend.list_types(kind, domain_name, status, reverse_order=reverse_order)
template = self.response_template(template_str)
return template.render(types=types)
def _describe_type(self, kind, template_str):
domain = self._params.get("domain")
_type = self._params.get("{}Type".format(kind))
name = _type["name"]
version = _type["version"]
_type = self.swf_backend.describe_type(kind, domain, name, version)
template = self.response_template(template_str)
return template.render(_type=_type)
def _deprecate_type(self, kind):
domain = self._params.get("domain")
_type = self._params.get("{}Type".format(kind))
name = _type["name"]
version = _type["version"]
self.swf_backend.deprecate_type(kind, domain, name, version)
template = self.response_template("")
return template.render()
# TODO: implement pagination # TODO: implement pagination
def list_domains(self): def list_domains(self):
status = self._params.get("registrationStatus") status = self._params.get("registrationStatus")
@ -82,12 +109,7 @@ class SWFResponse(BaseResponse):
# TODO: implement pagination # TODO: implement pagination
def list_activity_types(self): def list_activity_types(self):
domain_name = self._params.get("domain") return self._list_types("activity", LIST_ACTIVITY_TYPES_TEMPLATE)
status = self._params.get("registrationStatus")
reverse_order = self._params.get("reverseOrder", None)
types = self.swf_backend.list_activity_types(domain_name, status, reverse_order=reverse_order)
template = self.response_template(LIST_ACTIVITY_TYPES_TEMPLATE)
return template.render(types=types)
def register_activity_type(self): def register_activity_type(self):
domain = self._params.get("domain") domain = self._params.get("domain")
@ -104,8 +126,8 @@ class SWFResponse(BaseResponse):
default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout") default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout")
description = self._params.get("description") description = self._params.get("description")
# TODO: add defaultTaskPriority when boto gets to support it # TODO: add defaultTaskPriority when boto gets to support it
activity_type = self.swf_backend.register_activity_type( activity_type = self.swf_backend.register_type(
domain, name, version, task_list=task_list, "activity", domain, name, version, task_list=task_list,
default_task_heartbeat_timeout=default_task_heartbeat_timeout, default_task_heartbeat_timeout=default_task_heartbeat_timeout,
default_task_schedule_to_close_timeout=default_task_schedule_to_close_timeout, default_task_schedule_to_close_timeout=default_task_schedule_to_close_timeout,
default_task_schedule_to_start_timeout=default_task_schedule_to_start_timeout, default_task_schedule_to_start_timeout=default_task_schedule_to_start_timeout,
@ -116,33 +138,14 @@ class SWFResponse(BaseResponse):
return template.render() return template.render()
def deprecate_activity_type(self): def deprecate_activity_type(self):
domain = self._params.get("domain") return self._deprecate_type("activity")
_type = self._params.get("activityType")
name = _type["name"]
version = _type["version"]
domain = self.swf_backend.deprecate_activity_type(domain, name, version)
template = self.response_template("")
return template.render()
def describe_activity_type(self): def describe_activity_type(self):
domain = self._params.get("domain") return self._describe_type("activity", DESCRIBE_ACTIVITY_TYPE_TEMPLATE)
_type = self._params.get("activityType")
name = _type["name"]
version = _type["version"]
_type = self.swf_backend.describe_activity_type(domain, name, version)
template = self.response_template(DESCRIBE_ACTIVITY_TYPE_TEMPLATE)
return template.render(_type=_type)
# TODO: implement pagination
# TODO: refactor with list_activity_types() # TODO: refactor with list_activity_types()
def list_workflow_types(self): def list_workflow_types(self):
domain_name = self._params.get("domain") return self._list_types("workflow", LIST_WORKFLOW_TYPES_TEMPLATE)
status = self._params.get("registrationStatus")
reverse_order = self._params.get("reverseOrder", None)
types = self.swf_backend.list_workflow_types(domain_name, status, reverse_order=reverse_order)
template = self.response_template(LIST_WORKFLOW_TYPES_TEMPLATE)
return template.render(types=types)
def register_workflow_type(self): def register_workflow_type(self):
domain = self._params.get("domain") domain = self._params.get("domain")
@ -159,8 +162,8 @@ class SWFResponse(BaseResponse):
description = self._params.get("description") description = self._params.get("description")
# TODO: add defaultTaskPriority when boto gets to support it # TODO: add defaultTaskPriority when boto gets to support it
# TODO: add defaultLambdaRole when boto gets to support it # TODO: add defaultLambdaRole when boto gets to support it
workflow_type = self.swf_backend.register_workflow_type( workflow_type = self.swf_backend.register_type(
domain, name, version, task_list=task_list, "workflow", domain, name, version, task_list=task_list,
default_child_policy=default_child_policy, default_child_policy=default_child_policy,
default_task_start_to_close_timeout=default_task_start_to_close_timeout, default_task_start_to_close_timeout=default_task_start_to_close_timeout,
default_execution_start_to_close_timeout=default_execution_start_to_close_timeout, default_execution_start_to_close_timeout=default_execution_start_to_close_timeout,
@ -169,26 +172,11 @@ class SWFResponse(BaseResponse):
template = self.response_template("") template = self.response_template("")
return template.render() return template.render()
# TODO: refactor with deprecate_activity_type()
def deprecate_workflow_type(self): def deprecate_workflow_type(self):
domain = self._params.get("domain") return self._deprecate_type("workflow")
_type = self._params.get("workflowType")
name = _type["name"]
version = _type["version"]
domain = self.swf_backend.deprecate_workflow_type(domain, name, version)
template = self.response_template("")
return template.render()
# TODO: refactor with describe_activity_type()
def describe_workflow_type(self): def describe_workflow_type(self):
domain = self._params.get("domain") return self._describe_type("workflow", DESCRIBE_WORKFLOW_TYPE_TEMPLATE)
_type = self._params.get("workflowType")
name = _type["name"]
version = _type["version"]
_type = self.swf_backend.describe_workflow_type(domain, name, version)
template = self.response_template(DESCRIBE_WORKFLOW_TYPE_TEMPLATE)
return template.render(_type=_type)
LIST_DOMAINS_TEMPLATE = """{ LIST_DOMAINS_TEMPLATE = """{