TimestreamWrite - support MagneticStoreWriteProps-argument (#5088)

This commit is contained in:
Bert Blommers 2022-05-02 20:15:19 +00:00 committed by GitHub
parent cb4cbd1f5b
commit 578de3d47f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 128 additions and 13 deletions

View File

@ -5,7 +5,14 @@ from .exceptions import ResourceNotFound
class TimestreamTable(BaseModel): class TimestreamTable(BaseModel):
def __init__(self, region_name, table_name, db_name, retention_properties): def __init__(
self,
region_name,
table_name,
db_name,
retention_properties,
magnetic_store_write_properties,
):
self.region_name = region_name self.region_name = region_name
self.name = table_name self.name = table_name
self.db_name = db_name self.db_name = db_name
@ -13,10 +20,13 @@ class TimestreamTable(BaseModel):
"MemoryStoreRetentionPeriodInHours": 123, "MemoryStoreRetentionPeriodInHours": 123,
"MagneticStoreRetentionPeriodInDays": 123, "MagneticStoreRetentionPeriodInDays": 123,
} }
self.magnetic_store_write_properties = magnetic_store_write_properties or {}
self.records = [] self.records = []
def update(self, retention_properties): def update(self, retention_properties, magnetic_store_write_properties):
self.retention_properties = retention_properties self.retention_properties = retention_properties
if magnetic_store_write_properties is not None:
self.magnetic_store_write_properties = magnetic_store_write_properties
def write_records(self, records): def write_records(self, records):
self.records.extend(records) self.records.extend(records)
@ -32,6 +42,7 @@ class TimestreamTable(BaseModel):
"DatabaseName": self.db_name, "DatabaseName": self.db_name,
"TableStatus": "ACTIVE", "TableStatus": "ACTIVE",
"RetentionProperties": self.retention_properties, "RetentionProperties": self.retention_properties,
"MagneticStoreWriteProperties": self.magnetic_store_write_properties,
} }
@ -47,23 +58,31 @@ class TimestreamDatabase(BaseModel):
def update(self, kms_key_id): def update(self, kms_key_id):
self.kms_key_id = kms_key_id self.kms_key_id = kms_key_id
def create_table(self, table_name, retention_properties): def create_table(
self, table_name, retention_properties, magnetic_store_write_properties
):
table = TimestreamTable( table = TimestreamTable(
region_name=self.region_name, region_name=self.region_name,
table_name=table_name, table_name=table_name,
db_name=self.name, db_name=self.name,
retention_properties=retention_properties, retention_properties=retention_properties,
magnetic_store_write_properties=magnetic_store_write_properties,
) )
self.tables[table_name] = table self.tables[table_name] = table
return table return table
def update_table(self, table_name, retention_properties): def update_table(
self, table_name, retention_properties, magnetic_store_write_properties
):
table = self.tables[table_name] table = self.tables[table_name]
table.update(retention_properties=retention_properties) table.update(
retention_properties=retention_properties,
magnetic_store_write_properties=magnetic_store_write_properties,
)
return table return table
def delete_table(self, table_name): def delete_table(self, table_name):
del self.tables[table_name] self.tables.pop(table_name, None)
def describe_table(self, table_name): def describe_table(self, table_name):
if table_name not in self.tables: if table_name not in self.tables:
@ -116,9 +135,18 @@ class TimestreamWriteBackend(BaseBackend):
database.update(kms_key_id=kms_key_id) database.update(kms_key_id=kms_key_id)
return database return database
def create_table(self, database_name, table_name, retention_properties, tags): def create_table(
self,
database_name,
table_name,
retention_properties,
tags,
magnetic_store_write_properties,
):
database = self.describe_database(database_name) database = self.describe_database(database_name)
table = database.create_table(table_name, retention_properties) table = database.create_table(
table_name, retention_properties, magnetic_store_write_properties
)
self.tagging_service.tag_resource(table.arn, tags) self.tagging_service.tag_resource(table.arn, tags)
return table return table
@ -136,9 +164,17 @@ class TimestreamWriteBackend(BaseBackend):
tables = database.list_tables() tables = database.list_tables()
return tables return tables
def update_table(self, database_name, table_name, retention_properties): def update_table(
self,
database_name,
table_name,
retention_properties,
magnetic_store_write_properties,
):
database = self.describe_database(database_name) database = self.describe_database(database_name)
table = database.update_table(table_name, retention_properties) table = database.update_table(
table_name, retention_properties, magnetic_store_write_properties
)
return table return table
def write_records(self, database_name, table_name, records): def write_records(self, database_name, table_name, records):

View File

@ -51,8 +51,15 @@ class TimestreamWriteResponse(BaseResponse):
table_name = self._get_param("TableName") table_name = self._get_param("TableName")
retention_properties = self._get_param("RetentionProperties") retention_properties = self._get_param("RetentionProperties")
tags = self._get_param("Tags") tags = self._get_param("Tags")
magnetic_store_write_properties = self._get_param(
"MagneticStoreWriteProperties"
)
table = self.timestreamwrite_backend.create_table( table = self.timestreamwrite_backend.create_table(
database_name, table_name, retention_properties, tags database_name,
table_name,
retention_properties,
tags,
magnetic_store_write_properties,
) )
return json.dumps(dict(Table=table.description())) return json.dumps(dict(Table=table.description()))
@ -77,8 +84,14 @@ class TimestreamWriteResponse(BaseResponse):
database_name = self._get_param("DatabaseName") database_name = self._get_param("DatabaseName")
table_name = self._get_param("TableName") table_name = self._get_param("TableName")
retention_properties = self._get_param("RetentionProperties") retention_properties = self._get_param("RetentionProperties")
magnetic_store_write_properties = self._get_param(
"MagneticStoreWriteProperties"
)
table = self.timestreamwrite_backend.update_table( table = self.timestreamwrite_backend.update_table(
database_name, table_name, retention_properties database_name,
table_name,
retention_properties,
magnetic_store_write_properties,
) )
return json.dumps(dict(Table=table.description())) return json.dumps(dict(Table=table.description()))

View File

@ -59,7 +59,6 @@ TestAccSQSQueuePolicy
TestAccSSMDocument TestAccSSMDocument
TestAccSsmDocumentDataSource TestAccSsmDocumentDataSource
TestAccSsmParameterDataSource TestAccSsmParameterDataSource
TestAccTimestreamWriteTable
TestAccDataSourceLambdaLayerVersion TestAccDataSourceLambdaLayerVersion
TestAccDataSourceLambdaInvocation TestAccDataSourceLambdaInvocation
TestAccDataSourceNetworkInterface_ TestAccDataSourceNetworkInterface_

View File

@ -128,3 +128,4 @@ sqs:
- TestAccSQSQueue_FIFOQueue_ - TestAccSQSQueue_FIFOQueue_
timestreamwrite: timestreamwrite:
- TestAccTimestreamWriteDatabase - TestAccTimestreamWriteDatabase
- TestAccTimestreamWriteTable

View File

@ -36,6 +36,38 @@ def test_create_table():
) )
@mock_timestreamwrite
def test_create_table__with_magnetic_store_write_properties():
ts = boto3.client("timestream-write", region_name="us-east-1")
ts.create_database(DatabaseName="mydatabase")
resp = ts.create_table(
DatabaseName="mydatabase",
TableName="mytable",
MagneticStoreWriteProperties={
"EnableMagneticStoreWrites": True,
"MagneticStoreRejectedDataLocation": {
"S3Configuration": {"BucketName": "hithere"}
},
},
)
table = resp["Table"]
table.should.have.key("Arn").equal(
f"arn:aws:timestream:us-east-1:{ACCOUNT_ID}:database/mydatabase/table/mytable"
)
table.should.have.key("TableName").equal("mytable")
table.should.have.key("DatabaseName").equal("mydatabase")
table.should.have.key("TableStatus").equal("ACTIVE")
table.should.have.key("MagneticStoreWriteProperties").should.equal(
{
"EnableMagneticStoreWrites": True,
"MagneticStoreRejectedDataLocation": {
"S3Configuration": {"BucketName": "hithere"}
},
}
)
@mock_timestreamwrite @mock_timestreamwrite
def test_create_table_without_retention_properties(): def test_create_table_without_retention_properties():
ts = boto3.client("timestream-write", region_name="us-east-1") ts = boto3.client("timestream-write", region_name="us-east-1")
@ -188,6 +220,40 @@ def test_update_table():
) )
@mock_timestreamwrite
def test_update_table__with_magnetic_store_write_properties():
ts = boto3.client("timestream-write", region_name="us-east-1")
ts.create_database(DatabaseName="mydatabase")
ts.create_table(DatabaseName="mydatabase", TableName="mytable")
resp = ts.update_table(
DatabaseName="mydatabase",
TableName="mytable",
MagneticStoreWriteProperties={
"EnableMagneticStoreWrites": True,
"MagneticStoreRejectedDataLocation": {
"S3Configuration": {"BucketName": "hithere"}
},
},
)
table = resp["Table"]
table.should.have.key("Arn").equal(
f"arn:aws:timestream:us-east-1:{ACCOUNT_ID}:database/mydatabase/table/mytable"
)
table.should.have.key("TableName").equal("mytable")
table.should.have.key("DatabaseName").equal("mydatabase")
table.should.have.key("TableStatus").equal("ACTIVE")
table.should.have.key("MagneticStoreWriteProperties").should.equal(
{
"EnableMagneticStoreWrites": True,
"MagneticStoreRejectedDataLocation": {
"S3Configuration": {"BucketName": "hithere"}
},
}
)
@mock_timestreamwrite @mock_timestreamwrite
def test_write_records(): def test_write_records():
# The query-feature is not available at the moment, # The query-feature is not available at the moment,