commit
794e35073e
2
.gitignore
vendored
2
.gitignore
vendored
@ -20,3 +20,5 @@ env/
|
||||
.vscode/
|
||||
tests/file.tmp
|
||||
.eggs/
|
||||
.mypy_cache/
|
||||
*.tmp
|
||||
|
@ -394,12 +394,17 @@ class UsagePlanKey(BaseModel, dict):
|
||||
|
||||
|
||||
class RestAPI(BaseModel):
|
||||
def __init__(self, id, region_name, name, description):
|
||||
def __init__(self, id, region_name, name, description, **kwargs):
|
||||
self.id = id
|
||||
self.region_name = region_name
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.create_date = int(time.time())
|
||||
self.api_key_source = kwargs.get("api_key_source") or "HEADER"
|
||||
self.endpoint_configuration = kwargs.get("endpoint_configuration") or {
|
||||
"types": ["EDGE"]
|
||||
}
|
||||
self.tags = kwargs.get("tags") or {}
|
||||
|
||||
self.deployments = {}
|
||||
self.stages = {}
|
||||
@ -416,6 +421,9 @@ class RestAPI(BaseModel):
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"createdDate": int(time.time()),
|
||||
"apiKeySource": self.api_key_source,
|
||||
"endpointConfiguration": self.endpoint_configuration,
|
||||
"tags": self.tags,
|
||||
}
|
||||
|
||||
def add_child(self, path, parent_id=None):
|
||||
@ -529,9 +537,24 @@ class APIGatewayBackend(BaseBackend):
|
||||
self.__dict__ = {}
|
||||
self.__init__(region_name)
|
||||
|
||||
def create_rest_api(self, name, description):
|
||||
def create_rest_api(
|
||||
self,
|
||||
name,
|
||||
description,
|
||||
api_key_source=None,
|
||||
endpoint_configuration=None,
|
||||
tags=None,
|
||||
):
|
||||
api_id = create_id()
|
||||
rest_api = RestAPI(api_id, self.region_name, name, description)
|
||||
rest_api = RestAPI(
|
||||
api_id,
|
||||
self.region_name,
|
||||
name,
|
||||
description,
|
||||
api_key_source=api_key_source,
|
||||
endpoint_configuration=endpoint_configuration,
|
||||
tags=tags,
|
||||
)
|
||||
self.apis[api_id] = rest_api
|
||||
return rest_api
|
||||
|
||||
|
@ -12,6 +12,9 @@ from .exceptions import (
|
||||
ApiKeyAlreadyExists,
|
||||
)
|
||||
|
||||
API_KEY_SOURCES = ["AUTHORIZER", "HEADER"]
|
||||
ENDPOINT_CONFIGURATION_TYPES = ["PRIVATE", "EDGE", "REGIONAL"]
|
||||
|
||||
|
||||
class APIGatewayResponse(BaseResponse):
|
||||
def error(self, type_, message, status=400):
|
||||
@ -45,7 +48,45 @@ class APIGatewayResponse(BaseResponse):
|
||||
elif self.method == "POST":
|
||||
name = self._get_param("name")
|
||||
description = self._get_param("description")
|
||||
rest_api = self.backend.create_rest_api(name, description)
|
||||
api_key_source = self._get_param("apiKeySource")
|
||||
endpoint_configuration = self._get_param("endpointConfiguration")
|
||||
tags = self._get_param("tags")
|
||||
|
||||
# Param validation
|
||||
if api_key_source and api_key_source not in API_KEY_SOURCES:
|
||||
return self.error(
|
||||
"ValidationException",
|
||||
(
|
||||
"1 validation error detected: "
|
||||
"Value '{api_key_source}' at 'createRestApiInput.apiKeySource' failed "
|
||||
"to satisfy constraint: Member must satisfy enum value set: "
|
||||
"[AUTHORIZER, HEADER]"
|
||||
).format(api_key_source=api_key_source),
|
||||
)
|
||||
|
||||
if endpoint_configuration and "types" in endpoint_configuration:
|
||||
invalid_types = list(
|
||||
set(endpoint_configuration["types"])
|
||||
- set(ENDPOINT_CONFIGURATION_TYPES)
|
||||
)
|
||||
if invalid_types:
|
||||
return self.error(
|
||||
"ValidationException",
|
||||
(
|
||||
"1 validation error detected: Value '{endpoint_type}' "
|
||||
"at 'createRestApiInput.endpointConfiguration.types' failed "
|
||||
"to satisfy constraint: Member must satisfy enum value set: "
|
||||
"[PRIVATE, EDGE, REGIONAL]"
|
||||
).format(endpoint_type=invalid_types[0]),
|
||||
)
|
||||
|
||||
rest_api = self.backend.create_rest_api(
|
||||
name,
|
||||
description,
|
||||
api_key_source=api_key_source,
|
||||
endpoint_configuration=endpoint_configuration,
|
||||
tags=tags,
|
||||
)
|
||||
return 200, {}, json.dumps(rest_api.to_dict())
|
||||
|
||||
def restapis_individual(self, request, full_url, headers):
|
||||
|
@ -176,7 +176,8 @@ class LambdaResponse(BaseResponse):
|
||||
def _invoke(self, request, full_url):
|
||||
response_headers = {}
|
||||
|
||||
function_name = self.path.rsplit("/", 2)[-2]
|
||||
# URL Decode in case it's a ARN:
|
||||
function_name = unquote(self.path.rsplit("/", 2)[-2])
|
||||
qualifier = self._get_param("qualifier")
|
||||
|
||||
response_header, payload = self.lambda_backend.invoke(
|
||||
|
@ -12,6 +12,7 @@ url_paths = {
|
||||
r"{0}/(?P<api_version>[^/]+)/event-source-mappings/?$": response.event_source_mappings,
|
||||
r"{0}/(?P<api_version>[^/]+)/event-source-mappings/(?P<UUID>[\w_-]+)/?$": response.event_source_mapping,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invocations/?$": response.invoke,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<resource_arn>.+)/invocations/?$": response.invoke,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invoke-async/?$": response.invoke_async,
|
||||
r"{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)": response.tag,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/policy/(?P<statement_id>[\w_-]+)$": response.policy,
|
||||
|
@ -677,6 +677,8 @@ class CloudFormationBackend(BaseBackend):
|
||||
|
||||
def list_stack_resources(self, stack_name_or_id):
|
||||
stack = self.get_stack(stack_name_or_id)
|
||||
if stack is None:
|
||||
return None
|
||||
return stack.stack_resources
|
||||
|
||||
def delete_stack(self, name_or_stack_id):
|
||||
|
@ -229,6 +229,9 @@ class CloudFormationResponse(BaseResponse):
|
||||
stack_name_or_id = self._get_param("StackName")
|
||||
resources = self.cloudformation_backend.list_stack_resources(stack_name_or_id)
|
||||
|
||||
if resources is None:
|
||||
raise ValidationError(stack_name_or_id)
|
||||
|
||||
template = self.response_template(LIST_STACKS_RESOURCES_RESPONSE)
|
||||
return template.render(resources=resources)
|
||||
|
||||
|
@ -279,9 +279,18 @@ class CognitoIdpResponse(BaseResponse):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
limit = self._get_param("Limit")
|
||||
token = self._get_param("PaginationToken")
|
||||
filt = self._get_param("Filter")
|
||||
users, token = cognitoidp_backends[self.region].list_users(
|
||||
user_pool_id, limit=limit, pagination_token=token
|
||||
)
|
||||
if filt:
|
||||
name, value = filt.replace('"', "").split("=")
|
||||
users = [
|
||||
user
|
||||
for user in users
|
||||
for attribute in user.attributes
|
||||
if attribute["Name"] == name and attribute["Value"] == value
|
||||
]
|
||||
response = {"Users": [user.to_json(extended=True) for user in users]}
|
||||
if token:
|
||||
response["PaginationToken"] = str(token)
|
||||
|
@ -508,6 +508,13 @@ class DynamoHandler(BaseResponse):
|
||||
# 'KeyConditions': {u'forum_name': {u'ComparisonOperator': u'EQ', u'AttributeValueList': [{u'S': u'the-key'}]}}
|
||||
key_conditions = self.body.get("KeyConditions")
|
||||
query_filters = self.body.get("QueryFilter")
|
||||
|
||||
if not (key_conditions or query_filters):
|
||||
return self.error(
|
||||
"com.amazonaws.dynamodb.v20111205#ValidationException",
|
||||
"Either KeyConditions or QueryFilter should be present",
|
||||
)
|
||||
|
||||
if key_conditions:
|
||||
(
|
||||
hash_key_name,
|
||||
|
@ -139,17 +139,22 @@ from .utils import (
|
||||
rsa_public_key_fingerprint,
|
||||
)
|
||||
|
||||
INSTANCE_TYPES = json.load(
|
||||
open(resource_filename(__name__, "resources/instance_types.json"), "r")
|
||||
|
||||
def _load_resource(filename):
|
||||
with open(filename, "r") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
INSTANCE_TYPES = _load_resource(
|
||||
resource_filename(__name__, "resources/instance_types.json")
|
||||
)
|
||||
AMIS = json.load(
|
||||
open(
|
||||
os.environ.get("MOTO_AMIS_PATH")
|
||||
or resource_filename(__name__, "resources/amis.json"),
|
||||
"r",
|
||||
)
|
||||
|
||||
AMIS = _load_resource(
|
||||
os.environ.get("MOTO_AMIS_PATH")
|
||||
or resource_filename(__name__, "resources/amis.json"),
|
||||
)
|
||||
|
||||
|
||||
OWNER_ID = "111122223333"
|
||||
|
||||
|
||||
|
@ -7,25 +7,33 @@ from datetime import datetime, timedelta
|
||||
from boto3 import Session
|
||||
|
||||
from moto.core import BaseBackend, BaseModel
|
||||
from moto.core.utils import iso_8601_datetime_without_milliseconds
|
||||
from moto.core.utils import unix_time
|
||||
|
||||
from moto.iam.models import ACCOUNT_ID
|
||||
|
||||
from .utils import decrypt, encrypt, generate_key_id, generate_master_key
|
||||
|
||||
|
||||
class Key(BaseModel):
|
||||
def __init__(self, policy, key_usage, description, tags, region):
|
||||
def __init__(
|
||||
self, policy, key_usage, customer_master_key_spec, description, tags, region
|
||||
):
|
||||
self.id = generate_key_id()
|
||||
self.creation_date = unix_time()
|
||||
self.policy = policy
|
||||
self.key_usage = key_usage
|
||||
self.key_state = "Enabled"
|
||||
self.description = description
|
||||
self.enabled = True
|
||||
self.region = region
|
||||
self.account_id = "012345678912"
|
||||
self.account_id = ACCOUNT_ID
|
||||
self.key_rotation_status = False
|
||||
self.deletion_date = None
|
||||
self.tags = tags or {}
|
||||
self.key_material = generate_master_key()
|
||||
self.origin = "AWS_KMS"
|
||||
self.key_manager = "CUSTOMER"
|
||||
self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT"
|
||||
|
||||
@property
|
||||
def physical_resource_id(self):
|
||||
@ -37,23 +45,55 @@ class Key(BaseModel):
|
||||
self.region, self.account_id, self.id
|
||||
)
|
||||
|
||||
@property
|
||||
def encryption_algorithms(self):
|
||||
if self.key_usage == "SIGN_VERIFY":
|
||||
return None
|
||||
elif self.customer_master_key_spec == "SYMMETRIC_DEFAULT":
|
||||
return ["SYMMETRIC_DEFAULT"]
|
||||
else:
|
||||
return ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"]
|
||||
|
||||
@property
|
||||
def signing_algorithms(self):
|
||||
if self.key_usage == "ENCRYPT_DECRYPT":
|
||||
return None
|
||||
elif self.customer_master_key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]:
|
||||
return ["ECDSA_SHA_256"]
|
||||
elif self.customer_master_key_spec == "ECC_NIST_P384":
|
||||
return ["ECDSA_SHA_384"]
|
||||
elif self.customer_master_key_spec == "ECC_NIST_P521":
|
||||
return ["ECDSA_SHA_512"]
|
||||
else:
|
||||
return [
|
||||
"RSASSA_PKCS1_V1_5_SHA_256",
|
||||
"RSASSA_PKCS1_V1_5_SHA_384",
|
||||
"RSASSA_PKCS1_V1_5_SHA_512",
|
||||
"RSASSA_PSS_SHA_256",
|
||||
"RSASSA_PSS_SHA_384",
|
||||
"RSASSA_PSS_SHA_512",
|
||||
]
|
||||
|
||||
def to_dict(self):
|
||||
key_dict = {
|
||||
"KeyMetadata": {
|
||||
"AWSAccountId": self.account_id,
|
||||
"Arn": self.arn,
|
||||
"CreationDate": iso_8601_datetime_without_milliseconds(datetime.now()),
|
||||
"CreationDate": self.creation_date,
|
||||
"CustomerMasterKeySpec": self.customer_master_key_spec,
|
||||
"Description": self.description,
|
||||
"Enabled": self.enabled,
|
||||
"EncryptionAlgorithms": self.encryption_algorithms,
|
||||
"KeyId": self.id,
|
||||
"KeyManager": self.key_manager,
|
||||
"KeyUsage": self.key_usage,
|
||||
"KeyState": self.key_state,
|
||||
"Origin": self.origin,
|
||||
"SigningAlgorithms": self.signing_algorithms,
|
||||
}
|
||||
}
|
||||
if self.key_state == "PendingDeletion":
|
||||
key_dict["KeyMetadata"][
|
||||
"DeletionDate"
|
||||
] = iso_8601_datetime_without_milliseconds(self.deletion_date)
|
||||
key_dict["KeyMetadata"]["DeletionDate"] = unix_time(self.deletion_date)
|
||||
return key_dict
|
||||
|
||||
def delete(self, region_name):
|
||||
@ -69,6 +109,7 @@ class Key(BaseModel):
|
||||
key = kms_backend.create_key(
|
||||
policy=properties["KeyPolicy"],
|
||||
key_usage="ENCRYPT_DECRYPT",
|
||||
customer_master_key_spec="SYMMETRIC_DEFAULT",
|
||||
description=properties["Description"],
|
||||
tags=properties.get("Tags"),
|
||||
region=region_name,
|
||||
@ -90,8 +131,12 @@ class KmsBackend(BaseBackend):
|
||||
self.keys = {}
|
||||
self.key_to_aliases = defaultdict(set)
|
||||
|
||||
def create_key(self, policy, key_usage, description, tags, region):
|
||||
key = Key(policy, key_usage, description, tags, region)
|
||||
def create_key(
|
||||
self, policy, key_usage, customer_master_key_spec, description, tags, region
|
||||
):
|
||||
key = Key(
|
||||
policy, key_usage, customer_master_key_spec, description, tags, region
|
||||
)
|
||||
self.keys[key.id] = key
|
||||
return key
|
||||
|
||||
@ -215,9 +260,7 @@ class KmsBackend(BaseBackend):
|
||||
self.keys[key_id].deletion_date = datetime.now() + timedelta(
|
||||
days=pending_window_in_days
|
||||
)
|
||||
return iso_8601_datetime_without_milliseconds(
|
||||
self.keys[key_id].deletion_date
|
||||
)
|
||||
return unix_time(self.keys[key_id].deletion_date)
|
||||
|
||||
def encrypt(self, key_id, plaintext, encryption_context):
|
||||
key_id = self.any_id_to_key_id(key_id)
|
||||
|
@ -118,11 +118,12 @@ class KmsResponse(BaseResponse):
|
||||
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html"""
|
||||
policy = self.parameters.get("Policy")
|
||||
key_usage = self.parameters.get("KeyUsage")
|
||||
customer_master_key_spec = self.parameters.get("CustomerMasterKeySpec")
|
||||
description = self.parameters.get("Description")
|
||||
tags = self.parameters.get("Tags")
|
||||
|
||||
key = self.kms_backend.create_key(
|
||||
policy, key_usage, description, tags, self.region
|
||||
policy, key_usage, customer_master_key_spec, description, tags, self.region
|
||||
)
|
||||
return json.dumps(key.to_dict())
|
||||
|
||||
|
@ -10,3 +10,13 @@ class InvalidInputException(JsonRESTError):
|
||||
"InvalidInputException",
|
||||
"You provided a value that does not match the required pattern.",
|
||||
)
|
||||
|
||||
|
||||
class DuplicateOrganizationalUnitException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self):
|
||||
super(DuplicateOrganizationalUnitException, self).__init__(
|
||||
"DuplicateOrganizationalUnitException",
|
||||
"An OU with the same name already exists.",
|
||||
)
|
||||
|
@ -8,7 +8,10 @@ from moto.core import BaseBackend, BaseModel
|
||||
from moto.core.exceptions import RESTError
|
||||
from moto.core.utils import unix_time
|
||||
from moto.organizations import utils
|
||||
from moto.organizations.exceptions import InvalidInputException
|
||||
from moto.organizations.exceptions import (
|
||||
InvalidInputException,
|
||||
DuplicateOrganizationalUnitException,
|
||||
)
|
||||
|
||||
|
||||
class FakeOrganization(BaseModel):
|
||||
@ -222,6 +225,14 @@ class OrganizationsBackend(BaseBackend):
|
||||
self.attach_policy(PolicyId=utils.DEFAULT_POLICY_ID, TargetId=new_ou.id)
|
||||
return new_ou.describe()
|
||||
|
||||
def update_organizational_unit(self, **kwargs):
|
||||
for ou in self.ou:
|
||||
if ou.name == kwargs["Name"]:
|
||||
raise DuplicateOrganizationalUnitException
|
||||
ou = self.get_organizational_unit_by_id(kwargs["OrganizationalUnitId"])
|
||||
ou.name = kwargs["Name"]
|
||||
return ou.describe()
|
||||
|
||||
def get_organizational_unit_by_id(self, ou_id):
|
||||
ou = next((ou for ou in self.ou if ou.id == ou_id), None)
|
||||
if ou is None:
|
||||
|
@ -36,6 +36,11 @@ class OrganizationsResponse(BaseResponse):
|
||||
self.organizations_backend.create_organizational_unit(**self.request_params)
|
||||
)
|
||||
|
||||
def update_organizational_unit(self):
|
||||
return json.dumps(
|
||||
self.organizations_backend.update_organizational_unit(**self.request_params)
|
||||
)
|
||||
|
||||
def describe_organizational_unit(self):
|
||||
return json.dumps(
|
||||
self.organizations_backend.describe_organizational_unit(
|
||||
|
@ -130,7 +130,9 @@ class Database(BaseModel):
|
||||
if not self.option_group_name and self.engine in self.default_option_groups:
|
||||
self.option_group_name = self.default_option_groups[self.engine]
|
||||
self.character_set_name = kwargs.get("character_set_name", None)
|
||||
self.iam_database_authentication_enabled = False
|
||||
self.enable_iam_database_authentication = kwargs.get(
|
||||
"enable_iam_database_authentication", False
|
||||
)
|
||||
self.dbi_resource_id = "db-M5ENSHXFPU6XHZ4G4ZEI5QIO2U"
|
||||
self.tags = kwargs.get("tags", [])
|
||||
|
||||
@ -214,7 +216,7 @@ class Database(BaseModel):
|
||||
<ReadReplicaSourceDBInstanceIdentifier>{{ database.source_db_identifier }}</ReadReplicaSourceDBInstanceIdentifier>
|
||||
{% endif %}
|
||||
<Engine>{{ database.engine }}</Engine>
|
||||
<IAMDatabaseAuthenticationEnabled>{{database.iam_database_authentication_enabled }}</IAMDatabaseAuthenticationEnabled>
|
||||
<IAMDatabaseAuthenticationEnabled>{{database.enable_iam_database_authentication|lower }}</IAMDatabaseAuthenticationEnabled>
|
||||
<LicenseModel>{{ database.license_model }}</LicenseModel>
|
||||
<EngineVersion>{{ database.engine_version }}</EngineVersion>
|
||||
<OptionGroupMemberships>
|
||||
@ -542,7 +544,7 @@ class Snapshot(BaseModel):
|
||||
<KmsKeyId>{{ database.kms_key_id }}</KmsKeyId>
|
||||
<DBSnapshotArn>{{ snapshot.snapshot_arn }}</DBSnapshotArn>
|
||||
<Timezone></Timezone>
|
||||
<IAMDatabaseAuthenticationEnabled>false</IAMDatabaseAuthenticationEnabled>
|
||||
<IAMDatabaseAuthenticationEnabled>{{ database.enable_iam_database_authentication|lower }}</IAMDatabaseAuthenticationEnabled>
|
||||
</DBSnapshot>"""
|
||||
)
|
||||
return template.render(snapshot=self, database=self.database)
|
||||
|
@ -27,6 +27,9 @@ class RDS2Response(BaseResponse):
|
||||
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
|
||||
"engine": self._get_param("Engine"),
|
||||
"engine_version": self._get_param("EngineVersion"),
|
||||
"enable_iam_database_authentication": self._get_bool_param(
|
||||
"EnableIAMDatabaseAuthentication"
|
||||
),
|
||||
"license_model": self._get_param("LicenseModel"),
|
||||
"iops": self._get_int_param("Iops"),
|
||||
"kms_key_id": self._get_param("KmsKeyId"),
|
||||
|
@ -148,11 +148,15 @@ class SESBackend(BaseBackend):
|
||||
def __type_of_message__(self, destinations):
|
||||
"""Checks the destination for any special address that could indicate delivery,
|
||||
complaint or bounce like in SES simulator"""
|
||||
alladdress = (
|
||||
destinations.get("ToAddresses", [])
|
||||
+ destinations.get("CcAddresses", [])
|
||||
+ destinations.get("BccAddresses", [])
|
||||
)
|
||||
if isinstance(destinations, list):
|
||||
alladdress = destinations
|
||||
else:
|
||||
alladdress = (
|
||||
destinations.get("ToAddresses", [])
|
||||
+ destinations.get("CcAddresses", [])
|
||||
+ destinations.get("BccAddresses", [])
|
||||
)
|
||||
|
||||
for addr in alladdress:
|
||||
if SESFeedback.SUCCESS_ADDR in addr:
|
||||
return SESFeedback.DELIVERY
|
||||
|
@ -567,7 +567,7 @@ class SQSBackend(BaseBackend):
|
||||
for name, q in self.queues.items():
|
||||
if prefix_re.search(name):
|
||||
qs.append(q)
|
||||
return qs
|
||||
return qs[:1000]
|
||||
|
||||
def get_queue(self, queue_name):
|
||||
queue = self.queues.get(queue_name)
|
||||
|
2
setup.py
2
setup.py
@ -39,7 +39,7 @@ install_requires = [
|
||||
"werkzeug",
|
||||
"PyYAML>=5.1",
|
||||
"pytz",
|
||||
"python-dateutil<2.8.1,>=2.1",
|
||||
"python-dateutil<3.0.0,>=2.1",
|
||||
"python-jose<4.0.0",
|
||||
"mock",
|
||||
"docker>=2.5.1",
|
||||
|
@ -26,7 +26,14 @@ def test_create_and_get_rest_api():
|
||||
response.pop("ResponseMetadata")
|
||||
response.pop("createdDate")
|
||||
response.should.equal(
|
||||
{"id": api_id, "name": "my_api", "description": "this is my api"}
|
||||
{
|
||||
"id": api_id,
|
||||
"name": "my_api",
|
||||
"description": "this is my api",
|
||||
"apiKeySource": "HEADER",
|
||||
"endpointConfiguration": {"types": ["EDGE"]},
|
||||
"tags": {},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -47,6 +54,114 @@ def test_list_and_delete_apis():
|
||||
len(response["items"]).should.equal(1)
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_create_rest_api_with_tags():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
|
||||
response = client.create_rest_api(
|
||||
name="my_api", description="this is my api", tags={"MY_TAG1": "MY_VALUE1"}
|
||||
)
|
||||
api_id = response["id"]
|
||||
|
||||
response = client.get_rest_api(restApiId=api_id)
|
||||
|
||||
assert "tags" in response
|
||||
response["tags"].should.equal({"MY_TAG1": "MY_VALUE1"})
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_create_rest_api_invalid_apikeysource():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
|
||||
with assert_raises(ClientError) as ex:
|
||||
client.create_rest_api(
|
||||
name="my_api",
|
||||
description="this is my api",
|
||||
apiKeySource="not a valid api key source",
|
||||
)
|
||||
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_create_rest_api_valid_apikeysources():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
|
||||
# 1. test creating rest api with HEADER apiKeySource
|
||||
response = client.create_rest_api(
|
||||
name="my_api", description="this is my api", apiKeySource="HEADER",
|
||||
)
|
||||
api_id = response["id"]
|
||||
|
||||
response = client.get_rest_api(restApiId=api_id)
|
||||
response["apiKeySource"].should.equal("HEADER")
|
||||
|
||||
# 2. test creating rest api with AUTHORIZER apiKeySource
|
||||
response = client.create_rest_api(
|
||||
name="my_api2", description="this is my api", apiKeySource="AUTHORIZER",
|
||||
)
|
||||
api_id = response["id"]
|
||||
|
||||
response = client.get_rest_api(restApiId=api_id)
|
||||
response["apiKeySource"].should.equal("AUTHORIZER")
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_create_rest_api_invalid_endpointconfiguration():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
|
||||
with assert_raises(ClientError) as ex:
|
||||
client.create_rest_api(
|
||||
name="my_api",
|
||||
description="this is my api",
|
||||
endpointConfiguration={"types": ["INVALID"]},
|
||||
)
|
||||
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_create_rest_api_valid_endpointconfigurations():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
|
||||
# 1. test creating rest api with PRIVATE endpointConfiguration
|
||||
response = client.create_rest_api(
|
||||
name="my_api",
|
||||
description="this is my api",
|
||||
endpointConfiguration={"types": ["PRIVATE"]},
|
||||
)
|
||||
api_id = response["id"]
|
||||
|
||||
response = client.get_rest_api(restApiId=api_id)
|
||||
response["endpointConfiguration"].should.equal(
|
||||
{"types": ["PRIVATE"],}
|
||||
)
|
||||
|
||||
# 2. test creating rest api with REGIONAL endpointConfiguration
|
||||
response = client.create_rest_api(
|
||||
name="my_api2",
|
||||
description="this is my api",
|
||||
endpointConfiguration={"types": ["REGIONAL"]},
|
||||
)
|
||||
api_id = response["id"]
|
||||
|
||||
response = client.get_rest_api(restApiId=api_id)
|
||||
response["endpointConfiguration"].should.equal(
|
||||
{"types": ["REGIONAL"],}
|
||||
)
|
||||
|
||||
# 3. test creating rest api with EDGE endpointConfiguration
|
||||
response = client.create_rest_api(
|
||||
name="my_api3",
|
||||
description="this is my api",
|
||||
endpointConfiguration={"types": ["EDGE"]},
|
||||
)
|
||||
api_id = response["id"]
|
||||
|
||||
response = client.get_rest_api(restApiId=api_id)
|
||||
response["endpointConfiguration"].should.equal(
|
||||
{"types": ["EDGE"],}
|
||||
)
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_create_resource__validate_name():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
|
@ -124,6 +124,43 @@ def test_invoke_requestresponse_function():
|
||||
json.loads(payload).should.equal(in_data)
|
||||
|
||||
|
||||
@mock_lambda
|
||||
def test_invoke_requestresponse_function_with_arn():
|
||||
from moto.awslambda.models import ACCOUNT_ID
|
||||
|
||||
conn = boto3.client("lambda", "us-west-2")
|
||||
conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=get_role_name(),
|
||||
Handler="lambda_function.lambda_handler",
|
||||
Code={"ZipFile": get_test_zip_file1()},
|
||||
Description="test lambda function",
|
||||
Timeout=3,
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
|
||||
in_data = {"msg": "So long and thanks for all the fish"}
|
||||
success_result = conn.invoke(
|
||||
FunctionName="arn:aws:lambda:us-west-2:{}:function:testFunction".format(
|
||||
ACCOUNT_ID
|
||||
),
|
||||
InvocationType="RequestResponse",
|
||||
Payload=json.dumps(in_data),
|
||||
)
|
||||
|
||||
success_result["StatusCode"].should.equal(202)
|
||||
result_obj = json.loads(
|
||||
base64.b64decode(success_result["LogResult"]).decode("utf-8")
|
||||
)
|
||||
|
||||
result_obj.should.equal(in_data)
|
||||
|
||||
payload = success_result["Payload"].read().decode("utf-8")
|
||||
json.loads(payload).should.equal(in_data)
|
||||
|
||||
|
||||
@mock_lambda
|
||||
def test_invoke_event_function():
|
||||
conn = boto3.client("lambda", "us-west-2")
|
||||
|
@ -522,6 +522,13 @@ def test_boto3_list_stack_set_operations():
|
||||
list_operation["Summaries"][-1]["Action"].should.equal("UPDATE")
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
def test_boto3_bad_list_stack_resources():
|
||||
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
|
||||
with assert_raises(ClientError):
|
||||
cf_conn.list_stack_resources(StackName="test_stack_set")
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
def test_boto3_delete_stack_set():
|
||||
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
|
||||
|
@ -958,6 +958,18 @@ def test_list_users():
|
||||
result["Users"].should.have.length_of(1)
|
||||
result["Users"][0]["Username"].should.equal(username)
|
||||
|
||||
username_bis = str(uuid.uuid4())
|
||||
conn.admin_create_user(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username_bis,
|
||||
UserAttributes=[{"Name": "phone_number", "Value": "+33666666666"}],
|
||||
)
|
||||
result = conn.list_users(
|
||||
UserPoolId=user_pool_id, Filter='phone_number="+33666666666'
|
||||
)
|
||||
result["Users"].should.have.length_of(1)
|
||||
result["Users"][0]["Username"].should.equal(username_bis)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_returns_limit_items():
|
||||
|
@ -3721,3 +3721,24 @@ def test_allow_update_to_item_with_different_type():
|
||||
table.get_item(Key={"job_id": "b"})["Item"]["job_details"][
|
||||
"job_name"
|
||||
].should.be.equal({"nested": "yes"})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_query_catches_when_no_filters():
|
||||
dynamo = boto3.resource("dynamodb", region_name="eu-central-1")
|
||||
dynamo.create_table(
|
||||
AttributeDefinitions=[{"AttributeName": "job_id", "AttributeType": "S"}],
|
||||
TableName="origin-rbu-dev",
|
||||
KeySchema=[{"AttributeName": "job_id", "KeyType": "HASH"}],
|
||||
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
|
||||
)
|
||||
table = dynamo.Table("origin-rbu-dev")
|
||||
|
||||
with assert_raises(ClientError) as ex:
|
||||
table.query(TableName="original-rbu-dev")
|
||||
|
||||
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
|
||||
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"Either KeyConditions or QueryFilter should be present"
|
||||
)
|
||||
|
@ -1,25 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
from datetime import date
|
||||
from datetime import datetime
|
||||
from dateutil.tz import tzutc
|
||||
import base64
|
||||
import os
|
||||
import re
|
||||
|
||||
import boto3
|
||||
import boto.kms
|
||||
import botocore.exceptions
|
||||
import six
|
||||
import sure # noqa
|
||||
from boto.exception import JSONResponseError
|
||||
from boto.kms.exceptions import AlreadyExistsException, NotFoundException
|
||||
from freezegun import freeze_time
|
||||
from nose.tools import assert_raises
|
||||
from parameterized import parameterized
|
||||
|
||||
from moto.kms.exceptions import NotFoundException as MotoNotFoundException
|
||||
from moto import mock_kms, mock_kms_deprecated
|
||||
from moto import mock_kms_deprecated
|
||||
|
||||
PLAINTEXT_VECTORS = (
|
||||
(b"some encodeable plaintext",),
|
||||
@ -35,23 +28,6 @@ def _get_encoded_value(plaintext):
|
||||
return plaintext.encode("utf-8")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_create_key():
|
||||
conn = boto3.client("kms", region_name="us-east-1")
|
||||
with freeze_time("2015-01-01 00:00:00"):
|
||||
key = conn.create_key(
|
||||
Policy="my policy",
|
||||
Description="my key",
|
||||
KeyUsage="ENCRYPT_DECRYPT",
|
||||
Tags=[{"TagKey": "project", "TagValue": "moto"}],
|
||||
)
|
||||
|
||||
key["KeyMetadata"]["Description"].should.equal("my key")
|
||||
key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT")
|
||||
key["KeyMetadata"]["Enabled"].should.equal(True)
|
||||
key["KeyMetadata"]["CreationDate"].should.be.a(date)
|
||||
|
||||
|
||||
@mock_kms_deprecated
|
||||
def test_describe_key():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
@ -96,22 +72,6 @@ def test_describe_key_via_alias_not_found():
|
||||
)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("alias/does-not-exist",),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/does-not-exist",),
|
||||
("invalid",),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_describe_key_via_alias_invalid_alias(key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
client.create_key(Description="key")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.describe_key(KeyId=key_id)
|
||||
|
||||
|
||||
@mock_kms_deprecated
|
||||
def test_describe_key_via_arn():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
@ -239,71 +199,6 @@ def test_generate_data_key():
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_boto3_generate_data_key():
|
||||
kms = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = kms.create_key()
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
response = kms.generate_data_key(KeyId=key_id, NumberOfBytes=32)
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(response["CiphertextBlob"], validate=True)
|
||||
# Plaintext must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(response["Plaintext"], validate=True)
|
||||
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_encrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(Description="key")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||
response["CiphertextBlob"].should_not.equal(plaintext)
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(response["CiphertextBlob"], validate=True)
|
||||
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_decrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(Description="key")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
encrypt_response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||
|
||||
client.create_key(Description="key")
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(encrypt_response["CiphertextBlob"], validate=True)
|
||||
|
||||
decrypt_response = client.decrypt(CiphertextBlob=encrypt_response["CiphertextBlob"])
|
||||
|
||||
# Plaintext must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(decrypt_response["Plaintext"], validate=True)
|
||||
|
||||
decrypt_response["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@mock_kms_deprecated
|
||||
def test_disable_key_rotation_with_missing_key():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
@ -774,25 +669,6 @@ def test__list_aliases():
|
||||
len(aliases).should.equal(7)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("not-a-uuid",),
|
||||
("alias/DoesNotExist",),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",),
|
||||
("d25652e4-d2d2-49f7-929a-671ccda580c6",),
|
||||
(
|
||||
"arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6",
|
||||
),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_invalid_key_ids(key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.generate_data_key(KeyId=key_id, NumberOfBytes=5)
|
||||
|
||||
|
||||
@mock_kms_deprecated
|
||||
def test__assert_default_policy():
|
||||
from moto.kms.responses import _assert_default_policy
|
||||
@ -803,421 +679,3 @@ def test__assert_default_policy():
|
||||
_assert_default_policy.when.called_with("default").should_not.throw(
|
||||
MotoNotFoundException
|
||||
)
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_kms_encrypt_boto3(plaintext):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="key")
|
||||
response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext)
|
||||
|
||||
response = client.decrypt(CiphertextBlob=response["CiphertextBlob"])
|
||||
response["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="disable-key")
|
||||
client.disable_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "Disabled"
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="enable-key")
|
||||
client.disable_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
client.enable_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == True
|
||||
assert result["KeyMetadata"]["KeyState"] == "Enabled"
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_schedule_key_deletion():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="schedule-key-deletion")
|
||||
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false":
|
||||
with freeze_time("2015-01-01 12:00:00"):
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
assert response["DeletionDate"] == datetime(
|
||||
2015, 1, 31, 12, 0, tzinfo=tzutc()
|
||||
)
|
||||
else:
|
||||
# Can't manipulate time in server mode
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "PendingDeletion"
|
||||
assert "DeletionDate" in result["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_schedule_key_deletion_custom():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="schedule-key-deletion")
|
||||
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false":
|
||||
with freeze_time("2015-01-01 12:00:00"):
|
||||
response = client.schedule_key_deletion(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7
|
||||
)
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
assert response["DeletionDate"] == datetime(
|
||||
2015, 1, 8, 12, 0, tzinfo=tzutc()
|
||||
)
|
||||
else:
|
||||
# Can't manipulate time in server mode
|
||||
response = client.schedule_key_deletion(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7
|
||||
)
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "PendingDeletion"
|
||||
assert "DeletionDate" in result["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_cancel_key_deletion():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="cancel-key-deletion")
|
||||
client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
response = client.cancel_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "Disabled"
|
||||
assert "DeletionDate" not in result["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_update_key_description():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="old_description")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.update_key_description(KeyId=key_id, Description="new_description")
|
||||
assert "ResponseMetadata" in result
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_tag_resource():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="cancel-key-deletion")
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
keyid = response["KeyId"]
|
||||
response = client.tag_resource(
|
||||
KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}]
|
||||
)
|
||||
|
||||
# Shouldn't have any data, just header
|
||||
assert len(response.keys()) == 1
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_resource_tags():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="cancel-key-deletion")
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
keyid = response["KeyId"]
|
||||
response = client.tag_resource(
|
||||
KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}]
|
||||
)
|
||||
|
||||
response = client.list_resource_tags(KeyId=keyid)
|
||||
assert response["Tags"][0]["TagKey"] == "string"
|
||||
assert response["Tags"][0]["TagValue"] == "string"
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
(dict(KeySpec="AES_256"), 32),
|
||||
(dict(KeySpec="AES_128"), 16),
|
||||
(dict(NumberOfBytes=64), 64),
|
||||
(dict(NumberOfBytes=1), 1),
|
||||
(dict(NumberOfBytes=1024), 1024),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_sizes(kwargs, expected_key_length):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-size")
|
||||
|
||||
response = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs)
|
||||
|
||||
assert len(response["Plaintext"]) == expected_key_length
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_generate_data_key_decrypt():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-decrypt")
|
||||
|
||||
resp1 = client.generate_data_key(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256"
|
||||
)
|
||||
resp2 = client.decrypt(CiphertextBlob=resp1["CiphertextBlob"])
|
||||
|
||||
assert resp1["Plaintext"] == resp2["Plaintext"]
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
(dict(KeySpec="AES_257"),),
|
||||
(dict(KeySpec="AES_128", NumberOfBytes=16),),
|
||||
(dict(NumberOfBytes=2048),),
|
||||
(dict(NumberOfBytes=0),),
|
||||
(dict(),),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_invalid_size_params(kwargs):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-size")
|
||||
|
||||
with assert_raises(
|
||||
(botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError)
|
||||
) as err:
|
||||
client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("alias/DoesNotExist",),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",),
|
||||
("d25652e4-d2d2-49f7-929a-671ccda580c6",),
|
||||
(
|
||||
"arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6",
|
||||
),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_invalid_key(key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.generate_data_key(KeyId=key_id, KeySpec="AES_256")
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("alias/DoesExist", False),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/DoesExist", False),
|
||||
("", True),
|
||||
("arn:aws:kms:us-east-1:012345678912:key/", True),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_all_valid_key_ids(prefix, append_key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key()
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
client.create_alias(AliasName="alias/DoesExist", TargetKeyId=key_id)
|
||||
|
||||
target_id = prefix
|
||||
if append_key_id:
|
||||
target_id += key_id
|
||||
|
||||
client.generate_data_key(KeyId=key_id, NumberOfBytes=32)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_generate_data_key_without_plaintext_decrypt():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-decrypt")
|
||||
|
||||
resp1 = client.generate_data_key_without_plaintext(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256"
|
||||
)
|
||||
|
||||
assert "Plaintext" not in resp1
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_re_encrypt_decrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key_1 = client.create_key(Description="key 1")
|
||||
key_1_id = key_1["KeyMetadata"]["KeyId"]
|
||||
key_1_arn = key_1["KeyMetadata"]["Arn"]
|
||||
key_2 = client.create_key(Description="key 2")
|
||||
key_2_id = key_2["KeyMetadata"]["KeyId"]
|
||||
key_2_arn = key_2["KeyMetadata"]["Arn"]
|
||||
|
||||
encrypt_response = client.encrypt(
|
||||
KeyId=key_1_id, Plaintext=plaintext, EncryptionContext={"encryption": "context"}
|
||||
)
|
||||
|
||||
re_encrypt_response = client.re_encrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
SourceEncryptionContext={"encryption": "context"},
|
||||
DestinationKeyId=key_2_id,
|
||||
DestinationEncryptionContext={"another": "context"},
|
||||
)
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(re_encrypt_response["CiphertextBlob"], validate=True)
|
||||
|
||||
re_encrypt_response["SourceKeyId"].should.equal(key_1_arn)
|
||||
re_encrypt_response["KeyId"].should.equal(key_2_arn)
|
||||
|
||||
decrypt_response_1 = client.decrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
EncryptionContext={"encryption": "context"},
|
||||
)
|
||||
decrypt_response_1["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response_1["KeyId"].should.equal(key_1_arn)
|
||||
|
||||
decrypt_response_2 = client.decrypt(
|
||||
CiphertextBlob=re_encrypt_response["CiphertextBlob"],
|
||||
EncryptionContext={"another": "context"},
|
||||
)
|
||||
decrypt_response_2["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response_2["KeyId"].should.equal(key_2_arn)
|
||||
|
||||
decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_re_encrypt_to_invalid_destination():
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(Description="key 1")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
encrypt_response = client.encrypt(KeyId=key_id, Plaintext=b"some plaintext")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.re_encrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
DestinationKeyId="alias/DoesNotExist",
|
||||
)
|
||||
|
||||
|
||||
@parameterized(((12,), (44,), (91,), (1,), (1024,)))
|
||||
@mock_kms
|
||||
def test_generate_random(number_of_bytes):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
response = client.generate_random(NumberOfBytes=number_of_bytes)
|
||||
|
||||
response["Plaintext"].should.be.a(bytes)
|
||||
len(response["Plaintext"]).should.equal(number_of_bytes)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
(2048, botocore.exceptions.ClientError),
|
||||
(1025, botocore.exceptions.ClientError),
|
||||
(0, botocore.exceptions.ParamValidationError),
|
||||
(-1, botocore.exceptions.ParamValidationError),
|
||||
(-1024, botocore.exceptions.ParamValidationError),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_random_invalid_number_of_bytes(number_of_bytes, error_type):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
with assert_raises(error_type):
|
||||
client.generate_random(NumberOfBytes=number_of_bytes)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.enable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key_rotation_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.disable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.enable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.disable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_cancel_key_deletion_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.cancel_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_schedule_key_deletion_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.schedule_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_rotation_status_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.get_key_rotation_status(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_policy_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.get_key_policy(
|
||||
KeyId="12366f9b-1230-123d-123e-123e6ae60c02", PolicyName="default"
|
||||
)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_key_policies_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.list_key_policies(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.put_key_policy(
|
||||
KeyId="00000000-0000-0000-0000-000000000000",
|
||||
PolicyName="default",
|
||||
Policy="new policy",
|
||||
)
|
||||
|
638
tests/test_kms/test_kms_boto3.py
Normal file
638
tests/test_kms/test_kms_boto3.py
Normal file
@ -0,0 +1,638 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
from datetime import datetime
|
||||
from dateutil.tz import tzutc
|
||||
import base64
|
||||
import os
|
||||
|
||||
import boto3
|
||||
import botocore.exceptions
|
||||
import six
|
||||
import sure # noqa
|
||||
from freezegun import freeze_time
|
||||
from nose.tools import assert_raises
|
||||
from parameterized import parameterized
|
||||
|
||||
from moto import mock_kms
|
||||
|
||||
PLAINTEXT_VECTORS = (
|
||||
(b"some encodeable plaintext",),
|
||||
(b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",),
|
||||
("some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥",),
|
||||
)
|
||||
|
||||
|
||||
def _get_encoded_value(plaintext):
|
||||
if isinstance(plaintext, six.binary_type):
|
||||
return plaintext
|
||||
|
||||
return plaintext.encode("utf-8")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_create_key():
|
||||
conn = boto3.client("kms", region_name="us-east-1")
|
||||
key = conn.create_key(
|
||||
Policy="my policy",
|
||||
Description="my key",
|
||||
KeyUsage="ENCRYPT_DECRYPT",
|
||||
Tags=[{"TagKey": "project", "TagValue": "moto"}],
|
||||
)
|
||||
|
||||
key["KeyMetadata"]["Arn"].should.equal(
|
||||
"arn:aws:kms:us-east-1:123456789012:key/{}".format(key["KeyMetadata"]["KeyId"])
|
||||
)
|
||||
key["KeyMetadata"]["AWSAccountId"].should.equal("123456789012")
|
||||
key["KeyMetadata"]["CreationDate"].should.be.a(datetime)
|
||||
key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT")
|
||||
key["KeyMetadata"]["Description"].should.equal("my key")
|
||||
key["KeyMetadata"]["Enabled"].should.be.ok
|
||||
key["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"])
|
||||
key["KeyMetadata"]["KeyId"].should_not.be.empty
|
||||
key["KeyMetadata"]["KeyManager"].should.equal("CUSTOMER")
|
||||
key["KeyMetadata"]["KeyState"].should.equal("Enabled")
|
||||
key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT")
|
||||
key["KeyMetadata"]["Origin"].should.equal("AWS_KMS")
|
||||
key["KeyMetadata"].should_not.have.key("SigningAlgorithms")
|
||||
|
||||
key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", CustomerMasterKeySpec="RSA_2048",)
|
||||
|
||||
sorted(key["KeyMetadata"]["EncryptionAlgorithms"]).should.equal(
|
||||
["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"]
|
||||
)
|
||||
key["KeyMetadata"].should_not.have.key("SigningAlgorithms")
|
||||
|
||||
key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="RSA_2048",)
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
sorted(key["KeyMetadata"]["SigningAlgorithms"]).should.equal(
|
||||
[
|
||||
"RSASSA_PKCS1_V1_5_SHA_256",
|
||||
"RSASSA_PKCS1_V1_5_SHA_384",
|
||||
"RSASSA_PKCS1_V1_5_SHA_512",
|
||||
"RSASSA_PSS_SHA_256",
|
||||
"RSASSA_PSS_SHA_384",
|
||||
"RSASSA_PSS_SHA_512",
|
||||
]
|
||||
)
|
||||
|
||||
key = conn.create_key(
|
||||
KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_SECG_P256K1",
|
||||
)
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_256"])
|
||||
|
||||
key = conn.create_key(
|
||||
KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P384",
|
||||
)
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_384"])
|
||||
|
||||
key = conn.create_key(
|
||||
KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P521",
|
||||
)
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
response = client.create_key(Description="my key", KeyUsage="ENCRYPT_DECRYPT",)
|
||||
key_id = response["KeyMetadata"]["KeyId"]
|
||||
|
||||
response = client.describe_key(KeyId=key_id)
|
||||
|
||||
response["KeyMetadata"]["AWSAccountId"].should.equal("123456789012")
|
||||
response["KeyMetadata"]["CreationDate"].should.be.a(datetime)
|
||||
response["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT")
|
||||
response["KeyMetadata"]["Description"].should.equal("my key")
|
||||
response["KeyMetadata"]["Enabled"].should.be.ok
|
||||
response["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"])
|
||||
response["KeyMetadata"]["KeyId"].should_not.be.empty
|
||||
response["KeyMetadata"]["KeyManager"].should.equal("CUSTOMER")
|
||||
response["KeyMetadata"]["KeyState"].should.equal("Enabled")
|
||||
response["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT")
|
||||
response["KeyMetadata"]["Origin"].should.equal("AWS_KMS")
|
||||
response["KeyMetadata"].should_not.have.key("SigningAlgorithms")
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("alias/does-not-exist",),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/does-not-exist",),
|
||||
("invalid",),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_describe_key_via_alias_invalid_alias(key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
client.create_key(Description="key")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.describe_key(KeyId=key_id)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_generate_data_key():
|
||||
kms = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = kms.create_key()
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
response = kms.generate_data_key(KeyId=key_id, NumberOfBytes=32)
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(response["CiphertextBlob"], validate=True)
|
||||
# Plaintext must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(response["Plaintext"], validate=True)
|
||||
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_encrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(Description="key")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||
response["CiphertextBlob"].should_not.equal(plaintext)
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(response["CiphertextBlob"], validate=True)
|
||||
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_decrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(Description="key")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
encrypt_response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||
|
||||
client.create_key(Description="key")
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(encrypt_response["CiphertextBlob"], validate=True)
|
||||
|
||||
decrypt_response = client.decrypt(CiphertextBlob=encrypt_response["CiphertextBlob"])
|
||||
|
||||
# Plaintext must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(decrypt_response["Plaintext"], validate=True)
|
||||
|
||||
decrypt_response["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response["KeyId"].should.equal(key_arn)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("not-a-uuid",),
|
||||
("alias/DoesNotExist",),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",),
|
||||
("d25652e4-d2d2-49f7-929a-671ccda580c6",),
|
||||
(
|
||||
"arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6",
|
||||
),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_invalid_key_ids(key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.generate_data_key(KeyId=key_id, NumberOfBytes=5)
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_kms_encrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="key")
|
||||
response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext)
|
||||
|
||||
response = client.decrypt(CiphertextBlob=response["CiphertextBlob"])
|
||||
response["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="disable-key")
|
||||
client.disable_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "Disabled"
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="enable-key")
|
||||
client.disable_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
client.enable_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == True
|
||||
assert result["KeyMetadata"]["KeyState"] == "Enabled"
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_schedule_key_deletion():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="schedule-key-deletion")
|
||||
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false":
|
||||
with freeze_time("2015-01-01 12:00:00"):
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
assert response["DeletionDate"] == datetime(
|
||||
2015, 1, 31, 12, 0, tzinfo=tzutc()
|
||||
)
|
||||
else:
|
||||
# Can't manipulate time in server mode
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "PendingDeletion"
|
||||
assert "DeletionDate" in result["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_schedule_key_deletion_custom():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="schedule-key-deletion")
|
||||
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "false":
|
||||
with freeze_time("2015-01-01 12:00:00"):
|
||||
response = client.schedule_key_deletion(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7
|
||||
)
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
assert response["DeletionDate"] == datetime(
|
||||
2015, 1, 8, 12, 0, tzinfo=tzutc()
|
||||
)
|
||||
else:
|
||||
# Can't manipulate time in server mode
|
||||
response = client.schedule_key_deletion(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], PendingWindowInDays=7
|
||||
)
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "PendingDeletion"
|
||||
assert "DeletionDate" in result["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_cancel_key_deletion():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="cancel-key-deletion")
|
||||
client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
response = client.cancel_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert response["KeyId"] == key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.describe_key(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
assert result["KeyMetadata"]["Enabled"] == False
|
||||
assert result["KeyMetadata"]["KeyState"] == "Disabled"
|
||||
assert "DeletionDate" not in result["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_update_key_description():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="old_description")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
result = client.update_key_description(KeyId=key_id, Description="new_description")
|
||||
assert "ResponseMetadata" in result
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_tag_resource():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="cancel-key-deletion")
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
keyid = response["KeyId"]
|
||||
response = client.tag_resource(
|
||||
KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}]
|
||||
)
|
||||
|
||||
# Shouldn't have any data, just header
|
||||
assert len(response.keys()) == 1
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_resource_tags():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="cancel-key-deletion")
|
||||
response = client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
|
||||
|
||||
keyid = response["KeyId"]
|
||||
response = client.tag_resource(
|
||||
KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}]
|
||||
)
|
||||
|
||||
response = client.list_resource_tags(KeyId=keyid)
|
||||
assert response["Tags"][0]["TagKey"] == "string"
|
||||
assert response["Tags"][0]["TagValue"] == "string"
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
(dict(KeySpec="AES_256"), 32),
|
||||
(dict(KeySpec="AES_128"), 16),
|
||||
(dict(NumberOfBytes=64), 64),
|
||||
(dict(NumberOfBytes=1), 1),
|
||||
(dict(NumberOfBytes=1024), 1024),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_sizes(kwargs, expected_key_length):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-size")
|
||||
|
||||
response = client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs)
|
||||
|
||||
assert len(response["Plaintext"]) == expected_key_length
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_generate_data_key_decrypt():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-decrypt")
|
||||
|
||||
resp1 = client.generate_data_key(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256"
|
||||
)
|
||||
resp2 = client.decrypt(CiphertextBlob=resp1["CiphertextBlob"])
|
||||
|
||||
assert resp1["Plaintext"] == resp2["Plaintext"]
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
(dict(KeySpec="AES_257"),),
|
||||
(dict(KeySpec="AES_128", NumberOfBytes=16),),
|
||||
(dict(NumberOfBytes=2048),),
|
||||
(dict(NumberOfBytes=0),),
|
||||
(dict(),),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_invalid_size_params(kwargs):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-size")
|
||||
|
||||
with assert_raises(
|
||||
(botocore.exceptions.ClientError, botocore.exceptions.ParamValidationError)
|
||||
) as err:
|
||||
client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("alias/DoesNotExist",),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",),
|
||||
("d25652e4-d2d2-49f7-929a-671ccda580c6",),
|
||||
(
|
||||
"arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6",
|
||||
),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_invalid_key(key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.generate_data_key(KeyId=key_id, KeySpec="AES_256")
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
("alias/DoesExist", False),
|
||||
("arn:aws:kms:us-east-1:012345678912:alias/DoesExist", False),
|
||||
("", True),
|
||||
("arn:aws:kms:us-east-1:012345678912:key/", True),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_data_key_all_valid_key_ids(prefix, append_key_id):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key()
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
client.create_alias(AliasName="alias/DoesExist", TargetKeyId=key_id)
|
||||
|
||||
target_id = prefix
|
||||
if append_key_id:
|
||||
target_id += key_id
|
||||
|
||||
client.generate_data_key(KeyId=key_id, NumberOfBytes=32)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_generate_data_key_without_plaintext_decrypt():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key = client.create_key(Description="generate-data-key-decrypt")
|
||||
|
||||
resp1 = client.generate_data_key_without_plaintext(
|
||||
KeyId=key["KeyMetadata"]["KeyId"], KeySpec="AES_256"
|
||||
)
|
||||
|
||||
assert "Plaintext" not in resp1
|
||||
|
||||
|
||||
@parameterized(PLAINTEXT_VECTORS)
|
||||
@mock_kms
|
||||
def test_re_encrypt_decrypt(plaintext):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key_1 = client.create_key(Description="key 1")
|
||||
key_1_id = key_1["KeyMetadata"]["KeyId"]
|
||||
key_1_arn = key_1["KeyMetadata"]["Arn"]
|
||||
key_2 = client.create_key(Description="key 2")
|
||||
key_2_id = key_2["KeyMetadata"]["KeyId"]
|
||||
key_2_arn = key_2["KeyMetadata"]["Arn"]
|
||||
|
||||
encrypt_response = client.encrypt(
|
||||
KeyId=key_1_id, Plaintext=plaintext, EncryptionContext={"encryption": "context"}
|
||||
)
|
||||
|
||||
re_encrypt_response = client.re_encrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
SourceEncryptionContext={"encryption": "context"},
|
||||
DestinationKeyId=key_2_id,
|
||||
DestinationEncryptionContext={"another": "context"},
|
||||
)
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with assert_raises(Exception):
|
||||
base64.b64decode(re_encrypt_response["CiphertextBlob"], validate=True)
|
||||
|
||||
re_encrypt_response["SourceKeyId"].should.equal(key_1_arn)
|
||||
re_encrypt_response["KeyId"].should.equal(key_2_arn)
|
||||
|
||||
decrypt_response_1 = client.decrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
EncryptionContext={"encryption": "context"},
|
||||
)
|
||||
decrypt_response_1["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response_1["KeyId"].should.equal(key_1_arn)
|
||||
|
||||
decrypt_response_2 = client.decrypt(
|
||||
CiphertextBlob=re_encrypt_response["CiphertextBlob"],
|
||||
EncryptionContext={"another": "context"},
|
||||
)
|
||||
decrypt_response_2["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response_2["KeyId"].should.equal(key_2_arn)
|
||||
|
||||
decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_re_encrypt_to_invalid_destination():
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
key = client.create_key(Description="key 1")
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
|
||||
encrypt_response = client.encrypt(KeyId=key_id, Plaintext=b"some plaintext")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.re_encrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
DestinationKeyId="alias/DoesNotExist",
|
||||
)
|
||||
|
||||
|
||||
@parameterized(((12,), (44,), (91,), (1,), (1024,)))
|
||||
@mock_kms
|
||||
def test_generate_random(number_of_bytes):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
response = client.generate_random(NumberOfBytes=number_of_bytes)
|
||||
|
||||
response["Plaintext"].should.be.a(bytes)
|
||||
len(response["Plaintext"]).should.equal(number_of_bytes)
|
||||
|
||||
|
||||
@parameterized(
|
||||
(
|
||||
(2048, botocore.exceptions.ClientError),
|
||||
(1025, botocore.exceptions.ClientError),
|
||||
(0, botocore.exceptions.ParamValidationError),
|
||||
(-1, botocore.exceptions.ParamValidationError),
|
||||
(-1024, botocore.exceptions.ParamValidationError),
|
||||
)
|
||||
)
|
||||
@mock_kms
|
||||
def test_generate_random_invalid_number_of_bytes(number_of_bytes, error_type):
|
||||
client = boto3.client("kms", region_name="us-west-2")
|
||||
|
||||
with assert_raises(error_type):
|
||||
client.generate_random(NumberOfBytes=number_of_bytes)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.enable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key_rotation_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.disable_key_rotation(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.enable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.disable_key(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_cancel_key_deletion_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.cancel_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_schedule_key_deletion_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.schedule_key_deletion(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_rotation_status_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.get_key_rotation_status(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_policy_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.get_key_policy(
|
||||
KeyId="12366f9b-1230-123d-123e-123e6ae60c02", PolicyName="default"
|
||||
)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_key_policies_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.list_key_policies(KeyId="12366f9b-1230-123d-123e-123e6ae60c02")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy_key_not_found():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
|
||||
with assert_raises(client.exceptions.NotFoundException):
|
||||
client.put_key_policy(
|
||||
KeyId="00000000-0000-0000-0000-000000000000",
|
||||
PolicyName="default",
|
||||
Policy="new policy",
|
||||
)
|
@ -102,7 +102,7 @@ def test_deserialize_ciphertext_blob(raw, serialized):
|
||||
@parameterized(((ec[0],) for ec in ENCRYPTION_CONTEXT_VECTORS))
|
||||
def test_encrypt_decrypt_cycle(encryption_context):
|
||||
plaintext = b"some secret plaintext"
|
||||
master_key = Key("nop", "nop", "nop", [], "nop")
|
||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
||||
master_key_map = {master_key.id: master_key}
|
||||
|
||||
ciphertext_blob = encrypt(
|
||||
@ -133,7 +133,7 @@ def test_encrypt_unknown_key_id():
|
||||
|
||||
|
||||
def test_decrypt_invalid_ciphertext_format():
|
||||
master_key = Key("nop", "nop", "nop", [], "nop")
|
||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
||||
master_key_map = {master_key.id: master_key}
|
||||
|
||||
with assert_raises(InvalidCiphertextException):
|
||||
@ -153,7 +153,7 @@ def test_decrypt_unknwown_key_id():
|
||||
|
||||
|
||||
def test_decrypt_invalid_ciphertext():
|
||||
master_key = Key("nop", "nop", "nop", [], "nop")
|
||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
||||
master_key_map = {master_key.id: master_key}
|
||||
ciphertext_blob = (
|
||||
master_key.id.encode("utf-8") + b"123456789012"
|
||||
@ -171,7 +171,7 @@ def test_decrypt_invalid_ciphertext():
|
||||
|
||||
def test_decrypt_invalid_encryption_context():
|
||||
plaintext = b"some secret plaintext"
|
||||
master_key = Key("nop", "nop", "nop", [], "nop")
|
||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
||||
master_key_map = {master_key.id: master_key}
|
||||
|
||||
ciphertext_blob = encrypt(
|
||||
|
@ -713,3 +713,41 @@ def test_untag_resource_errors():
|
||||
ex.response["Error"]["Message"].should.equal(
|
||||
"You provided a value that does not match the required pattern."
|
||||
)
|
||||
|
||||
|
||||
@mock_organizations
|
||||
def test_update_organizational_unit():
|
||||
client = boto3.client("organizations", region_name="us-east-1")
|
||||
org = client.create_organization(FeatureSet="ALL")["Organization"]
|
||||
root_id = client.list_roots()["Roots"][0]["Id"]
|
||||
ou_name = "ou01"
|
||||
response = client.create_organizational_unit(ParentId=root_id, Name=ou_name)
|
||||
validate_organizational_unit(org, response)
|
||||
response["OrganizationalUnit"]["Name"].should.equal(ou_name)
|
||||
new_ou_name = "ou02"
|
||||
response = client.update_organizational_unit(
|
||||
OrganizationalUnitId=response["OrganizationalUnit"]["Id"], Name=new_ou_name
|
||||
)
|
||||
validate_organizational_unit(org, response)
|
||||
response["OrganizationalUnit"]["Name"].should.equal(new_ou_name)
|
||||
|
||||
|
||||
@mock_organizations
|
||||
def test_update_organizational_unit_duplicate_error():
|
||||
client = boto3.client("organizations", region_name="us-east-1")
|
||||
org = client.create_organization(FeatureSet="ALL")["Organization"]
|
||||
root_id = client.list_roots()["Roots"][0]["Id"]
|
||||
ou_name = "ou01"
|
||||
response = client.create_organizational_unit(ParentId=root_id, Name=ou_name)
|
||||
validate_organizational_unit(org, response)
|
||||
response["OrganizationalUnit"]["Name"].should.equal(ou_name)
|
||||
with assert_raises(ClientError) as e:
|
||||
client.update_organizational_unit(
|
||||
OrganizationalUnitId=response["OrganizationalUnit"]["Id"], Name=ou_name
|
||||
)
|
||||
exc = e.exception
|
||||
exc.operation_name.should.equal("UpdateOrganizationalUnit")
|
||||
exc.response["Error"]["Code"].should.contain("DuplicateOrganizationalUnitException")
|
||||
exc.response["Error"]["Message"].should.equal(
|
||||
"An OU with the same name already exists."
|
||||
)
|
||||
|
@ -1689,3 +1689,36 @@ def test_create_parameter_group_with_tags():
|
||||
ResourceName="arn:aws:rds:us-west-2:1234567890:pg:test"
|
||||
)
|
||||
result["TagList"].should.equal([{"Value": "bar", "Key": "foo"}])
|
||||
|
||||
|
||||
@mock_rds2
|
||||
def test_create_db_with_iam_authentication():
|
||||
conn = boto3.client("rds", region_name="us-west-2")
|
||||
|
||||
database = conn.create_db_instance(
|
||||
DBInstanceIdentifier="rds",
|
||||
DBInstanceClass="db.t1.micro",
|
||||
Engine="postgres",
|
||||
EnableIAMDatabaseAuthentication=True,
|
||||
)
|
||||
|
||||
db_instance = database["DBInstance"]
|
||||
db_instance["IAMDatabaseAuthenticationEnabled"].should.equal(True)
|
||||
|
||||
|
||||
@mock_rds2
|
||||
def test_create_db_snapshot_with_iam_authentication():
|
||||
conn = boto3.client("rds", region_name="us-west-2")
|
||||
|
||||
conn.create_db_instance(
|
||||
DBInstanceIdentifier="rds",
|
||||
DBInstanceClass="db.t1.micro",
|
||||
Engine="postgres",
|
||||
EnableIAMDatabaseAuthentication=True,
|
||||
)
|
||||
|
||||
snapshot = conn.create_db_snapshot(
|
||||
DBInstanceIdentifier="rds", DBSnapshotIdentifier="snapshot"
|
||||
).get("DBSnapshot")
|
||||
|
||||
snapshot.get("IAMDatabaseAuthenticationEnabled").should.equal(True)
|
||||
|
@ -40,6 +40,8 @@ def __setup_feedback_env__(
|
||||
)
|
||||
# Verify SES domain
|
||||
ses_conn.verify_domain_identity(Domain=domain)
|
||||
# Specify email address to allow for raw e-mails to be processed
|
||||
ses_conn.verify_email_identity(EmailAddress="test@example.com")
|
||||
# Setup SES notification topic
|
||||
if expected_msg is not None:
|
||||
ses_conn.set_identity_notification_topic(
|
||||
@ -47,7 +49,7 @@ def __setup_feedback_env__(
|
||||
)
|
||||
|
||||
|
||||
def __test_sns_feedback__(addr, expected_msg):
|
||||
def __test_sns_feedback__(addr, expected_msg, raw_email=False):
|
||||
region_name = "us-east-1"
|
||||
ses_conn = boto3.client("ses", region_name=region_name)
|
||||
sns_conn = boto3.client("sns", region_name=region_name)
|
||||
@ -73,7 +75,18 @@ def __test_sns_feedback__(addr, expected_msg):
|
||||
"Body": {"Text": {"Data": "test body"}},
|
||||
},
|
||||
)
|
||||
ses_conn.send_email(**kwargs)
|
||||
if raw_email:
|
||||
kwargs.pop("Message")
|
||||
kwargs.pop("Destination")
|
||||
kwargs.update(
|
||||
{
|
||||
"Destinations": [addr + "@" + domain],
|
||||
"RawMessage": {"Data": bytearray("raw_email", "utf-8")},
|
||||
}
|
||||
)
|
||||
ses_conn.send_raw_email(**kwargs)
|
||||
else:
|
||||
ses_conn.send_email(**kwargs)
|
||||
|
||||
# Wait for messages in the queues
|
||||
queue = sqs_conn.get_queue_by_name(QueueName=queue)
|
||||
@ -112,3 +125,12 @@ def test_sns_feedback_complaint():
|
||||
@mock_ses
|
||||
def test_sns_feedback_delivery():
|
||||
__test_sns_feedback__(SESFeedback.SUCCESS_ADDR, SESFeedback.DELIVERY)
|
||||
|
||||
|
||||
@mock_sqs
|
||||
@mock_sns
|
||||
@mock_ses
|
||||
def test_sns_feedback_delivery_raw_email():
|
||||
__test_sns_feedback__(
|
||||
SESFeedback.SUCCESS_ADDR, SESFeedback.DELIVERY, raw_email=True
|
||||
)
|
||||
|
@ -1759,3 +1759,23 @@ def test_receive_message_for_queue_with_receive_message_wait_time_seconds_set():
|
||||
)
|
||||
|
||||
queue.receive_messages()
|
||||
|
||||
|
||||
@mock_sqs
|
||||
def test_list_queues_limits_to_1000_queues():
|
||||
client = boto3.client("sqs", region_name="us-east-1")
|
||||
|
||||
for i in range(1001):
|
||||
client.create_queue(QueueName="test-queue-{0}".format(i))
|
||||
|
||||
client.list_queues()["QueueUrls"].should.have.length_of(1000)
|
||||
client.list_queues(QueueNamePrefix="test-queue")["QueueUrls"].should.have.length_of(
|
||||
1000
|
||||
)
|
||||
|
||||
resource = boto3.resource("sqs", region_name="us-east-1")
|
||||
|
||||
list(resource.queues.all()).should.have.length_of(1000)
|
||||
list(resource.queues.filter(QueueNamePrefix="test-queue")).should.have.length_of(
|
||||
1000
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user