LakeFormation: Add Parameters for list_permissions() (#7026)

This commit is contained in:
JohannesKoenigTMH 2023-11-16 11:13:30 +01:00 committed by GitHub
parent 1f841254b8
commit 63e869d717
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 493 additions and 7 deletions

View File

@ -5,3 +5,8 @@ from moto.core.exceptions import JsonRESTError
class EntityNotFound(JsonRESTError):
def __init__(self) -> None:
super().__init__("EntityNotFoundException", "Entity not found")
class InvalidInput(JsonRESTError):
def __init__(self, message: str) -> None:
super().__init__("InvalidInputException", message)

View File

@ -1,9 +1,18 @@
from __future__ import annotations
from enum import Enum
from collections import defaultdict
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Tuple, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.utilities.tagging_service import TaggingService
from .exceptions import EntityNotFound
from .exceptions import EntityNotFound, InvalidInput
class RessourceType(Enum):
catalog = "CATALOG"
database = "DATABASE"
table = "TABLE"
data_location = "DATA_LOCATION"
class Resource(BaseModel):
@ -18,6 +27,117 @@ class Resource(BaseModel):
}
class ListPermissionsResourceDatabase:
def __init__(self, catalog_id: Optional[str], name: str):
self.name = name
self.catalog_id = catalog_id
class ListPermissionsResourceTable:
def __init__(
self,
catalog_id: Optional[str],
database_name: str,
name: Optional[str],
table_wildcard: Optional[
Dict[str, str]
], # Placeholder type, table_wildcard is an empty dict in docs
):
if name is None and table_wildcard is None:
raise InvalidInput("Table name and table wildcard cannot both be empty.")
if name is not None and table_wildcard is not None:
raise InvalidInput("Table name and table wildcard cannot both be present.")
self.database_name = database_name
self.name = name
self.catalog_id = catalog_id
self.table_wildcard = table_wildcard
class ExcludedColumnNames:
def __init__(self, excluded_column_names: List[str]):
self.excluded_column_names = excluded_column_names
class ListPermissionsResourceTableWithColumns:
def __init__(
self,
catalog_id: Optional[str],
database_name: str,
name: str,
column_names: list[str],
column_wildcard: ExcludedColumnNames,
):
self.database_name = database_name
self.name = name
self.catalog_id = catalog_id
self.column_names = column_names
self.column_wildcard = column_wildcard
class ListPermissionsResourceDataLocation:
def __init__(self, catalog_id: Optional[str], resource_arn: str):
self.catalog_id = catalog_id
self.resource_arn = resource_arn
class ListPermissionsResourceDataCellsFilter:
def __init__(
self, table_catalog_id: str, database_name: str, table_name: str, name: str
):
self.table_catalog_id = table_catalog_id
self.database_name = database_name
self.table_name = table_name
self.name = name
class ListPermissionsResourceLFTag:
def __init__(self, catalog_id: str, tag_key: str, tag_values: List[str]):
self.catalog_id = catalog_id
self.tag_key = tag_key
self.tag_values = tag_values
class LFTag:
def __init__(self, tag_key: str, tag_values: List[str]):
self.tag_key = tag_key
self.tag_values = tag_values
class ListPermissionsResourceLFTagPolicy:
def __init__(self, catalog_id: str, resource_type: str, expression: List[LFTag]):
self.catalog_id = catalog_id
self.resource_type = resource_type
self.expression = expression
class ListPermissionsResource:
def __init__(
self,
catalog: Optional[
Dict[str, str]
], # Placeholder type, catalog is an empty dict in docs
database: Optional[ListPermissionsResourceDatabase],
table: Optional[ListPermissionsResourceTable],
table_with_columns: Optional[ListPermissionsResourceTableWithColumns],
data_location: Optional[ListPermissionsResourceDataLocation],
data_cells_filter: Optional[ListPermissionsResourceDataCellsFilter],
lf_tag: Optional[ListPermissionsResourceLFTag],
lf_tag_policy: Optional[ListPermissionsResourceLFTagPolicy],
):
if catalog is None and database is None and table is None:
raise InvalidInput(
"Resource must have either the catalog, table or database field populated."
)
self.catalog = catalog
self.database = database
self.table = table
self.table_with_columns = table_with_columns
self.data_location = data_location
self.data_cells_filter = data_cells_filter
self.lf_tag = lf_tag
self.lf_tag_policy = lf_tag_policy
def default_settings() -> Dict[str, Any]:
return {
"DataLakeAdmins": [],
@ -114,11 +234,92 @@ class LakeFormationBackend(BaseBackend):
grant for grant in self.grants[catalog_id] if grant["Permissions"] != []
]
def list_permissions(self, catalog_id: str) -> List[Dict[str, Any]]:
def list_permissions(
self,
catalog_id: str,
principal: Optional[Dict[str, str]] = None,
resource: Optional[ListPermissionsResource] = None,
resource_type: Optional[RessourceType] = None,
) -> List[Dict[str, Any]]:
"""
No parameters have been implemented yet
No pagination has been implemented yet.
"""
return self.grants[catalog_id]
permissions = self.grants[catalog_id]
def filter_for_principal(permission: Dict[str, Any]) -> bool:
return permission["Principal"] == principal
if principal is not None:
permissions = list(filter(filter_for_principal, permissions))
def filter_for_resource_type(permission: Dict[str, Any]) -> bool:
if resource_type is None: # Check for mypy
return False
resource = permission["Resource"]
if resource_type == RessourceType.catalog:
return "Catalog" in resource
elif resource_type == RessourceType.database:
return "Database" in resource
elif resource_type == RessourceType.data_location:
return "DataLocation" in resource
elif resource_type == RessourceType.table:
return "Table" in resource or "TableWithColumns" in resource
return False
if resource_type is not None:
permissions = list(filter(filter_for_resource_type, permissions))
def filter_for_resource(permission: Dict[str, Any]) -> bool:
"""
If catalog is provided:
only matching permissions with resource-type "Catalog" are returned;
if catalog is not provided and database is provided:
only matching permissions with resource-type "Database" are returned;
if catalog and database are not provided and table is provided:
only matching permissions with resource-type "Table" are returned;
"""
if resource is None: # Check for linter
return False
permission_resource = permission["Resource"]
catalog = resource.catalog
if catalog is not None and "Catalog" in permission_resource:
return catalog == permission_resource["Catalog"]
database = resource.database
if database is not None and "Database" in permission_resource:
equals = database.name == permission_resource["Database"]["Name"]
if database.catalog_id is not None:
equals = equals and (
database.catalog_id
== permission_resource["Database"]["CatalogId"]
)
return equals
table = resource.table
if table is not None and "Table" in permission_resource:
equals = (
table.database_name == permission_resource["Table"]["DatabaseName"]
)
if table.catalog_id is not None:
equals = equals and (
table.catalog_id == permission_resource["Table"]["CatalogId"]
)
if table.name is not None and table.table_wildcard is None:
equals = equals and (
table.name == permission_resource["Table"]["Name"]
)
if table.name is None and table.table_wildcard is not None:
equals = equals and (
table.table_wildcard
== permission_resource["Table"]["TableWildcard"]
)
return equals
return False
if resource is not None:
permissions = list(filter(filter_for_resource, permissions))
return permissions
def create_lf_tag(self, catalog_id: str, key: str, values: List[str]) -> None:
# There is no ARN that we can use, so just create another unique identifier that's easy to recognize and reproduce

View File

@ -3,7 +3,14 @@ import json
from typing import Any, Dict
from moto.core.responses import BaseResponse
from .models import lakeformation_backends, LakeFormationBackend
from .models import (
lakeformation_backends,
LakeFormationBackend,
ListPermissionsResource,
ListPermissionsResourceDatabase,
ListPermissionsResourceTable,
RessourceType,
)
class LakeFormationResponse(BaseResponse):
@ -87,7 +94,55 @@ class LakeFormationResponse(BaseResponse):
def list_permissions(self) -> str:
catalog_id = self._get_param("CatalogId") or self.current_account
permissions = self.lakeformation_backend.list_permissions(catalog_id)
principal = self._get_param("Principal")
resource = self._get_param("Resource")
resource_type_param = self._get_param("ResourceType")
if resource_type_param is None:
resource_type = None
else:
resource_type = RessourceType(resource_type_param)
if resource is None:
list_permission_resource = None
else:
database_sub_dictionary = resource.get("Database")
table_sub_dictionary = resource.get("Table")
catalog_sub_dictionary = resource.get("Catalog")
if database_sub_dictionary is None:
database = None
else:
database = ListPermissionsResourceDatabase(
name=database_sub_dictionary.get("Name"),
catalog_id=database_sub_dictionary.get("CatalogId"),
)
if table_sub_dictionary is None:
table = None
else:
table = ListPermissionsResourceTable(
database_name=table_sub_dictionary.get("DatabaseName"),
name=table_sub_dictionary.get("Name"),
catalog_id=table_sub_dictionary.get("CatalogId"),
table_wildcard=table_sub_dictionary.get("TableWildcard"),
)
list_permission_resource = ListPermissionsResource(
catalog=catalog_sub_dictionary,
database=database,
table=table,
table_with_columns=None,
data_location=None,
data_cells_filter=None,
lf_tag=None,
lf_tag_policy=None,
)
permissions = self.lakeformation_backend.list_permissions(
catalog_id=catalog_id,
principal=principal,
resource=list_permission_resource,
resource_type=resource_type,
)
return json.dumps({"PrincipalResourcePermissions": permissions})
def create_lf_tag(self) -> str:

View File

@ -1,4 +1,5 @@
"""Unit tests for lakeformation-supported APIs."""
from typing import Dict
import boto3
import pytest
@ -117,6 +118,230 @@ def test_list_permissions():
]
def grant_table_permissions(
client: boto3.client, catalog_id: str, principal: str, db: str, table: str
):
client.grant_permissions(
CatalogId=catalog_id,
Principal={"DataLakePrincipalIdentifier": principal},
Resource={
"Table": {
"DatabaseName": db,
"Name": table,
"CatalogId": catalog_id,
"TableWildcard": {},
}
},
Permissions=["ALL"],
PermissionsWithGrantOption=["SELECT"],
)
def grant_db_permissions(
client: boto3.client, catalog_id: str, principal: str, db: str
):
client.grant_permissions(
CatalogId=catalog_id,
Principal={"DataLakePrincipalIdentifier": principal},
Resource={"Database": {"Name": db, "CatalogId": catalog_id}},
Permissions=["ALL"],
PermissionsWithGrantOption=["SELECT"],
)
def grant_catalog_permissions(client: boto3.client, catalog_id: str, principal: str):
client.grant_permissions(
CatalogId=catalog_id,
Principal={"DataLakePrincipalIdentifier": principal},
Resource={"Catalog": {}},
Permissions=["CREATE_DATABASE"],
PermissionsWithGrantOption=[],
)
def db_response(principal: str, catalog_id: str, db: str) -> Dict:
return {
"Principal": {"DataLakePrincipalIdentifier": principal},
"Resource": {
"Database": {
"CatalogId": catalog_id,
"Name": db,
}
},
"Permissions": ["ALL"],
"PermissionsWithGrantOption": ["SELECT"],
}
def table_response(principal: str, catalog_id: str, db: str, table: str) -> Dict:
return {
"Principal": {"DataLakePrincipalIdentifier": principal},
"Resource": {
"Table": {
"CatalogId": catalog_id,
"DatabaseName": db,
"Name": table,
"TableWildcard": {},
}
},
"Permissions": ["ALL"],
"PermissionsWithGrantOption": ["SELECT"],
}
def catalog_response(principal: str) -> Dict:
return {
"Principal": {"DataLakePrincipalIdentifier": principal},
"Resource": {"Catalog": {}},
"Permissions": ["CREATE_DATABASE"],
"PermissionsWithGrantOption": [],
}
@mock_lakeformation
def test_list_permissions_filtered_for_catalog_id():
client = boto3.client("lakeformation", region_name="eu-west-2")
catalog_id_1 = "000000000000"
catalog_id_2 = "000000000001"
principal_1 = "principal_1"
principal_2 = "principal_2"
grant_catalog_permissions(
client=client, catalog_id=catalog_id_1, principal=principal_1
)
grant_catalog_permissions(
client=client, catalog_id=catalog_id_2, principal=principal_2
)
permissions = client.list_permissions(CatalogId=catalog_id_1)[
"PrincipalResourcePermissions"
]
assert permissions == [catalog_response(principal=principal_1)]
permissions = client.list_permissions(CatalogId=catalog_id_2)[
"PrincipalResourcePermissions"
]
assert permissions == [catalog_response(principal=principal_2)]
@mock_lakeformation
def test_list_permissions_filtered_for_resource_type():
client = boto3.client("lakeformation", region_name="eu-west-2")
catalog_id = "000000000000"
principal_1 = "principal_1"
db_1 = "db_1"
table_1 = "table_1"
grant_db_permissions(
client=client, catalog_id=catalog_id, principal=principal_1, db=db_1
)
grant_table_permissions(
client=client,
catalog_id=catalog_id,
principal=principal_1,
db=db_1,
table=table_1,
)
grant_catalog_permissions(
client=client, catalog_id=catalog_id, principal=principal_1
)
res = client.list_permissions(CatalogId=catalog_id, ResourceType="DATABASE")
assert res["PrincipalResourcePermissions"] == [
db_response(principal=principal_1, catalog_id=catalog_id, db=db_1),
]
res = client.list_permissions(CatalogId=catalog_id, ResourceType="TABLE")
assert res["PrincipalResourcePermissions"] == [
table_response(
principal=principal_1, catalog_id=catalog_id, db=db_1, table=table_1
),
]
res = client.list_permissions(CatalogId=catalog_id, ResourceType="CATALOG")
assert res["PrincipalResourcePermissions"] == [
catalog_response(principal=principal_1),
]
@mock_lakeformation
def test_list_permissions_filtered_for_resource_db():
client = boto3.client("lakeformation", region_name="eu-west-2")
catalog_id = "000000000000"
principal_1 = "principal_1"
db_1 = "db_1"
db_2 = "db_2"
grant_db_permissions(
client=client, catalog_id=catalog_id, principal=principal_1, db=db_1
)
grant_db_permissions(
client=client, catalog_id=catalog_id, principal=principal_1, db=db_2
)
res = client.list_permissions(
CatalogId=catalog_id, Resource={"Database": {"Name": db_1}}
)
assert res["PrincipalResourcePermissions"] == [
db_response(principal=principal_1, catalog_id=catalog_id, db=db_1),
]
res = client.list_permissions(
CatalogId=catalog_id, Resource={"Database": {"Name": db_2}}
)
assert res["PrincipalResourcePermissions"] == [
db_response(principal=principal_1, catalog_id=catalog_id, db=db_2),
]
@mock_lakeformation
def test_list_permissions_filtered_for_resource_table():
client = boto3.client("lakeformation", region_name="eu-west-2")
catalog_id = "000000000000"
principal_1 = "principal_1"
db_1 = "db_1"
table_1 = "table_1"
table_2 = "table_2"
grant_table_permissions(
client=client,
catalog_id=catalog_id,
principal=principal_1,
db=db_1,
table=table_1,
)
grant_table_permissions(
client=client,
catalog_id=catalog_id,
principal=principal_1,
db=db_1,
table=table_2,
)
res = client.list_permissions(
CatalogId=catalog_id,
Resource={"Table": {"DatabaseName": db_1, "Name": table_1}},
)
assert res["PrincipalResourcePermissions"] == [
table_response(
principal=principal_1, catalog_id=catalog_id, db=db_1, table=table_1
),
]
res = client.list_permissions(
CatalogId=catalog_id,
Resource={"Table": {"DatabaseName": db_1, "Name": table_2}},
)
assert res["PrincipalResourcePermissions"] == [
table_response(
principal=principal_1, catalog_id=catalog_id, db=db_1, table=table_2
),
]
@mock_lakeformation
def test_revoke_permissions():
client = boto3.client("lakeformation", region_name="eu-west-2")