Feature: Glue Schema Registry APIs: GetSchemaVersion, GetSchemaByDefinition, DeleteSchema, PutSchemaVersionMetadata (#5351)

This commit is contained in:
Himani Patel 2022-08-05 13:59:01 -07:00 committed by GitHub
parent 9629f09af6
commit 5dfe4e00d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1503 additions and 404 deletions

View File

@ -66,13 +66,36 @@ class VersionNotFoundException(EntityNotFoundException):
class SchemaNotFoundException(EntityNotFoundException):
def __init__(self):
def __init__(self, schema_name, registry_name, schema_arn, null="null"):
super().__init__(
"Schema is not found.",
f"Schema is not found. RegistryName: {registry_name if registry_name else null}, SchemaName: {schema_name if schema_name else null}, SchemaArn: {schema_arn if schema_arn else null}",
)
class GSREntityNotFoundException(EntityNotFoundException):
class SchemaVersionNotFoundFromSchemaIdException(EntityNotFoundException):
def __init__(
self,
registry_name,
schema_name,
schema_arn,
version_number,
latest_version,
null="null",
false="false",
):
super().__init__(
f"Schema version is not found. RegistryName: {registry_name if registry_name else null}, SchemaName: {schema_name if schema_name else null}, SchemaArn: {schema_arn if schema_arn else null}, VersionNumber: {version_number if version_number else null}, isLatestVersion: {latest_version if latest_version else false}",
)
class SchemaVersionNotFoundFromSchemaVersionIdException(EntityNotFoundException):
def __init__(self, schema_version_id):
super().__init__(
f"Schema version is not found. SchemaVersionId: {schema_version_id}",
)
class RegistryNotFoundException(EntityNotFoundException):
def __init__(self, resource, param_name, param_value):
super().__init__(
resource + " is not found. " + param_name + ": " + param_value,
@ -95,19 +118,47 @@ class ConcurrentRunsExceededException(GlueClientError):
class ResourceNumberLimitExceededException(GlueClientError):
def __init__(self, resource):
def __init__(self, msg):
super().__init__(
"ResourceNumberLimitExceededException",
msg,
)
class GeneralResourceNumberLimitExceededException(ResourceNumberLimitExceededException):
def __init__(self, resource):
super().__init__(
"More "
+ resource
+ " cannot be created. The maximum limit has been reached.",
)
class SchemaVersionMetadataLimitExceededException(ResourceNumberLimitExceededException):
def __init__(self):
super().__init__(
"Your resource limits for Schema Version Metadata have been exceeded.",
)
class GSRAlreadyExistsException(GlueClientError):
def __init__(self, resource, param_name, param_value):
def __init__(self, msg):
super().__init__(
"AlreadyExistsException",
msg,
)
class SchemaVersionMetadataAlreadyExistsException(GSRAlreadyExistsException):
def __init__(self, schema_version_id, metadata_key, metadata_value):
super().__init__(
f"Resource already exist for schema version id: {schema_version_id}, metadata key: {metadata_key}, metadata value: {metadata_value}",
)
class GeneralGSRAlreadyExistsException(GSRAlreadyExistsException):
def __init__(self, resource, param_name, param_value):
super().__init__(
resource + " already exists. " + param_name + ": " + param_value,
)
@ -197,19 +248,32 @@ class InvalidSchemaIdBothParamsProvidedException(GSRInvalidInputException):
)
class InvalidSchemaIdInsufficientParamsProvidedException(GSRInvalidInputException):
class InvalidSchemaIdNotProvidedException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"At least one of (registryName and schemaName) or schemaArn has to be provided.",
)
class DisabledCompatibilityVersioningException(GSRInvalidInputException):
def __init__(self, schema_name, registry_name):
class InvalidSchemaVersionNumberBothParamsProvidedException(GSRInvalidInputException):
def __init__(self):
super().__init__("Only one of VersionNumber or LatestVersion is required.")
class InvalidSchemaVersionNumberNotProvidedException(GSRInvalidInputException):
def __init__(self):
super().__init__("One of version number (or) latest version is required.")
class InvalidSchemaVersionIdProvidedWithOtherParamsException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"Compatibility DISABLED does not allow versioning. SchemaId: SchemaId(schemaName="
+ schema_name
+ ", registryName="
+ registry_name
+ ")"
"No other input parameters can be specified when fetching by SchemaVersionId."
)
class DisabledCompatibilityVersioningException(GSRInvalidInputException):
def __init__(self, schema_name, registry_name, schema_arn, null="null"):
super().__init__(
f"Compatibility DISABLED does not allow versioning. SchemaId: SchemaId(schemaArn={schema_arn if schema_arn else null}, schemaName={schema_name if schema_name else null}, registryName={registry_name if registry_name else null})"
)

View File

@ -11,9 +11,8 @@ DESCRIPTION_PATTERN = re.compile(
r"[\\u0020-\\uD7FF\\uE000-\\uFFFD\\uD800\\uDC00-\\uDBFF\\uDFFF\\r\\n\\t]*"
)
ARN_PATTERN = re.compile(r"^arn:(aws|aws-us-gov|aws-cn):glue:.*$")
SCHEMA_VERSION_ID_PATTERN = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$"
)
AVAILABLE_STATUS = "AVAILABLE"
DELETING_STATUS = "DELETING"
# registry constants
MAX_REGISTRY_NAME_LENGTH = 255
@ -28,8 +27,20 @@ MAX_SCHEMAS_ALLOWED = 1000
MAX_SCHEMA_DEFINITION_LENGTH = 170000
SCHEMA_NAME = "SchemaName"
SCHEMA_ARN = "SchemaArn"
SCHEMA_DEFINITION = "schemaDefinition"
SCHEMA_DEFINITION = "SchemaDefinition"
# schema version number constants
MAX_VERSION_NUMBER = 100000
MAX_SCHEMA_VERSIONS_ALLOWED = 1000
SCHEMA_VERSION_ID = "SchemaVersionId"
LATEST_VERSION = "LatestVersion"
VERSION_NUMBER = "VersionNumber"
SCHEMA_VERSION_ID_PATTERN = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$"
)
MIN_SCHEMA_VERSION_ID_LENGTH = 36
SCHEMA_VERSION_METADATA_PATTERN = re.compile(r"^[a-zA-Z0-9+-=._./@]+$")
MAX_SCHEMA_VERSION_METADATA_ALLOWED = 10
MAX_SCHEMA_VERSION_METADATA_LENGTH = 128
METADATA_KEY = "MetadataKey"
METADATA_VALUE = "MetadataValue"

View File

@ -20,6 +20,15 @@ from .glue_schema_registry_constants import (
MAX_SCHEMAS_ALLOWED,
MAX_SCHEMA_VERSIONS_ALLOWED,
MAX_REGISTRIES_ALLOWED,
SCHEMA_VERSION_ID_PATTERN,
SCHEMA_VERSION_ID,
VERSION_NUMBER,
LATEST_VERSION,
METADATA_VALUE,
METADATA_KEY,
MAX_SCHEMA_VERSION_METADATA_LENGTH,
SCHEMA_VERSION_METADATA_PATTERN,
MAX_SCHEMA_VERSION_METADATA_ALLOWED,
)
from .exceptions import (
@ -27,45 +36,75 @@ from .exceptions import (
ParamValueContainsInvalidCharactersException,
InvalidSchemaDefinitionException,
InvalidRegistryIdBothParamsProvidedException,
GSREntityNotFoundException,
RegistryNotFoundException,
InvalidSchemaIdBothParamsProvidedException,
InvalidSchemaIdInsufficientParamsProvidedException,
InvalidSchemaIdNotProvidedException,
SchemaNotFoundException,
InvalidDataFormatException,
InvalidCompatibilityException,
InvalidNumberOfTagsException,
GSRAlreadyExistsException,
ResourceNumberLimitExceededException,
GeneralGSRAlreadyExistsException,
GeneralResourceNumberLimitExceededException,
DisabledCompatibilityVersioningException,
InvalidSchemaVersionNumberBothParamsProvidedException,
InvalidSchemaVersionIdProvidedWithOtherParamsException,
InvalidSchemaVersionNumberNotProvidedException,
SchemaVersionMetadataLimitExceededException,
)
def validate_registry_name_pattern_and_length(param_value):
param_name = "registryName"
max_name_length = MAX_REGISTRY_NAME_LENGTH
pattern = RESOURCE_NAME_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
validate_param_pattern_and_length(
param_value,
param_name="registryName",
max_name_length=MAX_REGISTRY_NAME_LENGTH,
pattern=RESOURCE_NAME_PATTERN,
)
def validate_arn_pattern_and_length(param_value):
param_name = "registryArn"
max_name_length = MAX_ARN_LENGTH
pattern = ARN_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
validate_param_pattern_and_length(
param_value,
param_name="registryArn",
max_name_length=MAX_ARN_LENGTH,
pattern=ARN_PATTERN,
)
def validate_description_pattern_and_length(param_value):
param_name = "description"
max_name_length = MAX_DESCRIPTION_LENGTH
pattern = DESCRIPTION_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
validate_param_pattern_and_length(
param_value,
param_name="description",
max_name_length=MAX_DESCRIPTION_LENGTH,
pattern=DESCRIPTION_PATTERN,
)
def validate_schema_name_pattern_and_length(param_value):
param_name = "schemaName"
max_name_length = MAX_SCHEMA_NAME_LENGTH
pattern = RESOURCE_NAME_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
validate_param_pattern_and_length(
param_value,
param_name="schemaName",
max_name_length=MAX_SCHEMA_NAME_LENGTH,
pattern=RESOURCE_NAME_PATTERN,
)
def validate_schema_version_metadata_key_pattern_and_length(param_value):
validate_param_pattern_and_length(
param_value,
param_name="key",
max_name_length=MAX_SCHEMA_VERSION_METADATA_LENGTH,
pattern=SCHEMA_VERSION_METADATA_PATTERN,
)
def validate_schema_version_metadata_value_pattern_and_length(param_value):
validate_param_pattern_and_length(
param_value,
param_name="value",
max_name_length=MAX_SCHEMA_VERSION_METADATA_LENGTH,
pattern=SCHEMA_VERSION_METADATA_PATTERN,
)
def validate_param_pattern_and_length(
@ -79,9 +118,7 @@ def validate_param_pattern_and_length(
def validate_schema_definition(schema_definition, data_format):
if len(schema_definition) > MAX_SCHEMA_DEFINITION_LENGTH:
param_name = SCHEMA_DEFINITION
raise ResourceNameTooLongException(param_name)
validate_schema_definition_length(schema_definition)
if data_format in ["AVRO", "JSON"]:
try:
json.loads(schema_definition)
@ -89,6 +126,17 @@ def validate_schema_definition(schema_definition, data_format):
raise InvalidSchemaDefinitionException(data_format, err)
def validate_schema_definition_length(schema_definition):
if len(schema_definition) > MAX_SCHEMA_DEFINITION_LENGTH:
param_name = SCHEMA_DEFINITION
raise ResourceNameTooLongException(param_name)
def validate_schema_version_id_pattern(schema_version_id):
if re.match(SCHEMA_VERSION_ID_PATTERN, schema_version_id) is None:
raise ParamValueContainsInvalidCharactersException(SCHEMA_VERSION_ID)
def validate_number_of_tags(tags):
if len(tags) > MAX_TAGS_ALLOWED:
raise InvalidNumberOfTagsException()
@ -99,7 +147,7 @@ def validate_registry_id(registry_id, registries):
registry_name = DEFAULT_REGISTRY_NAME
return registry_name
elif registry_id.get(REGISTRY_NAME) and registry_id.get(REGISTRY_ARN):
if registry_id.get(REGISTRY_NAME) and registry_id.get(REGISTRY_ARN):
raise InvalidRegistryIdBothParamsProvidedException()
if registry_id.get(REGISTRY_NAME):
@ -111,15 +159,16 @@ def validate_registry_id(registry_id, registries):
validate_arn_pattern_and_length(registry_arn)
registry_name = registry_arn.split("/")[-1]
if registry_name not in registries:
if registry_name != DEFAULT_REGISTRY_NAME and registry_name not in registries:
if registry_id.get(REGISTRY_NAME):
raise GSREntityNotFoundException(
raise RegistryNotFoundException(
resource="Registry",
param_name=REGISTRY_NAME,
param_value=registry_name,
)
if registry_id.get(REGISTRY_ARN):
raise GSREntityNotFoundException(
raise RegistryNotFoundException(
resource="Registry",
param_name=REGISTRY_ARN,
param_value=registry_arn,
@ -138,10 +187,10 @@ def validate_registry_params(registries, registry_name, description=None, tags=N
validate_number_of_tags(tags)
if len(registries) >= MAX_REGISTRIES_ALLOWED:
raise ResourceNumberLimitExceededException(resource="registries")
raise GeneralResourceNumberLimitExceededException(resource="registries")
if registry_name in registries:
raise GSRAlreadyExistsException(
raise GeneralGSRAlreadyExistsException(
resource="Registry",
param_name=REGISTRY_NAME,
param_value=registry_name,
@ -149,31 +198,30 @@ def validate_registry_params(registries, registry_name, description=None, tags=N
def validate_schema_id(schema_id, registries):
if schema_id:
schema_arn = schema_id.get(SCHEMA_ARN)
registry_name = schema_id.get(REGISTRY_NAME)
schema_name = schema_id.get(SCHEMA_NAME)
if schema_arn:
if registry_name or schema_name:
raise InvalidSchemaIdBothParamsProvidedException()
validate_arn_pattern_and_length(schema_arn)
arn_components = schema_arn.split("/")
schema_name = arn_components[-1]
registry_name = arn_components[-2]
schema_arn = schema_id.get(SCHEMA_ARN)
registry_name = schema_id.get(REGISTRY_NAME)
schema_name = schema_id.get(SCHEMA_NAME)
if schema_arn:
if registry_name or schema_name:
raise InvalidSchemaIdBothParamsProvidedException()
validate_arn_pattern_and_length(schema_arn)
arn_components = schema_arn.split("/")
schema_name = arn_components[-1]
registry_name = arn_components[-2]
else:
if registry_name is None or schema_name is None:
raise InvalidSchemaIdInsufficientParamsProvidedException()
validate_registry_name_pattern_and_length(registry_name)
validate_schema_name_pattern_and_length(schema_name)
else:
if registry_name is None or schema_name is None:
raise InvalidSchemaIdNotProvidedException()
validate_registry_name_pattern_and_length(registry_name)
validate_schema_name_pattern_and_length(schema_name)
if (
registry_name not in registries
or schema_name not in registries[registry_name].schemas
):
raise SchemaNotFoundException()
raise SchemaNotFoundException(schema_name, registry_name, schema_arn)
return registry_name, schema_name
return registry_name, schema_name, schema_arn
def validate_schema_params(
@ -212,31 +260,105 @@ def validate_schema_params(
validate_schema_definition(schema_definition, data_format)
if num_schemas >= MAX_SCHEMAS_ALLOWED:
raise ResourceNumberLimitExceededException(resource="schemas")
raise GeneralResourceNumberLimitExceededException(resource="schemas")
if schema_name in registry.schemas:
raise GSRAlreadyExistsException(
raise GeneralGSRAlreadyExistsException(
resource="Schema",
param_name=SCHEMA_NAME,
param_value=schema_name,
)
def validate_schema_version_params(
def validate_register_schema_version_params(
registry_name,
schema_name,
schema_arn,
num_schema_versions,
schema_definition,
compatibility,
data_format,
):
if compatibility == "DISABLED":
raise DisabledCompatibilityVersioningException(schema_name, registry_name)
raise DisabledCompatibilityVersioningException(
schema_name, registry_name, schema_arn
)
validate_schema_definition(schema_definition, data_format)
if num_schema_versions >= MAX_SCHEMA_VERSIONS_ALLOWED:
raise ResourceNumberLimitExceededException(resource="schema versions")
raise GeneralResourceNumberLimitExceededException(resource="schema versions")
def validate_schema_version_params(
registries, schema_id, schema_version_id, schema_version_number
):
if not schema_version_id and not schema_id and not schema_version_number:
raise InvalidSchemaIdNotProvidedException()
if schema_version_id and (schema_id or schema_version_number):
raise InvalidSchemaVersionIdProvidedWithOtherParamsException()
if schema_version_id:
validate_schema_version_id_pattern(schema_version_id)
# returns schema_version_id, registry_name, schema_name, schema_arn, version_number, latest_version
return schema_version_id, None, None, None, None, None
if schema_id and schema_version_number:
registry_name, schema_name, schema_arn = validate_schema_id(
schema_id, registries
)
version_number, latest_version = validate_schema_version_number(
registries, registry_name, schema_name, schema_version_number
)
return (
None,
registry_name,
schema_name,
schema_arn,
version_number,
latest_version,
)
if not schema_id:
raise InvalidSchemaIdNotProvidedException()
if not schema_version_number:
raise InvalidSchemaVersionNumberNotProvidedException()
def validate_schema_version_number(
registries, registry_name, schema_name, schema_version_number
):
latest_version = schema_version_number.get(LATEST_VERSION)
version_number = schema_version_number.get(VERSION_NUMBER)
schema = registries[registry_name].schemas[schema_name]
if latest_version:
if version_number:
raise InvalidSchemaVersionNumberBothParamsProvidedException()
return schema.latest_schema_version, latest_version
return version_number, latest_version
def validate_schema_version_metadata_pattern_and_length(metadata_key_value):
metadata_key = metadata_key_value.get(METADATA_KEY)
metadata_value = metadata_key_value.get(METADATA_VALUE)
validate_schema_version_metadata_key_pattern_and_length(metadata_key)
validate_schema_version_metadata_value_pattern_and_length(metadata_value)
return metadata_key, metadata_value
def validate_number_of_schema_version_metadata_allowed(metadata):
num_metadata_key_value_pairs = 0
for m in metadata.values():
num_metadata_key_value_pairs += len(m)
if num_metadata_key_value_pairs >= MAX_SCHEMA_VERSION_METADATA_ALLOWED:
raise SchemaVersionMetadataLimitExceededException()
def get_schema_version_if_definition_exists(
@ -253,3 +375,50 @@ def get_schema_version_if_definition_exists(
if schema_definition == schema_version.schema_definition:
return schema_version.as_dict()
return None
def get_put_schema_version_metadata_response(
schema_id, schema_version_number, schema_version_id, metadata_key_value
):
put_schema_version_metadata_response_dict = {}
if schema_version_id:
put_schema_version_metadata_response_dict[SCHEMA_VERSION_ID] = schema_version_id
if schema_id:
schema_arn = schema_id.get(SCHEMA_ARN)
registry_name = schema_id.get(REGISTRY_NAME)
schema_name = schema_id.get(SCHEMA_NAME)
if schema_arn:
put_schema_version_metadata_response_dict[SCHEMA_ARN] = schema_arn
if registry_name:
put_schema_version_metadata_response_dict[REGISTRY_NAME] = registry_name
if schema_name:
put_schema_version_metadata_response_dict[SCHEMA_NAME] = schema_name
if schema_version_number:
latest_version = schema_version_number.get(LATEST_VERSION)
version_number = schema_version_number.get(VERSION_NUMBER)
if latest_version:
put_schema_version_metadata_response_dict[LATEST_VERSION] = latest_version
else:
put_schema_version_metadata_response_dict[LATEST_VERSION] = False
if version_number:
put_schema_version_metadata_response_dict[VERSION_NUMBER] = version_number
else:
put_schema_version_metadata_response_dict[LATEST_VERSION] = False
put_schema_version_metadata_response_dict[VERSION_NUMBER] = 0
metadata_key = metadata_key_value.get(METADATA_KEY)
metadata_value = metadata_key_value.get(METADATA_VALUE)
put_schema_version_metadata_response_dict[METADATA_KEY] = metadata_key
put_schema_version_metadata_response_dict[METADATA_VALUE] = metadata_value
return put_schema_version_metadata_response_dict
def delete_schema_response(schema_name, schema_arn, status):
return {
"SchemaName": schema_name,
"SchemaArn": schema_arn,
"Status": status,
}

View File

@ -6,7 +6,14 @@ from uuid import uuid4
from moto.core import BaseBackend, BaseModel
from moto.core.models import get_account_id
from moto.core.utils import BackendDict
from moto.glue.exceptions import CrawlerRunningException, CrawlerNotRunningException
from moto.glue.exceptions import (
CrawlerRunningException,
CrawlerNotRunningException,
SchemaVersionNotFoundFromSchemaVersionIdException,
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
)
from .exceptions import (
JsonRESTError,
CrawlerAlreadyExistsException,
@ -26,12 +33,20 @@ from .glue_schema_registry_utils import (
validate_registry_id,
validate_schema_id,
validate_schema_params,
validate_schema_version_params,
get_schema_version_if_definition_exists,
validate_registry_params,
validate_schema_version_params,
validate_schema_definition_length,
validate_schema_version_metadata_pattern_and_length,
validate_number_of_schema_version_metadata_allowed,
get_put_schema_version_metadata_response,
validate_register_schema_version_params,
delete_schema_response,
)
from .glue_schema_registry_constants import (
DEFAULT_REGISTRY_NAME,
AVAILABLE_STATUS,
DELETING_STATUS,
)
from ..utilities.paginator import paginate
from ..utilities.tagging_service import TaggingService
@ -264,13 +279,13 @@ class GlueBackend(BaseBackend):
def create_registry(self, registry_name, description=None, tags=None):
"""CreateRegistry API"""
""" If registry name id default-registry, create default-registry """
# If registry name id default-registry, create default-registry
if registry_name == DEFAULT_REGISTRY_NAME:
registry = FakeRegistry(registry_name, description, tags)
self.registries[registry_name] = registry
return registry
""" Validate Registry Parameters """
# Validate Registry Parameters
validate_registry_params(self.registries, registry_name, description, tags)
registry = FakeRegistry(registry_name, description, tags)
@ -288,7 +303,12 @@ class GlueBackend(BaseBackend):
tags=None,
):
"""CrateSchema API"""
"""Validate Registry Id"""
"""
The following parameters/features are not yet implemented: Glue Schema Registry: compatibility checks NONE | BACKWARD | BACKWARD_ALL | FORWARD | FORWARD_ALL | FULL | FULL_ALL and Data format parsing and syntax validation.
....
"""
# Validate Registry Id
registry_name = validate_registry_id(registry_id, self.registries)
if (
registry_name == DEFAULT_REGISTRY_NAME
@ -297,7 +317,7 @@ class GlueBackend(BaseBackend):
self.create_registry(registry_name)
registry = self.registries[registry_name]
""" Validate Schema Parameters """
# Validate Schema Parameters
validate_schema_params(
registry,
schema_name,
@ -309,7 +329,7 @@ class GlueBackend(BaseBackend):
tags,
)
""" Create Schema """
# Create Schema
schema_version = FakeSchemaVersion(
registry_name, schema_name, schema_definition, version_number=1
)
@ -333,24 +353,28 @@ class GlueBackend(BaseBackend):
def register_schema_version(self, schema_id, schema_definition):
"""RegisterSchemaVersion API"""
""" Validate Schema Id """
registry_name, schema_name = validate_schema_id(schema_id, self.registries)
# Validate Schema Id
registry_name, schema_name, schema_arn = validate_schema_id(
schema_id, self.registries
)
compatibility = (
self.registries[registry_name].schemas[schema_name].compatibility
)
data_format = self.registries[registry_name].schemas[schema_name].data_format
validate_schema_version_params(
validate_register_schema_version_params(
registry_name,
schema_name,
schema_arn,
self.num_schema_versions,
schema_definition,
compatibility,
data_format,
)
""" If the same schema definition is already stored in Schema Registry as a version,
the schema ID of the existing schema is returned to the caller. """
# If the same schema definition is already stored in Schema Registry as a version,
# the schema ID of the existing schema is returned to the caller.
schema_versions = (
self.registries[registry_name].schemas[schema_name].schema_versions.values()
)
@ -360,13 +384,17 @@ class GlueBackend(BaseBackend):
if existing_schema_version:
return existing_schema_version
""" Register Schema Version """
# Register Schema Version
version_number = (
self.registries[registry_name]
.schemas[schema_name]
.get_next_schema_version()
)
self.registries[registry_name].schemas[schema_name].update_next_schema_version()
self.registries[registry_name].schemas[
schema_name
].update_latest_schema_version()
self.num_schema_versions += 1
schema_version = FakeSchemaVersion(
@ -375,8 +403,173 @@ class GlueBackend(BaseBackend):
self.registries[registry_name].schemas[schema_name].schema_versions[
schema_version.schema_version_id
] = schema_version
return schema_version.as_dict()
def get_schema_version(
self, schema_id=None, schema_version_id=None, schema_version_number=None
):
"""GetSchemaVersion API"""
# Validate Schema Parameters
(
schema_version_id,
registry_name,
schema_name,
schema_arn,
version_number,
latest_version,
) = validate_schema_version_params(
self.registries, schema_id, schema_version_id, schema_version_number
)
# GetSchemaVersion using SchemaVersionId
if schema_version_id:
for registry in self.registries.values():
for schema in registry.schemas.values():
if (
schema.schema_versions.get(schema_version_id, None)
and schema.schema_versions[
schema_version_id
].schema_version_status
!= DELETING_STATUS
):
get_schema_version_dict = schema.schema_versions[
schema_version_id
].get_schema_version_as_dict()
get_schema_version_dict["DataFormat"] = schema.data_format
return get_schema_version_dict
raise SchemaVersionNotFoundFromSchemaVersionIdException(schema_version_id)
# GetSchemaVersion using VersionNumber
schema = self.registries[registry_name].schemas[schema_name]
for schema_version in schema.schema_versions.values():
if (
version_number == schema_version.version_number
and schema_version.schema_version_status != DELETING_STATUS
):
get_schema_version_dict = schema_version.get_schema_version_as_dict()
get_schema_version_dict["DataFormat"] = schema.data_format
return get_schema_version_dict
raise SchemaVersionNotFoundFromSchemaIdException(
registry_name, schema_name, schema_arn, version_number, latest_version
)
def get_schema_by_definition(self, schema_id, schema_definition):
"""GetSchemaByDefinition API"""
# Validate SchemaId
validate_schema_definition_length(schema_definition)
registry_name, schema_name, schema_arn = validate_schema_id(
schema_id, self.registries
)
# Get Schema By Definition
schema = self.registries[registry_name].schemas[schema_name]
for schema_version in schema.schema_versions.values():
if (
schema_definition == schema_version.schema_definition
and schema_version.schema_version_status != DELETING_STATUS
):
get_schema_by_definition_dict = (
schema_version.get_schema_by_definition_as_dict()
)
get_schema_by_definition_dict["DataFormat"] = schema.data_format
return get_schema_by_definition_dict
raise SchemaNotFoundException(schema_name, registry_name, schema_arn)
def put_schema_version_metadata(
self, schema_id, schema_version_number, schema_version_id, metadata_key_value
):
"""PutSchemaVersionMetadata API"""
# Validate metadata_key_value and schema version params
(
metadata_key,
metadata_value,
) = validate_schema_version_metadata_pattern_and_length(metadata_key_value)
(
schema_version_id,
registry_name,
schema_name,
schema_arn,
version_number,
latest_version,
) = validate_schema_version_params(
self.registries, schema_id, schema_version_id, schema_version_number
)
# PutSchemaVersionMetadata using SchemaVersionId
if schema_version_id:
for registry in self.registries.values():
for schema in registry.schemas.values():
if schema.schema_versions.get(schema_version_id, None):
metadata = schema.schema_versions[schema_version_id].metadata
validate_number_of_schema_version_metadata_allowed(metadata)
if metadata_key in metadata:
if metadata_value in metadata[metadata_key]:
raise SchemaVersionMetadataAlreadyExistsException(
schema_version_id, metadata_key, metadata_value
)
metadata[metadata_key].append(metadata_value)
else:
metadata[metadata_key] = [metadata_value]
return get_put_schema_version_metadata_response(
schema_id,
schema_version_number,
schema_version_id,
metadata_key_value,
)
raise SchemaVersionNotFoundFromSchemaVersionIdException(schema_version_id)
# PutSchemaVersionMetadata using VersionNumber
schema = self.registries[registry_name].schemas[schema_name]
for schema_version in schema.schema_versions.values():
if version_number == schema_version.version_number:
validate_number_of_schema_version_metadata_allowed(
schema_version.metadata
)
if metadata_key in schema_version.metadata:
if metadata_value in schema_version.metadata[metadata_key]:
raise SchemaVersionMetadataAlreadyExistsException(
schema_version.schema_version_id,
metadata_key,
metadata_value,
)
schema_version.metadata[metadata_key].append(metadata_value)
else:
schema_version.metadata[metadata_key] = [metadata_value]
return get_put_schema_version_metadata_response(
schema_id,
schema_version_number,
schema_version_id,
metadata_key_value,
)
raise SchemaVersionNotFoundFromSchemaIdException(
registry_name, schema_name, schema_arn, version_number, latest_version
)
def delete_schema(self, schema_id):
"""DeleteSchema API"""
# Validate schema_id
registry_name, schema_name, _ = validate_schema_id(schema_id, self.registries)
# delete schema pre-processing
schema = self.registries[registry_name].schemas[schema_name]
num_schema_version_in_schema = len(schema.schema_versions)
schema.schema_status = DELETING_STATUS
response = delete_schema_response(
schema.schema_name, schema.schema_arn, schema.schema_status
)
# delete schema
del self.registries[registry_name].schemas[schema_name]
self.num_schemas -= 1
self.num_schema_versions -= num_schema_version_in_schema
return response
class FakeDatabase(BaseModel):
def __init__(self, database_name, database_input):
@ -806,10 +999,10 @@ class FakeSchema(BaseModel):
self.schema_checkpoint = 1
self.latest_schema_version = 1
self.next_schema_version = 2
self.schema_status = "AVAILABLE"
self.schema_status = AVAILABLE_STATUS
self.tags = tags
self.schema_version_id = schema_version_id
self.schema_version_status = "AVAILABLE"
self.schema_version_status = AVAILABLE_STATUS
self.created_time = datetime.utcnow()
self.updated_time = datetime.utcnow()
self.schema_versions = OrderedDict()
@ -817,6 +1010,9 @@ class FakeSchema(BaseModel):
def update_next_schema_version(self):
self.next_schema_version += 1
def update_latest_schema_version(self):
self.latest_schema_version += 1
def get_next_schema_version(self):
return self.next_schema_version
@ -845,11 +1041,12 @@ class FakeSchemaVersion(BaseModel):
self.schema_name = schema_name
self.schema_arn = f"arn:aws:glue:us-east-1:{get_account_id()}:schema/{self.registry_name}/{self.schema_name}"
self.schema_definition = schema_definition
self.schema_version_status = "AVAILABLE"
self.schema_version_status = AVAILABLE_STATUS
self.version_number = version_number
self.schema_version_id = str(uuid4())
self.created_time = datetime.utcnow()
self.updated_time = datetime.utcnow()
self.metadata = OrderedDict()
def get_schema_version_id(self):
return self.schema_version_id
@ -861,6 +1058,26 @@ class FakeSchemaVersion(BaseModel):
"Status": self.schema_version_status,
}
def get_schema_version_as_dict(self):
# add data_format for full return dictionary of get_schema_version
return {
"SchemaVersionId": self.schema_version_id,
"SchemaDefinition": self.schema_definition,
"SchemaArn": self.schema_arn,
"VersionNumber": self.version_number,
"Status": self.schema_version_status,
"CreatedTime": str(self.created_time),
}
def get_schema_by_definition_as_dict(self):
# add data_format for full return dictionary of get_schema_by_definition
return {
"SchemaVersionId": self.schema_version_id,
"SchemaArn": self.schema_arn,
"Status": self.schema_version_status,
"CreatedTime": str(self.created_time),
}
glue_backends = BackendDict(
GlueBackend, "glue", use_boto3_regions=False, additional_regions=["global"]

View File

@ -488,3 +488,36 @@ class GlueResponse(BaseResponse):
schema_id, schema_definition
)
return json.dumps(schema_version)
def get_schema_version(self):
schema_id = self._get_param("SchemaId")
schema_version_id = self._get_param("SchemaVersionId")
schema_version_number = self._get_param("SchemaVersionNumber")
schema_version = self.glue_backend.get_schema_version(
schema_id, schema_version_id, schema_version_number
)
return json.dumps(schema_version)
def get_schema_by_definition(self):
schema_id = self._get_param("SchemaId")
schema_definition = self._get_param("SchemaDefinition")
schema_version = self.glue_backend.get_schema_by_definition(
schema_id, schema_definition
)
return json.dumps(schema_version)
def put_schema_version_metadata(self):
schema_id = self._get_param("SchemaId")
schema_version_number = self._get_param("SchemaVersionNumber")
schema_version_id = self._get_param("SchemaVersionId")
metadata_key_value = self._get_param("MetadataKeyValue")
schema_version = self.glue_backend.put_schema_version_metadata(
schema_id, schema_version_number, schema_version_id, metadata_key_value
)
return json.dumps(schema_version)
def delete_schema(self):
schema_id = self._get_param("SchemaId")
schema = self.glue_backend.delete_schema(schema_id)
return json.dumps(schema)

View File

@ -1,34 +1,53 @@
from moto.core import ACCOUNT_ID
DESCRIPTION = "test_description"
TEST_DESCRIPTION = "test_description"
TAGS = {"key1": "value1", "key2": "value2"}
TEST_TAGS = {"key1": "value1", "key2": "value2"}
AVAILABLE_STATUS = "AVAILABLE"
TEST_AVAILABLE_STATUS = "AVAILABLE"
TEST_DELETING_STATUS = "DELETING"
REGISTRY_NAME = "TestRegistry"
TEST_REGISTRY_NAME = "TestRegistry"
TEST_INVALID_REGISTRY_NAME_DOES_NOT_EXIST = "InvalidRegistryDoesNotExist"
TEST_REGISTRY_ARN = f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{TEST_REGISTRY_NAME}"
TEST_REGISTRY_ID = {"RegistryName": f"{TEST_REGISTRY_NAME}"}
REGISTRY_ARN = f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{REGISTRY_NAME}"
TEST_SCHEMA_NAME = "TestSchema"
TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST = "InvalidSchemaDoesNotExist"
TEST_SCHEMA_ARN = f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{TEST_REGISTRY_NAME}/{TEST_SCHEMA_NAME}"
TEST_SCHEMA_ID = {
"RegistryName": f"{TEST_REGISTRY_NAME}",
"SchemaName": f"{TEST_SCHEMA_NAME}",
}
TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST = {
"RegistryName": TEST_REGISTRY_NAME,
"SchemaName": TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST,
}
TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST = {
"RegistryName": TEST_INVALID_REGISTRY_NAME_DOES_NOT_EXIST,
"SchemaName": TEST_SCHEMA_NAME,
}
SCHEMA_NAME = "TestSchema"
TEST_AVRO_DATA_FORMAT = "AVRO"
TEST_JSON_DATA_FORMAT = "JSON"
TEST_PROTOBUF_DATA_FORMAT = "PROTOBUF"
REGISTRY_ID = {"RegistryName": f"{REGISTRY_NAME}"}
TEST_BACKWARD_COMPATIBILITY = "BACKWARD"
TEST_DISABLED_COMPATIBILITY = "DISABLED"
SCHEMA_ID = {"RegistryName": f"{REGISTRY_NAME}", "SchemaName": f"{SCHEMA_NAME}"}
TEST_SCHEMA_VERSION_NUMBER = {"VersionNumber": 1, "LatestVersion": False}
TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION = {"LatestVersion": True}
TEST_VERSION_ID = "00000000-0000-0000-0000-000000000000"
SCHEMA_ARN = f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{REGISTRY_NAME}/{SCHEMA_NAME}"
AVRO_DATA_FORMAT = "AVRO"
TEST_METADATA_KEY = "test_metadata_key"
TEST_METADATA_VALUE = "test_metadata_value"
TEST_METADATA_KEY_VALUE = {
"MetadataKey": TEST_METADATA_KEY,
"MetadataValue": TEST_METADATA_VALUE,
}
JSON_DATA_FORMAT = "JSON"
PROTOBUF_DATA_FORMAT = "PROTOBUF"
BACKWARD_COMPATIBILITY = "BACKWARD"
DISABLED_COMPATIBILITY = "DISABLED"
AVRO_SCHEMA_DEFINITION = """{
TEST_AVRO_SCHEMA_DEFINITION = """{
"type": "record",
"namespace": "Moto_Test",
"name": "Person",
@ -44,7 +63,7 @@ AVRO_SCHEMA_DEFINITION = """{
]
}"""
NEW_AVRO_SCHEMA_DEFINITION = """{
TEST_NEW_AVRO_SCHEMA_DEFINITION = """{
"type": "record",
"namespace": "Moto_Test",
"name": "Person",
@ -65,7 +84,7 @@ NEW_AVRO_SCHEMA_DEFINITION = """{
]
}"""
JSON_SCHEMA_DEFINITION = """{
TEST_JSON_SCHEMA_DEFINITION = """{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Person",
@ -82,7 +101,7 @@ JSON_SCHEMA_DEFINITION = """{
}
}"""
NEW_JSON_SCHEMA_DEFINITION = """{
TEST_NEW_JSON_SCHEMA_DEFINITION = """{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Person",
@ -104,7 +123,7 @@ NEW_JSON_SCHEMA_DEFINITION = """{
}
}"""
PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
TEST_PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
package tutorial;
option java_multiple_files = true;
@ -130,7 +149,7 @@ PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
repeated PhoneNumber phones = 4;
}"""
NEW_PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
TEST_NEW_PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
package tutorial;

View File

@ -2,11 +2,13 @@ import copy
from .fixtures.datacatalog import TABLE_INPUT, PARTITION_INPUT, DATABASE_INPUT
from .fixtures.schema_registry import (
REGISTRY_NAME,
SCHEMA_NAME,
BACKWARD_COMPATIBILITY,
AVRO_DATA_FORMAT,
AVRO_SCHEMA_DEFINITION,
TEST_REGISTRY_NAME,
TEST_SCHEMA_NAME,
TEST_BACKWARD_COMPATIBILITY,
TEST_AVRO_DATA_FORMAT,
TEST_AVRO_SCHEMA_DEFINITION,
TEST_SCHEMA_ID,
TEST_NEW_AVRO_SCHEMA_DEFINITION,
)
@ -166,21 +168,27 @@ def create_crawler(
)
def create_registry(client, registry_name=REGISTRY_NAME):
def create_registry(client, registry_name=TEST_REGISTRY_NAME):
return client.create_registry(RegistryName=registry_name)
def create_schema(
client,
registry_id,
data_format=AVRO_DATA_FORMAT,
compatibility=BACKWARD_COMPATIBILITY,
schema_definition=AVRO_SCHEMA_DEFINITION,
data_format=TEST_AVRO_DATA_FORMAT,
compatibility=TEST_BACKWARD_COMPATIBILITY,
schema_definition=TEST_AVRO_SCHEMA_DEFINITION,
):
return client.create_schema(
RegistryId=registry_id,
SchemaName=SCHEMA_NAME,
SchemaName=TEST_SCHEMA_NAME,
DataFormat=data_format,
Compatibility=compatibility,
SchemaDefinition=schema_definition,
)
def register_schema_version(client):
return client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION
)

File diff suppressed because it is too large Load Diff