[Glue] Triggers (#6253)

This commit is contained in:
Przemysław Dąbek 2023-04-25 18:02:57 +02:00 committed by GitHub
parent 92da03b1dd
commit 69cf309183
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 565 additions and 1 deletions

View File

@ -114,6 +114,11 @@ class RegistryNotFoundException(EntityNotFoundException):
)
class TriggerNotFoundException(EntityNotFoundException):
def __init__(self, trigger: str):
super().__init__(f"Trigger {trigger} not found.")
class CrawlerRunningException(GlueClientError):
def __init__(self, msg: str):
super().__init__("CrawlerRunningException", msg)

View File

@ -30,6 +30,7 @@ from .exceptions import (
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
TriggerNotFoundException,
)
from .utils import PartitionFilter
from .glue_schema_registry_utils import (
@ -57,6 +58,18 @@ from ..utilities.tagging_service import TaggingService
class GlueBackend(BaseBackend):
PAGINATION_MODEL = {
"get_jobs": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
"get_triggers": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "name",
},
"list_crawlers": {
"input_token": "next_token",
"limit_key": "max_results",
@ -69,7 +82,7 @@ class GlueBackend(BaseBackend):
"limit_default": 100,
"unique_attribute": "name",
},
"get_jobs": {
"list_triggers": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
@ -84,6 +97,7 @@ class GlueBackend(BaseBackend):
self.jobs: Dict[str, FakeJob] = OrderedDict()
self.job_runs: Dict[str, FakeJobRun] = OrderedDict()
self.tagger = TaggingService()
self.triggers: Dict[str, FakeTrigger] = OrderedDict()
self.registries: Dict[str, FakeRegistry] = OrderedDict()
self.num_schemas = 0
self.num_schema_versions = 0
@ -757,6 +771,79 @@ class GlueBackend(BaseBackend):
return schema.as_dict()
def create_trigger(
self,
name: str,
workflow_name: str,
trigger_type: str,
schedule: str,
predicate: Dict[str, Any],
actions: List[Dict[str, Any]],
description: str,
start_on_creation: bool,
tags: Dict[str, str],
event_batching_condition: Dict[str, Any],
) -> None:
self.triggers[name] = FakeTrigger(
name=name,
workflow_name=workflow_name,
trigger_type=trigger_type,
schedule=schedule,
predicate=predicate,
actions=actions,
description=description,
start_on_creation=start_on_creation,
tags=tags,
event_batching_condition=event_batching_condition,
backend=self,
)
def get_trigger(self, name: str) -> "FakeTrigger":
try:
return self.triggers[name]
except KeyError:
raise TriggerNotFoundException(name)
def start_trigger(self, name: str) -> None:
trigger = self.get_trigger(name)
trigger.start_trigger()
def stop_trigger(self, name: str) -> None:
trigger = self.get_trigger(name)
trigger.stop_trigger()
@paginate(pagination_model=PAGINATION_MODEL)
def get_triggers(self, dependent_job_name: str) -> List["FakeTrigger"]: # type: ignore
if dependent_job_name:
triggers = []
for trigger in self.triggers.values():
for action in trigger.actions:
if ("JobName" in action) and (
action["JobName"] == dependent_job_name
):
triggers.append(trigger)
return triggers
return list(self.triggers.values())
@paginate(pagination_model=PAGINATION_MODEL)
def list_triggers(self, dependent_job_name: str) -> List["FakeTrigger"]: # type: ignore
if dependent_job_name:
triggers = []
for trigger in self.triggers.values():
for action in trigger.actions:
if ("JobName" in action) and (
action["JobName"] == dependent_job_name
):
triggers.append(trigger)
return triggers
return list(self.triggers.values())
def delete_trigger(self, name: str) -> None:
if name in self.triggers:
del self.triggers[name]
def batch_delete_table(
self, database_name: str, tables: List[str]
) -> List[Dict[str, Any]]:
@ -874,6 +961,13 @@ class GlueBackend(BaseBackend):
jobs.append(self.jobs[job_name].as_dict())
return jobs
def batch_get_triggers(self, trigger_names: List[str]) -> List[Dict[str, Any]]:
triggers = []
for trigger_name in trigger_names:
if trigger_name in self.triggers:
triggers.append(self.triggers[trigger_name].as_dict())
return triggers
class FakeDatabase(BaseModel):
def __init__(self, database_name: str, database_input: Dict[str, Any]):
@ -1423,4 +1517,70 @@ class FakeSchemaVersion(BaseModel):
}
class FakeTrigger(BaseModel):
def __init__(
self,
backend: GlueBackend,
name: str,
workflow_name: str,
trigger_type: str, # to avoid any issues with built-in function type()
schedule: str,
predicate: Dict[str, Any],
actions: List[Dict[str, Any]],
description: str,
start_on_creation: bool,
tags: Dict[str, str],
event_batching_condition: Dict[str, Any],
):
self.name = name
self.workflow_name = workflow_name
self.trigger_type = trigger_type
self.schedule = schedule
self.predicate = predicate
self.actions = actions
self.description = description
if start_on_creation:
self.state = "ACTIVATED"
else:
self.state = "CREATED"
self.event_batching_condition = event_batching_condition
self.arn = f"arn:aws:glue:{backend.region_name}:{backend.account_id}:trigger/{self.name}"
self.backend = backend
self.backend.tag_resource(self.arn, tags)
def get_name(self) -> str:
return self.name
def start_trigger(self) -> None:
self.state = "ACTIVATED"
def stop_trigger(self) -> None:
self.state = "DEACTIVATED"
def as_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {
"Name": self.name,
"Type": self.trigger_type,
"Actions": self.actions,
"State": self.state,
}
if self.workflow_name:
data["WorkflowName"] = self.workflow_name
if self.trigger_type == "SCHEDULED":
data["Schedule"] = self.schedule
if self.predicate:
data["Predicate"] = self.predicate
if self.description:
data["Description"] = self.description
if self.event_batching_condition:
data["EventBatchingCondition"] = self.event_batching_condition
return data
glue_backends = BackendDict(GlueBackend, "glue")

View File

@ -415,6 +415,17 @@ class GlueResponse(BaseResponse):
return [job.get_name() for job in jobs]
return [job.get_name() for job in jobs if self.is_tags_match(job.arn, tags)]
def filter_triggers_by_tags(
self, triggers: List[FakeJob], tags: Dict[str, str]
) -> List[str]:
if not tags:
return [trigger.get_name() for trigger in triggers]
return [
trigger.get_name()
for trigger in triggers
if self.is_tags_match(trigger.arn, tags)
]
def is_tags_match(self, resource_arn: str, tags: Dict[str, str]) -> bool:
glue_resource_tags = self.glue_backend.get_tags(resource_arn)
mutual_keys = set(glue_resource_tags).intersection(tags)
@ -542,3 +553,95 @@ class GlueResponse(BaseResponse):
def get_partition_indexes(self) -> str:
return json.dumps({"PartitionIndexDescriptorList": []})
def create_trigger(self) -> str:
name = self._get_param("Name")
workflow_name = self._get_param("WorkflowName")
trigger_type = self._get_param("Type")
schedule = self._get_param("Schedule")
predicate = self._get_param("Predicate")
actions = self._get_param("Actions")
description = self._get_param("Description")
start_on_creation = self._get_param("StartOnCreation")
tags = self._get_param("Tags")
event_batching_condition = self._get_param("EventBatchingCondition")
self.glue_backend.create_trigger(
name=name,
workflow_name=workflow_name,
trigger_type=trigger_type,
schedule=schedule,
predicate=predicate,
actions=actions,
description=description,
start_on_creation=start_on_creation,
tags=tags,
event_batching_condition=event_batching_condition,
)
return json.dumps({"Name": name})
def get_trigger(self) -> str:
name = self.parameters.get("Name")
trigger = self.glue_backend.get_trigger(name) # type: ignore[arg-type]
return json.dumps({"Trigger": trigger.as_dict()})
def get_triggers(self) -> str:
next_token = self._get_param("NextToken")
dependent_job_name = self._get_param("DependentJobName")
max_results = self._get_int_param("MaxResults")
triggers, next_token = self.glue_backend.get_triggers(
next_token=next_token,
dependent_job_name=dependent_job_name,
max_results=max_results,
)
return json.dumps(
dict(
Triggers=[trigger.as_dict() for trigger in triggers],
NextToken=next_token,
)
)
def list_triggers(self) -> str:
next_token = self._get_param("NextToken")
dependent_job_name = self._get_param("DependentJobName")
max_results = self._get_int_param("MaxResults")
tags = self._get_param("Tags")
triggers, next_token = self.glue_backend.list_triggers(
next_token=next_token,
dependent_job_name=dependent_job_name,
max_results=max_results,
)
filtered_trigger_names = self.filter_triggers_by_tags(triggers, tags)
return json.dumps(
dict(
TriggerNames=[trigger_name for trigger_name in filtered_trigger_names],
NextToken=next_token,
)
)
def batch_get_triggers(self) -> str:
trigger_names = self._get_param("TriggerNames")
triggers = self.glue_backend.batch_get_triggers(trigger_names)
triggers_not_found = list(
set(trigger_names) - set(map(lambda trigger: trigger["Name"], triggers))
)
return json.dumps(
{
"Triggers": triggers,
"TriggersNotFound": triggers_not_found,
}
)
def start_trigger(self) -> str:
name = self.parameters.get("Name")
self.glue_backend.start_trigger(name) # type: ignore[arg-type]
return json.dumps({"Name": name})
def stop_trigger(self) -> str:
name = self.parameters.get("Name")
self.glue_backend.stop_trigger(name) # type: ignore[arg-type]
return json.dumps({"Name": name})
def delete_trigger(self) -> str:
name = self.parameters.get("Name")
self.glue_backend.delete_trigger(name) # type: ignore[arg-type]
return json.dumps({"Name": name})

View File

@ -493,3 +493,299 @@ def test_batch_get_crawlers():
response["Crawlers"].should.have.length_of(1)
response["CrawlersNotFound"].should.have.length_of(1)
@mock_glue
def test_create_trigger():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
response = client.create_trigger(
Name=trigger_name,
Type="ON_DEMAND",
Actions=[
{
"JobName": job_name,
}
],
)
assert response["Name"] == trigger_name
@mock_glue
def test_get_trigger_on_demand():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "ON_DEMAND",
"Actions": [
{
"JobName": job_name,
}
],
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["Name"] == trigger_name
assert trigger["Type"] == "ON_DEMAND"
assert trigger["State"] == "CREATED"
assert trigger["Actions"] == [{"JobName": job_name}]
@mock_glue
def test_get_trigger_scheduled():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "SCHEDULED",
"Schedule": "cron(5 3 * * ? *)",
"Actions": [
{
"JobName": job_name,
}
],
"StartOnCreation": True,
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["Name"] == trigger_name
assert trigger["Type"] == "SCHEDULED"
assert trigger["State"] == "ACTIVATED"
assert trigger["Actions"] == [{"JobName": job_name}]
@mock_glue
def test_get_trigger_conditional():
client = create_glue_client()
crawler_name = create_test_crawler(client)
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "CONDITIONAL",
"Actions": [
{
"JobName": job_name,
}
],
"StartOnCreation": True,
"Predicate": {
"Logical": "ANY",
"Conditions": [
{
"LogicalOperator": "EQUALS",
"CrawlerName": crawler_name,
"CrawlState": "SUCCEEDED",
}
],
},
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["Name"] == trigger_name
assert trigger["Type"] == "CONDITIONAL"
assert trigger["State"] == "ACTIVATED"
assert trigger["Actions"] == [{"JobName": job_name}]
assert "Predicate" in trigger
def create_test_trigger(client, tags=None):
job_name = create_test_job(client)
trigger_name = str(uuid4())
client.create_trigger(
Name=trigger_name,
Type="ON_DEMAND",
Actions=[
{
"JobName": job_name,
}
],
Tags=tags or {},
)
return trigger_name
@mock_glue
def test_get_triggers_trigger_name_exists():
client = create_glue_client()
trigger_name = create_test_trigger(client)
response = client.get_triggers()
assert len(response["Triggers"]) == 1
assert response["Triggers"][0]["Name"] == trigger_name
@mock_glue
def test_get_triggers_dependent_job_name():
client = create_glue_client()
create_test_trigger(client)
job_name = create_test_job(client)
trigger_name = str(uuid4())
response = client.create_trigger(
Name=trigger_name,
Type="ON_DEMAND",
Actions=[
{
"JobName": job_name,
}
],
)
response = client.get_triggers(DependentJobName=job_name)
assert len(response["Triggers"]) == 1
assert response["Triggers"][0]["Name"] == trigger_name
@mock_glue
def test_start_trigger():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "SCHEDULED",
"Schedule": "cron(5 3 * * ? *)",
"Actions": [
{
"JobName": job_name,
}
],
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["State"] == "CREATED"
client.start_trigger(Name=trigger_name)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["State"] == "ACTIVATED"
@mock_glue
def test_stop_trigger():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "SCHEDULED",
"Schedule": "cron(5 3 * * ? *)",
"Actions": [
{
"JobName": job_name,
}
],
"StartOnCreation": True,
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["State"] == "ACTIVATED"
client.stop_trigger(Name=trigger_name)
trigger = client.get_trigger(Name=trigger_name)["Trigger"]
assert trigger["State"] == "DEACTIVATED"
@mock_glue
def test_list_triggers():
client = create_glue_client()
trigger_name = create_test_trigger(client)
response = client.list_triggers()
assert response["TriggerNames"] == [trigger_name]
assert "NextToken" not in response
@mock_glue
def test_list_triggers_dependent_job_name():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "ON_DEMAND",
"Actions": [
{
"JobName": job_name,
}
],
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
create_test_trigger(client)
response = client.list_triggers()
assert len(response["TriggerNames"]) == 2
response = client.list_triggers(DependentJobName=job_name)
assert len(response["TriggerNames"]) == 1
assert response["TriggerNames"] == [trigger_name]
@mock_glue
def test_list_triggers_tags():
client = create_glue_client()
job_name = create_test_job(client)
trigger_name = str(uuid4())
trigger_attributes = {
"Type": "ON_DEMAND",
"Actions": [
{
"JobName": job_name,
}
],
"Tags": {
"CreatedBy": "moto",
},
}
client.create_trigger(Name=trigger_name, **trigger_attributes)
create_test_trigger(client)
response = client.list_triggers()
assert len(response["TriggerNames"]) == 2
response = client.list_triggers(Tags={"CreatedBy": "moto"})
assert len(response["TriggerNames"]) == 1
assert response["TriggerNames"] == [trigger_name]
@mock_glue
def test_batch_get_triggers():
client = create_glue_client()
trigger_name = create_test_trigger(client)
response = client.batch_get_triggers(
TriggerNames=[trigger_name, "trigger-not-found"]
)
assert len(response["Triggers"]) == 1
assert len(response["TriggersNotFound"]) == 1
@mock_glue
def test_delete_trigger():
client = create_glue_client()
trigger_name = create_test_trigger(client)
client.get_trigger(Name=trigger_name)
client.delete_trigger(Name=trigger_name)
with pytest.raises(ClientError) as exc:
client.get_trigger(Name=trigger_name)
assert exc.value.response["Error"]["Code"] == "EntityNotFoundException"