"""Unit tests for glue-schema-registry-supported APIs."""
import boto3
import pytest
import sure  # noqa # pylint: disable=unused-import
from botocore.client import ClientError
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID

from moto import mock_glue

from . import helpers
from .fixtures.schema_registry import (
    TEST_REGISTRY_NAME,
    TEST_REGISTRY_ARN,
    TEST_DESCRIPTION,
    TEST_TAGS,
    TEST_SCHEMA_NAME,
    TEST_SCHEMA_ARN,
    TEST_AVRO_SCHEMA_DEFINITION,
    TEST_NEW_AVRO_SCHEMA_DEFINITION,
    TEST_JSON_SCHEMA_DEFINITION,
    TEST_NEW_JSON_SCHEMA_DEFINITION,
    TEST_PROTOBUF_SCHEMA_DEFINITION,
    TEST_NEW_PROTOBUF_SCHEMA_DEFINITION,
    TEST_AVRO_DATA_FORMAT,
    TEST_JSON_DATA_FORMAT,
    TEST_PROTOBUF_DATA_FORMAT,
    TEST_REGISTRY_ID,
    TEST_SCHEMA_ID,
    TEST_BACKWARD_COMPATIBILITY,
    TEST_DISABLED_COMPATIBILITY,
    TEST_AVAILABLE_STATUS,
    TEST_SCHEMA_VERSION_NUMBER,
    TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION,
    TEST_VERSION_ID,
    TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST,
    TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
    TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST,
    TEST_METADATA_KEY_VALUE,
    TEST_METADATA_KEY,
    TEST_METADATA_VALUE,
    TEST_DELETING_STATUS,
)


@pytest.fixture(name="client")
def fixture_client():
    with mock_glue():
        yield boto3.client("glue", region_name="us-east-1")


# Test create_registry
def test_create_registry_valid_input(client):
    response = client.create_registry(
        RegistryName=TEST_REGISTRY_NAME, Description=TEST_DESCRIPTION, Tags=TEST_TAGS
    )
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("Description").equals(TEST_DESCRIPTION)
    response.should.have.key("Tags").equals(TEST_TAGS)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)


def test_create_registry_valid_partial_input(client):
    response = client.create_registry(RegistryName=TEST_REGISTRY_NAME)
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)


def test_create_registry_invalid_registry_name_too_long(client):
    registry_name = ""
    for _ in range(80):
        registry_name = registry_name + "toolong"

    with pytest.raises(ClientError) as exc:
        client.create_registry(RegistryName=registry_name)
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The resource name contains too many or too few characters. Parameter Name: registryName"
    )


def test_create_registry_more_than_allowed(client):
    for i in range(10):
        client.create_registry(RegistryName=TEST_REGISTRY_NAME + str(i))

    with pytest.raises(ClientError) as exc:
        client.create_registry(RegistryName=TEST_REGISTRY_NAME)

    err = exc.value.response["Error"]
    err["Code"].should.equal("ResourceNumberLimitExceededException")
    err["Message"].should.equal(
        "More registries cannot be created. The maximum limit has been reached."
    )


def test_create_registry_invalid_registry_name(client):
    invalid_registry_name = "A,B,C"

    with pytest.raises(ClientError) as exc:
        client.create_registry(RegistryName=invalid_registry_name)
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The parameter value contains one or more characters that are not valid. Parameter Name: registryName"
    )


def test_create_registry_already_exists(client):
    client.create_registry(RegistryName=TEST_REGISTRY_NAME)

    with pytest.raises(ClientError) as exc:
        client.create_registry(RegistryName=TEST_REGISTRY_NAME)
    err = exc.value.response["Error"]
    err["Code"].should.equal("AlreadyExistsException")
    err["Message"].should.equal(
        "Registry already exists. RegistryName: " + TEST_REGISTRY_NAME
    )


def test_create_registry_invalid_description_too_long(client):
    description = ""
    for _ in range(350):
        description = description + "toolong"

    with pytest.raises(ClientError) as exc:
        client.create_registry(
            RegistryName=TEST_REGISTRY_NAME,
            Description=description,
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The resource name contains too many or too few characters. Parameter Name: description"
    )


def test_create_registry_invalid_number_of_tags(client):
    tags = {}
    for i in range(51):
        key = "k" + str(i)
        val = "v" + str(i)
        tags[key] = val

    with pytest.raises(ClientError) as exc:
        client.create_registry(
            RegistryName=TEST_REGISTRY_NAME,
            Tags=tags,
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal("New Tags cannot be empty or more than 50")


# Test create_schema
def test_create_schema_valid_input_registry_name_avro(client):
    helpers.create_registry(client)

    response = client.create_schema(
        RegistryId=TEST_REGISTRY_ID,
        SchemaName=TEST_SCHEMA_NAME,
        DataFormat=TEST_AVRO_DATA_FORMAT,
        Compatibility=TEST_BACKWARD_COMPATIBILITY,
        SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        Description=TEST_DESCRIPTION,
        Tags=TEST_TAGS,
    )
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("Description").equals(TEST_DESCRIPTION)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("Tags").equals(TEST_TAGS)
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_valid_input_registry_name_json(client):
    helpers.create_registry(client)

    response = client.create_schema(
        RegistryId=TEST_REGISTRY_ID,
        SchemaName=TEST_SCHEMA_NAME,
        DataFormat=TEST_JSON_DATA_FORMAT,
        Compatibility=TEST_BACKWARD_COMPATIBILITY,
        SchemaDefinition=TEST_JSON_SCHEMA_DEFINITION,
        Description=TEST_DESCRIPTION,
        Tags=TEST_TAGS,
    )
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("Description").equals(TEST_DESCRIPTION)
    response.should.have.key("DataFormat").equals(TEST_JSON_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("Tags").equals(TEST_TAGS)
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_valid_input_registry_name_protobuf(client):
    helpers.create_registry(client)

    response = client.create_schema(
        RegistryId=TEST_REGISTRY_ID,
        SchemaName=TEST_SCHEMA_NAME,
        DataFormat=TEST_PROTOBUF_DATA_FORMAT,
        Compatibility=TEST_BACKWARD_COMPATIBILITY,
        SchemaDefinition=TEST_PROTOBUF_SCHEMA_DEFINITION,
        Description=TEST_DESCRIPTION,
        Tags=TEST_TAGS,
    )
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("Description").equals(TEST_DESCRIPTION)
    response.should.have.key("DataFormat").equals(TEST_PROTOBUF_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("Tags").equals(TEST_TAGS)
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_valid_input_registry_arn(client):
    helpers.create_registry(client)

    registry_id = {"RegistryArn": f"{TEST_REGISTRY_ARN}"}
    response = client.create_schema(
        RegistryId=registry_id,
        SchemaName=TEST_SCHEMA_NAME,
        DataFormat=TEST_AVRO_DATA_FORMAT,
        Compatibility=TEST_BACKWARD_COMPATIBILITY,
        SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        Description=TEST_DESCRIPTION,
        Tags=TEST_TAGS,
    )
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("Description").equals(TEST_DESCRIPTION)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("Tags").equals(TEST_TAGS)
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_valid_partial_input(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("RegistryArn").equals(TEST_REGISTRY_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("SchemaStatus")
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_valid_default_registry(client):
    helpers.create_registry(client)

    empty_registry_id = {}

    response = helpers.create_schema(client, registry_id=empty_registry_id)
    default_registry_name = "default-registry"
    response.should.have.key("RegistryName").equals(default_registry_name)
    response.should.have.key("RegistryArn").equals(
        f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{default_registry_name}"
    )
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(
        f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{default_registry_name}/{TEST_SCHEMA_NAME}"
    )
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("SchemaStatus")
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_valid_default_registry_in_registry_id(client):
    helpers.create_registry(client)

    default_registry_name = "default-registry"
    registry_id_default_registry = {"RegistryName": f"{default_registry_name}"}

    response = helpers.create_schema(client, registry_id=registry_id_default_registry)

    response.should.have.key("RegistryName").equals(default_registry_name)
    response.should.have.key("RegistryArn").equals(
        f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{default_registry_name}"
    )
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("SchemaArn").equals(
        f"arn:aws:glue:us-east-1:{ACCOUNT_ID}:schema/{default_registry_name}/{TEST_SCHEMA_NAME}"
    )
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("Compatibility").equals(TEST_BACKWARD_COMPATIBILITY)
    response.should.have.key("SchemaCheckpoint").equals(1)
    response.should.have.key("LatestSchemaVersion").equals(1)
    response.should.have.key("NextSchemaVersion").equals(2)
    response.should.have.key("SchemaStatus").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("SchemaStatus")
    response.should.have.key("SchemaVersionId")
    response.should.have.key("SchemaVersionStatus").equals(TEST_AVAILABLE_STATUS)


def test_create_schema_invalid_registry_arn(client):
    helpers.create_registry(client)

    invalid_registry_arn = (
        f"invalid:arn:aws:glue:us-east-1:{ACCOUNT_ID}:registry/{TEST_REGISTRY_NAME}"
    )
    invalid_registry_id = {"RegistryArn": f"{invalid_registry_arn}"}

    with pytest.raises(ClientError) as exc:
        helpers.create_schema(client, registry_id=invalid_registry_id)
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The parameter value contains one or more characters that are not valid. Parameter Name: registryArn"
    )


def test_create_schema_invalid_registry_id_both_params_provided(client):
    helpers.create_registry(client)

    invalid_registry_id = {
        "RegistryName": f"{TEST_REGISTRY_NAME}",
        "RegistryArn": f"{TEST_REGISTRY_ARN}",
    }

    with pytest.raises(ClientError) as exc:
        helpers.create_schema(client, registry_id=invalid_registry_id)
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "One of registryName or registryArn has to be provided, both cannot be provided."
    )


def test_create_schema_invalid_schema_name(client):
    helpers.create_registry(client)

    invalid_schema_name = "Invalid,Schema,Name"

    with pytest.raises(ClientError) as exc:
        client.create_schema(
            RegistryId=TEST_REGISTRY_ID,
            SchemaName=invalid_schema_name,
            DataFormat=TEST_AVRO_DATA_FORMAT,
            Compatibility=TEST_BACKWARD_COMPATIBILITY,
            SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The parameter value contains one or more characters that are not valid. Parameter Name: schemaName"
    )


def test_create_schema_invalid_schema_name_too_long(client):
    helpers.create_registry(client)

    invalid_schema_name = ""
    for _ in range(80):
        invalid_schema_name = invalid_schema_name + "toolong"

    with pytest.raises(ClientError) as exc:
        client.create_schema(
            RegistryId=TEST_REGISTRY_ID,
            SchemaName=invalid_schema_name,
            DataFormat=TEST_AVRO_DATA_FORMAT,
            Compatibility=TEST_BACKWARD_COMPATIBILITY,
            SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The resource name contains too many or too few characters. Parameter Name: schemaName"
    )


def test_create_schema_invalid_data_format(client):
    helpers.create_registry(client)

    invalid_data_format = "INVALID"

    with pytest.raises(ClientError) as exc:
        helpers.create_schema(client, TEST_REGISTRY_ID, data_format=invalid_data_format)
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal("Data format is not valid.")


def test_create_schema_invalid_compatibility(client):
    helpers.create_registry(client)

    invalid_compatibility = "INVALID"

    with pytest.raises(ClientError) as exc:
        helpers.create_schema(
            client, TEST_REGISTRY_ID, compatibility=invalid_compatibility
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal("Compatibility is not valid.")


def test_create_schema_invalid_schema_definition(client):
    helpers.create_registry(client)

    invalid_schema_definition = """{
                            "type":: "record",
                        }"""

    with pytest.raises(ClientError) as exc:
        helpers.create_schema(
            client, TEST_REGISTRY_ID, schema_definition=invalid_schema_definition
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have(
        f"Schema definition of {TEST_AVRO_DATA_FORMAT} data format is invalid"
    )


# test register_schema_version
def test_register_schema_version_valid_input_avro(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    response = client.register_schema_version(
        SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId")
    response.should.have.key("VersionNumber").equals(2)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_valid_input_json(client):
    helpers.create_registry(client)

    helpers.create_schema(
        client,
        TEST_REGISTRY_ID,
        data_format="JSON",
        schema_definition=TEST_JSON_SCHEMA_DEFINITION,
    )

    response = client.register_schema_version(
        SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_JSON_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId")
    response.should.have.key("VersionNumber").equals(2)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_valid_input_protobuf(client):
    helpers.create_registry(client)

    helpers.create_schema(
        client,
        TEST_REGISTRY_ID,
        data_format="PROTOBUF",
        schema_definition=TEST_PROTOBUF_SCHEMA_DEFINITION,
    )

    response = client.register_schema_version(
        SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_NEW_PROTOBUF_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId")
    response.should.have.key("VersionNumber").equals(2)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_valid_input_schema_arn(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    schema_id = {"SchemaArn": TEST_SCHEMA_ARN}
    response = client.register_schema_version(
        SchemaId=schema_id, SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId")
    response.should.have.key("VersionNumber").equals(2)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_identical_schema_version_avro(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)

    version_id = response["SchemaVersionId"]

    response = client.register_schema_version(
        SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("VersionNumber").equals(1)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_identical_schema_version_json(client):
    helpers.create_registry(client)

    response = helpers.create_schema(
        client,
        TEST_REGISTRY_ID,
        data_format=TEST_JSON_DATA_FORMAT,
        schema_definition=TEST_JSON_SCHEMA_DEFINITION,
    )

    version_id = response["SchemaVersionId"]

    response = client.register_schema_version(
        SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_JSON_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("VersionNumber").equals(1)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_identical_schema_version_protobuf(client):
    helpers.create_registry(client)

    response = helpers.create_schema(
        client,
        TEST_REGISTRY_ID,
        data_format=TEST_PROTOBUF_DATA_FORMAT,
        schema_definition=TEST_PROTOBUF_SCHEMA_DEFINITION,
    )

    version_id = response["SchemaVersionId"]

    response = client.register_schema_version(
        SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_PROTOBUF_SCHEMA_DEFINITION
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("VersionNumber").equals(1)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)


def test_register_schema_version_invalid_registry_schema_does_not_exist(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.register_schema_version(
            SchemaId=TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST,
            SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


def test_register_schema_version_invalid_schema_schema_does_not_exist(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.register_schema_version(
            SchemaId=TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
            SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


def test_register_schema_version_invalid_compatibility_disabled(client):
    helpers.create_registry(client)

    helpers.create_schema(
        client, TEST_REGISTRY_ID, compatibility=TEST_DISABLED_COMPATIBILITY
    )

    with pytest.raises(ClientError) as exc:
        client.register_schema_version(
            SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "Compatibility DISABLED does not allow versioning. SchemaId: SchemaId(schemaArn=null"
        + ", schemaName="
        + TEST_SCHEMA_NAME
        + ", registryName="
        + TEST_REGISTRY_NAME
        + ")"
    )


def test_register_schema_version_invalid_schema_definition(client):
    helpers.create_registry(client)

    helpers.create_schema(
        client, TEST_REGISTRY_ID, compatibility=TEST_DISABLED_COMPATIBILITY
    )

    with pytest.raises(ClientError) as exc:
        client.register_schema_version(
            SchemaId=TEST_SCHEMA_ID, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have("Schema definition of JSON data format is invalid:")


def test_register_schema_version_invalid_schema_id(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)
    invalid_schema_id = {
        "SchemaArn": TEST_SCHEMA_ARN,
        "RegistryName": TEST_REGISTRY_NAME,
        "SchemaName": TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST,
    }

    with pytest.raises(ClientError) as exc:
        client.register_schema_version(
            SchemaId=invalid_schema_id, SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION
        )
    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "One of (registryName and schemaName) or schemaArn has to be provided, both cannot be provided."
    )


# test get_schema_version
def test_get_schema_version_valid_input_schema_version_id(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    response = client.get_schema_version(
        SchemaVersionId=version_id,
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("SchemaDefinition").equals(TEST_AVRO_SCHEMA_DEFINITION)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("VersionNumber").equals(1)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("CreatedTime")


def test_get_schema_version_valid_input_version_number(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    response = client.get_schema_version(
        SchemaId=TEST_SCHEMA_ID,
        SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("SchemaDefinition").equals(TEST_AVRO_SCHEMA_DEFINITION)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("VersionNumber").equals(1)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("CreatedTime")


def test_get_schema_version_valid_input_version_number_latest_version(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    response = helpers.register_schema_version(client)
    version_id = response["SchemaVersionId"]

    response = client.get_schema_version(
        SchemaId=TEST_SCHEMA_ID,
        SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION,
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("SchemaDefinition").equals(TEST_NEW_AVRO_SCHEMA_DEFINITION)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("VersionNumber").equals(2)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("CreatedTime")


def test_get_schema_version_empty_input(client):

    with pytest.raises(ClientError) as exc:
        client.get_schema_version()

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have(
        "At least one of (registryName and schemaName) or schemaArn has to be provided."
    )


def test_get_schema_version_invalid_schema_id_schema_version_number_both_provided(
    client,
):

    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionId=TEST_VERSION_ID,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER_LATEST_VERSION,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "No other input parameters can be specified when fetching by SchemaVersionId."
    )


def test_get_schema_version_insufficient_params_provided(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaId=TEST_SCHEMA_ID,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "One of version number (or) latest version is required."
    )


def test_get_schema_version_invalid_schema_version_number(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    invalid_schema_version_number = {"VersionNumber": 1, "LatestVersion": True}
    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=invalid_schema_version_number,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "Only one of VersionNumber or LatestVersion is required."
    )


def test_get_schema_version_invalid_version_number(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    invalid_schema_version_number = {"VersionNumber": 2}
    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=invalid_schema_version_number,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


def test_get_schema_version_invalid_schema_id_schema_name(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaId=TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.equal(
        f"Schema is not found. RegistryName: {TEST_REGISTRY_NAME}, SchemaName: {TEST_INVALID_SCHEMA_NAME_DOES_NOT_EXIST}, SchemaArn: null"
    )


def test_get_schema_version_invalid_schema_id_registry_name(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaId=TEST_INVALID_SCHEMA_ID_REGISTRY_DOES_NOT_EXIST,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


def test_get_schema_version_invalid_schema_version(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)
    invalid_schema_version_id = "00000000-0000-0000-0000-00000000000p"
    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaVersionId=invalid_schema_version_id,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.equal(
        "The parameter value contains one or more characters that are not valid. Parameter Name: SchemaVersionId"
    )


# Test get_schema_by_definition
def test_get_schema_by_definition_valid_input(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    response = client.get_schema_by_definition(
        SchemaId=TEST_SCHEMA_ID,
        SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("DataFormat").equals(TEST_AVRO_DATA_FORMAT)
    response.should.have.key("Status").equals(TEST_AVAILABLE_STATUS)
    response.should.have.key("CreatedTime")


def test_get_schema_by_definition_invalid_schema_id_schema_does_not_exist(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.get_schema_by_definition(
            SchemaId=TEST_INVALID_SCHEMA_ID_SCHEMA_DOES_NOT_EXIST,
            SchemaDefinition=TEST_AVRO_SCHEMA_DEFINITION,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


def test_get_schema_by_definition_invalid_schema_definition_does_not_exist(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    with pytest.raises(ClientError) as exc:
        client.get_schema_by_definition(
            SchemaId=TEST_SCHEMA_ID,
            SchemaDefinition=TEST_NEW_AVRO_SCHEMA_DEFINITION,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


# test put_schema_version_metadata
def test_put_schema_version_metadata_valid_input_schema_version_number(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    response = client.put_schema_version_metadata(
        SchemaId=TEST_SCHEMA_ID,
        SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
        MetadataKeyValue=TEST_METADATA_KEY_VALUE,
    )

    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("RegistryName").equals(TEST_REGISTRY_NAME)
    response.should.have.key("LatestVersion").equals(False)
    response.should.have.key("VersionNumber").equals(1)
    response.should.have.key("MetadataKey").equals(
        TEST_METADATA_KEY_VALUE["MetadataKey"]
    )
    response.should.have.key("MetadataValue").equals(
        TEST_METADATA_KEY_VALUE["MetadataValue"]
    )


def test_put_schema_version_metadata_valid_input_schema_version_id(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    response = client.put_schema_version_metadata(
        SchemaVersionId=version_id,
        MetadataKeyValue=TEST_METADATA_KEY_VALUE,
    )

    response.should.have.key("SchemaVersionId").equals(version_id)
    response.should.have.key("LatestVersion").equals(False)
    response.should.have.key("VersionNumber").equals(0)
    response.should.have.key("MetadataKey").equals(TEST_METADATA_KEY)
    response.should.have.key("MetadataValue").equals(TEST_METADATA_VALUE)


def test_put_schema_version_metadata_more_than_allowed_schema_version_id(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    for i in range(10):
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue={
                "MetadataKey": f"test_metadata_key{i}",
                "MetadataValue": f"test_metadata_value{i}",
            },
        )

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue=TEST_METADATA_KEY_VALUE,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("ResourceNumberLimitExceededException")
    err["Message"].should.equal(
        "Your resource limits for Schema Version Metadata have been exceeded."
    )


def test_put_schema_version_metadata_more_than_allowed_schema_version_id_same_key(
    client,
):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    for i in range(10):
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue={
                "MetadataKey": "test_metadata_key",
                "MetadataValue": f"test_metadata_value{i}",
            },
        )

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue=TEST_METADATA_KEY_VALUE,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("ResourceNumberLimitExceededException")
    err["Message"].should.equal(
        "Your resource limits for Schema Version Metadata have been exceeded."
    )


def test_put_schema_version_metadata_already_exists_schema_version_id(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    client.put_schema_version_metadata(
        SchemaVersionId=version_id,
        MetadataKeyValue=TEST_METADATA_KEY_VALUE,
    )

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue=TEST_METADATA_KEY_VALUE,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("AlreadyExistsException")
    err["Message"].should.equal(
        f"Resource already exist for schema version id: {version_id}, metadata key: {TEST_METADATA_KEY}, metadata value: {TEST_METADATA_VALUE}"
    )


def test_put_schema_version_metadata_invalid_characters_metadata_key_schema_version_id(
    client,
):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    invalid_metadata_key = {
        "MetadataKey": "invalid~metadata~key",
        "MetadataValue": TEST_METADATA_VALUE,
    }

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue=invalid_metadata_key,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have(
        "key contains one or more characters that are not valid."
    )


def test_put_schema_version_metadata_invalid_characters_metadata_value_schema_version_id(
    client,
):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    invalid_metadata_value = {
        "MetadataKey": TEST_METADATA_KEY,
        "MetadataValue": "invalid~metadata~value",
    }

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaVersionId=version_id,
            MetadataKeyValue=invalid_metadata_value,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have(
        "value contains one or more characters that are not valid."
    )


def test_put_schema_version_metadata_more_than_allowed_schema_version_number(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    for i in range(10):
        client.put_schema_version_metadata(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
            MetadataKeyValue={
                "MetadataKey": f"test_metadata_key{i}",
                "MetadataValue": f"test_metadata_value{i}",
            },
        )

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
            MetadataKeyValue=TEST_METADATA_KEY_VALUE,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("ResourceNumberLimitExceededException")
    err["Message"].should.equal(
        "Your resource limits for Schema Version Metadata have been exceeded."
    )


def test_put_schema_version_metadata_already_exists_schema_version_number(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    client.put_schema_version_metadata(
        SchemaId=TEST_SCHEMA_ID,
        SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
        MetadataKeyValue=TEST_METADATA_KEY_VALUE,
    )

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
            MetadataKeyValue=TEST_METADATA_KEY_VALUE,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("AlreadyExistsException")
    err["Message"].should.equal(
        f"Resource already exist for schema version id: {version_id}, metadata key: {TEST_METADATA_KEY}, metadata value: {TEST_METADATA_VALUE}"
    )


def test_put_schema_version_metadata_invalid_characters_metadata_key_schema_version_number(
    client,
):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    invalid_metadata_key = {
        "MetadataKey": "invalid~metadata~key",
        "MetadataValue": TEST_METADATA_VALUE,
    }

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
            MetadataKeyValue=invalid_metadata_key,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have(
        "key contains one or more characters that are not valid."
    )


def test_put_schema_version_metadata_invalid_characters_metadata_value_schema_version_number(
    client,
):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    invalid_metadata_value = {
        "MetadataKey": TEST_METADATA_KEY,
        "MetadataValue": "invalid~metadata~value",
    }

    with pytest.raises(ClientError) as exc:
        client.put_schema_version_metadata(
            SchemaId=TEST_SCHEMA_ID,
            SchemaVersionNumber=TEST_SCHEMA_VERSION_NUMBER,
            MetadataKeyValue=invalid_metadata_value,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("InvalidInputException")
    err["Message"].should.have(
        "value contains one or more characters that are not valid."
    )


# test delete_schema
def test_delete_schema_valid_input(client):
    helpers.create_registry(client)

    helpers.create_schema(client, TEST_REGISTRY_ID)

    response = client.delete_schema(
        SchemaId=TEST_SCHEMA_ID,
    )

    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("Status").equals(TEST_DELETING_STATUS)


def test_delete_schema_valid_input_schema_arn(client):
    helpers.create_registry(client)

    response = helpers.create_schema(client, TEST_REGISTRY_ID)
    version_id = response["SchemaVersionId"]

    schema_id = {"SchemaArn": f"{TEST_SCHEMA_ARN}"}
    response = client.delete_schema(
        SchemaId=schema_id,
    )

    response.should.have.key("SchemaArn").equals(TEST_SCHEMA_ARN)
    response.should.have.key("SchemaName").equals(TEST_SCHEMA_NAME)
    response.should.have.key("Status").equals(TEST_DELETING_STATUS)

    with pytest.raises(ClientError) as exc:
        client.get_schema_version(
            SchemaVersionId=version_id,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have("Schema is not found.")


def test_delete_schema_schema_not_found(client):
    helpers.create_registry(client)

    with pytest.raises(ClientError) as exc:
        client.delete_schema(
            SchemaId=TEST_SCHEMA_ID,
        )

    err = exc.value.response["Error"]
    err["Code"].should.equal("EntityNotFoundException")
    err["Message"].should.have(
        f"Schema is not found. RegistryName: {TEST_REGISTRY_NAME}, SchemaName: {TEST_SCHEMA_NAME}, SchemaArn: null"
    )