moto/tests/test_glue/test_schema_registry.py

1260 lines
44 KiB
Python

"""Unit tests for glue-schema-registry-supported APIs."""
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto import mock_glue
from . import helpers
from .fixtures.schema_registry import (
TEST_REGISTRY_NAME,
TEST_REGISTRY_ARN,
TEST_DESCRIPTION,
TEST_TAGS,
TEST_SCHEMA_NAME,
TEST_SCHEMA_ARN,
TEST_AVRO_SCHEMA_DEFINITION,
TEST_NEW_AVRO_SCHEMA_DEFINITION,
TEST_JSON_SCHEMA_DEFINITION,
TEST_NEW_JSON_SCHEMA_DEFINITION,
TEST_PROTOBUF_SCHEMA_DEFINITION,
TEST_NEW_PROTOBUF_SCHEMA_DEFINITION,
TEST_AVRO_DATA_FORMAT,
TEST_JSON_DATA_FORMAT,
TEST_PROTOBUF_DATA_FORMAT,
TEST_REGISTRY_ID,
TEST_SCHEMA_ID,
TEST_BACKWARD_COMPATIBILITY,
TEST_DISABLED_COMPATIBILITY,
TEST_AVAILABLE_STATUS,
TEST_SCHEMA_VERSION_NUMBER,
TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION,
TEST_VERSION_ID,
TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST,
TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST,
TEST_METADATA_KEY_VALUE,
TEST_METADATA_KEY,
TEST_METADATA_VALUE,
TEST_DELETING_STATUS,
)
@pytest.fixture(name="client")
def fixture_client():
with mock_glue():
yield boto3.client("glue", region_name="us-east-1")
# Test create_registry
def test_create_registry_valid_input(client):
response = client.create_registry(
RegistryName=TEST_REGISTRY_NAME, Description=TEST_DESCRIPTION, Tags=TEST_TAGS
)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("Description").equals(TEST_DESCRIPTION)
response.should.have.key("Tags").equals(TEST_TAGS)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
def test_create_registry_valid_partial_input(client):
response = client.create_registry(RegistryName=TEST_REGISTRY_NAME)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
def test_create_registry_invalid_registry_name_too_long(client):
registry_name = ""
for _ in range(80):
registry_name = registry_name + "toolong"
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=registry_name)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The resource name contains too many or too few characters. Parameter Name: registryName"
)
def test_create_registry_more_than_allowed(client):
for i in range(10):
client.create_registry(RegistryName=TEST_REGISTRY_NAME + str(i))
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=TEST_REGISTRY_NAME)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNumberLimitExceededException")
err["Message"].should.equal(
"More registries cannot be created. The maximum limit has been reached."
)
def test_create_registry_invalid_registry_name(client):
invalid_registry_name = "A,B,C"
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=invalid_registry_name)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: registryName"
)
def test_create_registry_already_exists(client):
client.create_registry(RegistryName=TEST_REGISTRY_NAME)
with pytest.raises(ClientError) as exc:
client.create_registry(RegistryName=TEST_REGISTRY_NAME)
err = exc.value.response["Error"]
err["Code"].should.equal("AlreadyExistsException")
err["Message"].should.equal(
"Registry already exists. RegistryName: " + TEST_REGISTRY_NAME
)
def test_create_registry_invalid_description_too_long(client):
description = ""
for _ in range(350):
description = description + "toolong"
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName=TEST_REGISTRY_NAME,
Description=description,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The resource name contains too many or too few characters. Parameter Name: description"
)
def test_create_registry_invalid_number_of_tags(client):
tags = {}
for i in range(51):
key = "k" + str(i)
val = "v" + str(i)
tags[key] = val
with pytest.raises(ClientError) as exc:
client.create_registry(
RegistryName=TEST_REGISTRY_NAME,
Tags=tags,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal("New Tags cannot be empty or more than 50")
# Test create_schema
def test_create_schema_valid_input_registry_name_avro(client):
helpers.create_registry(client)
response = client.create_schema(
RegistryId=TEST_REGISTRY_ID,
SchemaName=TEST_SCHEMA_NAME,
DataFormat=TEST_AVRO_DATA_FORMAT,
Compatibility=TEST_BACKWARD_COMPATIBILITY,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
Description=TEST_DESCRIPTION,
Tags=TEST_TAGS,
)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("Description").equals(TEST_DESCRIPTION)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TEST_TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_valid_input_registry_name_json(client):
helpers.create_registry(client)
response = client.create_schema(
RegistryId=TEST_REGISTRY_ID,
SchemaName=TEST_SCHEMA_NAME,
DataFormat=TEST_JSON_DATA_FORMAT,
Compatibility=TEST_BACKWARD_COMPATIBILITY,
SchemaDefinition=TEST_JSON_SCHEMA_DEFINITION,
Description=TEST_DESCRIPTION,
Tags=TEST_TAGS,
)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("Description").equals(TEST_DESCRIPTION)
response.should.have.key("DataFormat").equals(TEST_JSON_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TEST_TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_valid_input_registry_name_protobuf(client):
helpers.create_registry(client)
response = client.create_schema(
RegistryId=TEST_REGISTRY_ID,
SchemaName=TEST_SCHEMA_NAME,
DataFormat=TEST_PROTOBUF_DATA_FORMAT,
Compatibility=TEST_BACKWARD_COMPATIBILITY,
SchemaDefinition=TEST_PROTOBUF_SCHEMA_DEFINITION,
Description=TEST_DESCRIPTION,
Tags=TEST_TAGS,
)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("Description").equals(TEST_DESCRIPTION)
response.should.have.key("DataFormat").equals(TEST_PROTOBUF_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TEST_TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_valid_input_registry_arn(client):
helpers.create_registry(client)
registry_id = {"RegistryArn": f"{TEST_REGISTRY_ARN}"}
response = client.create_schema(
RegistryId=registry_id,
SchemaName=TEST_SCHEMA_NAME,
DataFormat=TEST_AVRO_DATA_FORMAT,
Compatibility=TEST_BACKWARD_COMPATIBILITY,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
Description=TEST_DESCRIPTION,
Tags=TEST_TAGS,
)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("Description").equals(TEST_DESCRIPTION)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("Tags").equals(TEST_TAGS)
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_valid_partial_input(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("SchemaStatus")
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_valid_default_registry(client):
helpers.create_registry(client)
empty_registry_id = {}
response = helpers.create_schema(client, registry_id=empty_registry_id)
default_registry_name = "default-registry"
response.should.have.key("RegistryName").equals(default_registry_name)
response.should.have.key("RegistryArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{default_registry_name}"
)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{default_registry_name}/{TEST_SCHEMA_NAME}"
)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("SchemaStatus")
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_valid_default_registry_in_registry_id(client):
helpers.create_registry(client)
default_registry_name = "default-registry"
registry_id_default_registry = {"RegistryName": f"{default_registry_name}"}
response = helpers.create_schema(client, registry_id=registry_id_default_registry)
response.should.have.key("RegistryName").equals(default_registry_name)
response.should.have.key("RegistryArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{default_registry_name}"
)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("SchemaArn").equals(
f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{default_registry_name}/{TEST_SCHEMA_NAME}"
)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
response.should.have.key("SchemaCheckpoint").equals(1)
response.should.have.key("LatestSchemaVersion").equals(1)
response.should.have.key("NextSchemaVersion").equals(2)
response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("SchemaStatus")
response.should.have.key("SchemaVersionId")
response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)
def test_create_schema_invalid_registry_arn(client):
helpers.create_registry(client)
invalid_registry_arn = (
f"invalid:arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{TEST_REGISTRY_NAME}"
)
invalid_registry_id = {"RegistryArn": f"{invalid_registry_arn}"}
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, registry_id=invalid_registry_id)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: registryArn"
)
def test_create_schema_invalid_registry_id_both_params_provided(client):
helpers.create_registry(client)
invalid_registry_id = {
"RegistryName": f"{TEST_REGISTRY_NAME}",
"RegistryArn": f"{TEST_REGISTRY_ARN}",
}
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, registry_id=invalid_registry_id)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"One of registryName or registryArn has to be provided, both cannot be provided."
)
def test_create_schema_invalid_schema_name(client):
helpers.create_registry(client)
invalid_schema_name = "Invalid,Schema,Name"
with pytest.raises(ClientError) as exc:
client.create_schema(
RegistryId=TEST_REGISTRY_ID,
SchemaName=invalid_schema_name,
DataFormat=TEST_AVRO_DATA_FORMAT,
Compatibility=TEST_BACKWARD_COMPATIBILITY,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: schemaName"
)
def test_create_schema_invalid_schema_name_too_long(client):
helpers.create_registry(client)
invalid_schema_name = ""
for _ in range(80):
invalid_schema_name = invalid_schema_name + "toolong"
with pytest.raises(ClientError) as exc:
client.create_schema(
RegistryId=TEST_REGISTRY_ID,
SchemaName=invalid_schema_name,
DataFormat=TEST_AVRO_DATA_FORMAT,
Compatibility=TEST_BACKWARD_COMPATIBILITY,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The resource name contains too many or too few characters. Parameter Name: schemaName"
)
def test_create_schema_invalid_data_format(client):
helpers.create_registry(client)
invalid_data_format = "INVALID"
with pytest.raises(ClientError) as exc:
helpers.create_schema(client, TEST_REGISTRY_ID, data_format=invalid_data_format)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal("Data format is not valid.")
def test_create_schema_invalid_compatibility(client):
helpers.create_registry(client)
invalid_compatibility = "INVALID"
with pytest.raises(ClientError) as exc:
helpers.create_schema(
client, TEST_REGISTRY_ID, compatibility=invalid_compatibility
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal("Compatibility is not valid.")
def test_create_schema_invalid_schema_definition(client):
helpers.create_registry(client)
invalid_schema_definition = """{
"type":: "record",
}"""
with pytest.raises(ClientError) as exc:
helpers.create_schema(
client, TEST_REGISTRY_ID, schema_definition=invalid_schema_definition
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
f"Schema definition of {TEST_AVRO_DATA_FORMAT} data format is invalid"
)
# test register_schema_version
def test_register_schema_version_valid_input_avro(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
response = client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_valid_input_json(client):
helpers.create_registry(client)
helpers.create_schema(
client,
TEST_REGISTRY_ID,
data_format="JSON",
schema_definition=TEST_JSON_SCHEMA_DEFINITION,
)
response = client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_JSON_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_valid_input_protobuf(client):
helpers.create_registry(client)
helpers.create_schema(
client,
TEST_REGISTRY_ID,
data_format="PROTOBUF",
schema_definition=TEST_PROTOBUF_SCHEMA_DEFINITION,
)
response = client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_PROTOBUF_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_valid_input_schema_arn(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
schema_id = {"SchemaArn": TEST_SCHEMA_ARN}
response = client.register_schema_version(
SchemaId=schema_id, SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId")
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_identical_schema_version_avro(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
response = client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_identical_schema_version_json(client):
helpers.create_registry(client)
response = helpers.create_schema(
client,
TEST_REGISTRY_ID,
data_format=TEST_JSON_DATA_FORMAT,
schema_definition=TEST_JSON_SCHEMA_DEFINITION,
)
version_id = response["SchemaVersionId"]
response = client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_JSON_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_identical_schema_version_protobuf(client):
helpers.create_registry(client)
response = helpers.create_schema(
client,
TEST_REGISTRY_ID,
data_format=TEST_PROTOBUF_DATA_FORMAT,
schema_definition=TEST_PROTOBUF_SCHEMA_DEFINITION,
)
version_id = response["SchemaVersionId"]
response = client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_PROTOBUF_SCHEMA_DEFINITION
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
def test_register_schema_version_invalid_registry_schema_does_not_exist(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
def test_register_schema_version_invalid_schema_schema_does_not_exist(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
def test_register_schema_version_invalid_compatibility_disabled(client):
helpers.create_registry(client)
helpers.create_schema(
client, TEST_REGISTRY_ID, compatibility=TEST_DISABLED_COMPATIBILITY
)
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"Compatibility DISABLED does not allow versioning. SchemaId: SchemaId(schemaArn=null"
+ ", schemaName="
+ TEST_SCHEMA_NAME
+ ", registryName="
+ TEST_REGISTRY_NAME
+ ")"
)
def test_register_schema_version_invalid_schema_definition(client):
helpers.create_registry(client)
helpers.create_schema(
client, TEST_REGISTRY_ID, compatibility=TEST_DISABLED_COMPATIBILITY
)
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have("Schema definition of JSON data format is invalid:")
def test_register_schema_version_invalid_schema_id(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
invalid_schema_id = {
"SchemaArn": TEST_SCHEMA_ARN,
"RegistryName": TEST_REGISTRY_NAME,
"SchemaName": TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST,
}
with pytest.raises(ClientError) as exc:
client.register_schema_version(
SchemaId=invalid_schema_id, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"One of (registryName and schemaName) or schemaArn has to be provided, both cannot be provided."
)
# test get_schema_version
def test_get_schema_version_valid_input_schema_version_id(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
response = client.get_schema_version(
SchemaVersionId=version_id,
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("SchemaDefinition").equals(TEST_AVRO_SCHEMA_DEFINITION)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("CreatedTime")
def test_get_schema_version_valid_input_version_number(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
response = client.get_schema_version(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("SchemaDefinition").equals(TEST_AVRO_SCHEMA_DEFINITION)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("CreatedTime")
def test_get_schema_version_valid_input_version_number_latest_version(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
response = helpers.register_schema_version(client)
version_id = response["SchemaVersionId"]
response = client.get_schema_version(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION,
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("SchemaDefinition").equals(TEST_NEW_AVRO_SCHEMA_DEFINITION)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("VersionNumber").equals(2)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("CreatedTime")
def test_get_schema_version_empty_input(client):
with pytest.raises(ClientError) as exc:
client.get_schema_version()
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
"At least one of (registryName and schemaName) or schemaArn has to be provided."
)
def test_get_schema_version_invalid_schema_id_schema_version_number_both_provided(
client,
):
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionId=TEST_VERSION_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"No other input parameters can be specified when fetching by SchemaVersionId."
)
def test_get_schema_version_insufficient_params_provided(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaId=TEST_SCHEMA_ID,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"One of version number (or) latest version is required."
)
def test_get_schema_version_invalid_schema_version_number(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
invalid_schema_version_number = {"VersionNumber": 1, "LatestVersion": True}
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=invalid_schema_version_number,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"Only one of VersionNumber or LatestVersion is required."
)
def test_get_schema_version_invalid_version_number(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
invalid_schema_version_number = {"VersionNumber": 2}
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=invalid_schema_version_number,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
def test_get_schema_version_invalid_schema_id_schema_name(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaId=TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.equal(
f"Schema is not found. RegistryName: {TEST_REGISTRY_NAME}, SchemaName: {TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST}, SchemaArn: null"
)
def test_get_schema_version_invalid_schema_id_registry_name(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaId=TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
def test_get_schema_version_invalid_schema_version(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
invalid_schema_version_id = "00000000-0000-0000-0000-00000000000p"
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaVersionId=invalid_schema_version_id,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.equal(
"The parameter value contains one or more characters that are not valid. Parameter Name: SchemaVersionId"
)
# Test get_schema_by_definition
def test_get_schema_by_definition_valid_input(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
response = client.get_schema_by_definition(
SchemaId=TEST_SCHEMA_ID,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
response.should.have.key("CreatedTime")
def test_get_schema_by_definition_invalid_schema_id_schema_does_not_exist(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.get_schema_by_definition(
SchemaId=TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
def test_get_schema_by_definition_invalid_schema_definition_does_not_exist(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
with pytest.raises(ClientError) as exc:
client.get_schema_by_definition(
SchemaId=TEST_SCHEMA_ID,
SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
# test put_schema_version_metadata
def test_put_schema_version_metadata_valid_input_schema_version_number(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
response = client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
response.should.have.key("LatestVersion").equals(False)
response.should.have.key("VersionNumber").equals(1)
response.should.have.key("MetadataKey").equals(
TEST_METADATA_KEY_VALUE["MetadataKey"]
)
response.should.have.key("MetadataValue").equals(
TEST_METADATA_KEY_VALUE["MetadataValue"]
)
def test_put_schema_version_metadata_valid_input_schema_version_id(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
response = client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
response.should.have.key("SchemaVersionId").equals(version_id)
response.should.have.key("LatestVersion").equals(False)
response.should.have.key("VersionNumber").equals(0)
response.should.have.key("MetadataKey").equals(TEST_METADATA_KEY)
response.should.have.key("MetadataValue").equals(TEST_METADATA_VALUE)
def test_put_schema_version_metadata_more_than_allowed_schema_version_id(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
for i in range(10):
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue={
"MetadataKey": f"test_metadata_key{i}",
"MetadataValue": f"test_metadata_value{i}",
},
)
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNumberLimitExceededException")
err["Message"].should.equal(
"Your resource limits for Schema Version Metadata have been exceeded."
)
def test_put_schema_version_metadata_more_than_allowed_schema_version_id_same_key(
client,
):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
for i in range(10):
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue={
"MetadataKey": "test_metadata_key",
"MetadataValue": f"test_metadata_value{i}",
},
)
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNumberLimitExceededException")
err["Message"].should.equal(
"Your resource limits for Schema Version Metadata have been exceeded."
)
def test_put_schema_version_metadata_already_exists_schema_version_id(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
err = exc.value.response["Error"]
err["Code"].should.equal("AlreadyExistsException")
err["Message"].should.equal(
f"Resource already exist for schema version id: {version_id}, metadata key: {TEST_METADATA_KEY}, metadata value: {TEST_METADATA_VALUE}"
)
def test_put_schema_version_metadata_invalid_characters_metadata_key_schema_version_id(
client,
):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
invalid_metadata_key = {
"MetadataKey": "invalid~metadata~key",
"MetadataValue": TEST_METADATA_VALUE,
}
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=invalid_metadata_key,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
"key contains one or more characters that are not valid."
)
def test_put_schema_version_metadata_invalid_characters_metadata_value_schema_version_id(
client,
):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
invalid_metadata_value = {
"MetadataKey": TEST_METADATA_KEY,
"MetadataValue": "invalid~metadata~value",
}
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaVersionId=version_id,
MetadataKeyValue=invalid_metadata_value,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
"value contains one or more characters that are not valid."
)
def test_put_schema_version_metadata_more_than_allowed_schema_version_number(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
for i in range(10):
client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue={
"MetadataKey": f"test_metadata_key{i}",
"MetadataValue": f"test_metadata_value{i}",
},
)
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNumberLimitExceededException")
err["Message"].should.equal(
"Your resource limits for Schema Version Metadata have been exceeded."
)
def test_put_schema_version_metadata_already_exists_schema_version_number(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue=TEST_METADATA_KEY_VALUE,
)
err = exc.value.response["Error"]
err["Code"].should.equal("AlreadyExistsException")
err["Message"].should.equal(
f"Resource already exist for schema version id: {version_id}, metadata key: {TEST_METADATA_KEY}, metadata value: {TEST_METADATA_VALUE}"
)
def test_put_schema_version_metadata_invalid_characters_metadata_key_schema_version_number(
client,
):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
invalid_metadata_key = {
"MetadataKey": "invalid~metadata~key",
"MetadataValue": TEST_METADATA_VALUE,
}
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue=invalid_metadata_key,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
"key contains one or more characters that are not valid."
)
def test_put_schema_version_metadata_invalid_characters_metadata_value_schema_version_number(
client,
):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
invalid_metadata_value = {
"MetadataKey": TEST_METADATA_KEY,
"MetadataValue": "invalid~metadata~value",
}
with pytest.raises(ClientError) as exc:
client.put_schema_version_metadata(
SchemaId=TEST_SCHEMA_ID,
SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
MetadataKeyValue=invalid_metadata_value,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidInputException")
err["Message"].should.have(
"value contains one or more characters that are not valid."
)
# test delete_schema
def test_delete_schema_valid_input(client):
helpers.create_registry(client)
helpers.create_schema(client, TEST_REGISTRY_ID)
response = client.delete_schema(
SchemaId=TEST_SCHEMA_ID,
)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("Status").equals(TEST_DELETING_STATUS)
def test_delete_schema_valid_input_schema_arn(client):
helpers.create_registry(client)
response = helpers.create_schema(client, TEST_REGISTRY_ID)
version_id = response["SchemaVersionId"]
schema_id = {"SchemaArn": f"{TEST_SCHEMA_ARN}"}
response = client.delete_schema(
SchemaId=schema_id,
)
response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
response.should.have.key("Status").equals(TEST_DELETING_STATUS)
with pytest.raises(ClientError) as exc:
client.get_schema_version(
SchemaVersionId=version_id,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have("Schema is not found.")
def test_delete_schema_schema_not_found(client):
helpers.create_registry(client)
with pytest.raises(ClientError) as exc:
client.delete_schema(
SchemaId=TEST_SCHEMA_ID,
)
err = exc.value.response["Error"]
err["Code"].should.equal("EntityNotFoundException")
err["Message"].should.have(
f"Schema is not found. RegistryName: {TEST_REGISTRY_NAME}, SchemaName: {TEST_SCHEMA_NAME}, SchemaArn: null"
)