import datetime
from unittest import SkipTest, mock
from collections import OrderedDict
from gzip import compress as gzip_compress
from botocore.awsrequest import AWSPreparedRequest
from moto.core.responses import AWSServiceSpec, BaseResponse
from moto.core.responses import flatten_json_request_body
from moto.s3.responses import S3Response
from moto import settings
from freezegun import freeze_time
def test_flatten_json_request_body():
spec = AWSServiceSpec("data/emr/2009-03-31/service-2.json").input_spec("RunJobFlow")
body = {
"Name": "cluster",
"Instances": {
"Ec2KeyName": "ec2key",
"InstanceGroups": [
{"InstanceRole": "MASTER", "InstanceType": "m1.small"},
{"InstanceRole": "CORE", "InstanceType": "m1.medium"},
],
"Placement": {"AvailabilityZone": "us-east-1"},
},
"Steps": [
{
"HadoopJarStep": {
"Properties": [
{"Key": "k1", "Value": "v1"},
{"Key": "k2", "Value": "v2"},
],
"Args": ["arg1", "arg2"],
}
}
],
"Configurations": [
{
"Classification": "class",
"Properties": {"propkey1": "propkey1", "propkey2": "propkey2"},
},
{"Classification": "anotherclass", "Properties": {"propkey3": "propkey3"}},
],
}
flat = flatten_json_request_body("", body, spec)
assert flat["Name"] == body["Name"]
assert flat["Instances.Ec2KeyName"] == body["Instances"]["Ec2KeyName"]
for idx in range(2):
inst = body["Instances"]["InstanceGroups"][idx]
assert (
flat[f"Instances.InstanceGroups.member.{(idx + 1)}.InstanceRole"]
== inst["InstanceRole"]
)
assert (
flat[f"Instances.InstanceGroups.member.{(idx + 1)}.InstanceType"]
== inst["InstanceType"]
)
assert (
flat["Instances.Placement.AvailabilityZone"]
== body["Instances"]["Placement"]["AvailabilityZone"]
)
for idx in range(1):
prefix = "Steps.member." + str(idx + 1) + ".HadoopJarStep"
step = body["Steps"][idx]["HadoopJarStep"]
i = 0
while prefix + ".Properties.member." + str(i + 1) + ".Key" in flat:
prop = step["Properties"][i]
assert flat[f"{prefix}.Properties.member.{(i + 1)}.Key"] == prop["Key"]
assert flat[f"{prefix}.Properties.member.{(i + 1)}.Value"] == prop["Value"]
i += 1
i = 0
while prefix + ".Args.member." + str(i + 1) in flat:
assert flat[f"{prefix}.Args.member.{(i + 1)}"] == step["Args"][i]
i += 1
for idx in range(2):
assert (
flat["Configurations.member." + str(idx + 1) + ".Classification"]
== body["Configurations"][idx]["Classification"]
)
props = {}
i = 1
keyfmt = "Configurations.member.{0}.Properties.entry.{1}"
key = keyfmt.format(idx + 1, i)
while key + ".key" in flat:
props[flat[key + ".key"]] = flat[key + ".value"]
i += 1
key = keyfmt.format(idx + 1, i)
assert props == body["Configurations"][idx]["Properties"]
def test_parse_qs_unicode_decode_error():
body = b'{"key": "%D0"}, "C": "#0 = :0"}'
request = AWSPreparedRequest("GET", "http://request", {"foo": "bar"}, body, False)
BaseResponse().setup_class(request, request.url, request.headers)
def test_get_params():
subject = BaseResponse()
subject.querystring = OrderedDict(
[
("Action", ["CreateRule"]),
("Version", ["2015-12-01"]),
(
"ListenerArn",
[
"arn:aws:elasticloadbalancing:us-east-1:1:listener/my-lb/50dc6c495c0c9188/80139731473870416"
],
),
("Priority", ["100"]),
("Conditions.member.1.Field", ["http-header"]),
("Conditions.member.1.HttpHeaderConfig.HttpHeaderName", ["User-Agent"]),
("Conditions.member.1.HttpHeaderConfig.Values.member.2", ["curl"]),
("Conditions.member.1.HttpHeaderConfig.Values.member.1", ["Mozilla"]),
("Actions.member.1.FixedResponseConfig.StatusCode", ["200"]),
("Actions.member.1.FixedResponseConfig.ContentType", ["text/plain"]),
("Actions.member.1.Type", ["fixed-response"]),
]
)
result = subject._get_params()
assert result == {
"Action": "CreateRule",
"Version": "2015-12-01",
"ListenerArn": "arn:aws:elasticloadbalancing:us-east-1:1:listener/my-lb/50dc6c495c0c9188/80139731473870416",
"Priority": "100",
"Conditions": [
{
"Field": "http-header",
"HttpHeaderConfig": {
"HttpHeaderName": "User-Agent",
"Values": ["Mozilla", "curl"],
},
}
],
"Actions": [
{
"Type": "fixed-response",
"FixedResponseConfig": {
"StatusCode": "200",
"ContentType": "text/plain",
},
}
],
}
def test_get_dict_list_params():
subject = BaseResponse()
subject.querystring = OrderedDict(
[
("Action", ["CreateDBCluster"]),
("Version", ["2014-10-31"]),
("VpcSecurityGroupIds.VpcSecurityGroupId.1", ["sg-123"]),
("VpcSecurityGroupIds.VpcSecurityGroupId.2", ["sg-456"]),
("VpcSecurityGroupIds.VpcSecurityGroupId.3", ["sg-789"]),
]
)
# TODO: extend test and logic such that we can call subject._get_params() directly here
result = subject._get_multi_param_dict("VpcSecurityGroupIds")
assert result == {"VpcSecurityGroupId": ["sg-123", "sg-456", "sg-789"]}
def test_response_environment_preserved_by_type():
"""Ensure Jinja environment is cached by response type."""
class ResponseA(BaseResponse):
pass
class ResponseB(BaseResponse):
pass
resp_a = ResponseA()
another_resp_a = ResponseA()
resp_b = ResponseB()
assert resp_a.environment is another_resp_a.environment
assert resp_b.environment is not resp_a.environment
source_1 = "template"
source_2 = "amother template"
assert not resp_a.contains_template(BaseResponse._make_template_id(source_1))
resp_a.response_template(source_1)
assert resp_a.contains_template(BaseResponse._make_template_id(source_1))
assert not resp_a.contains_template(BaseResponse._make_template_id(source_2))
resp_a.response_template(source_2)
assert resp_a.contains_template(BaseResponse._make_template_id(source_2))
assert not resp_b.contains_template(BaseResponse._make_template_id(source_1))
assert not resp_b.contains_template(BaseResponse._make_template_id(source_2))
assert another_resp_a.contains_template(BaseResponse._make_template_id(source_1))
assert another_resp_a.contains_template(BaseResponse._make_template_id(source_2))
resp_a_new_instance = ResponseA()
assert resp_a_new_instance.contains_template(
BaseResponse._make_template_id(source_1)
)
assert resp_a_new_instance.contains_template(
BaseResponse._make_template_id(source_2)
)
@mock.patch(
"moto.core.responses.settings.PRETTIFY_RESPONSES",
new_callable=mock.PropertyMock(return_value=True),
)
def test_jinja_render_prettify(m_env_var):
if settings.TEST_SERVER_MODE:
raise SkipTest(
"It is not possible to set the environment variable in server mode"
)
response = BaseResponse()
TEMPLATE = """Test text"""
expected_output = '\n\n\tTest text\n'
template = response.response_template(TEMPLATE)
xml_string = template.render()
assert xml_string == expected_output
assert m_env_var
def test_response_metadata():
# Setup
frozen_time = datetime.datetime(
2023, 5, 20, 10, 20, 30, tzinfo=datetime.timezone.utc
)
request = AWSPreparedRequest("GET", "http://request", {}, None, False)
# Execute
with freeze_time(frozen_time):
bc = BaseResponse()
bc.setup_class(request, request.url, request.headers)
# Verify
assert "date" in bc.response_headers
if not settings.TEST_SERVER_MODE:
assert bc.response_headers["date"] == "Sat, 20 May 2023 10:20:30 GMT"
def test_compression_gzip():
body = '{"key": "%D0"}, "C": "#0 = :0"}'
request = AWSPreparedRequest(
"GET",
url="http://request",
headers={"Content-Encoding": "gzip"},
body=_gzip_compress_body(body),
stream_output=False,
)
response = BaseResponse()
response.setup_class(request, request.url, request.headers)
assert body == response.body
def test_compression_gzip_in_s3():
body = b"some random data"
request = AWSPreparedRequest(
"GET",
url="http://request",
headers={"Content-Encoding": "gzip"},
body=body,
stream_output=False,
)
response = S3Response()
response.setup_class(request, request.url, request.headers)
assert body == response.body.encode("utf-8")
def _gzip_compress_body(body: str):
assert isinstance(body, str)
return gzip_compress(data=body.encode("utf-8"))