Feature: Record/replay incoming requests (#5481)

This commit is contained in:
Bert Blommers 2022-09-19 15:04:29 +00:00 committed by GitHub
parent 9fc64ad93b
commit 03a43a9a0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 532 additions and 5 deletions

1
.gitignore vendored
View File

@ -28,3 +28,4 @@ htmlcov/
.~c9_*
.coverage*
docs/_build
moto_recording

View File

@ -11,6 +11,7 @@ Moto has a variety of ways to configure the mock behaviour.
:maxdepth: 1
environment_variables
recorder/index
state_transition/index
state_transition/models

View File

@ -0,0 +1,70 @@
.. _recorder_page:
.. role:: raw-html(raw)
:format: html
=============================
Recorder
=============================
The Moto Recorder is used to log all incoming requests, which can be replayed at a later date.
This is useful if you need to setup an initial state, and ensure that this is the same across developers/environments.
Usage
##############
Usage in decorator mode:
.. sourcecode:: python
from moto.moto_api import recorder
# Start the recorder
recorder.start_recording()
# Make some requests using boto3
# When you're ready..
recorder.stop_recording()
log = recorder.download_recording()
# Later on, upload this log to another system
recorder.upload_recording(log)
# And replay the contents
recorder.replay_recording()
# While the recorder is active, new requests will be appended to the existing log
# Reset the current log if you want to start with an empty slate
recorder.reset_recording()
Usage in ServerMode:
.. sourcecode:: python
# Start the recorder
requests.post("http://localhost:5000/moto-api/recorder/start-recording")
# Make some requests
# When you're ready..
requests.post("http://localhost:5000/moto-api/recorder/stop-recording")
log = requests.get("http://localhost:5000/moto-api/recorder/download-recording").content
# Later on, upload this log to another system
requests.post("http://localhost:5000/moto-api/recorder/upload-recording", data=log)
# and replay the contents
requests.post("http://localhost:5000/moto-api/recorder/replay-recording")
# While the recorder is active, new requests will be appended to the existing log
# Reset the current log if you want to start with an empty slate
requests.post("http://localhost:5000/moto-api/recorder/reset-recording")
Note that this feature records and replays the incoming HTTP request. Randomized data created by Moto, such as resource ID's, will not be stored as part of the log.
Configuration
##################
The requests are stored in a file called `moto_recording`, in the directory that Python is run from. You can configure this location using the following environment variable:
`MOTO_RECORDER_FILEPATH=/whatever/path/you/want`
The recorder is disabled by default. If you want to enable it, use the following environment variable:
`MOTO_ENABLE_RECORDING=True`

View File

@ -32,6 +32,9 @@ class BotocoreStubber:
def __call__(self, event_name, request, **kwargs):
if not self.enabled:
return None
from moto.moto_api import recorder
response = None
response_callback = None
found_index = None
@ -52,9 +55,12 @@ class BotocoreStubber:
if isinstance(value, bytes):
request.headers[header] = value.decode("utf-8")
try:
recorder._record_request(request)
status, headers, body = response_callback(
request, request.url, request.headers
)
except HTTPException as e:
status = e.code
headers = e.get_headers()

View File

@ -30,14 +30,14 @@ class CallbackResponse(responses.CallbackResponse):
if request.body is None:
body = None
elif isinstance(request.body, str):
body = BytesIO(request.body.encode("UTF-8"))
body = request.body.encode("UTF-8")
elif hasattr(request.body, "read"):
body = BytesIO(request.body.read())
body = request.body.read()
else:
body = BytesIO(request.body)
body = request.body
req = Request.from_values(
path="?".join([url.path, url.query]),
input_stream=body,
input_stream=BytesIO(body) if body else None,
content_length=request.headers.get("Content-Length"),
content_type=request.headers.get("Content-Type"),
method=request.method,
@ -49,6 +49,10 @@ class CallbackResponse(responses.CallbackResponse):
request = req
headers = self.get_headers()
from moto.moto_api import recorder
recorder._record_request(request, body)
result = self.callback(request)
if isinstance(result, Exception):
raise result

View File

@ -124,8 +124,10 @@ class convert_to_flask_response(object):
def __call__(self, args=None, **kwargs):
from flask import request, Response
from moto.moto_api import recorder
try:
recorder._record_request(request)
result = self.callback(request, request.url, dict(request.headers))
except ClientError as exc:
result = 400, {}, exc.response["Error"]["Message"]

View File

@ -5,3 +5,9 @@ Global StateManager that everyone uses
Use this manager to configure how AWS models transition between states. (initializing -> starting, starting -> ready, etc.)
"""
state_manager = _internal.state_manager.StateManager()
""""
Recorder, used to record calls to Moto and replay them later
"""
recorder = _internal.Recorder()

View File

@ -1,5 +1,6 @@
from .models import moto_api_backend
from .state_manager import StateManager # noqa
from .recorder.models import Recorder # noqa
moto_api_backends = {"global": moto_api_backend}

View File

@ -0,0 +1,135 @@
import base64
import io
import json
import os
import requests
from botocore.awsrequest import AWSPreparedRequest
from urllib.parse import urlparse
class Recorder:
def __init__(self):
self._location = str(os.environ.get("MOTO_RECORDER_FILEPATH", "moto_recording"))
self._os_enabled = bool(os.environ.get("MOTO_ENABLE_RECORDING", False))
self._user_enabled = self._os_enabled
def _record_request(self, request, body=None):
"""
Record the current request
"""
if not self._user_enabled:
return
if urlparse(request.url).path.startswith("/moto-api/recorder/"):
return
entry = {
"headers": dict(request.headers),
"method": request.method,
"url": request.url,
}
if body is None:
if isinstance(request, AWSPreparedRequest):
body, body_encoded = self._encode_body(body=request.body)
else:
try:
request_body = None
request_body_size = int(request.headers["Content-Length"])
request_body = request.environ["wsgi.input"].read(request_body_size)
body, body_encoded = self._encode_body(body=request_body)
except (AttributeError, KeyError):
body = ""
body_encoded = False
finally:
if request_body is not None:
if isinstance(request_body, bytes):
request_body = request_body.decode("utf-8")
request.environ["wsgi.input"] = io.StringIO(request_body)
else:
body, body_encoded = self._encode_body(body)
entry.update({"body": body, "body_encoded": body_encoded})
filepath = self._location
with open(filepath, "a+") as file:
file.write(json.dumps(entry))
file.write("\n")
def _encode_body(self, body):
body_encoded = False
try:
if isinstance(body, io.BytesIO):
body = body.getvalue()
if isinstance(body, bytes):
body = base64.b64encode(body).decode("ascii")
body_encoded = True
except AttributeError:
body = None
return body, body_encoded
def reset_recording(self):
"""
Resets the recording. This will erase any requests made previously.
"""
filepath = self._location
with open(filepath, "w"):
pass
def start_recording(self):
"""
Start the recording, and append incoming requests to the log.
"""
self._user_enabled = True
def stop_recording(self):
self._user_enabled = False
def upload_recording(self, data):
"""
Replace the current log. Remember to replay the recording afterwards.
"""
filepath = self._location
with open(filepath, "bw") as file:
file.write(data)
def download_recording(self):
"""
Download the current recording. The result can be uploaded afterwards.
"""
filepath = self._location
with open(filepath, "r") as file:
return file.read()
def replay_recording(self, target_host=None):
"""
Replays the current log, i.e. replay all requests that were made after the recorder was started.
Download the recording if you want to manually verify the correct requests will be replayed.
"""
filepath = self._location
# do not record the replay itself
old_setting = self._user_enabled
self._user_enabled = False
with open(filepath, "r") as file:
entries = file.readlines()
for row in entries:
row_loaded = json.loads(row)
body = row_loaded.get("body", "{}")
if row_loaded.get("body_encoded"):
body = base64.b64decode(body)
method = row_loaded.get("method")
url = row_loaded.get("url")
if target_host is not None:
parsed_host = urlparse(target_host)
parsed_url = urlparse(url)
url = f"{parsed_host.scheme}://{parsed_host.netloc}{parsed_url.path}"
if parsed_url.query:
url = f"{url}?{parsed_url.query}"
headers = row_loaded.get("headers")
requests.request(method=method, url=url, headers=headers, data=body)
# restore the recording setting
self._user_enabled = old_setting

View File

@ -0,0 +1,31 @@
from ... import recorder
from moto.core.responses import BaseResponse
class RecorderResponse(BaseResponse):
def reset_recording(self, req, url, headers): # pylint: disable=unused-argument
recorder.reset_recording()
return 200, {}, ""
def start_recording(self, req, url, headers): # pylint: disable=unused-argument
recorder.start_recording()
return 200, {}, "Recording is set to True"
def stop_recording(self, req, url, headers): # pylint: disable=unused-argument
recorder.stop_recording()
return 200, {}, "Recording is set to False"
def upload_recording(self, req, url, headers): # pylint: disable=unused-argument
data = req.data
recorder.upload_recording(data)
return 200, {}, ""
def download_recording(self, req, url, headers): # pylint: disable=unused-argument
data = recorder.download_recording()
return 200, {}, data
# NOTE: Replaying assumes, for simplicity, that it is the only action
# running against moto at the time. No recording happens while replaying.
def replay_recording(self, req, url, headers): # pylint: disable=unused-argument
recorder.replay_recording(target_host=url)
return 200, {}, ""

View File

@ -1,8 +1,10 @@
from moto.moto_api._internal.responses import MotoAPIResponse
from .responses import MotoAPIResponse
from .recorder.responses import RecorderResponse
url_bases = ["https?://motoapi.amazonaws.com"]
response_instance = MotoAPIResponse()
recorder_response = RecorderResponse()
url_paths = {
"{0}/moto-api/$": response_instance.dashboard,
@ -12,4 +14,10 @@ url_paths = {
"{0}/moto-api/state-manager/get-transition": response_instance.get_transition,
"{0}/moto-api/state-manager/set-transition": response_instance.set_transition,
"{0}/moto-api/state-manager/unset-transition": response_instance.unset_transition,
"{0}/moto-api/recorder/reset-recording": recorder_response.reset_recording,
"{0}/moto-api/recorder/start-recording": recorder_response.start_recording,
"{0}/moto-api/recorder/stop-recording": recorder_response.stop_recording,
"{0}/moto-api/recorder/upload-recording": recorder_response.upload_recording,
"{0}/moto-api/recorder/download-recording": recorder_response.download_recording,
"{0}/moto-api/recorder/replay-recording": recorder_response.replay_recording,
}

View File

@ -0,0 +1,262 @@
import boto3
import json
import requests
import os
import sure # noqa # pylint: disable=unused-import
from moto import (
settings,
mock_apigateway,
mock_dynamodb,
mock_ec2,
mock_s3,
mock_timestreamwrite,
)
from moto.moto_api import recorder
from moto.server import ThreadedMotoServer
from tests import EXAMPLE_AMI_ID
from unittest import SkipTest, TestCase
@mock_apigateway
@mock_dynamodb
@mock_ec2
@mock_s3
@mock_timestreamwrite
class TestRecorder(TestCase):
def _reset_recording(self):
if settings.TEST_SERVER_MODE:
requests.post("http://localhost:5000/moto-api/recorder/reset-recording")
else:
recorder.reset_recording()
def _start_recording(self):
if settings.TEST_SERVER_MODE:
requests.post("http://localhost:5000/moto-api/recorder/start-recording")
else:
recorder.start_recording()
def _stop_recording(self):
if settings.TEST_SERVER_MODE:
requests.post("http://localhost:5000/moto-api/recorder/stop-recording")
else:
recorder.stop_recording()
def _download_recording(self):
if settings.TEST_SERVER_MODE:
resp = requests.get(
"http://localhost:5000/moto-api/recorder/download-recording"
)
resp.status_code.should.equal(200)
return resp.content
else:
return recorder.download_recording()
def _replay_recording(self):
if settings.TEST_SERVER_MODE:
requests.post("http://localhost:5000/moto-api/recorder/replay-recording")
else:
recorder.replay_recording()
def setUp(self) -> None:
# Reset recorded calls to ensure it's not bleeding over from other tests
self._reset_recording()
def tearDown(self) -> None:
self._stop_recording()
def test_ec2_instance_creation__recording_off(self):
ec2 = boto3.client("ec2", region_name="us-west-1")
ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
self._download_recording().should.be.empty
def test_ec2_instance_creation_recording_on(self):
self._start_recording()
ec2 = boto3.client("ec2", region_name="us-west-1")
ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
content = json.loads(self._download_recording())
content.should.have.key("body").should.contain("Action=RunInstances")
content.should.have.key("body").should.contain(f"ImageId={EXAMPLE_AMI_ID}")
def test_multiple_services(self):
self._start_recording()
ddb = boto3.client("dynamodb", "eu-west-1")
ddb.create_table(
TableName="test",
AttributeDefinitions=[{"AttributeName": "client", "AttributeType": "S"}],
KeySchema=[{"AttributeName": "client", "KeyType": "HASH"}],
ProvisionedThroughput={"ReadCapacityUnits": 123, "WriteCapacityUnits": 123},
)
ddb.put_item(TableName="test", Item={"client": {"S": "test1"}})
ts = boto3.client("timestream-write", region_name="us-east-1")
ts.create_database(DatabaseName="mydatabase")
apigw = boto3.client("apigateway", region_name="us-west-2")
apigw.create_rest_api(name="my_api", description="desc")
content = self._download_recording()
rows = [json.loads(x) for x in content.splitlines()]
actions = [row["headers"].get("X-Amz-Target") for row in rows]
actions.should.contain("DynamoDB_20120810.CreateTable")
actions.should.contain("DynamoDB_20120810.PutItem")
actions.should.contain("Timestream_20181101.CreateDatabase")
def test_replay(self):
self._start_recording()
ddb = boto3.client("dynamodb", "eu-west-1")
self._create_ddb_table(ddb, "test")
apigw = boto3.client("apigateway", region_name="us-west-2")
api_id = apigw.create_rest_api(name="my_api", description="desc")["id"]
self._stop_recording()
ddb.delete_table(TableName="test")
apigw.delete_rest_api(restApiId=api_id)
self._replay_recording()
ddb.list_tables()["TableNames"].should.equal(["test"])
apis = apigw.get_rest_apis()["items"]
apis.should.have.length_of(1)
# The ID is uniquely generated everytime, but the name is the same
apis[0]["id"].shouldnt.equal(api_id)
apis[0]["name"].should.equal("my_api")
def test_replay__partial_delete(self):
self._start_recording()
ddb = boto3.client("dynamodb", "eu-west-1")
self._create_ddb_table(ddb, "test")
apigw = boto3.client("apigateway", region_name="us-west-2")
api_id = apigw.create_rest_api(name="my_api", description="desc")["id"]
ddb.delete_table(TableName="test")
self._stop_recording()
apigw.delete_rest_api(restApiId=api_id)
self._replay_recording()
# The replay will create, then delete this Table
ddb.list_tables()["TableNames"].should.equal([])
# The replay will create the RestAPI - the deletion was not recorded
apis = apigw.get_rest_apis()["items"]
apis.should.have.length_of(1)
def test_s3_upload_data(self):
self._start_recording()
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="mybucket")
s3.put_object(Bucket="mybucket", Body=b"ABCD", Key="data")
self._stop_recording()
s3.delete_object(Bucket="mybucket", Key="data")
s3.delete_bucket(Bucket="mybucket")
# Replaying should recreate the file as is
self._replay_recording()
resp = s3.get_object(Bucket="mybucket", Key="data")
resp["Body"].read().should.equal(b"ABCD")
def test_s3_upload_file_using_requests(self):
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="mybucket")
params = {"Bucket": "mybucket", "Key": "file_upload"}
_url = s3.generate_presigned_url("put_object", params, ExpiresIn=900)
with open("text.txt", "w") as file:
file.write("test")
# Record file uploaded to S3 outside of boto3
self._start_recording()
requests.put(_url, files={"upload_file": open("text.txt", "rb")})
self._stop_recording()
# Delete file
s3.delete_object(Bucket="mybucket", Key="file_upload")
# Replay upload, and assert it succeeded
self._replay_recording()
resp = s3.get_object(Bucket="mybucket", Key="file_upload")
resp["Body"].read().should.equal(b"test")
# cleanup
os.remove("text.txt")
def _create_ddb_table(self, ddb, table_name):
ddb.create_table(
TableName=table_name,
AttributeDefinitions=[{"AttributeName": "client", "AttributeType": "S"}],
KeySchema=[{"AttributeName": "client", "KeyType": "HASH"}],
ProvisionedThroughput={"ReadCapacityUnits": 123, "WriteCapacityUnits": 123},
)
class TestThreadedMotoServer(TestCase):
def setUp(self) -> None:
if settings.TEST_SERVER_MODE:
raise SkipTest("No point in testing ServerMode within ServerMode")
self.port_1 = 5678
self.port_2 = 5679
# start server on port x
server = ThreadedMotoServer(
ip_address="127.0.0.1", port=self.port_1, verbose=False
)
server.start()
requests.post(
f"http://localhost:{self.port_1}/moto-api/recorder/reset-recording"
)
requests.post(
f"http://localhost:{self.port_1}/moto-api/recorder/start-recording"
)
# create s3 file
s3 = boto3.client(
"s3",
region_name="us-east-1",
endpoint_url=f"http://localhost:{self.port_1}",
)
s3.create_bucket(Bucket="mybucket")
s3.put_object(Bucket="mybucket", Body=b"ABCD", Key="data")
# store content
requests.post(
f"http://localhost:{self.port_1}/moto-api/recorder/stop-recording"
)
self.content = requests.post(
f"http://localhost:{self.port_1}/moto-api/recorder/download-recording"
).content
server.stop()
def test_server(self):
# start motoserver on port y
server = ThreadedMotoServer(
ip_address="127.0.0.1", port=self.port_2, verbose=False
)
server.start()
requests.post(f"http://localhost:{self.port_2}/moto-api/reset")
# upload content
requests.post(
f"http://localhost:{self.port_2}/moto-api/recorder/upload-recording",
data=self.content,
)
# replay
requests.post(
f"http://localhost:{self.port_2}/moto-api/recorder/replay-recording"
)
# assert the file exists
s3 = boto3.client(
"s3",
region_name="us-east-1",
endpoint_url=f"http://localhost:{self.port_2}",
)
resp = s3.get_object(Bucket="mybucket", Key="data")
resp["Body"].read().should.equal(b"ABCD")
server.stop()