diff --git a/.gitignore b/.gitignore index 7d3a7f3d2..8c9f3ec2a 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ htmlcov/ .~c9_* .coverage* docs/_build +moto_recording diff --git a/docs/docs/configuration/index.rst b/docs/docs/configuration/index.rst index 17dba95ca..b98154f69 100644 --- a/docs/docs/configuration/index.rst +++ b/docs/docs/configuration/index.rst @@ -11,6 +11,7 @@ Moto has a variety of ways to configure the mock behaviour. :maxdepth: 1 environment_variables + recorder/index state_transition/index state_transition/models diff --git a/docs/docs/configuration/recorder/index.rst b/docs/docs/configuration/recorder/index.rst new file mode 100644 index 000000000..07974d284 --- /dev/null +++ b/docs/docs/configuration/recorder/index.rst @@ -0,0 +1,70 @@ +.. _recorder_page: + +.. role:: raw-html(raw) + :format: html + +============================= +Recorder +============================= + +The Moto Recorder is used to log all incoming requests, which can be replayed at a later date. +This is useful if you need to setup an initial state, and ensure that this is the same across developers/environments. + +Usage +############## + +Usage in decorator mode: + +.. sourcecode:: python + + from moto.moto_api import recorder + + # Start the recorder + recorder.start_recording() + # Make some requests using boto3 + + # When you're ready.. + recorder.stop_recording() + log = recorder.download_recording() + + # Later on, upload this log to another system + recorder.upload_recording(log) + # And replay the contents + recorder.replay_recording() + + # While the recorder is active, new requests will be appended to the existing log + # Reset the current log if you want to start with an empty slate + recorder.reset_recording() + +Usage in ServerMode: + +.. sourcecode:: python + + # Start the recorder + requests.post("http://localhost:5000/moto-api/recorder/start-recording") + # Make some requests + + # When you're ready.. + requests.post("http://localhost:5000/moto-api/recorder/stop-recording") + log = requests.get("http://localhost:5000/moto-api/recorder/download-recording").content + + # Later on, upload this log to another system + requests.post("http://localhost:5000/moto-api/recorder/upload-recording", data=log) + # and replay the contents + requests.post("http://localhost:5000/moto-api/recorder/replay-recording") + + # While the recorder is active, new requests will be appended to the existing log + # Reset the current log if you want to start with an empty slate + requests.post("http://localhost:5000/moto-api/recorder/reset-recording") + +Note that this feature records and replays the incoming HTTP request. Randomized data created by Moto, such as resource ID's, will not be stored as part of the log. + + +Configuration +################## + +The requests are stored in a file called `moto_recording`, in the directory that Python is run from. You can configure this location using the following environment variable: +`MOTO_RECORDER_FILEPATH=/whatever/path/you/want` + +The recorder is disabled by default. If you want to enable it, use the following environment variable: +`MOTO_ENABLE_RECORDING=True` diff --git a/moto/core/botocore_stubber.py b/moto/core/botocore_stubber.py index faf25b617..387fd41f7 100644 --- a/moto/core/botocore_stubber.py +++ b/moto/core/botocore_stubber.py @@ -32,6 +32,9 @@ class BotocoreStubber: def __call__(self, event_name, request, **kwargs): if not self.enabled: return None + + from moto.moto_api import recorder + response = None response_callback = None found_index = None @@ -52,9 +55,12 @@ class BotocoreStubber: if isinstance(value, bytes): request.headers[header] = value.decode("utf-8") try: + recorder._record_request(request) + status, headers, body = response_callback( request, request.url, request.headers ) + except HTTPException as e: status = e.code headers = e.get_headers() diff --git a/moto/core/custom_responses_mock.py b/moto/core/custom_responses_mock.py index 503dd972e..3fff7ddf7 100644 --- a/moto/core/custom_responses_mock.py +++ b/moto/core/custom_responses_mock.py @@ -30,14 +30,14 @@ class CallbackResponse(responses.CallbackResponse): if request.body is None: body = None elif isinstance(request.body, str): - body = BytesIO(request.body.encode("UTF-8")) + body = request.body.encode("UTF-8") elif hasattr(request.body, "read"): - body = BytesIO(request.body.read()) + body = request.body.read() else: - body = BytesIO(request.body) + body = request.body req = Request.from_values( path="?".join([url.path, url.query]), - input_stream=body, + input_stream=BytesIO(body) if body else None, content_length=request.headers.get("Content-Length"), content_type=request.headers.get("Content-Type"), method=request.method, @@ -49,6 +49,10 @@ class CallbackResponse(responses.CallbackResponse): request = req headers = self.get_headers() + from moto.moto_api import recorder + + recorder._record_request(request, body) + result = self.callback(request) if isinstance(result, Exception): raise result diff --git a/moto/core/utils.py b/moto/core/utils.py index 8083e96ca..9dfc2be8c 100644 --- a/moto/core/utils.py +++ b/moto/core/utils.py @@ -124,8 +124,10 @@ class convert_to_flask_response(object): def __call__(self, args=None, **kwargs): from flask import request, Response + from moto.moto_api import recorder try: + recorder._record_request(request) result = self.callback(request, request.url, dict(request.headers)) except ClientError as exc: result = 400, {}, exc.response["Error"]["Message"] diff --git a/moto/moto_api/__init__.py b/moto/moto_api/__init__.py index 0517e8ff3..4870a0ce4 100644 --- a/moto/moto_api/__init__.py +++ b/moto/moto_api/__init__.py @@ -5,3 +5,9 @@ Global StateManager that everyone uses Use this manager to configure how AWS models transition between states. (initializing -> starting, starting -> ready, etc.) """ state_manager = _internal.state_manager.StateManager() + + +"""" +Recorder, used to record calls to Moto and replay them later +""" +recorder = _internal.Recorder() diff --git a/moto/moto_api/_internal/__init__.py b/moto/moto_api/_internal/__init__.py index b7efea3d8..d12ac2556 100644 --- a/moto/moto_api/_internal/__init__.py +++ b/moto/moto_api/_internal/__init__.py @@ -1,5 +1,6 @@ from .models import moto_api_backend from .state_manager import StateManager # noqa +from .recorder.models import Recorder # noqa moto_api_backends = {"global": moto_api_backend} diff --git a/moto/moto_api/_internal/recorder/__init__.py b/moto/moto_api/_internal/recorder/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/moto/moto_api/_internal/recorder/models.py b/moto/moto_api/_internal/recorder/models.py new file mode 100644 index 000000000..5d351d4ed --- /dev/null +++ b/moto/moto_api/_internal/recorder/models.py @@ -0,0 +1,135 @@ +import base64 +import io +import json +import os + +import requests +from botocore.awsrequest import AWSPreparedRequest +from urllib.parse import urlparse + + +class Recorder: + def __init__(self): + self._location = str(os.environ.get("MOTO_RECORDER_FILEPATH", "moto_recording")) + self._os_enabled = bool(os.environ.get("MOTO_ENABLE_RECORDING", False)) + self._user_enabled = self._os_enabled + + def _record_request(self, request, body=None): + """ + Record the current request + """ + if not self._user_enabled: + return + + if urlparse(request.url).path.startswith("/moto-api/recorder/"): + return + + entry = { + "headers": dict(request.headers), + "method": request.method, + "url": request.url, + } + + if body is None: + if isinstance(request, AWSPreparedRequest): + body, body_encoded = self._encode_body(body=request.body) + else: + try: + request_body = None + request_body_size = int(request.headers["Content-Length"]) + request_body = request.environ["wsgi.input"].read(request_body_size) + body, body_encoded = self._encode_body(body=request_body) + except (AttributeError, KeyError): + body = "" + body_encoded = False + finally: + if request_body is not None: + if isinstance(request_body, bytes): + request_body = request_body.decode("utf-8") + request.environ["wsgi.input"] = io.StringIO(request_body) + else: + body, body_encoded = self._encode_body(body) + entry.update({"body": body, "body_encoded": body_encoded}) + + filepath = self._location + with open(filepath, "a+") as file: + file.write(json.dumps(entry)) + file.write("\n") + + def _encode_body(self, body): + body_encoded = False + try: + if isinstance(body, io.BytesIO): + body = body.getvalue() + if isinstance(body, bytes): + body = base64.b64encode(body).decode("ascii") + body_encoded = True + except AttributeError: + body = None + return body, body_encoded + + def reset_recording(self): + """ + Resets the recording. This will erase any requests made previously. + """ + filepath = self._location + with open(filepath, "w"): + pass + + def start_recording(self): + """ + Start the recording, and append incoming requests to the log. + """ + self._user_enabled = True + + def stop_recording(self): + self._user_enabled = False + + def upload_recording(self, data): + """ + Replace the current log. Remember to replay the recording afterwards. + """ + filepath = self._location + with open(filepath, "bw") as file: + file.write(data) + + def download_recording(self): + """ + Download the current recording. The result can be uploaded afterwards. + """ + filepath = self._location + with open(filepath, "r") as file: + return file.read() + + def replay_recording(self, target_host=None): + """ + Replays the current log, i.e. replay all requests that were made after the recorder was started. + Download the recording if you want to manually verify the correct requests will be replayed. + """ + filepath = self._location + + # do not record the replay itself + old_setting = self._user_enabled + self._user_enabled = False + + with open(filepath, "r") as file: + entries = file.readlines() + + for row in entries: + row_loaded = json.loads(row) + body = row_loaded.get("body", "{}") + if row_loaded.get("body_encoded"): + body = base64.b64decode(body) + method = row_loaded.get("method") + url = row_loaded.get("url") + if target_host is not None: + parsed_host = urlparse(target_host) + parsed_url = urlparse(url) + url = f"{parsed_host.scheme}://{parsed_host.netloc}{parsed_url.path}" + if parsed_url.query: + url = f"{url}?{parsed_url.query}" + headers = row_loaded.get("headers") + requests.request(method=method, url=url, headers=headers, data=body) + + # restore the recording setting + self._user_enabled = old_setting diff --git a/moto/moto_api/_internal/recorder/responses.py b/moto/moto_api/_internal/recorder/responses.py new file mode 100644 index 000000000..870427934 --- /dev/null +++ b/moto/moto_api/_internal/recorder/responses.py @@ -0,0 +1,31 @@ +from ... import recorder +from moto.core.responses import BaseResponse + + +class RecorderResponse(BaseResponse): + def reset_recording(self, req, url, headers): # pylint: disable=unused-argument + recorder.reset_recording() + return 200, {}, "" + + def start_recording(self, req, url, headers): # pylint: disable=unused-argument + recorder.start_recording() + return 200, {}, "Recording is set to True" + + def stop_recording(self, req, url, headers): # pylint: disable=unused-argument + recorder.stop_recording() + return 200, {}, "Recording is set to False" + + def upload_recording(self, req, url, headers): # pylint: disable=unused-argument + data = req.data + recorder.upload_recording(data) + return 200, {}, "" + + def download_recording(self, req, url, headers): # pylint: disable=unused-argument + data = recorder.download_recording() + return 200, {}, data + + # NOTE: Replaying assumes, for simplicity, that it is the only action + # running against moto at the time. No recording happens while replaying. + def replay_recording(self, req, url, headers): # pylint: disable=unused-argument + recorder.replay_recording(target_host=url) + return 200, {}, "" diff --git a/moto/moto_api/_internal/urls.py b/moto/moto_api/_internal/urls.py index eaf15e963..1f433d470 100644 --- a/moto/moto_api/_internal/urls.py +++ b/moto/moto_api/_internal/urls.py @@ -1,8 +1,10 @@ -from moto.moto_api._internal.responses import MotoAPIResponse +from .responses import MotoAPIResponse +from .recorder.responses import RecorderResponse url_bases = ["https?://motoapi.amazonaws.com"] response_instance = MotoAPIResponse() +recorder_response = RecorderResponse() url_paths = { "{0}/moto-api/$": response_instance.dashboard, @@ -12,4 +14,10 @@ url_paths = { "{0}/moto-api/state-manager/get-transition": response_instance.get_transition, "{0}/moto-api/state-manager/set-transition": response_instance.set_transition, "{0}/moto-api/state-manager/unset-transition": response_instance.unset_transition, + "{0}/moto-api/recorder/reset-recording": recorder_response.reset_recording, + "{0}/moto-api/recorder/start-recording": recorder_response.start_recording, + "{0}/moto-api/recorder/stop-recording": recorder_response.stop_recording, + "{0}/moto-api/recorder/upload-recording": recorder_response.upload_recording, + "{0}/moto-api/recorder/download-recording": recorder_response.download_recording, + "{0}/moto-api/recorder/replay-recording": recorder_response.replay_recording, } diff --git a/tests/test_moto_api/recorder/test_recorder.py b/tests/test_moto_api/recorder/test_recorder.py new file mode 100644 index 000000000..bfbeebb19 --- /dev/null +++ b/tests/test_moto_api/recorder/test_recorder.py @@ -0,0 +1,262 @@ +import boto3 +import json +import requests +import os +import sure # noqa # pylint: disable=unused-import +from moto import ( + settings, + mock_apigateway, + mock_dynamodb, + mock_ec2, + mock_s3, + mock_timestreamwrite, +) +from moto.moto_api import recorder +from moto.server import ThreadedMotoServer +from tests import EXAMPLE_AMI_ID +from unittest import SkipTest, TestCase + + +@mock_apigateway +@mock_dynamodb +@mock_ec2 +@mock_s3 +@mock_timestreamwrite +class TestRecorder(TestCase): + def _reset_recording(self): + if settings.TEST_SERVER_MODE: + requests.post("http://localhost:5000/moto-api/recorder/reset-recording") + else: + recorder.reset_recording() + + def _start_recording(self): + if settings.TEST_SERVER_MODE: + requests.post("http://localhost:5000/moto-api/recorder/start-recording") + else: + recorder.start_recording() + + def _stop_recording(self): + if settings.TEST_SERVER_MODE: + requests.post("http://localhost:5000/moto-api/recorder/stop-recording") + else: + recorder.stop_recording() + + def _download_recording(self): + if settings.TEST_SERVER_MODE: + resp = requests.get( + "http://localhost:5000/moto-api/recorder/download-recording" + ) + resp.status_code.should.equal(200) + return resp.content + else: + return recorder.download_recording() + + def _replay_recording(self): + if settings.TEST_SERVER_MODE: + requests.post("http://localhost:5000/moto-api/recorder/replay-recording") + else: + recorder.replay_recording() + + def setUp(self) -> None: + # Reset recorded calls to ensure it's not bleeding over from other tests + self._reset_recording() + + def tearDown(self) -> None: + self._stop_recording() + + def test_ec2_instance_creation__recording_off(self): + ec2 = boto3.client("ec2", region_name="us-west-1") + ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1) + + self._download_recording().should.be.empty + + def test_ec2_instance_creation_recording_on(self): + self._start_recording() + ec2 = boto3.client("ec2", region_name="us-west-1") + ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1) + + content = json.loads(self._download_recording()) + + content.should.have.key("body").should.contain("Action=RunInstances") + content.should.have.key("body").should.contain(f"ImageId={EXAMPLE_AMI_ID}") + + def test_multiple_services(self): + self._start_recording() + ddb = boto3.client("dynamodb", "eu-west-1") + ddb.create_table( + TableName="test", + AttributeDefinitions=[{"AttributeName": "client", "AttributeType": "S"}], + KeySchema=[{"AttributeName": "client", "KeyType": "HASH"}], + ProvisionedThroughput={"ReadCapacityUnits": 123, "WriteCapacityUnits": 123}, + ) + ddb.put_item(TableName="test", Item={"client": {"S": "test1"}}) + + ts = boto3.client("timestream-write", region_name="us-east-1") + ts.create_database(DatabaseName="mydatabase") + + apigw = boto3.client("apigateway", region_name="us-west-2") + apigw.create_rest_api(name="my_api", description="desc") + + content = self._download_recording() + rows = [json.loads(x) for x in content.splitlines()] + + actions = [row["headers"].get("X-Amz-Target") for row in rows] + actions.should.contain("DynamoDB_20120810.CreateTable") + actions.should.contain("DynamoDB_20120810.PutItem") + actions.should.contain("Timestream_20181101.CreateDatabase") + + def test_replay(self): + self._start_recording() + ddb = boto3.client("dynamodb", "eu-west-1") + self._create_ddb_table(ddb, "test") + + apigw = boto3.client("apigateway", region_name="us-west-2") + api_id = apigw.create_rest_api(name="my_api", description="desc")["id"] + + self._stop_recording() + + ddb.delete_table(TableName="test") + apigw.delete_rest_api(restApiId=api_id) + + self._replay_recording() + + ddb.list_tables()["TableNames"].should.equal(["test"]) + + apis = apigw.get_rest_apis()["items"] + apis.should.have.length_of(1) + # The ID is uniquely generated everytime, but the name is the same + apis[0]["id"].shouldnt.equal(api_id) + apis[0]["name"].should.equal("my_api") + + def test_replay__partial_delete(self): + self._start_recording() + ddb = boto3.client("dynamodb", "eu-west-1") + self._create_ddb_table(ddb, "test") + + apigw = boto3.client("apigateway", region_name="us-west-2") + api_id = apigw.create_rest_api(name="my_api", description="desc")["id"] + + ddb.delete_table(TableName="test") + self._stop_recording() + + apigw.delete_rest_api(restApiId=api_id) + + self._replay_recording() + + # The replay will create, then delete this Table + ddb.list_tables()["TableNames"].should.equal([]) + + # The replay will create the RestAPI - the deletion was not recorded + apis = apigw.get_rest_apis()["items"] + apis.should.have.length_of(1) + + def test_s3_upload_data(self): + self._start_recording() + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="mybucket") + s3.put_object(Bucket="mybucket", Body=b"ABCD", Key="data") + + self._stop_recording() + s3.delete_object(Bucket="mybucket", Key="data") + s3.delete_bucket(Bucket="mybucket") + + # Replaying should recreate the file as is + self._replay_recording() + resp = s3.get_object(Bucket="mybucket", Key="data") + resp["Body"].read().should.equal(b"ABCD") + + def test_s3_upload_file_using_requests(self): + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="mybucket") + + params = {"Bucket": "mybucket", "Key": "file_upload"} + _url = s3.generate_presigned_url("put_object", params, ExpiresIn=900) + with open("text.txt", "w") as file: + file.write("test") + + # Record file uploaded to S3 outside of boto3 + self._start_recording() + requests.put(_url, files={"upload_file": open("text.txt", "rb")}) + self._stop_recording() + + # Delete file + s3.delete_object(Bucket="mybucket", Key="file_upload") + + # Replay upload, and assert it succeeded + self._replay_recording() + resp = s3.get_object(Bucket="mybucket", Key="file_upload") + resp["Body"].read().should.equal(b"test") + # cleanup + os.remove("text.txt") + + def _create_ddb_table(self, ddb, table_name): + ddb.create_table( + TableName=table_name, + AttributeDefinitions=[{"AttributeName": "client", "AttributeType": "S"}], + KeySchema=[{"AttributeName": "client", "KeyType": "HASH"}], + ProvisionedThroughput={"ReadCapacityUnits": 123, "WriteCapacityUnits": 123}, + ) + + +class TestThreadedMotoServer(TestCase): + def setUp(self) -> None: + if settings.TEST_SERVER_MODE: + raise SkipTest("No point in testing ServerMode within ServerMode") + + self.port_1 = 5678 + self.port_2 = 5679 + # start server on port x + server = ThreadedMotoServer( + ip_address="127.0.0.1", port=self.port_1, verbose=False + ) + server.start() + requests.post( + f"http://localhost:{self.port_1}/moto-api/recorder/reset-recording" + ) + requests.post( + f"http://localhost:{self.port_1}/moto-api/recorder/start-recording" + ) + + # create s3 file + s3 = boto3.client( + "s3", + region_name="us-east-1", + endpoint_url=f"http://localhost:{self.port_1}", + ) + s3.create_bucket(Bucket="mybucket") + s3.put_object(Bucket="mybucket", Body=b"ABCD", Key="data") + + # store content + requests.post( + f"http://localhost:{self.port_1}/moto-api/recorder/stop-recording" + ) + self.content = requests.post( + f"http://localhost:{self.port_1}/moto-api/recorder/download-recording" + ).content + server.stop() + + def test_server(self): + # start motoserver on port y + server = ThreadedMotoServer( + ip_address="127.0.0.1", port=self.port_2, verbose=False + ) + server.start() + requests.post(f"http://localhost:{self.port_2}/moto-api/reset") + # upload content + requests.post( + f"http://localhost:{self.port_2}/moto-api/recorder/upload-recording", + data=self.content, + ) + # replay + requests.post( + f"http://localhost:{self.port_2}/moto-api/recorder/replay-recording" + ) + # assert the file exists + s3 = boto3.client( + "s3", + region_name="us-east-1", + endpoint_url=f"http://localhost:{self.port_2}", + ) + resp = s3.get_object(Bucket="mybucket", Key="data") + resp["Body"].read().should.equal(b"ABCD") + server.stop()