Glue: get_table_version() (#5968)
This commit is contained in:
parent
5d87085435
commit
95bbb5af4b
@ -3100,7 +3100,7 @@
|
||||
|
||||
## glue
|
||||
<details>
|
||||
<summary>21% implemented</summary>
|
||||
<summary>23% implemented</summary>
|
||||
|
||||
- [X] batch_create_partition
|
||||
- [ ] batch_delete_connection
|
||||
@ -3165,7 +3165,7 @@
|
||||
- [ ] delete_security_configuration
|
||||
- [ ] delete_session
|
||||
- [X] delete_table
|
||||
- [ ] delete_table_version
|
||||
- [X] delete_table_version
|
||||
- [ ] delete_trigger
|
||||
- [ ] delete_user_defined_function
|
||||
- [ ] delete_workflow
|
||||
@ -3219,8 +3219,8 @@
|
||||
- [ ] get_session
|
||||
- [ ] get_statement
|
||||
- [X] get_table
|
||||
- [ ] get_table_version
|
||||
- [ ] get_table_versions
|
||||
- [X] get_table_version
|
||||
- [X] get_table_versions
|
||||
- [X] get_tables
|
||||
- [X] get_tags
|
||||
- [ ] get_trigger
|
||||
|
@ -92,7 +92,7 @@ glue
|
||||
- [ ] delete_security_configuration
|
||||
- [ ] delete_session
|
||||
- [X] delete_table
|
||||
- [ ] delete_table_version
|
||||
- [X] delete_table_version
|
||||
- [ ] delete_trigger
|
||||
- [ ] delete_user_defined_function
|
||||
- [ ] delete_workflow
|
||||
@ -158,8 +158,8 @@ glue
|
||||
- [ ] get_session
|
||||
- [ ] get_statement
|
||||
- [X] get_table
|
||||
- [ ] get_table_version
|
||||
- [ ] get_table_versions
|
||||
- [X] get_table_version
|
||||
- [X] get_table_versions
|
||||
- [X] get_tables
|
||||
- [X] get_tags
|
||||
- [ ] get_trigger
|
||||
|
@ -1,8 +1,9 @@
|
||||
import json
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
import re
|
||||
from typing import Dict, List
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from moto.core import BaseBackend, BackendDict, BaseModel
|
||||
from moto.moto_api import state_manager
|
||||
@ -169,6 +170,32 @@ class GlueBackend(BaseBackend):
|
||||
table = self.get_table(database_name, table_name)
|
||||
table.update(table_input)
|
||||
|
||||
def get_table_version(
|
||||
self, database_name: str, table_name: str, ver_id: str
|
||||
) -> str:
|
||||
table = self.get_table(database_name, table_name)
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"TableVersion": {
|
||||
"Table": table.as_dict(version=ver_id),
|
||||
"VersionId": ver_id,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
def get_table_versions(
|
||||
self, database_name: str, table_name: str
|
||||
) -> Dict[str, Dict[str, Any]]:
|
||||
table = self.get_table(database_name, table_name)
|
||||
return {version: table.as_dict(version) for version in table.versions.keys()}
|
||||
|
||||
def delete_table_version(
|
||||
self, database_name: str, table_name: str, version_id: str
|
||||
) -> None:
|
||||
table = self.get_table(database_name, table_name)
|
||||
table.delete_version(version_id)
|
||||
|
||||
def get_partitions(self, database_name, table_name, expression):
|
||||
"""
|
||||
See https://docs.aws.amazon.com/glue/latest/webapi/API_GetPartitions.html
|
||||
@ -779,33 +806,38 @@ class FakeTable(BaseModel):
|
||||
self.partitions = OrderedDict()
|
||||
self.created_time = datetime.utcnow()
|
||||
self.updated_time = None
|
||||
self.versions = [table_input]
|
||||
self._current_version = 1
|
||||
self.versions: Dict[str, Dict[str, Any]] = {
|
||||
str(self._current_version): table_input
|
||||
}
|
||||
|
||||
def update(self, table_input):
|
||||
self.versions.append(table_input)
|
||||
self.versions[str(self._current_version + 1)] = table_input
|
||||
self._current_version += 1
|
||||
self.updated_time = datetime.utcnow()
|
||||
|
||||
def get_version(self, ver):
|
||||
try:
|
||||
if not isinstance(ver, int):
|
||||
# "1" goes to [0]
|
||||
ver = int(ver) - 1
|
||||
int(ver)
|
||||
except ValueError as e:
|
||||
raise JsonRESTError("InvalidInputException", str(e))
|
||||
|
||||
try:
|
||||
return self.versions[ver]
|
||||
except IndexError:
|
||||
except KeyError:
|
||||
raise VersionNotFoundException()
|
||||
|
||||
def as_dict(self, version=-1):
|
||||
def delete_version(self, version_id):
|
||||
self.versions.pop(version_id)
|
||||
|
||||
def as_dict(self, version=1):
|
||||
obj = {
|
||||
"DatabaseName": self.database_name,
|
||||
"Name": self.name,
|
||||
"CreateTime": unix_time(self.created_time),
|
||||
**self.get_version(version),
|
||||
**self.get_version(str(version)),
|
||||
# Add VersionId after we get the version-details, just to make sure that it's a valid version (int)
|
||||
"VersionId": str(int(version) + 1),
|
||||
"VersionId": str(version),
|
||||
}
|
||||
if self.updated_time is not None:
|
||||
obj["UpdateTime"] = unix_time(self.updated_time)
|
||||
|
@ -72,13 +72,12 @@ class GlueResponse(BaseResponse):
|
||||
def get_table_versions(self):
|
||||
database_name = self.parameters.get("DatabaseName")
|
||||
table_name = self.parameters.get("TableName")
|
||||
table = self.glue_backend.get_table(database_name, table_name)
|
||||
|
||||
versions = self.glue_backend.get_table_versions(database_name, table_name)
|
||||
return json.dumps(
|
||||
{
|
||||
"TableVersions": [
|
||||
{"Table": table.as_dict(version=n), "VersionId": str(n + 1)}
|
||||
for n in range(len(table.versions))
|
||||
{"Table": data, "VersionId": version}
|
||||
for version, data in versions.items()
|
||||
]
|
||||
}
|
||||
)
|
||||
@ -86,17 +85,15 @@ class GlueResponse(BaseResponse):
|
||||
def get_table_version(self):
|
||||
database_name = self.parameters.get("DatabaseName")
|
||||
table_name = self.parameters.get("TableName")
|
||||
table = self.glue_backend.get_table(database_name, table_name)
|
||||
ver_id = self.parameters.get("VersionId")
|
||||
return self.glue_backend.get_table_version(database_name, table_name, ver_id)
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"TableVersion": {
|
||||
"Table": table.as_dict(version=ver_id),
|
||||
"VersionId": ver_id,
|
||||
}
|
||||
}
|
||||
)
|
||||
def delete_table_version(self) -> str:
|
||||
database_name = self.parameters.get("DatabaseName")
|
||||
table_name = self.parameters.get("TableName")
|
||||
version_id = self.parameters.get("VersionId")
|
||||
self.glue_backend.delete_table_version(database_name, table_name, version_id)
|
||||
return "{}"
|
||||
|
||||
def get_tables(self):
|
||||
database_name = self.parameters.get("DatabaseName")
|
||||
|
@ -353,7 +353,8 @@ class PartitionFilter:
|
||||
return True
|
||||
|
||||
warnings.warn("Expression filtering is experimental")
|
||||
versions = list(self.fake_table.versions.values())
|
||||
return expression.eval(
|
||||
part_keys=self.fake_table.versions[-1].get("PartitionKeys", []),
|
||||
part_keys=versions[-1].get("PartitionKeys", []),
|
||||
part_input=fake_partition.partition_input,
|
||||
)
|
||||
|
@ -384,6 +384,42 @@ def test_get_table_version_invalid_input():
|
||||
exc.value.response["Error"]["Code"].should.equal("InvalidInputException")
|
||||
|
||||
|
||||
@mock_glue
|
||||
def test_delete_table_version():
|
||||
client = boto3.client("glue", region_name="us-east-1")
|
||||
database_name = "myspecialdatabase"
|
||||
helpers.create_database(client, database_name)
|
||||
|
||||
table_name = "myfirsttable"
|
||||
version_inputs = {}
|
||||
|
||||
table_input = helpers.create_table_input(database_name, table_name)
|
||||
helpers.create_table(client, database_name, table_name, table_input)
|
||||
version_inputs["1"] = table_input
|
||||
|
||||
columns = [{"Name": "country", "Type": "string"}]
|
||||
table_input = helpers.create_table_input(database_name, table_name, columns=columns)
|
||||
helpers.update_table(client, database_name, table_name, table_input)
|
||||
version_inputs["2"] = table_input
|
||||
|
||||
# Updateing with an identical input should still create a new version
|
||||
helpers.update_table(client, database_name, table_name, table_input)
|
||||
version_inputs["3"] = table_input
|
||||
|
||||
response = helpers.get_table_versions(client, database_name, table_name)
|
||||
vers = response["TableVersions"]
|
||||
vers.should.have.length_of(3)
|
||||
|
||||
client.delete_table_version(
|
||||
DatabaseName=database_name, TableName=table_name, VersionId="2"
|
||||
)
|
||||
|
||||
response = helpers.get_table_versions(client, database_name, table_name)
|
||||
vers = response["TableVersions"]
|
||||
vers.should.have.length_of(2)
|
||||
[v["VersionId"] for v in vers].should.equal(["1", "3"])
|
||||
|
||||
|
||||
@mock_glue
|
||||
def test_get_table_not_exits():
|
||||
client = boto3.client("glue", region_name="us-east-1")
|
||||
|
Loading…
Reference in New Issue
Block a user