from __future__ import unicode_literals import copy from .fixtures.datacatalog import TABLE_INPUT, PARTITION_INPUT def create_database(client, database_name): return client.create_database(DatabaseInput={"Name": database_name}) def get_database(client, database_name): return client.get_database(Name=database_name) def create_table_input(database_name, table_name, columns=[], partition_keys=[]): table_input = copy.deepcopy(TABLE_INPUT) table_input["Name"] = table_name table_input["PartitionKeys"] = partition_keys table_input["StorageDescriptor"]["Columns"] = columns table_input["StorageDescriptor"][ "Location" ] = "s3://my-bucket/{database_name}/{table_name}".format( database_name=database_name, table_name=table_name ) return table_input def create_table(client, database_name, table_name, table_input=None, **kwargs): if table_input is None: table_input = create_table_input(database_name, table_name, **kwargs) return client.create_table(DatabaseName=database_name, TableInput=table_input) def update_table(client, database_name, table_name, table_input=None, **kwargs): if table_input is None: table_input = create_table_input(database_name, table_name, **kwargs) return client.update_table(DatabaseName=database_name, TableInput=table_input) def get_table(client, database_name, table_name): return client.get_table(DatabaseName=database_name, Name=table_name) def get_tables(client, database_name): return client.get_tables(DatabaseName=database_name) def get_table_versions(client, database_name, table_name): return client.get_table_versions(DatabaseName=database_name, TableName=table_name) def get_table_version(client, database_name, table_name, version_id): return client.get_table_version( DatabaseName=database_name, TableName=table_name, VersionId=version_id ) def create_partition_input(database_name, table_name, values=[], columns=[]): root_path = "s3://my-bucket/{database_name}/{table_name}".format( database_name=database_name, table_name=table_name ) part_input = copy.deepcopy(PARTITION_INPUT) part_input["Values"] = values part_input["StorageDescriptor"]["Columns"] = columns part_input["StorageDescriptor"]["SerdeInfo"]["Parameters"]["path"] = root_path return part_input def create_partition(client, database_name, table_name, partiton_input=None, **kwargs): if partiton_input is None: partiton_input = create_partition_input(database_name, table_name, **kwargs) return client.create_partition( DatabaseName=database_name, TableName=table_name, PartitionInput=partiton_input ) def update_partition( client, database_name, table_name, old_values=[], partiton_input=None, **kwargs ): if partiton_input is None: partiton_input = create_partition_input(database_name, table_name, **kwargs) return client.update_partition( DatabaseName=database_name, TableName=table_name, PartitionInput=partiton_input, PartitionValueList=old_values, ) def get_partition(client, database_name, table_name, values): return client.get_partition( DatabaseName=database_name, TableName=table_name, PartitionValues=values )