"""Handles incoming lakeformation requests, invokes methods, returns responses.""" import json from typing import Any, Dict from moto.core.responses import BaseResponse from .exceptions import InvalidInput from .models import ( LakeFormationBackend, ListPermissionsResource, ListPermissionsResourceDatabase, ListPermissionsResourceDataLocation, ListPermissionsResourceTable, RessourceType, lakeformation_backends, ) class LakeFormationResponse(BaseResponse): """Handler for LakeFormation requests and responses.""" def __init__(self) -> None: super().__init__(service_name="lakeformation") @property def lakeformation_backend(self) -> LakeFormationBackend: """Return backend instance specific for this region.""" return lakeformation_backends[self.current_account][self.region] def describe_resource(self) -> str: resource_arn = self._get_param("ResourceArn") resource = self.lakeformation_backend.describe_resource( resource_arn=resource_arn ) return json.dumps({"ResourceInfo": resource.to_dict()}) def deregister_resource(self) -> str: resource_arn = self._get_param("ResourceArn") self.lakeformation_backend.deregister_resource(resource_arn=resource_arn) return "{}" def register_resource(self) -> str: resource_arn = self._get_param("ResourceArn") role_arn = self._get_param("RoleArn") self.lakeformation_backend.register_resource( resource_arn=resource_arn, role_arn=role_arn, ) return "{}" def list_resources(self) -> str: resources = self.lakeformation_backend.list_resources() return json.dumps({"ResourceInfoList": [res.to_dict() for res in resources]}) def get_data_lake_settings(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account settings = self.lakeformation_backend.get_data_lake_settings(catalog_id) return json.dumps({"DataLakeSettings": settings}) def put_data_lake_settings(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account settings = self._get_param("DataLakeSettings") self.lakeformation_backend.put_data_lake_settings(catalog_id, settings) return "{}" def grant_permissions(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account principal = self._get_param("Principal") resource = self._get_param("Resource") permissions = self._get_param("Permissions") permissions_with_grant_options = self._get_param("PermissionsWithGrantOption") self.lakeformation_backend.grant_permissions( catalog_id=catalog_id, principal=principal, resource=resource, permissions=permissions, permissions_with_grant_options=permissions_with_grant_options, ) return "{}" def revoke_permissions(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account principal = self._get_param("Principal") resource = self._get_param("Resource") permissions = self._get_param("Permissions") permissions_with_grant_options = ( self._get_param("PermissionsWithGrantOption") or [] ) self.lakeformation_backend.revoke_permissions( catalog_id=catalog_id, principal=principal, resource=resource, permissions_to_revoke=permissions, permissions_with_grant_options_to_revoke=permissions_with_grant_options, ) return "{}" def list_permissions(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account principal = self._get_param("Principal") resource = self._get_param("Resource") resource_type_param = self._get_param("ResourceType") if principal is not None and resource is None: # Error message is the exact string returned by the AWS-CLI raise InvalidInput( "An error occurred (InvalidInputException) when calling the ListPermissions operation: Resource is mandatory if Principal is set in the input." ) if resource_type_param is None: resource_type = None else: resource_type = RessourceType(resource_type_param) if resource is None: list_permission_resource = None else: database_sub_dictionary = resource.get("Database") table_sub_dictionary = resource.get("Table") catalog_sub_dictionary = resource.get("Catalog") data_location_sub_dictionary = resource.get("DataLocation") if database_sub_dictionary is None: database = None else: database = ListPermissionsResourceDatabase( name=database_sub_dictionary.get("Name"), catalog_id=database_sub_dictionary.get("CatalogId"), ) if table_sub_dictionary is None: table = None else: table = ListPermissionsResourceTable( database_name=table_sub_dictionary.get("DatabaseName"), name=table_sub_dictionary.get("Name"), catalog_id=table_sub_dictionary.get("CatalogId"), table_wildcard=table_sub_dictionary.get("TableWildcard"), ) if data_location_sub_dictionary is None: data_location = None else: data_location = ListPermissionsResourceDataLocation( resource_arn=data_location_sub_dictionary.get("ResourceArn"), catalog_id=data_location_sub_dictionary.get("CatalogId"), ) list_permission_resource = ListPermissionsResource( catalog=catalog_sub_dictionary, database=database, table=table, table_with_columns=None, data_location=data_location, data_cells_filter=None, lf_tag=None, lf_tag_policy=None, ) permissions = self.lakeformation_backend.list_permissions( catalog_id=catalog_id, principal=principal, resource=list_permission_resource, resource_type=resource_type, ) return json.dumps({"PrincipalResourcePermissions": permissions}) def create_lf_tag(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account key = self._get_param("TagKey") values = self._get_param("TagValues") self.lakeformation_backend.create_lf_tag(catalog_id, key, values) return "{}" def get_lf_tag(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account key = self._get_param("TagKey") tag_values = self.lakeformation_backend.get_lf_tag(catalog_id, key) return json.dumps( {"CatalogId": catalog_id, "TagKey": key, "TagValues": tag_values} ) def delete_lf_tag(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account key = self._get_param("TagKey") self.lakeformation_backend.delete_lf_tag(catalog_id, key) return "{}" def list_lf_tags(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account tags = self.lakeformation_backend.list_lf_tags(catalog_id) return json.dumps( { "LFTags": [ {"CatalogId": catalog_id, "TagKey": tag, "TagValues": value} for tag, value in tags.items() ] } ) def update_lf_tag(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account tag_key = self._get_param("TagKey") to_delete = self._get_param("TagValuesToDelete") to_add = self._get_param("TagValuesToAdd") self.lakeformation_backend.update_lf_tag(catalog_id, tag_key, to_delete, to_add) return "{}" def list_data_cells_filter(self) -> str: data_cells = self.lakeformation_backend.list_data_cells_filter() return json.dumps({"DataCellsFilters": data_cells}) def batch_grant_permissions(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account entries = self._get_param("Entries") self.lakeformation_backend.batch_grant_permissions(catalog_id, entries) return json.dumps({"Failures": []}) def batch_revoke_permissions(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account entries = self._get_param("Entries") self.lakeformation_backend.batch_revoke_permissions(catalog_id, entries) return json.dumps({"Failures": []}) def add_lf_tags_to_resource(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account resource = self._get_param("Resource") tags = self._get_param("LFTags") failures = self.lakeformation_backend.add_lf_tags_to_resource( catalog_id, resource, tags ) return json.dumps({"Failures": failures}) def get_resource_lf_tags(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account resource = self._get_param("Resource") db, table, columns = self.lakeformation_backend.get_resource_lf_tags( catalog_id, resource ) resp: Dict[str, Any] = {} if db: resp["LFTagOnDatabase"] = db if table: resp["LFTagsOnTable"] = table if columns: resp["LFTagsOnColumns"] = columns return json.dumps(resp) def remove_lf_tags_from_resource(self) -> str: catalog_id = self._get_param("CatalogId") or self.current_account resource = self._get_param("Resource") tags = self._get_param("LFTags") self.lakeformation_backend.remove_lf_tags_from_resource( catalog_id, resource, tags ) return "{}"