Events: put_events() now support HTTP targets (#6777)

This commit is contained in:
Bert Blommers 2023-09-06 22:30:10 +00:00 committed by GitHub
parent 5b2da11e19
commit 4ea51d8795
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 0 deletions

View File

@ -28,6 +28,8 @@ The following is a non-exhaustive list of the environment variables that can be
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+ +-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| | | | | | | | | |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+ +-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| MOTO_EVENTS_INVOKE_HTTP | str | | See :ref:`events`. |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| MOTO_S3_CUSTOM_ENDPOINTS | str | | See :ref:`s3`. | | MOTO_S3_CUSTOM_ENDPOINTS | str | | See :ref:`s3`. |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+ +-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+

View File

@ -1,6 +1,7 @@
import copy import copy
import os import os
import re import re
import requests
import json import json
import sys import sys
import warnings import warnings
@ -11,6 +12,8 @@ from operator import lt, le, eq, ge, gt
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from collections import OrderedDict from collections import OrderedDict
from moto import settings
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
from moto.core import BaseBackend, BackendDict, CloudFormationModel, BaseModel from moto.core import BaseBackend, BackendDict, CloudFormationModel, BaseModel
from moto.core.utils import ( from moto.core.utils import (
@ -142,6 +145,23 @@ class Rule(CloudFormationModel):
"EventBusName": arn.resource_id, "EventBusName": arn.resource_id,
} }
cross_account_backend.put_events([new_event]) cross_account_backend.put_events([new_event])
elif arn.service == "events" and arn.resource_type == "api-destination":
if settings.events_invoke_http():
api_destination = self._find_api_destination(arn.resource_id)
request_parameters = target.get("HttpParameters", {})
headers = request_parameters.get("HeaderParameters", {})
qs_params = request_parameters.get("QueryStringParameters", {})
query_string = "&".join(
[f"{key}={val}" for key, val in qs_params.items()]
)
url = api_destination.invocation_endpoint + (
f"?{query_string}" if query_string else ""
)
requests.request(
method=api_destination.http_method,
url=url,
headers=headers,
)
else: else:
raise NotImplementedError(f"Expr not defined for {type(self)}") raise NotImplementedError(f"Expr not defined for {type(self)}")
@ -170,6 +190,11 @@ class Rule(CloudFormationModel):
if archive.uuid == archive_uuid: if archive.uuid == archive_uuid:
archive.events.append(event) archive.events.append(event)
def _find_api_destination(self, resource_id: str) -> "Destination":
backend: "EventsBackend" = events_backends[self.account_id][self.region_name]
destination_name = resource_id.split("/")[0]
return backend.destinations[destination_name]
def _send_to_sqs_queue( def _send_to_sqs_queue(
self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None
) -> None: ) -> None:
@ -1245,6 +1270,7 @@ class EventsBackend(BaseBackend):
- EventBridge Archive - EventBridge Archive
- SQS Queue + FIFO Queue - SQS Queue + FIFO Queue
- Cross-region/account EventBus - Cross-region/account EventBus
- HTTP requests (only enabled when MOTO_EVENTS_INVOKE_HTTP=true)
""" """
num_events = len(events) num_events = len(events)

View File

@ -79,6 +79,10 @@ def ecs_new_arn_format() -> bool:
return os.environ.get("MOTO_ECS_NEW_ARN", "true").lower() != "false" return os.environ.get("MOTO_ECS_NEW_ARN", "true").lower() != "false"
def events_invoke_http() -> bool:
return os.environ.get("MOTO_EVENTS_INVOKE_HTTP", "false").lower() == "true"
def allow_unknown_region() -> bool: def allow_unknown_region() -> bool:
return os.environ.get("MOTO_ALLOW_NONEXISTENT_REGION", "false").lower() == "true" return os.environ.get("MOTO_ALLOW_NONEXISTENT_REGION", "false").lower() == "true"

View File

@ -0,0 +1,84 @@
import boto3
import json
import os
import responses
from moto import mock_events, settings
from unittest import mock, SkipTest
@mock_events
@mock.patch.dict(os.environ, {"MOTO_EVENTS_INVOKE_HTTP": "true"})
def test_invoke_http_request_on_event():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't intercept HTTP requests in ServerMode")
events = boto3.client("events", region_name="eu-west-1")
#
# Create API endpoint to invoke
response = events.create_connection(
Name="test",
Description="test description",
AuthorizationType="API_KEY",
AuthParameters={
"ApiKeyAuthParameters": {"ApiKeyName": "test", "ApiKeyValue": "test"}
},
)
destination_response = events.create_api_destination(
Name="test",
Description="test-description",
ConnectionArn=response.get("ConnectionArn"),
InvocationEndpoint="https://www.google.com",
HttpMethod="GET",
)
destination_arn = destination_response["ApiDestinationArn"]
#
# Create Rules when to invoke the connection
pattern = {"source": ["test-source"], "detail-type": ["test-detail-type"]}
rule_name = "test-event-rule"
events.put_rule(
Name=rule_name,
State="ENABLED",
EventPattern=json.dumps(pattern),
)
events.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "123",
"Arn": destination_arn,
"HttpParameters": {
"HeaderParameters": {"header1": "value1"},
"QueryStringParameters": {"qs1": "qv2"},
},
}
],
)
#
# Ensure we intercept HTTP requests
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
# test that both json and urlencoded body are empty in matcher and in request
rsps.add(
method=responses.GET,
url="https://www.google.com/",
match=[
responses.matchers.header_matcher({"header1": "value1"}),
responses.matchers.query_param_matcher({"qs1": "qv2"}),
],
)
#
# Invoke HTTP requests
events.put_events(
Entries=[
{
"Source": "test-source",
"DetailType": "test-detail-type",
"Detail": "{}",
}
]
)