Add sessions to Glue (#6302)

This commit is contained in:
Dean Matter 2023-05-12 18:41:48 +03:00 committed by GitHub
parent c96a21ddf4
commit f7b2478726
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 312 additions and 13 deletions

View File

@ -3142,7 +3142,7 @@
## glue
<details>
<summary>30% implemented</summary>
<summary>33% implemented</summary>
- [X] batch_create_partition
- [ ] batch_delete_connection
@ -3181,7 +3181,7 @@
- [X] create_schema
- [ ] create_script
- [ ] create_security_configuration
- [ ] create_session
- [X] create_session
- [X] create_table
- [X] create_trigger
- [ ] create_user_defined_function
@ -3205,7 +3205,7 @@
- [X] delete_schema
- [ ] delete_schema_versions
- [ ] delete_security_configuration
- [ ] delete_session
- [X] delete_session
- [X] delete_table
- [X] delete_table_version
- [X] delete_trigger
@ -3258,7 +3258,7 @@
- [ ] get_schema_versions_diff
- [ ] get_security_configuration
- [ ] get_security_configurations
- [ ] get_session
- [X] get_session
- [ ] get_statement
- [X] get_table
- [X] get_table_version
@ -3291,7 +3291,7 @@
- [X] list_registries
- [ ] list_schema_versions
- [ ] list_schemas
- [ ] list_sessions
- [X] list_sessions
- [ ] list_statements
- [X] list_triggers
- [ ] list_workflows
@ -3320,7 +3320,7 @@
- [ ] start_workflow_run
- [X] stop_crawler
- [ ] stop_crawler_schedule
- [ ] stop_session
- [X] stop_session
- [X] stop_trigger
- [ ] stop_workflow_run
- [X] tag_resource

View File

@ -66,7 +66,7 @@ glue
- [ ] create_script
- [ ] create_security_configuration
- [ ] create_session
- [X] create_session
- [X] create_table
- [X] create_trigger
- [ ] create_user_defined_function
@ -90,7 +90,7 @@ glue
- [X] delete_schema
- [ ] delete_schema_versions
- [ ] delete_security_configuration
- [ ] delete_session
- [X] delete_session
- [X] delete_table
- [X] delete_table_version
- [X] delete_trigger
@ -155,7 +155,7 @@ glue
- [ ] get_schema_versions_diff
- [ ] get_security_configuration
- [ ] get_security_configurations
- [ ] get_session
- [X] get_session
- [ ] get_statement
- [X] get_table
- [X] get_table_version
@ -188,7 +188,7 @@ glue
- [X] list_registries
- [ ] list_schema_versions
- [ ] list_schemas
- [ ] list_sessions
- [X] list_sessions
- [ ] list_statements
- [X] list_triggers
- [ ] list_workflows
@ -217,7 +217,7 @@ glue
- [ ] start_workflow_run
- [X] stop_crawler
- [ ] stop_crawler_schedule
- [ ] stop_session
- [X] stop_session
- [X] stop_trigger
- [ ] stop_workflow_run
- [X] tag_resource

View File

@ -158,7 +158,6 @@ backend_url_patterns = [
("service-quotas", re.compile("https?://servicequotas\\.(.+)\\.amazonaws\\.com")),
("ses", re.compile("https?://email\\.(.+)\\.amazonaws\\.com")),
("ses", re.compile("https?://ses\\.(.+)\\.amazonaws\\.com")),
("sesv2", re.compile("https?://ses\\.(.+)\\.amazonaws\\.com")),
("sesv2", re.compile("https?://email\\.(.+)\\.amazonaws\\.com")),
("signer", re.compile("https?://signer\\.(.+)\\.amazonaws\\.com")),
("sns", re.compile("https?://sns\\.(.+)\\.amazonaws\\.com")),

View File

@ -31,6 +31,11 @@ class CrawlerAlreadyExistsException(AlreadyExistsException):
super().__init__("Crawler")
class SessionAlreadyExistsException(AlreadyExistsException):
def __init__(self) -> None:
super().__init__("Session")
class EntityNotFoundException(GlueClientError):
def __init__(self, msg: str):
super().__init__("EntityNotFoundException", msg)
@ -107,6 +112,16 @@ class SchemaVersionNotFoundFromSchemaVersionIdException(EntityNotFoundException)
)
class SessionNotFoundException(EntityNotFoundException):
def __init__(self, session: str):
super().__init__(f"Session {session} not found.")
class IllegalSessionStateException(GlueClientError):
def __init__(self, msg: str):
super().__init__("IllegalSessionStateException", msg)
class RegistryNotFoundException(EntityNotFoundException):
def __init__(self, resource: str, param_name: str, param_value: Optional[str]):
super().__init__(

View File

@ -30,6 +30,9 @@ from .exceptions import (
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
SessionAlreadyExistsException,
SessionNotFoundException,
IllegalSessionStateException,
TriggerNotFoundException,
)
from .utils import PartitionFilter
@ -82,6 +85,12 @@ class GlueBackend(BaseBackend):
"limit_default": 100,
"unique_attribute": "name",
},
"list_sessions": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "session_id",
},
"list_triggers": {
"input_token": "next_token",
"limit_key": "max_results",
@ -96,6 +105,7 @@ class GlueBackend(BaseBackend):
self.crawlers: Dict[str, FakeCrawler] = OrderedDict()
self.jobs: Dict[str, FakeJob] = OrderedDict()
self.job_runs: Dict[str, FakeJobRun] = OrderedDict()
self.sessions: Dict[str, FakeSession] = OrderedDict()
self.tagger = TaggingService()
self.triggers: Dict[str, FakeTrigger] = OrderedDict()
self.registries: Dict[str, FakeRegistry] = OrderedDict()
@ -771,6 +781,67 @@ class GlueBackend(BaseBackend):
return schema.as_dict()
def create_session(
self,
session_id: str,
description: str,
role: str,
command: Dict[str, str],
timeout: int,
idle_timeout: int,
default_arguments: Dict[str, str],
connections: Dict[str, List[str]],
max_capacity: float,
number_of_workers: int,
worker_type: str,
security_configuration: str,
glue_version: str,
tags: Dict[str, str],
request_origin: str,
) -> None:
if session_id in self.sessions:
raise SessionAlreadyExistsException()
session = FakeSession(
session_id,
description,
role,
command,
timeout,
idle_timeout,
default_arguments,
connections,
max_capacity,
number_of_workers,
worker_type,
security_configuration,
glue_version,
tags,
request_origin,
backend=self,
)
self.sessions[session_id] = session
def get_session(self, session_id: str) -> "FakeSession":
try:
return self.sessions[session_id]
except KeyError:
raise SessionNotFoundException(session_id)
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc]
def list_sessions(self) -> List["FakeSession"]: # type: ignore[misc]
return [session for _, session in self.sessions.items()]
def stop_session(self, session_id: str) -> None:
session = self.get_session(session_id)
session.stop_session()
def delete_session(self, session_id: str) -> None:
try:
del self.sessions[session_id]
except KeyError:
raise SessionNotFoundException(session_id)
def create_trigger(
self,
name: str,
@ -1517,6 +1588,74 @@ class FakeSchemaVersion(BaseModel):
}
class FakeSession(BaseModel):
def __init__(
self,
session_id: str,
description: str,
role: str,
command: Dict[str, str],
timeout: int,
idle_timeout: int,
default_arguments: Dict[str, str],
connections: Dict[str, List[str]],
max_capacity: float,
number_of_workers: int,
worker_type: str,
security_configuration: str,
glue_version: str,
tags: Dict[str, str],
request_origin: str,
backend: GlueBackend,
):
self.session_id = session_id
self.description = description
self.role = role
self.command = command
self.timeout = timeout
self.idle_timeout = idle_timeout
self.default_arguments = default_arguments
self.connections = connections
self.max_capacity = max_capacity
self.number_of_workers = number_of_workers
self.worker_type = worker_type
self.security_configuration = security_configuration
self.glue_version = glue_version
self.tags = tags
self.request_origin = request_origin
self.creation_time = datetime.utcnow()
self.last_updated = self.creation_time
self.arn = f"arn:aws:glue:{backend.region_name}:{backend.account_id}:session/{self.session_id}"
self.backend = backend
self.backend.tag_resource(self.arn, tags)
self.state = "READY"
def get_id(self) -> str:
return self.session_id
def as_dict(self) -> Dict[str, Any]:
return {
"Id": self.session_id,
"CreatedOn": self.creation_time.isoformat(),
"Status": self.state,
"ErrorMessage": "string",
"Description": self.description,
"Role": self.role,
"Command": self.command,
"DefaultArguments": self.default_arguments,
"Connections": self.connections,
"Progress": 12.3,
"MaxCapacity": 123.0,
"SecurityConfiguration": self.security_configuration,
"GlueVersion": self.glue_version,
}
def stop_session(self) -> None:
if self.state != "READY":
raise IllegalSessionStateException(f"Session is in {self.state} status")
self.state = "STOPPING"
class FakeTrigger(BaseModel):
def __init__(
self,

View File

@ -3,7 +3,7 @@ from typing import Any, Dict, List
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import glue_backends, GlueBackend, FakeJob, FakeCrawler
from .models import glue_backends, GlueBackend, FakeJob, FakeCrawler, FakeSession
class GlueResponse(BaseResponse):
@ -527,6 +527,72 @@ class GlueResponse(BaseResponse):
schema = self.glue_backend.update_schema(schema_id, compatibility, description)
return json.dumps(schema)
def create_session(self) -> str:
self.glue_backend.create_session(
session_id=self.parameters.get("Id"), # type: ignore[arg-type]
description=self.parameters.get("Description"), # type: ignore[arg-type]
role=self.parameters.get("Role"), # type: ignore[arg-type]
command=self.parameters.get("Command"), # type: ignore[arg-type]
timeout=self.parameters.get("Timeout"), # type: ignore[arg-type]
idle_timeout=self.parameters.get("IdleTimeout"), # type: ignore[arg-type]
default_arguments=self.parameters.get("DefaultArguments"), # type: ignore[arg-type]
connections=self.parameters.get("Connections"), # type: ignore[arg-type]
max_capacity=self.parameters.get("MaxCapacity"), # type: ignore[arg-type]
number_of_workers=self.parameters.get("NumberOfWorkers"), # type: ignore[arg-type]
worker_type=self.parameters.get("WorkerType"), # type: ignore[arg-type]
security_configuration=self.parameters.get("SecurityConfiguration"), # type: ignore[arg-type]
glue_version=self.parameters.get("GlueVersion"), # type: ignore[arg-type]
tags=self.parameters.get("Tags"), # type: ignore[arg-type]
request_origin=self.parameters.get("RequestOrigin"), # type: ignore[arg-type]
)
return ""
def get_session(self) -> str:
session_id = self.parameters.get("Id")
session = self.glue_backend.get_session(session_id) # type: ignore[arg-type]
return json.dumps({"Session": session.as_dict()})
def list_sessions(self) -> str:
next_token = self._get_param("NextToken")
max_results = self._get_int_param("MaxResults")
tags = self._get_param("Tags")
sessions, next_token = self.glue_backend.list_sessions(
next_token=next_token, max_results=max_results
)
filtered_session_ids = self._filter_sessions_by_tags(sessions, tags)
return json.dumps(
dict(
Ids=[session_id for session_id in filtered_session_ids],
Sessions=[
self.glue_backend.get_session(session_id).as_dict()
for session_id in filtered_session_ids
],
NextToken=next_token,
)
)
def _filter_sessions_by_tags(
self, sessions: List[FakeSession], tags: Dict[str, str]
) -> List[str]:
if not tags:
return [session.get_id() for session in sessions]
return [
session.get_id()
for session in sessions
if self.is_tags_match(session.arn, tags)
]
def stop_session(self) -> str:
session_id = self.parameters.get("Id")
self.glue_backend.stop_session(session_id) # type: ignore[arg-type]
return json.dumps({"Id": session_id})
def delete_session(self) -> str:
session_id = self.parameters.get("Id")
self.glue_backend.delete_session(session_id) # type: ignore[arg-type]
return json.dumps({"Id": session_id})
def batch_get_crawlers(self) -> str:
crawler_names = self._get_param("CrawlerNames")
crawlers = self.glue_backend.batch_get_crawlers(crawler_names)

View File

@ -789,3 +789,83 @@ def test_delete_trigger():
client.get_trigger(Name=trigger_name)
assert exc.value.response["Error"]["Code"] == "EntityNotFoundException"
def create_test_session(client):
session_id = str(uuid4())
client.create_session(
Id=session_id,
Description="string",
Role="arn_of_a_testing_role",
Command={"Name": "string", "PythonVersion": "string"},
Timeout=123,
IdleTimeout=123,
DefaultArguments={"string": "string"},
Connections={
"Connections": [
"string",
]
},
MaxCapacity=123.0,
NumberOfWorkers=123,
WorkerType="Standard",
SecurityConfiguration="string",
GlueVersion="string",
Tags={"string": "string"},
RequestOrigin="string",
)
return session_id
@mock_glue
def test_create_session():
client = create_glue_client()
session_id = create_test_session(client)
resp = client.get_session(Id=session_id)
assert resp["Session"]["Id"] == session_id
@mock_glue
def test_get_session():
client = create_glue_client()
session_id = create_test_session(client)
resp = client.get_session(Id=session_id)
assert resp["Session"]["Id"] == session_id
@mock_glue
def test_list_sessions():
client = create_glue_client()
session_id = create_test_session(client)
resp = client.list_sessions()
assert session_id in resp["Ids"]
@mock_glue
def test_delete_session():
client = create_glue_client()
session_id = create_test_session(client)
resp = client.delete_session(Id=session_id)
assert resp["Id"] == session_id
resp = client.list_sessions()
assert session_id not in resp["Ids"]
@mock_glue
def test_stop_session():
client = create_glue_client()
session_id = create_test_session(client)
resp = client.get_session(Id=session_id)
assert resp["Session"]["Status"] == "READY"
resp = client.stop_session(Id=session_id)
assert resp["Id"] == session_id
resp = client.get_session(Id=session_id)
assert resp["Session"]["Status"] == "STOPPING"