Glue: CreateSchema and RegisterSchemaVersion features (#5301)

This commit is contained in:
Himani Patel 2022-08-04 08:48:08 -07:00 committed by GitHub
parent e5f8ef2f9a
commit 02270ffcef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1501 additions and 250 deletions

View File

@ -65,6 +65,20 @@ class VersionNotFoundException(EntityNotFoundException):
super().__init__("Version not found.")
class SchemaNotFoundException(EntityNotFoundException):
def __init__(self):
super().__init__(
"Schema is not found.",
)
class GSREntityNotFoundException(EntityNotFoundException):
def __init__(self, resource, param_name, param_value):
super().__init__(
resource + " is not found. " + param_name + ": " + param_value,
)
class CrawlerRunningException(GlueClientError):
def __init__(self, msg):
super().__init__("CrawlerRunningException", msg)
@ -80,6 +94,24 @@ class ConcurrentRunsExceededException(GlueClientError):
super().__init__("ConcurrentRunsExceededException", msg)
class ResourceNumberLimitExceededException(GlueClientError):
def __init__(self, resource):
super().__init__(
"ResourceNumberLimitExceededException",
"More "
+ resource
+ " cannot be created. The maximum limit has been reached.",
)
class GSRAlreadyExistsException(GlueClientError):
def __init__(self, resource, param_name, param_value):
super().__init__(
"AlreadyExistsException",
resource + " already exists. " + param_name + ": " + param_value,
)
class _InvalidOperationException(GlueClientError):
def __init__(self, error_type, op, msg):
super().__init__(
@ -89,57 +121,95 @@ class _InvalidOperationException(GlueClientError):
)
class InvalidInputException(_InvalidOperationException):
def __init__(self, op, msg):
super().__init__("InvalidInputException", op, msg)
class InvalidStateException(_InvalidOperationException):
def __init__(self, op, msg):
super().__init__("InvalidStateException", op, msg)
class ResourceNumberLimitExceededException(_InvalidOperationException):
def __init__(self, op, resource):
class InvalidInputException(_InvalidOperationException):
def __init__(self, op, msg):
super().__init__("InvalidInputException", op, msg)
class GSRInvalidInputException(GlueClientError):
def __init__(self, msg):
super().__init__("InvalidInputException", msg)
class ResourceNameTooLongException(GSRInvalidInputException):
def __init__(self, param_name):
super().__init__(
"ResourceNumberLimitExceededException",
op,
"More "
+ resource
+ " cannot be created. The maximum limit has been reached.",
)
class GSRAlreadyExistsException(_InvalidOperationException):
def __init__(self, op, resource, param_name, param_value):
super().__init__(
"AlreadyExistsException",
op,
resource + " already exists. " + param_name + ": " + param_value,
)
class ResourceNameTooLongException(InvalidInputException):
def __init__(self, op, param_name):
super().__init__(
op,
"The resource name contains too many or too few characters. Parameter Name: "
+ param_name,
)
class ParamValueContainsInvalidCharactersException(InvalidInputException):
def __init__(self, op, param_name):
class ParamValueContainsInvalidCharactersException(GSRInvalidInputException):
def __init__(self, param_name):
super().__init__(
op,
"The parameter value contains one or more characters that are not valid. Parameter Name: "
+ param_name,
)
class InvalidNumberOfTagsException(InvalidInputException):
def __init__(self, op):
class InvalidNumberOfTagsException(GSRInvalidInputException):
def __init__(self):
super().__init__(
op,
"New Tags cannot be empty or more than 50",
)
class InvalidDataFormatException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"Data format is not valid.",
)
class InvalidCompatibilityException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"Compatibility is not valid.",
)
class InvalidSchemaDefinitionException(GSRInvalidInputException):
def __init__(self, data_format_name, err):
super().__init__(
"Schema definition of "
+ data_format_name
+ " data format is invalid: "
+ str(err),
)
class InvalidRegistryIdBothParamsProvidedException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"One of registryName or registryArn has to be provided, both cannot be provided.",
)
class InvalidSchemaIdBothParamsProvidedException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"One of (registryName and schemaName) or schemaArn has to be provided, both cannot be provided.",
)
class InvalidSchemaIdInsufficientParamsProvidedException(GSRInvalidInputException):
def __init__(self):
super().__init__(
"At least one of (registryName and schemaName) or schemaArn has to be provided.",
)
class DisabledCompatibilityVersioningException(GSRInvalidInputException):
def __init__(self, schema_name, registry_name):
super().__init__(
"Compatibility DISABLED does not allow versioning. SchemaId: SchemaId(schemaName="
+ schema_name
+ ", registryName="
+ registry_name
+ ")"
)

View File

@ -0,0 +1,35 @@
import re
# This file contains the constants required for Glue Schema Registry APIs.
# common used constants
MAX_DESCRIPTION_LENGTH = 2048
MAX_ARN_LENGTH = 1000
MAX_TAGS_ALLOWED = 50
RESOURCE_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9-_$#.]+$")
DESCRIPTION_PATTERN = re.compile(
r"[\\u0020-\\uD7FF\\uE000-\\uFFFD\\uD800\\uDC00-\\uDBFF\\uDFFF\\r\\n\\t]*"
)
ARN_PATTERN = re.compile(r"^arn:(aws|aws-us-gov|aws-cn):glue:.*$")
SCHEMA_VERSION_ID_PATTERN = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$"
)
# registry constants
MAX_REGISTRY_NAME_LENGTH = 255
MAX_REGISTRIES_ALLOWED = 10
DEFAULT_REGISTRY_NAME = "default-registry"
REGISTRY_NAME = "RegistryName"
REGISTRY_ARN = "RegistryArn"
# schema constants
MAX_SCHEMA_NAME_LENGTH = 255
MAX_SCHEMAS_ALLOWED = 1000
MAX_SCHEMA_DEFINITION_LENGTH = 170000
SCHEMA_NAME = "SchemaName"
SCHEMA_ARN = "SchemaArn"
SCHEMA_DEFINITION = "schemaDefinition"
# schema version number constants
MAX_VERSION_NUMBER = 100000
MAX_SCHEMA_VERSIONS_ALLOWED = 1000

View File

@ -0,0 +1,255 @@
import re
import json
from .glue_schema_registry_constants import (
MAX_REGISTRY_NAME_LENGTH,
RESOURCE_NAME_PATTERN,
MAX_ARN_LENGTH,
ARN_PATTERN,
MAX_DESCRIPTION_LENGTH,
DESCRIPTION_PATTERN,
MAX_SCHEMA_NAME_LENGTH,
DEFAULT_REGISTRY_NAME,
REGISTRY_NAME,
REGISTRY_ARN,
SCHEMA_NAME,
SCHEMA_ARN,
MAX_TAGS_ALLOWED,
MAX_SCHEMA_DEFINITION_LENGTH,
SCHEMA_DEFINITION,
MAX_SCHEMAS_ALLOWED,
MAX_SCHEMA_VERSIONS_ALLOWED,
MAX_REGISTRIES_ALLOWED,
)
from .exceptions import (
ResourceNameTooLongException,
ParamValueContainsInvalidCharactersException,
InvalidSchemaDefinitionException,
InvalidRegistryIdBothParamsProvidedException,
GSREntityNotFoundException,
InvalidSchemaIdBothParamsProvidedException,
InvalidSchemaIdInsufficientParamsProvidedException,
SchemaNotFoundException,
InvalidDataFormatException,
InvalidCompatibilityException,
InvalidNumberOfTagsException,
GSRAlreadyExistsException,
ResourceNumberLimitExceededException,
DisabledCompatibilityVersioningException,
)
def validate_registry_name_pattern_and_length(param_value):
param_name = "registryName"
max_name_length = MAX_REGISTRY_NAME_LENGTH
pattern = RESOURCE_NAME_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
def validate_arn_pattern_and_length(param_value):
param_name = "registryArn"
max_name_length = MAX_ARN_LENGTH
pattern = ARN_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
def validate_description_pattern_and_length(param_value):
param_name = "description"
max_name_length = MAX_DESCRIPTION_LENGTH
pattern = DESCRIPTION_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
def validate_schema_name_pattern_and_length(param_value):
param_name = "schemaName"
max_name_length = MAX_SCHEMA_NAME_LENGTH
pattern = RESOURCE_NAME_PATTERN
validate_param_pattern_and_length(param_value, param_name, max_name_length, pattern)
def validate_param_pattern_and_length(
param_value, param_name, max_name_length, pattern
):
if len(param_value.encode("utf-8")) > max_name_length:
raise ResourceNameTooLongException(param_name)
if re.match(pattern, param_value) is None:
raise ParamValueContainsInvalidCharactersException(param_name)
def validate_schema_definition(schema_definition, data_format):
if len(schema_definition) > MAX_SCHEMA_DEFINITION_LENGTH:
param_name = SCHEMA_DEFINITION
raise ResourceNameTooLongException(param_name)
if data_format in ["AVRO", "JSON"]:
try:
json.loads(schema_definition)
except ValueError as err:
raise InvalidSchemaDefinitionException(data_format, err)
def validate_number_of_tags(tags):
if len(tags) > MAX_TAGS_ALLOWED:
raise InvalidNumberOfTagsException()
def validate_registry_id(registry_id, registries):
if not registry_id:
registry_name = DEFAULT_REGISTRY_NAME
return registry_name
elif registry_id.get(REGISTRY_NAME) and registry_id.get(REGISTRY_ARN):
raise InvalidRegistryIdBothParamsProvidedException()
if registry_id.get(REGISTRY_NAME):
registry_name = registry_id.get(REGISTRY_NAME)
validate_registry_name_pattern_and_length(registry_name)
elif registry_id.get(REGISTRY_ARN):
registry_arn = registry_id.get(REGISTRY_ARN)
validate_arn_pattern_and_length(registry_arn)
registry_name = registry_arn.split("/")[-1]
if registry_name not in registries:
if registry_id.get(REGISTRY_NAME):
raise GSREntityNotFoundException(
resource="Registry",
param_name=REGISTRY_NAME,
param_value=registry_name,
)
if registry_id.get(REGISTRY_ARN):
raise GSREntityNotFoundException(
resource="Registry",
param_name=REGISTRY_ARN,
param_value=registry_arn,
)
return registry_name
def validate_registry_params(registries, registry_name, description=None, tags=None):
validate_registry_name_pattern_and_length(registry_name)
if description:
validate_description_pattern_and_length(description)
if tags:
validate_number_of_tags(tags)
if len(registries) >= MAX_REGISTRIES_ALLOWED:
raise ResourceNumberLimitExceededException(resource="registries")
if registry_name in registries:
raise GSRAlreadyExistsException(
resource="Registry",
param_name=REGISTRY_NAME,
param_value=registry_name,
)
def validate_schema_id(schema_id, registries):
if schema_id:
schema_arn = schema_id.get(SCHEMA_ARN)
registry_name = schema_id.get(REGISTRY_NAME)
schema_name = schema_id.get(SCHEMA_NAME)
if schema_arn:
if registry_name or schema_name:
raise InvalidSchemaIdBothParamsProvidedException()
validate_arn_pattern_and_length(schema_arn)
arn_components = schema_arn.split("/")
schema_name = arn_components[-1]
registry_name = arn_components[-2]
else:
if registry_name is None or schema_name is None:
raise InvalidSchemaIdInsufficientParamsProvidedException()
validate_registry_name_pattern_and_length(registry_name)
validate_schema_name_pattern_and_length(schema_name)
if (
registry_name not in registries
or schema_name not in registries[registry_name].schemas
):
raise SchemaNotFoundException()
return registry_name, schema_name
def validate_schema_params(
registry,
schema_name,
data_format,
compatibility,
schema_definition,
num_schemas,
description=None,
tags=None,
):
validate_schema_name_pattern_and_length(schema_name)
if data_format not in ["AVRO", "JSON", "PROTOBUF"]:
raise InvalidDataFormatException()
if compatibility not in [
"NONE",
"DISABLED",
"BACKWARD",
"BACKWARD_ALL",
"FORWARD",
"FORWARD_ALL",
"FULL",
"FULL_ALL",
]:
raise InvalidCompatibilityException()
if description:
validate_description_pattern_and_length(description)
if tags:
validate_number_of_tags(tags)
validate_schema_definition(schema_definition, data_format)
if num_schemas >= MAX_SCHEMAS_ALLOWED:
raise ResourceNumberLimitExceededException(resource="schemas")
if schema_name in registry.schemas:
raise GSRAlreadyExistsException(
resource="Schema",
param_name=SCHEMA_NAME,
param_value=schema_name,
)
def validate_schema_version_params(
registry_name,
schema_name,
num_schema_versions,
schema_definition,
compatibility,
data_format,
):
if compatibility == "DISABLED":
raise DisabledCompatibilityVersioningException(schema_name, registry_name)
validate_schema_definition(schema_definition, data_format)
if num_schema_versions >= MAX_SCHEMA_VERSIONS_ALLOWED:
raise ResourceNumberLimitExceededException(resource="schema versions")
def get_schema_version_if_definition_exists(
schema_versions, data_format, schema_definition
):
if data_format in ["AVRO", "JSON"]:
for schema_version in schema_versions:
if json.loads(schema_definition) == json.loads(
schema_version.schema_definition
):
return schema_version.as_dict()
else:
for schema_version in schema_versions:
if schema_definition == schema_version.schema_definition:
return schema_version.as_dict()
return None

View File

@ -1,7 +1,7 @@
import time
import re
from collections import OrderedDict
from datetime import datetime
from uuid import uuid4
from moto.core import BaseBackend, BaseModel
from moto.core.models import get_account_id
@ -20,13 +20,19 @@ from .exceptions import (
VersionNotFoundException,
JobNotFoundException,
ConcurrentRunsExceededException,
GSRAlreadyExistsException,
ResourceNumberLimitExceededException,
ResourceNameTooLongException,
ParamValueContainsInvalidCharactersException,
InvalidNumberOfTagsException,
)
from .utils import PartitionFilter
from .glue_schema_registry_utils import (
validate_registry_id,
validate_schema_id,
validate_schema_params,
validate_schema_version_params,
get_schema_version_if_definition_exists,
validate_registry_params,
)
from .glue_schema_registry_constants import (
DEFAULT_REGISTRY_NAME,
)
from ..utilities.paginator import paginate
from ..utilities.tagging_service import TaggingService
@ -55,6 +61,8 @@ class GlueBackend(BaseBackend):
self.job_runs = OrderedDict()
self.tagger = TaggingService()
self.registries = OrderedDict()
self.num_schemas = 0
self.num_schema_versions = 0
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
@ -254,62 +262,120 @@ class GlueBackend(BaseBackend):
def untag_resource(self, resource_arn, tag_keys):
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
# TODO: @Himani. Will Refactor validation logic as I find the common validation required for other APIs
def create_registry(self, registry_name, description, tags):
operation_name = "CreateRegistry"
def create_registry(self, registry_name, description=None, tags=None):
"""CreateRegistry API"""
""" If registry name id default-registry, create default-registry """
if registry_name == DEFAULT_REGISTRY_NAME:
registry = FakeRegistry(registry_name, description, tags)
self.registries[registry_name] = registry
return registry
registry_name_pattern = re.compile(r"^[a-zA-Z0-9-_$#.]+$")
registry_description_pattern = re.compile(
r"[\\u0020-\\uD7FF\\uE000-\\uFFFD\\uD800\\uDC00-\\uDBFF\\uDFFF\\r\\n\\t]*"
)
max_registry_name_length = 255
max_registries_allowed = 10
max_description_length = 2048
max_tags_allowed = 50
if len(self.registries) >= max_registries_allowed:
raise ResourceNumberLimitExceededException(
operation_name, resource="registries"
)
if (
registry_name == ""
or len(registry_name.encode("utf-8")) > max_registry_name_length
):
param_name = "registryName"
raise ResourceNameTooLongException(operation_name, param_name)
if re.match(registry_name_pattern, registry_name) is None:
param_name = "registryName"
raise ParamValueContainsInvalidCharactersException(
operation_name, param_name
)
if registry_name in self.registries:
raise GSRAlreadyExistsException(
operation_name,
resource="Registry",
param_name="RegistryName",
param_value=registry_name,
)
if description and len(description.encode("utf-8")) > max_description_length:
param_name = "description"
raise ResourceNameTooLongException(operation_name, param_name)
if description and re.match(registry_description_pattern, description) is None:
param_name = "description"
raise ParamValueContainsInvalidCharactersException(
operation_name, param_name
)
if tags and len(tags) > max_tags_allowed:
raise InvalidNumberOfTagsException(operation_name)
""" Validate Registry Parameters """
validate_registry_params(self.registries, registry_name, description, tags)
registry = FakeRegistry(registry_name, description, tags)
self.registries[registry_name] = registry
return registry
return registry.as_dict()
def create_schema(
self,
registry_id,
schema_name,
data_format,
compatibility,
schema_definition,
description=None,
tags=None,
):
"""CrateSchema API"""
"""Validate Registry Id"""
registry_name = validate_registry_id(registry_id, self.registries)
if (
registry_name == DEFAULT_REGISTRY_NAME
and DEFAULT_REGISTRY_NAME not in self.registries
):
self.create_registry(registry_name)
registry = self.registries[registry_name]
""" Validate Schema Parameters """
validate_schema_params(
registry,
schema_name,
data_format,
compatibility,
schema_definition,
self.num_schemas,
description,
tags,
)
""" Create Schema """
schema_version = FakeSchemaVersion(
registry_name, schema_name, schema_definition, version_number=1
)
schema_version_id = schema_version.get_schema_version_id()
schema = FakeSchema(
registry_name,
schema_name,
data_format,
compatibility,
schema_version_id,
description,
tags,
)
registry.schemas[schema_name] = schema
self.num_schemas += 1
schema.schema_versions[schema.schema_version_id] = schema_version
self.num_schema_versions += 1
return schema.as_dict()
def register_schema_version(self, schema_id, schema_definition):
"""RegisterSchemaVersion API"""
""" Validate Schema Id """
registry_name, schema_name = validate_schema_id(schema_id, self.registries)
compatibility = (
self.registries[registry_name].schemas[schema_name].compatibility
)
data_format = self.registries[registry_name].schemas[schema_name].data_format
validate_schema_version_params(
registry_name,
schema_name,
self.num_schema_versions,
schema_definition,
compatibility,
data_format,
)
""" If the same schema definition is already stored in Schema Registry as a version,
the schema ID of the existing schema is returned to the caller. """
schema_versions = (
self.registries[registry_name].schemas[schema_name].schema_versions.values()
)
existing_schema_version = get_schema_version_if_definition_exists(
schema_versions, data_format, schema_definition
)
if existing_schema_version:
return existing_schema_version
""" Register Schema Version """
version_number = (
self.registries[registry_name]
.schemas[schema_name]
.get_next_schema_version()
)
self.registries[registry_name].schemas[schema_name].update_next_schema_version()
self.num_schema_versions += 1
schema_version = FakeSchemaVersion(
registry_name, schema_name, schema_definition, version_number
)
self.registries[registry_name].schemas[schema_name].schema_versions[
schema_version.schema_version_id
] = schema_version
return schema_version.as_dict()
class FakeDatabase(BaseModel):
@ -702,9 +768,11 @@ class FakeRegistry(BaseModel):
self.tags = tags
self.created_time = datetime.utcnow()
self.updated_time = datetime.utcnow()
self.status = "AVAILABLE"
self.registry_arn = (
f"arn:aws:glue:us-east-1:{get_account_id()}:registry/{self.name}"
)
self.schemas = OrderedDict()
def as_dict(self):
return {
@ -715,6 +783,85 @@ class FakeRegistry(BaseModel):
}
class FakeSchema(BaseModel):
def __init__(
self,
registry_name,
schema_name,
data_format,
compatibility,
schema_version_id,
description=None,
tags=None,
):
self.registry_name = registry_name
self.registry_arn = (
f"arn:aws:glue:us-east-1:{get_account_id()}:registry/{self.registry_name}"
)
self.schema_name = schema_name
self.schema_arn = f"arn:aws:glue:us-east-1:{get_account_id()}:schema/{self.registry_name}/{self.schema_name}"
self.description = description
self.data_format = data_format
self.compatibility = compatibility
self.schema_checkpoint = 1
self.latest_schema_version = 1
self.next_schema_version = 2
self.schema_status = "AVAILABLE"
self.tags = tags
self.schema_version_id = schema_version_id
self.schema_version_status = "AVAILABLE"
self.created_time = datetime.utcnow()
self.updated_time = datetime.utcnow()
self.schema_versions = OrderedDict()
def update_next_schema_version(self):
self.next_schema_version += 1
def get_next_schema_version(self):
return self.next_schema_version
def as_dict(self):
return {
"RegistryArn": self.registry_arn,
"RegistryName": self.registry_name,
"SchemaName": self.schema_name,
"SchemaArn": self.schema_arn,
"DataFormat": self.data_format,
"Compatibility": self.compatibility,
"SchemaCheckpoint": self.schema_checkpoint,
"LatestSchemaVersion": self.latest_schema_version,
"NextSchemaVersion": self.next_schema_version,
"SchemaStatus": self.schema_status,
"SchemaVersionId": self.schema_version_id,
"SchemaVersionStatus": self.schema_version_status,
"Description": self.description,
"Tags": self.tags,
}
class FakeSchemaVersion(BaseModel):
def __init__(self, registry_name, schema_name, schema_definition, version_number):
self.registry_name = registry_name
self.schema_name = schema_name
self.schema_arn = f"arn:aws:glue:us-east-1:{get_account_id()}:schema/{self.registry_name}/{self.schema_name}"
self.schema_definition = schema_definition
self.schema_version_status = "AVAILABLE"
self.version_number = version_number
self.schema_version_id = str(uuid4())
self.created_time = datetime.utcnow()
self.updated_time = datetime.utcnow()
def get_schema_version_id(self):
return self.schema_version_id
def as_dict(self):
return {
"SchemaVersionId": self.schema_version_id,
"VersionNumber": self.version_number,
"Status": self.schema_version_status,
}
glue_backends = BackendDict(
GlueBackend, "glue", use_boto3_regions=False, additional_regions=["global"]
)

View File

@ -460,4 +460,31 @@ class GlueResponse(BaseResponse):
description = self._get_param("Description")
tags = self._get_param("Tags")
registry = self.glue_backend.create_registry(registry_name, description, tags)
return json.dumps(registry.as_dict())
return json.dumps(registry)
def create_schema(self):
registry_id = self._get_param("RegistryId")
schema_name = self._get_param("SchemaName")
data_format = self._get_param("DataFormat")
compatibility = self._get_param("Compatibility")
description = self._get_param("Description")
tags = self._get_param("Tags")
schema_definition = self._get_param("SchemaDefinition")
schema = self.glue_backend.create_schema(
registry_id,
schema_name,
data_format,
compatibility,
schema_definition,
description,
tags,
)
return json.dumps(schema)
def register_schema_version(self):
schema_id = self._get_param("SchemaId")
schema_definition = self._get_param("SchemaDefinition")
schema_version = self.glue_backend.register_schema_version(
schema_id, schema_definition
)
return json.dumps(schema_version)

View File

@ -23,7 +23,10 @@ from pyparsing import (
pyparsing_common,
)
from .exceptions import InvalidInputException, InvalidStateException
from .exceptions import (
InvalidInputException,
InvalidStateException,
)
def _cast(type_: str, value: Any) -> Union[date, datetime, float, int, str]:

View File

@ -0,0 +1,162 @@
from moto.core import ACCOUNT_ID
DESCRIPTION = "test_description"
TAGS = {"key1": "value1", "key2": "value2"}
AVAILABLE_STATUS = "AVAILABLE"
REGISTRY_NAME = "TestRegistry"
REGISTRY_ARN = f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{REGISTRY_NAME}"
SCHEMA_NAME = "TestSchema"
REGISTRY_ID = {"RegistryName": f"{REGISTRY_NAME}"}
SCHEMA_ID = {"RegistryName": f"{REGISTRY_NAME}", "SchemaName": f"{SCHEMA_NAME}"}
SCHEMA_ARN = f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{REGISTRY_NAME}/{SCHEMA_NAME}"
AVRO_DATA_FORMAT = "AVRO"
JSON_DATA_FORMAT = "JSON"
PROTOBUF_DATA_FORMAT = "PROTOBUF"
BACKWARD_COMPATIBILITY = "BACKWARD"
DISABLED_COMPATIBILITY = "DISABLED"
AVRO_SCHEMA_DEFINITION = """{
"type": "record",
"namespace": "Moto_Test",
"name": "Person",
"fields": [
{
"name": "Name",
"type": "string"
},
{
"name": "Age",
"type": "int"
}
]
}"""
NEW_AVRO_SCHEMA_DEFINITION = """{
"type": "record",
"namespace": "Moto_Test",
"name": "Person",
"fields": [
{
"name": "Name",
"type": "string"
},
{
"name": "Age",
"type": "int"
},
{
"name": "address",
"type": "string",
"default": ""
}
]
}"""
JSON_SCHEMA_DEFINITION = """{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Person",
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
}
}
}"""
NEW_JSON_SCHEMA_DEFINITION = """{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Person",
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}"""
PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
package tutorial;
option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";
message Person {
optional string name = 1;
optional int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
optional string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phones = 4;
}"""
NEW_PROTOBUF_SCHEMA_DEFINITION = """syntax = "proto2";
package tutorial;
option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";
message Person {
optional string name = 1;
optional int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
optional string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phones = 4;
}
message AddressBook {
repeated Person people = 1;
}"""

View File

@ -1,6 +1,13 @@
import copy
from .fixtures.datacatalog import TABLE_INPUT, PARTITION_INPUT, DATABASE_INPUT
from .fixtures.schema_registry import (
REGISTRY_NAME,
SCHEMA_NAME,
BACKWARD_COMPATIBILITY,
AVRO_DATA_FORMAT,
AVRO_SCHEMA_DEFINITION,
)
def create_database_input(database_name):
@ -157,3 +164,23 @@ def create_crawler(
return client.create_crawler(
Name=crawler_name, Role=crawler_role, Targets=crawler_targets, **params
)
def create_registry(client, registry_name=REGISTRY_NAME):
return client.create_registry(RegistryName=registry_name)
def create_schema(
client,
registry_id,
data_format=AVRO_DATA_FORMAT,
compatibility=BACKWARD_COMPATIBILITY,
schema_definition=AVRO_SCHEMA_DEFINITION,
):
return client.create_schema(
RegistryId=registry_id,
SchemaName=SCHEMA_NAME,
DataFormat=data_format,
Compatibility=compatibility,
SchemaDefinition=schema_definition,
)

View File

@ -7,7 +7,6 @@ import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ParamValidationError
from botocore.client import ClientError
from moto.core import ACCOUNT_ID
from moto import mock_glue
@ -450,158 +449,3 @@ def test_untag_glue_crawler():
resp = client.get_tags(ResourceArn=resource_arn)
resp.should.have.key("Tags").equals({"key1": "value1", "key3": "value3"})
@mock_glue
def test_create_registry_valid_input():
client = create_glue_client()
registry_name = "TestRegistry"
response = client.create_registry(
RegistryName=registry_name,
Description="test_create_registry_description",
Tags={"key1": "value1", "key2": "value2"},
)
response.should.have.key("RegistryName").equals("TestRegistry")
response.should.have.key("Description").equals("test_create_registry_description")
response.should.have.key("Tags").equals({"key1": "value1", "key2": "value2"})
response.should.have.key("RegistryArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/" + registry_name
)
@mock_glue
def test_create_registry_valid_partial_input():
client = create_glue_client()
registry_name = "TestRegistry"
response = client.create_registry(RegistryName=registry_name)
response.should.have.key("RegistryName").equals("TestRegistry")
response.should.have.key("RegistryArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/" + registry_name
)
@mock_glue
def test_create_registry_invalid_input_registry_name_too_long():
client = create_glue_client()
registry_name = ""
for _ in range(90):
registry_name = registry_name + "foobar"
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName=registry_name,
Description="test_create_registry_description",
Tags={"key1": "value1", "key2": "value2"},
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"An error occurred (InvalidInputException) when calling the CreateRegistry operation: The resource name contains too many or too few characters. Parameter Name: registryName"
)
@mock_glue
def test_create_registry_more_than_allowed():
client = create_glue_client()
for i in range(10):
registry_name = "TestRegistry" + str(i)
client.create_registry(
RegistryName=registry_name,
Description="test_create_registry_description",
Tags={"key1": "value1", "key2": "value2"},
)
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName="TestRegistry10",
Description="test_create_registry_description10",
Tags={"key1": "value1", "key2": "value2"},
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNumberLimitExceededException")
err["Message"].should.equal(
"An error occurred (ResourceNumberLimitExceededException) when calling the CreateRegistry operation: More registries cannot be created. The maximum limit has been reached."
)
@mock_glue
def test_create_registry_invalid_registry_name():
client = create_glue_client()
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName="A,B,C",
Description="test_create_registry_description",
Tags={"key1": "value1", "key2": "value2"},
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"An error occurred (InvalidInputException) when calling the CreateRegistry operation: The parameter value contains one or more characters that are not valid. Parameter Name: registryName"
)
@mock_glue
def test_create_registry_already_exists():
client = create_glue_client()
client.create_registry(
RegistryName="TestRegistry1",
Description="test_create_registry_description1",
Tags={"key1": "value1", "key2": "value2"},
)
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName="TestRegistry1",
Description="test_create_registry_description1",
Tags={"key1": "value1", "key2": "value2"},
)
err = exc.value.response["Error"]
err["Code"].should.equal("AlreadyExistsException")
err["Message"].should.equal(
"An error occurred (AlreadyExistsException) when calling the CreateRegistry operation: Registry already exists. RegistryName: TestRegistry1"
)
@mock_glue
def test_create_registry_invalid_description_too_long():
client = create_glue_client()
description = ""
for _ in range(300):
description = description + "foobar, "
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName="TestRegistry1",
Description=description,
Tags={"key1": "value1", "key2": "value2"},
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"An error occurred (InvalidInputException) when calling the CreateRegistry operation: The resource name contains too many or too few characters. Parameter Name: description"
)
@mock_glue
def test_create_registry_invalid_number_of_tags():
tags = {}
for i in range(51):
key = "k" + str(i)
val = "v" + str(i)
tags[key] = val
client = create_glue_client()
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName="TestRegistry1",
Description="test_create_registry_description",
Tags=tags,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"An error occurred (InvalidInputException) when calling the CreateRegistry operation: New Tags cannot be empty or more than 50"
)

View File

@ -0,0 +1,681 @@
"""Unit tests for glue-schema-registry-supported APIs."""
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError
from moto.core import ACCOUNT_ID
from moto import mock_glue
from . import helpers
from .fixtures.schema_registry import (
REGISTRY_NAME,
REGISTRY_ARN,
DESCRIPTION,
TAGS,
SCHEMA_NAME,
SCHEMA_ARN,
AVRO_SCHEMA_DEFINITION,
NEW_AVRO_SCHEMA_DEFINITION,
JSON_SCHEMA_DEFINITION,
NEW_JSON_SCHEMA_DEFINITION,
PROTOBUF_SCHEMA_DEFINITION,
NEW_PROTOBUF_SCHEMA_DEFINITION,
AVRO_DATA_FORMAT,
JSON_DATA_FORMAT,
PROTOBUF_DATA_FORMAT,
REGISTRY_ID,
SCHEMA_ID,
BACKWARD_COMPATIBILITY,
DISABLED_COMPATIBILITY,
AVAILABLE_STATUS,
)
def create_glue_client():
return boto3.client("glue", region_name="us-east-1")
# Test create_registry
@mock_glue
def test_create_registry_valid_input():
client = create_glue_client()
response = client.create_registry(
RegistryName=REGISTRY_NAME, Description=DESCRIPTION, Tags=TAGS
)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("Description").equals(DESCRIPTION)
response.should.have.key("Tags").equals(TAGS)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
@mock_glue
def test_create_registry_valid_partial_input():
client = create_glue_client()
response = client.create_registry(RegistryName=REGISTRY_NAME)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
@mock_glue
def test_create_registry_invalid_registry_name_too_long():
client = create_glue_client()
registry_name = ""
for _ in range(80):
registry_name = registry_name + "toolong"
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=registry_name)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The resource name contains too many or too few characters. Parameter Name: registryName"
)
@mock_glue
def test_create_registry_more_than_allowed():
client = create_glue_client()
for i in range(10):
client.create_registry(RegistryName=REGISTRY_NAME + str(i))
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=REGISTRY_NAME)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNumberLimitExceededException")
err["Message"].should.equal(
"More registries cannot be created. The maximum limit has been reached."
)
@mock_glue
def test_create_registry_invalid_registry_name():
client = create_glue_client()
invalid_registry_name = "A,B,C"
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=invalid_registry_name)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: registryName"
)
@mock_glue
def test_create_registry_already_exists():
client = create_glue_client()
client.create_registry(RegistryName=REGISTRY_NAME)
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=REGISTRY_NAME)
err = exc.value.response["Error"]
err["Code"].should.equal("AlreadyExistsException")
err["Message"].should.equal(
"Registry already exists. RegistryName: " + REGISTRY_NAME
)
@mock_glue
def test_create_registry_invalid_description_too_long():
client = create_glue_client()
description = ""
for _ in range(350):
description = description + "toolong"
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName=REGISTRY_NAME,
Description=description,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The resource name contains too many or too few characters. Parameter Name: description"
)
@mock_glue
def test_create_registry_invalid_number_of_tags():
tags = {}
for i in range(51):
key = "k" + str(i)
val = "v" + str(i)
tags[key] = val
client = create_glue_client()
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName=REGISTRY_NAME,
Tags=tags,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal("New Tags cannot be empty or more than 50")
# Test create_schema
@mock_glue
def test_create_schema_valid_input_registry_name_avro():
client = create_glue_client()
helpers.create_registry(client)
response = client.create_schema(
RegistryId=REGISTRY_ID,
SchemaName=SCHEMA_NAME,
DataFormat=AVRO_DATA_FORMAT,
Compatibility=BACKWARD_COMPATIBILITY,
SchemaDefinition=AVRO_SCHEMA_DEFINITION,
Description=DESCRIPTION,
Tags=TAGS,
)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
response.should.have.key("SchemaName").equals(SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(SCHEMA_ARN)
response.should.have.key("Description").equals(DESCRIPTION)
response.should.have.key("DataFormat").equals(AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(AVAILABLE_STATUS)
@mock_glue
def test_create_schema_valid_input_registry_name_json():
client = create_glue_client()
helpers.create_registry(client)
response = client.create_schema(
RegistryId=REGISTRY_ID,
SchemaName=SCHEMA_NAME,
DataFormat=JSON_DATA_FORMAT,
Compatibility=BACKWARD_COMPATIBILITY,
SchemaDefinition=JSON_SCHEMA_DEFINITION,
Description=DESCRIPTION,
Tags=TAGS,
)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
response.should.have.key("SchemaName").equals(SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(SCHEMA_ARN)
response.should.have.key("Description").equals(DESCRIPTION)
response.should.have.key("DataFormat").equals(JSON_DATA_FORMAT)
response.should.have.key("Compatibility").equals(BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(AVAILABLE_STATUS)
@mock_glue
def test_create_schema_valid_input_registry_name_protobuf():
client = create_glue_client()
helpers.create_registry(client)
response = client.create_schema(
RegistryId=REGISTRY_ID,
SchemaName=SCHEMA_NAME,
DataFormat=PROTOBUF_DATA_FORMAT,
Compatibility=BACKWARD_COMPATIBILITY,
SchemaDefinition=PROTOBUF_SCHEMA_DEFINITION,
Description=DESCRIPTION,
Tags=TAGS,
)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
response.should.have.key("SchemaName").equals(SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(SCHEMA_ARN)
response.should.have.key("Description").equals(DESCRIPTION)
response.should.have.key("DataFormat").equals(PROTOBUF_DATA_FORMAT)
response.should.have.key("Compatibility").equals(BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(AVAILABLE_STATUS)
@mock_glue
def test_create_schema_valid_input_registry_arn():
client = create_glue_client()
helpers.create_registry(client)
registry_id = {"RegistryArn": f"{REGISTRY_ARN}"}
response = client.create_schema(
RegistryId=registry_id,
SchemaName=SCHEMA_NAME,
DataFormat=AVRO_DATA_FORMAT,
Compatibility=BACKWARD_COMPATIBILITY,
SchemaDefinition=AVRO_SCHEMA_DEFINITION,
Description=DESCRIPTION,
Tags=TAGS,
)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
response.should.have.key("SchemaName").equals(SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(SCHEMA_ARN)
response.should.have.key("Description").equals(DESCRIPTION)
response.should.have.key("DataFormat").equals(AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(AVAILABLE_STATUS)
@mock_glue
def test_create_schema_valid_partial_input():
client = create_glue_client()
helpers.create_registry(client)
response = helpers.create_schema(client, REGISTRY_ID)
response.should.have.key("RegistryName").equals(REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(REGISTRY_ARN)
response.should.have.key("SchemaName").equals(SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(SCHEMA_ARN)
response.should.have.key("DataFormat").equals(AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(AVAILABLE_STATUS)
response.should.have.key("SchemaStatus")
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(AVAILABLE_STATUS)
@mock_glue
def test_create_schema_valid_default_registry():
client = create_glue_client()
helpers.create_registry(client)
empty_registry_id = {}
response = helpers.create_schema(client, registry_id=empty_registry_id)
default_registry_name = "default-registry"
response.should.have.key("RegistryName").equals(default_registry_name)
response.should.have.key("RegistryArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{default_registry_name}"
)
response.should.have.key("SchemaName").equals(SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{default_registry_name}/{SCHEMA_NAME}"
)
response.should.have.key("DataFormat").equals(AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(AVAILABLE_STATUS)
response.should.have.key("SchemaStatus")
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(AVAILABLE_STATUS)
@mock_glue
def test_create_schema_invalid_registry_arn():
client = create_glue_client()
helpers.create_registry(client)
invalid_registry_arn = (
f"invalid:arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{REGISTRY_NAME}"
)
invalid_registry_id = {"RegistryArn": f"{invalid_registry_arn}"}
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, registry_id=invalid_registry_id)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: registryArn"
)
@mock_glue
def test_create_schema_invalid_registry_id_both_params_provided():
client = create_glue_client()
helpers.create_registry(client)
invalid_registry_id = {
"RegistryName": f"{REGISTRY_NAME}",
"RegistryArn": f"{REGISTRY_ARN}",
}
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, registry_id=invalid_registry_id)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"One of registryName or registryArn has to be provided, both cannot be provided."
)
@mock_glue
def test_create_schema_invalid_schema_name():
client = create_glue_client()
helpers.create_registry(client)
invalid_schema_name = "Invalid,Schema,Name"
with pytest.raises(ClientError) as exc:
client.create_schema(
RegistryId=REGISTRY_ID,
SchemaName=invalid_schema_name,
DataFormat=AVRO_DATA_FORMAT,
Compatibility=BACKWARD_COMPATIBILITY,
SchemaDefinition=AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: schemaName"
)
@mock_glue
def test_create_schema_invalid_schema_name_too_long():
client = create_glue_client()
helpers.create_registry(client)
invalid_schema_name = ""
for _ in range(80):
invalid_schema_name = invalid_schema_name + "toolong"
with pytest.raises(ClientError) as exc:
client.create_schema(
RegistryId=REGISTRY_ID,
SchemaName=invalid_schema_name,
DataFormat=AVRO_DATA_FORMAT,
Compatibility=BACKWARD_COMPATIBILITY,
SchemaDefinition=AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The resource name contains too many or too few characters. Parameter Name: schemaName"
)
@mock_glue
def test_create_schema_invalid_data_format():
client = create_glue_client()
helpers.create_registry(client)
invalid_data_format = "INVALID"
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, REGISTRY_ID, data_format=invalid_data_format)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal("Data format is not valid.")
@mock_glue
def test_create_schema_invalid_compatibility():
client = create_glue_client()
helpers.create_registry(client)
invalid_compatibility = "INVALID"
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, REGISTRY_ID, compatibility=invalid_compatibility)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal("Compatibility is not valid.")
@mock_glue
def test_create_schema_invalid_schema_definition():
client = create_glue_client()
helpers.create_registry(client)
invalid_schema_definition = """{
"type":: "record",
}"""
with pytest.raises(ClientError) as exc:
helpers.create_schema(
client, REGISTRY_ID, schema_definition=invalid_schema_definition
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
f"Schema definition of {AVRO_DATA_FORMAT} data format is invalid"
)
# test RegisterSchemaVersion
@mock_glue
def test_register_schema_version_valid_input_avro():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(client, REGISTRY_ID)
response = client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=NEW_AVRO_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_valid_input_json():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(
client,
REGISTRY_ID,
data_format="JSON",
schema_definition=JSON_SCHEMA_DEFINITION,
)
response = client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=NEW_JSON_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_valid_input_protobuf():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(
client,
REGISTRY_ID,
data_format="PROTOBUF",
schema_definition=PROTOBUF_SCHEMA_DEFINITION,
)
response = client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=NEW_PROTOBUF_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_valid_input_schema_arn():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(client, REGISTRY_ID)
schema_id = {"SchemaArn": SCHEMA_ARN}
response = client.register_schema_version(
SchemaId=schema_id, SchemaDefinition=NEW_AVRO_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_identical_schema_version_avro():
client = create_glue_client()
helpers.create_registry(client)
response = helpers.create_schema(client, REGISTRY_ID)
version_id = response["SchemaVersionId"]
response = client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=AVRO_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_identical_schema_version_json():
client = create_glue_client()
helpers.create_registry(client)
response = helpers.create_schema(
client,
REGISTRY_ID,
data_format=JSON_DATA_FORMAT,
schema_definition=JSON_SCHEMA_DEFINITION,
)
version_id = response["SchemaVersionId"]
response = client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=JSON_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_identical_schema_version_protobuf():
client = create_glue_client()
helpers.create_registry(client)
response = helpers.create_schema(
client,
REGISTRY_ID,
data_format=PROTOBUF_DATA_FORMAT,
schema_definition=PROTOBUF_SCHEMA_DEFINITION,
)
version_id = response["SchemaVersionId"]
response = client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=PROTOBUF_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(AVAILABLE_STATUS)
@mock_glue
def test_register_schema_version_invalid_registry_schema_does_not_exist():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(client, REGISTRY_ID)
invalid_schema_id = {
"RegistryName": "InvalidRegistryDoesNotExist",
"SchemaName": f"{SCHEMA_NAME}",
}
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=invalid_schema_id, SchemaDefinition=AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.equal("Schema is not found.")
@mock_glue
def test_register_schema_version_invalid_schema_schema_does_not_exist():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(client, REGISTRY_ID)
invalid_schema_id = {
"RegistryName": f"{REGISTRY_NAME}",
"SchemaName": "InvalidSchemaDoesNotExist",
}
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=invalid_schema_id, SchemaDefinition=AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.equal("Schema is not found.")
@mock_glue
def test_register_schema_version_invalid_compatibility_disabled():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(client, REGISTRY_ID, compatibility=DISABLED_COMPATIBILITY)
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"Compatibility DISABLED does not allow versioning. SchemaId: SchemaId(schemaName="
+ SCHEMA_NAME
+ ", registryName="
+ REGISTRY_NAME
+ ")"
)
@mock_glue
def test_register_schema_version_invalid_schema_definition():
client = create_glue_client()
helpers.create_registry(client)
helpers.create_schema(client, REGISTRY_ID, compatibility=DISABLED_COMPATIBILITY)
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=SCHEMA_ID, SchemaDefinition=AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have("Schema definition of JSON data format is invalid:")