moto/tests/test_events/test_events_http_integration.py

85 lines
2.4 KiB
Python

import boto3
import json
import os
import responses
from moto import mock_events, settings
from unittest import mock, SkipTest
@mock_events
@mock.patch.dict(os.environ, {"MOTO_EVENTS_INVOKE_HTTP": "true"})
def test_invoke_http_request_on_event():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't intercept HTTP requests in ServerMode")
events = boto3.client("events", region_name="eu-west-1")
#
# Create API endpoint to invoke
response = events.create_connection(
Name="test",
Description="test description",
AuthorizationType="API_KEY",
AuthParameters={
"ApiKeyAuthParameters": {"ApiKeyName": "test", "ApiKeyValue": "test"}
},
)
destination_response = events.create_api_destination(
Name="test",
Description="test-description",
ConnectionArn=response.get("ConnectionArn"),
InvocationEndpoint="https://www.google.com",
HttpMethod="GET",
)
destination_arn = destination_response["ApiDestinationArn"]
#
# Create Rules when to invoke the connection
pattern = {"source": ["test-source"], "detail-type": ["test-detail-type"]}
rule_name = "test-event-rule"
events.put_rule(
Name=rule_name,
State="ENABLED",
EventPattern=json.dumps(pattern),
)
events.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "123",
"Arn": destination_arn,
"HttpParameters": {
"HeaderParameters": {"header1": "value1"},
"QueryStringParameters": {"qs1": "qv2"},
},
}
],
)
#
# Ensure we intercept HTTP requests
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
# test that both json and urlencoded body are empty in matcher and in request
rsps.add(
method=responses.GET,
url="https://www.google.com/",
match=[
responses.matchers.header_matcher({"header1": "value1"}),
responses.matchers.query_param_matcher({"qs1": "qv2"}),
],
)
#
# Invoke HTTP requests
events.put_events(
Entries=[
{
"Source": "test-source",
"DetailType": "test-detail-type",
"Detail": "{}",
}
]
)