From 69cf3091831800c1f0b94716089a8ac6e44d6831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20D=C4=85bek?= <373530+szemek@users.noreply.github.com> Date: Tue, 25 Apr 2023 18:02:57 +0200 Subject: [PATCH] [Glue] Triggers (#6253) --- moto/glue/exceptions.py | 5 + moto/glue/models.py | 162 ++++++++++++++++++- moto/glue/responses.py | 103 ++++++++++++ tests/test_glue/test_glue.py | 296 +++++++++++++++++++++++++++++++++++ 4 files changed, 565 insertions(+), 1 deletion(-) diff --git a/moto/glue/exceptions.py b/moto/glue/exceptions.py index 2ed7262bf..d31932042 100644 --- a/moto/glue/exceptions.py +++ b/moto/glue/exceptions.py @@ -114,6 +114,11 @@ class RegistryNotFoundException(EntityNotFoundException): ) +class TriggerNotFoundException(EntityNotFoundException): + def __init__(self, trigger: str): + super().__init__(f"Trigger {trigger} not found.") + + class CrawlerRunningException(GlueClientError): def __init__(self, msg: str): super().__init__("CrawlerRunningException", msg) diff --git a/moto/glue/models.py b/moto/glue/models.py index 7742ef101..56e51a02a 100644 --- a/moto/glue/models.py +++ b/moto/glue/models.py @@ -30,6 +30,7 @@ from .exceptions import ( SchemaVersionNotFoundFromSchemaIdException, SchemaNotFoundException, SchemaVersionMetadataAlreadyExistsException, + TriggerNotFoundException, ) from .utils import PartitionFilter from .glue_schema_registry_utils import ( @@ -57,6 +58,18 @@ from ..utilities.tagging_service import TaggingService class GlueBackend(BaseBackend): PAGINATION_MODEL = { + "get_jobs": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "unique_attribute": "name", + }, + "get_triggers": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "unique_attribute": "name", + }, "list_crawlers": { "input_token": "next_token", "limit_key": "max_results", @@ -69,7 +82,7 @@ class GlueBackend(BaseBackend): "limit_default": 100, "unique_attribute": "name", }, - "get_jobs": { + "list_triggers": { "input_token": "next_token", "limit_key": "max_results", "limit_default": 100, @@ -84,6 +97,7 @@ class GlueBackend(BaseBackend): self.jobs: Dict[str, FakeJob] = OrderedDict() self.job_runs: Dict[str, FakeJobRun] = OrderedDict() self.tagger = TaggingService() + self.triggers: Dict[str, FakeTrigger] = OrderedDict() self.registries: Dict[str, FakeRegistry] = OrderedDict() self.num_schemas = 0 self.num_schema_versions = 0 @@ -757,6 +771,79 @@ class GlueBackend(BaseBackend): return schema.as_dict() + def create_trigger( + self, + name: str, + workflow_name: str, + trigger_type: str, + schedule: str, + predicate: Dict[str, Any], + actions: List[Dict[str, Any]], + description: str, + start_on_creation: bool, + tags: Dict[str, str], + event_batching_condition: Dict[str, Any], + ) -> None: + self.triggers[name] = FakeTrigger( + name=name, + workflow_name=workflow_name, + trigger_type=trigger_type, + schedule=schedule, + predicate=predicate, + actions=actions, + description=description, + start_on_creation=start_on_creation, + tags=tags, + event_batching_condition=event_batching_condition, + backend=self, + ) + + def get_trigger(self, name: str) -> "FakeTrigger": + try: + return self.triggers[name] + except KeyError: + raise TriggerNotFoundException(name) + + def start_trigger(self, name: str) -> None: + trigger = self.get_trigger(name) + trigger.start_trigger() + + def stop_trigger(self, name: str) -> None: + trigger = self.get_trigger(name) + trigger.stop_trigger() + + @paginate(pagination_model=PAGINATION_MODEL) + def get_triggers(self, dependent_job_name: str) -> List["FakeTrigger"]: # type: ignore + if dependent_job_name: + triggers = [] + for trigger in self.triggers.values(): + for action in trigger.actions: + if ("JobName" in action) and ( + action["JobName"] == dependent_job_name + ): + triggers.append(trigger) + return triggers + + return list(self.triggers.values()) + + @paginate(pagination_model=PAGINATION_MODEL) + def list_triggers(self, dependent_job_name: str) -> List["FakeTrigger"]: # type: ignore + if dependent_job_name: + triggers = [] + for trigger in self.triggers.values(): + for action in trigger.actions: + if ("JobName" in action) and ( + action["JobName"] == dependent_job_name + ): + triggers.append(trigger) + return triggers + + return list(self.triggers.values()) + + def delete_trigger(self, name: str) -> None: + if name in self.triggers: + del self.triggers[name] + def batch_delete_table( self, database_name: str, tables: List[str] ) -> List[Dict[str, Any]]: @@ -874,6 +961,13 @@ class GlueBackend(BaseBackend): jobs.append(self.jobs[job_name].as_dict()) return jobs + def batch_get_triggers(self, trigger_names: List[str]) -> List[Dict[str, Any]]: + triggers = [] + for trigger_name in trigger_names: + if trigger_name in self.triggers: + triggers.append(self.triggers[trigger_name].as_dict()) + return triggers + class FakeDatabase(BaseModel): def __init__(self, database_name: str, database_input: Dict[str, Any]): @@ -1423,4 +1517,70 @@ class FakeSchemaVersion(BaseModel): } +class FakeTrigger(BaseModel): + def __init__( + self, + backend: GlueBackend, + name: str, + workflow_name: str, + trigger_type: str, # to avoid any issues with built-in function type() + schedule: str, + predicate: Dict[str, Any], + actions: List[Dict[str, Any]], + description: str, + start_on_creation: bool, + tags: Dict[str, str], + event_batching_condition: Dict[str, Any], + ): + self.name = name + self.workflow_name = workflow_name + self.trigger_type = trigger_type + self.schedule = schedule + self.predicate = predicate + self.actions = actions + self.description = description + if start_on_creation: + self.state = "ACTIVATED" + else: + self.state = "CREATED" + self.event_batching_condition = event_batching_condition + self.arn = f"arn:aws:glue:{backend.region_name}:{backend.account_id}:trigger/{self.name}" + self.backend = backend + self.backend.tag_resource(self.arn, tags) + + def get_name(self) -> str: + return self.name + + def start_trigger(self) -> None: + self.state = "ACTIVATED" + + def stop_trigger(self) -> None: + self.state = "DEACTIVATED" + + def as_dict(self) -> Dict[str, Any]: + data: Dict[str, Any] = { + "Name": self.name, + "Type": self.trigger_type, + "Actions": self.actions, + "State": self.state, + } + + if self.workflow_name: + data["WorkflowName"] = self.workflow_name + + if self.trigger_type == "SCHEDULED": + data["Schedule"] = self.schedule + + if self.predicate: + data["Predicate"] = self.predicate + + if self.description: + data["Description"] = self.description + + if self.event_batching_condition: + data["EventBatchingCondition"] = self.event_batching_condition + + return data + + glue_backends = BackendDict(GlueBackend, "glue") diff --git a/moto/glue/responses.py b/moto/glue/responses.py index 110986905..b35214b2f 100644 --- a/moto/glue/responses.py +++ b/moto/glue/responses.py @@ -415,6 +415,17 @@ class GlueResponse(BaseResponse): return [job.get_name() for job in jobs] return [job.get_name() for job in jobs if self.is_tags_match(job.arn, tags)] + def filter_triggers_by_tags( + self, triggers: List[FakeJob], tags: Dict[str, str] + ) -> List[str]: + if not tags: + return [trigger.get_name() for trigger in triggers] + return [ + trigger.get_name() + for trigger in triggers + if self.is_tags_match(trigger.arn, tags) + ] + def is_tags_match(self, resource_arn: str, tags: Dict[str, str]) -> bool: glue_resource_tags = self.glue_backend.get_tags(resource_arn) mutual_keys = set(glue_resource_tags).intersection(tags) @@ -542,3 +553,95 @@ class GlueResponse(BaseResponse): def get_partition_indexes(self) -> str: return json.dumps({"PartitionIndexDescriptorList": []}) + + def create_trigger(self) -> str: + name = self._get_param("Name") + workflow_name = self._get_param("WorkflowName") + trigger_type = self._get_param("Type") + schedule = self._get_param("Schedule") + predicate = self._get_param("Predicate") + actions = self._get_param("Actions") + description = self._get_param("Description") + start_on_creation = self._get_param("StartOnCreation") + tags = self._get_param("Tags") + event_batching_condition = self._get_param("EventBatchingCondition") + self.glue_backend.create_trigger( + name=name, + workflow_name=workflow_name, + trigger_type=trigger_type, + schedule=schedule, + predicate=predicate, + actions=actions, + description=description, + start_on_creation=start_on_creation, + tags=tags, + event_batching_condition=event_batching_condition, + ) + return json.dumps({"Name": name}) + + def get_trigger(self) -> str: + name = self.parameters.get("Name") + trigger = self.glue_backend.get_trigger(name) # type: ignore[arg-type] + return json.dumps({"Trigger": trigger.as_dict()}) + + def get_triggers(self) -> str: + next_token = self._get_param("NextToken") + dependent_job_name = self._get_param("DependentJobName") + max_results = self._get_int_param("MaxResults") + triggers, next_token = self.glue_backend.get_triggers( + next_token=next_token, + dependent_job_name=dependent_job_name, + max_results=max_results, + ) + return json.dumps( + dict( + Triggers=[trigger.as_dict() for trigger in triggers], + NextToken=next_token, + ) + ) + + def list_triggers(self) -> str: + next_token = self._get_param("NextToken") + dependent_job_name = self._get_param("DependentJobName") + max_results = self._get_int_param("MaxResults") + tags = self._get_param("Tags") + triggers, next_token = self.glue_backend.list_triggers( + next_token=next_token, + dependent_job_name=dependent_job_name, + max_results=max_results, + ) + filtered_trigger_names = self.filter_triggers_by_tags(triggers, tags) + return json.dumps( + dict( + TriggerNames=[trigger_name for trigger_name in filtered_trigger_names], + NextToken=next_token, + ) + ) + + def batch_get_triggers(self) -> str: + trigger_names = self._get_param("TriggerNames") + triggers = self.glue_backend.batch_get_triggers(trigger_names) + triggers_not_found = list( + set(trigger_names) - set(map(lambda trigger: trigger["Name"], triggers)) + ) + return json.dumps( + { + "Triggers": triggers, + "TriggersNotFound": triggers_not_found, + } + ) + + def start_trigger(self) -> str: + name = self.parameters.get("Name") + self.glue_backend.start_trigger(name) # type: ignore[arg-type] + return json.dumps({"Name": name}) + + def stop_trigger(self) -> str: + name = self.parameters.get("Name") + self.glue_backend.stop_trigger(name) # type: ignore[arg-type] + return json.dumps({"Name": name}) + + def delete_trigger(self) -> str: + name = self.parameters.get("Name") + self.glue_backend.delete_trigger(name) # type: ignore[arg-type] + return json.dumps({"Name": name}) diff --git a/tests/test_glue/test_glue.py b/tests/test_glue/test_glue.py index 1d6102cf6..b86e5ccce 100644 --- a/tests/test_glue/test_glue.py +++ b/tests/test_glue/test_glue.py @@ -493,3 +493,299 @@ def test_batch_get_crawlers(): response["Crawlers"].should.have.length_of(1) response["CrawlersNotFound"].should.have.length_of(1) + + +@mock_glue +def test_create_trigger(): + client = create_glue_client() + job_name = create_test_job(client) + trigger_name = str(uuid4()) + + response = client.create_trigger( + Name=trigger_name, + Type="ON_DEMAND", + Actions=[ + { + "JobName": job_name, + } + ], + ) + assert response["Name"] == trigger_name + + +@mock_glue +def test_get_trigger_on_demand(): + client = create_glue_client() + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "ON_DEMAND", + "Actions": [ + { + "JobName": job_name, + } + ], + } + client.create_trigger(Name=trigger_name, **trigger_attributes) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["Name"] == trigger_name + assert trigger["Type"] == "ON_DEMAND" + assert trigger["State"] == "CREATED" + assert trigger["Actions"] == [{"JobName": job_name}] + + +@mock_glue +def test_get_trigger_scheduled(): + client = create_glue_client() + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "SCHEDULED", + "Schedule": "cron(5 3 * * ? *)", + "Actions": [ + { + "JobName": job_name, + } + ], + "StartOnCreation": True, + } + client.create_trigger(Name=trigger_name, **trigger_attributes) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["Name"] == trigger_name + assert trigger["Type"] == "SCHEDULED" + assert trigger["State"] == "ACTIVATED" + assert trigger["Actions"] == [{"JobName": job_name}] + + +@mock_glue +def test_get_trigger_conditional(): + client = create_glue_client() + crawler_name = create_test_crawler(client) + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "CONDITIONAL", + "Actions": [ + { + "JobName": job_name, + } + ], + "StartOnCreation": True, + "Predicate": { + "Logical": "ANY", + "Conditions": [ + { + "LogicalOperator": "EQUALS", + "CrawlerName": crawler_name, + "CrawlState": "SUCCEEDED", + } + ], + }, + } + client.create_trigger(Name=trigger_name, **trigger_attributes) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["Name"] == trigger_name + assert trigger["Type"] == "CONDITIONAL" + assert trigger["State"] == "ACTIVATED" + assert trigger["Actions"] == [{"JobName": job_name}] + assert "Predicate" in trigger + + +def create_test_trigger(client, tags=None): + job_name = create_test_job(client) + trigger_name = str(uuid4()) + client.create_trigger( + Name=trigger_name, + Type="ON_DEMAND", + Actions=[ + { + "JobName": job_name, + } + ], + Tags=tags or {}, + ) + return trigger_name + + +@mock_glue +def test_get_triggers_trigger_name_exists(): + client = create_glue_client() + trigger_name = create_test_trigger(client) + response = client.get_triggers() + assert len(response["Triggers"]) == 1 + assert response["Triggers"][0]["Name"] == trigger_name + + +@mock_glue +def test_get_triggers_dependent_job_name(): + client = create_glue_client() + + create_test_trigger(client) + job_name = create_test_job(client) + trigger_name = str(uuid4()) + response = client.create_trigger( + Name=trigger_name, + Type="ON_DEMAND", + Actions=[ + { + "JobName": job_name, + } + ], + ) + + response = client.get_triggers(DependentJobName=job_name) + assert len(response["Triggers"]) == 1 + assert response["Triggers"][0]["Name"] == trigger_name + + +@mock_glue +def test_start_trigger(): + client = create_glue_client() + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "SCHEDULED", + "Schedule": "cron(5 3 * * ? *)", + "Actions": [ + { + "JobName": job_name, + } + ], + } + client.create_trigger(Name=trigger_name, **trigger_attributes) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["State"] == "CREATED" + + client.start_trigger(Name=trigger_name) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["State"] == "ACTIVATED" + + +@mock_glue +def test_stop_trigger(): + client = create_glue_client() + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "SCHEDULED", + "Schedule": "cron(5 3 * * ? *)", + "Actions": [ + { + "JobName": job_name, + } + ], + "StartOnCreation": True, + } + client.create_trigger(Name=trigger_name, **trigger_attributes) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["State"] == "ACTIVATED" + + client.stop_trigger(Name=trigger_name) + + trigger = client.get_trigger(Name=trigger_name)["Trigger"] + + assert trigger["State"] == "DEACTIVATED" + + +@mock_glue +def test_list_triggers(): + client = create_glue_client() + trigger_name = create_test_trigger(client) + response = client.list_triggers() + assert response["TriggerNames"] == [trigger_name] + assert "NextToken" not in response + + +@mock_glue +def test_list_triggers_dependent_job_name(): + client = create_glue_client() + + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "ON_DEMAND", + "Actions": [ + { + "JobName": job_name, + } + ], + } + + client.create_trigger(Name=trigger_name, **trigger_attributes) + create_test_trigger(client) + + response = client.list_triggers() + assert len(response["TriggerNames"]) == 2 + + response = client.list_triggers(DependentJobName=job_name) + assert len(response["TriggerNames"]) == 1 + assert response["TriggerNames"] == [trigger_name] + + +@mock_glue +def test_list_triggers_tags(): + client = create_glue_client() + + job_name = create_test_job(client) + trigger_name = str(uuid4()) + trigger_attributes = { + "Type": "ON_DEMAND", + "Actions": [ + { + "JobName": job_name, + } + ], + "Tags": { + "CreatedBy": "moto", + }, + } + + client.create_trigger(Name=trigger_name, **trigger_attributes) + create_test_trigger(client) + + response = client.list_triggers() + assert len(response["TriggerNames"]) == 2 + + response = client.list_triggers(Tags={"CreatedBy": "moto"}) + assert len(response["TriggerNames"]) == 1 + assert response["TriggerNames"] == [trigger_name] + + +@mock_glue +def test_batch_get_triggers(): + client = create_glue_client() + trigger_name = create_test_trigger(client) + + response = client.batch_get_triggers( + TriggerNames=[trigger_name, "trigger-not-found"] + ) + + assert len(response["Triggers"]) == 1 + assert len(response["TriggersNotFound"]) == 1 + + +@mock_glue +def test_delete_trigger(): + client = create_glue_client() + trigger_name = create_test_trigger(client) + + client.get_trigger(Name=trigger_name) + + client.delete_trigger(Name=trigger_name) + + with pytest.raises(ClientError) as exc: + client.get_trigger(Name=trigger_name) + + assert exc.value.response["Error"]["Code"] == "EntityNotFoundException"