moto/tests/test_kinesis/test_kinesis_encryption.py

73 lines
2.1 KiB
Python
Raw Normal View History

2022-01-26 19:41:04 +00:00
import boto3
2024-01-07 12:03:33 +00:00
from moto import mock_aws
from .test_kinesis import get_stream_arn
2022-01-26 19:41:04 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2022-01-26 19:41:04 +00:00
def test_enable_encryption():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
assert desc["EncryptionType"] == "NONE"
assert "KeyId" not in desc
2022-01-26 19:41:04 +00:00
client.start_stream_encryption(
StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"
)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
assert desc["EncryptionType"] == "KMS"
assert desc["KeyId"] == "n/a"
2022-01-26 19:41:04 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2022-01-26 19:41:04 +00:00
def test_disable_encryption():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
assert desc["EncryptionType"] == "NONE"
2022-01-26 19:41:04 +00:00
client.start_stream_encryption(
StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"
)
client.stop_stream_encryption(
StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"
)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
assert desc["EncryptionType"] == "NONE"
assert "KeyId" not in desc
2024-01-07 12:03:33 +00:00
@mock_aws
def test_disable_encryption__using_arns():
client = boto3.client("kinesis", region_name="us-west-2")
client.create_stream(StreamName="my-stream", ShardCount=2)
stream_arn = get_stream_arn(client, "my-stream")
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
assert desc["EncryptionType"] == "NONE"
client.start_stream_encryption(
StreamARN=stream_arn, EncryptionType="KMS", KeyId="n/a"
)
client.stop_stream_encryption(
StreamARN=stream_arn, EncryptionType="KMS", KeyId="n/a"
)
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
assert desc["EncryptionType"] == "NONE"
assert "KeyId" not in desc