From f7b247872604a6ff26bab7af1018dac343e90d27 Mon Sep 17 00:00:00 2001 From: Dean Matter <65437301+deanmatter@users.noreply.github.com> Date: Fri, 12 May 2023 18:41:48 +0300 Subject: [PATCH] Add sessions to Glue (#6302) --- IMPLEMENTATION_COVERAGE.md | 12 +-- docs/docs/services/glue.rst | 10 +-- moto/backend_index.py | 1 - moto/glue/exceptions.py | 15 ++++ moto/glue/models.py | 139 +++++++++++++++++++++++++++++++++++ moto/glue/responses.py | 68 ++++++++++++++++- tests/test_glue/test_glue.py | 80 ++++++++++++++++++++ 7 files changed, 312 insertions(+), 13 deletions(-) diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index e090a1d48..26f6b9d2a 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3142,7 +3142,7 @@ ## glue
-30% implemented +33% implemented - [X] batch_create_partition - [ ] batch_delete_connection @@ -3181,7 +3181,7 @@ - [X] create_schema - [ ] create_script - [ ] create_security_configuration -- [ ] create_session +- [X] create_session - [X] create_table - [X] create_trigger - [ ] create_user_defined_function @@ -3205,7 +3205,7 @@ - [X] delete_schema - [ ] delete_schema_versions - [ ] delete_security_configuration -- [ ] delete_session +- [X] delete_session - [X] delete_table - [X] delete_table_version - [X] delete_trigger @@ -3258,7 +3258,7 @@ - [ ] get_schema_versions_diff - [ ] get_security_configuration - [ ] get_security_configurations -- [ ] get_session +- [X] get_session - [ ] get_statement - [X] get_table - [X] get_table_version @@ -3291,7 +3291,7 @@ - [X] list_registries - [ ] list_schema_versions - [ ] list_schemas -- [ ] list_sessions +- [X] list_sessions - [ ] list_statements - [X] list_triggers - [ ] list_workflows @@ -3320,7 +3320,7 @@ - [ ] start_workflow_run - [X] stop_crawler - [ ] stop_crawler_schedule -- [ ] stop_session +- [X] stop_session - [X] stop_trigger - [ ] stop_workflow_run - [X] tag_resource diff --git a/docs/docs/services/glue.rst b/docs/docs/services/glue.rst index 0e0c7407a..2aeac6467 100644 --- a/docs/docs/services/glue.rst +++ b/docs/docs/services/glue.rst @@ -66,7 +66,7 @@ glue - [ ] create_script - [ ] create_security_configuration -- [ ] create_session +- [X] create_session - [X] create_table - [X] create_trigger - [ ] create_user_defined_function @@ -90,7 +90,7 @@ glue - [X] delete_schema - [ ] delete_schema_versions - [ ] delete_security_configuration -- [ ] delete_session +- [X] delete_session - [X] delete_table - [X] delete_table_version - [X] delete_trigger @@ -155,7 +155,7 @@ glue - [ ] get_schema_versions_diff - [ ] get_security_configuration - [ ] get_security_configurations -- [ ] get_session +- [X] get_session - [ ] get_statement - [X] get_table - [X] get_table_version @@ -188,7 +188,7 @@ glue - [X] list_registries - [ ] list_schema_versions - [ ] list_schemas -- [ ] list_sessions +- [X] list_sessions - [ ] list_statements - [X] list_triggers - [ ] list_workflows @@ -217,7 +217,7 @@ glue - [ ] start_workflow_run - [X] stop_crawler - [ ] stop_crawler_schedule -- [ ] stop_session +- [X] stop_session - [X] stop_trigger - [ ] stop_workflow_run - [X] tag_resource diff --git a/moto/backend_index.py b/moto/backend_index.py index 8708ee366..7b10be586 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -158,7 +158,6 @@ backend_url_patterns = [ ("service-quotas", re.compile("https?://servicequotas\\.(.+)\\.amazonaws\\.com")), ("ses", re.compile("https?://email\\.(.+)\\.amazonaws\\.com")), ("ses", re.compile("https?://ses\\.(.+)\\.amazonaws\\.com")), - ("sesv2", re.compile("https?://ses\\.(.+)\\.amazonaws\\.com")), ("sesv2", re.compile("https?://email\\.(.+)\\.amazonaws\\.com")), ("signer", re.compile("https?://signer\\.(.+)\\.amazonaws\\.com")), ("sns", re.compile("https?://sns\\.(.+)\\.amazonaws\\.com")), diff --git a/moto/glue/exceptions.py b/moto/glue/exceptions.py index d31932042..dfa71bcfe 100644 --- a/moto/glue/exceptions.py +++ b/moto/glue/exceptions.py @@ -31,6 +31,11 @@ class CrawlerAlreadyExistsException(AlreadyExistsException): super().__init__("Crawler") +class SessionAlreadyExistsException(AlreadyExistsException): + def __init__(self) -> None: + super().__init__("Session") + + class EntityNotFoundException(GlueClientError): def __init__(self, msg: str): super().__init__("EntityNotFoundException", msg) @@ -107,6 +112,16 @@ class SchemaVersionNotFoundFromSchemaVersionIdException(EntityNotFoundException) ) +class SessionNotFoundException(EntityNotFoundException): + def __init__(self, session: str): + super().__init__(f"Session {session} not found.") + + +class IllegalSessionStateException(GlueClientError): + def __init__(self, msg: str): + super().__init__("IllegalSessionStateException", msg) + + class RegistryNotFoundException(EntityNotFoundException): def __init__(self, resource: str, param_name: str, param_value: Optional[str]): super().__init__( diff --git a/moto/glue/models.py b/moto/glue/models.py index 6b288d650..a15c89152 100644 --- a/moto/glue/models.py +++ b/moto/glue/models.py @@ -30,6 +30,9 @@ from .exceptions import ( SchemaVersionNotFoundFromSchemaIdException, SchemaNotFoundException, SchemaVersionMetadataAlreadyExistsException, + SessionAlreadyExistsException, + SessionNotFoundException, + IllegalSessionStateException, TriggerNotFoundException, ) from .utils import PartitionFilter @@ -82,6 +85,12 @@ class GlueBackend(BaseBackend): "limit_default": 100, "unique_attribute": "name", }, + "list_sessions": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "unique_attribute": "session_id", + }, "list_triggers": { "input_token": "next_token", "limit_key": "max_results", @@ -96,6 +105,7 @@ class GlueBackend(BaseBackend): self.crawlers: Dict[str, FakeCrawler] = OrderedDict() self.jobs: Dict[str, FakeJob] = OrderedDict() self.job_runs: Dict[str, FakeJobRun] = OrderedDict() + self.sessions: Dict[str, FakeSession] = OrderedDict() self.tagger = TaggingService() self.triggers: Dict[str, FakeTrigger] = OrderedDict() self.registries: Dict[str, FakeRegistry] = OrderedDict() @@ -771,6 +781,67 @@ class GlueBackend(BaseBackend): return schema.as_dict() + def create_session( + self, + session_id: str, + description: str, + role: str, + command: Dict[str, str], + timeout: int, + idle_timeout: int, + default_arguments: Dict[str, str], + connections: Dict[str, List[str]], + max_capacity: float, + number_of_workers: int, + worker_type: str, + security_configuration: str, + glue_version: str, + tags: Dict[str, str], + request_origin: str, + ) -> None: + if session_id in self.sessions: + raise SessionAlreadyExistsException() + + session = FakeSession( + session_id, + description, + role, + command, + timeout, + idle_timeout, + default_arguments, + connections, + max_capacity, + number_of_workers, + worker_type, + security_configuration, + glue_version, + tags, + request_origin, + backend=self, + ) + self.sessions[session_id] = session + + def get_session(self, session_id: str) -> "FakeSession": + try: + return self.sessions[session_id] + except KeyError: + raise SessionNotFoundException(session_id) + + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc] + def list_sessions(self) -> List["FakeSession"]: # type: ignore[misc] + return [session for _, session in self.sessions.items()] + + def stop_session(self, session_id: str) -> None: + session = self.get_session(session_id) + session.stop_session() + + def delete_session(self, session_id: str) -> None: + try: + del self.sessions[session_id] + except KeyError: + raise SessionNotFoundException(session_id) + def create_trigger( self, name: str, @@ -1517,6 +1588,74 @@ class FakeSchemaVersion(BaseModel): } +class FakeSession(BaseModel): + def __init__( + self, + session_id: str, + description: str, + role: str, + command: Dict[str, str], + timeout: int, + idle_timeout: int, + default_arguments: Dict[str, str], + connections: Dict[str, List[str]], + max_capacity: float, + number_of_workers: int, + worker_type: str, + security_configuration: str, + glue_version: str, + tags: Dict[str, str], + request_origin: str, + backend: GlueBackend, + ): + self.session_id = session_id + self.description = description + self.role = role + self.command = command + self.timeout = timeout + self.idle_timeout = idle_timeout + self.default_arguments = default_arguments + self.connections = connections + self.max_capacity = max_capacity + self.number_of_workers = number_of_workers + self.worker_type = worker_type + self.security_configuration = security_configuration + self.glue_version = glue_version + self.tags = tags + self.request_origin = request_origin + self.creation_time = datetime.utcnow() + self.last_updated = self.creation_time + self.arn = f"arn:aws:glue:{backend.region_name}:{backend.account_id}:session/{self.session_id}" + self.backend = backend + self.backend.tag_resource(self.arn, tags) + self.state = "READY" + + def get_id(self) -> str: + return self.session_id + + def as_dict(self) -> Dict[str, Any]: + return { + "Id": self.session_id, + "CreatedOn": self.creation_time.isoformat(), + "Status": self.state, + "ErrorMessage": "string", + "Description": self.description, + "Role": self.role, + "Command": self.command, + "DefaultArguments": self.default_arguments, + "Connections": self.connections, + "Progress": 12.3, + "MaxCapacity": 123.0, + "SecurityConfiguration": self.security_configuration, + "GlueVersion": self.glue_version, + } + + def stop_session(self) -> None: + if self.state != "READY": + raise IllegalSessionStateException(f"Session is in {self.state} status") + self.state = "STOPPING" + + class FakeTrigger(BaseModel): def __init__( self, diff --git a/moto/glue/responses.py b/moto/glue/responses.py index b35214b2f..591aa6def 100644 --- a/moto/glue/responses.py +++ b/moto/glue/responses.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse -from .models import glue_backends, GlueBackend, FakeJob, FakeCrawler +from .models import glue_backends, GlueBackend, FakeJob, FakeCrawler, FakeSession class GlueResponse(BaseResponse): @@ -527,6 +527,72 @@ class GlueResponse(BaseResponse): schema = self.glue_backend.update_schema(schema_id, compatibility, description) return json.dumps(schema) + def create_session(self) -> str: + self.glue_backend.create_session( + session_id=self.parameters.get("Id"), # type: ignore[arg-type] + description=self.parameters.get("Description"), # type: ignore[arg-type] + role=self.parameters.get("Role"), # type: ignore[arg-type] + command=self.parameters.get("Command"), # type: ignore[arg-type] + timeout=self.parameters.get("Timeout"), # type: ignore[arg-type] + idle_timeout=self.parameters.get("IdleTimeout"), # type: ignore[arg-type] + default_arguments=self.parameters.get("DefaultArguments"), # type: ignore[arg-type] + connections=self.parameters.get("Connections"), # type: ignore[arg-type] + max_capacity=self.parameters.get("MaxCapacity"), # type: ignore[arg-type] + number_of_workers=self.parameters.get("NumberOfWorkers"), # type: ignore[arg-type] + worker_type=self.parameters.get("WorkerType"), # type: ignore[arg-type] + security_configuration=self.parameters.get("SecurityConfiguration"), # type: ignore[arg-type] + glue_version=self.parameters.get("GlueVersion"), # type: ignore[arg-type] + tags=self.parameters.get("Tags"), # type: ignore[arg-type] + request_origin=self.parameters.get("RequestOrigin"), # type: ignore[arg-type] + ) + return "" + + def get_session(self) -> str: + session_id = self.parameters.get("Id") + session = self.glue_backend.get_session(session_id) # type: ignore[arg-type] + return json.dumps({"Session": session.as_dict()}) + + def list_sessions(self) -> str: + next_token = self._get_param("NextToken") + max_results = self._get_int_param("MaxResults") + tags = self._get_param("Tags") + sessions, next_token = self.glue_backend.list_sessions( + next_token=next_token, max_results=max_results + ) + filtered_session_ids = self._filter_sessions_by_tags(sessions, tags) + + return json.dumps( + dict( + Ids=[session_id for session_id in filtered_session_ids], + Sessions=[ + self.glue_backend.get_session(session_id).as_dict() + for session_id in filtered_session_ids + ], + NextToken=next_token, + ) + ) + + def _filter_sessions_by_tags( + self, sessions: List[FakeSession], tags: Dict[str, str] + ) -> List[str]: + if not tags: + return [session.get_id() for session in sessions] + return [ + session.get_id() + for session in sessions + if self.is_tags_match(session.arn, tags) + ] + + def stop_session(self) -> str: + session_id = self.parameters.get("Id") + self.glue_backend.stop_session(session_id) # type: ignore[arg-type] + return json.dumps({"Id": session_id}) + + def delete_session(self) -> str: + session_id = self.parameters.get("Id") + self.glue_backend.delete_session(session_id) # type: ignore[arg-type] + return json.dumps({"Id": session_id}) + def batch_get_crawlers(self) -> str: crawler_names = self._get_param("CrawlerNames") crawlers = self.glue_backend.batch_get_crawlers(crawler_names) diff --git a/tests/test_glue/test_glue.py b/tests/test_glue/test_glue.py index b86e5ccce..a0014cdc1 100644 --- a/tests/test_glue/test_glue.py +++ b/tests/test_glue/test_glue.py @@ -789,3 +789,83 @@ def test_delete_trigger(): client.get_trigger(Name=trigger_name) assert exc.value.response["Error"]["Code"] == "EntityNotFoundException" + + +def create_test_session(client): + session_id = str(uuid4()) + client.create_session( + Id=session_id, + Description="string", + Role="arn_of_a_testing_role", + Command={"Name": "string", "PythonVersion": "string"}, + Timeout=123, + IdleTimeout=123, + DefaultArguments={"string": "string"}, + Connections={ + "Connections": [ + "string", + ] + }, + MaxCapacity=123.0, + NumberOfWorkers=123, + WorkerType="Standard", + SecurityConfiguration="string", + GlueVersion="string", + Tags={"string": "string"}, + RequestOrigin="string", + ) + return session_id + + +@mock_glue +def test_create_session(): + client = create_glue_client() + session_id = create_test_session(client) + + resp = client.get_session(Id=session_id) + assert resp["Session"]["Id"] == session_id + + +@mock_glue +def test_get_session(): + client = create_glue_client() + session_id = create_test_session(client) + + resp = client.get_session(Id=session_id) + assert resp["Session"]["Id"] == session_id + + +@mock_glue +def test_list_sessions(): + client = create_glue_client() + session_id = create_test_session(client) + + resp = client.list_sessions() + assert session_id in resp["Ids"] + + +@mock_glue +def test_delete_session(): + client = create_glue_client() + session_id = create_test_session(client) + + resp = client.delete_session(Id=session_id) + assert resp["Id"] == session_id + + resp = client.list_sessions() + assert session_id not in resp["Ids"] + + +@mock_glue +def test_stop_session(): + client = create_glue_client() + session_id = create_test_session(client) + + resp = client.get_session(Id=session_id) + assert resp["Session"]["Status"] == "READY" + + resp = client.stop_session(Id=session_id) + assert resp["Id"] == session_id + + resp = client.get_session(Id=session_id) + assert resp["Session"]["Status"] == "STOPPING"