MockAWS implementation using botocore event hooks
This commit is contained in:
parent
71a054af92
commit
8e909f580a
@ -10,6 +10,7 @@ from boto3.session import Session
|
||||
import responses
|
||||
from moto.core import BaseBackend, BaseModel
|
||||
from .utils import create_id
|
||||
from moto.core.utils import path_url
|
||||
from .exceptions import StageNotFoundException, ApiKeyNotFoundException
|
||||
|
||||
STAGE_URL = "https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}"
|
||||
@ -372,7 +373,8 @@ class RestAPI(BaseModel):
|
||||
# TODO deal with no matching resource
|
||||
|
||||
def resource_callback(self, request):
|
||||
path_after_stage_name = '/'.join(request.path_url.split("/")[2:])
|
||||
path = path_url(request.url)
|
||||
path_after_stage_name = '/'.join(path.split("/")[2:])
|
||||
if not path_after_stage_name:
|
||||
path_after_stage_name = '/'
|
||||
|
||||
|
@ -7,7 +7,7 @@ try:
|
||||
except ImportError:
|
||||
from urllib.parse import unquote
|
||||
|
||||
from moto.core.utils import amz_crc32, amzn_request_id
|
||||
from moto.core.utils import amz_crc32, amzn_request_id, path_url
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import lambda_backends
|
||||
|
||||
@ -94,7 +94,7 @@ class LambdaResponse(BaseResponse):
|
||||
return self._add_policy(request, full_url, headers)
|
||||
|
||||
def _add_policy(self, request, full_url, headers):
|
||||
path = request.path if hasattr(request, 'path') else request.path_url
|
||||
path = request.path if hasattr(request, 'path') else path_url(request.url)
|
||||
function_name = path.split('/')[-2]
|
||||
if self.lambda_backend.get_function(function_name):
|
||||
policy = request.body.decode('utf8')
|
||||
@ -104,7 +104,7 @@ class LambdaResponse(BaseResponse):
|
||||
return 404, {}, "{}"
|
||||
|
||||
def _get_policy(self, request, full_url, headers):
|
||||
path = request.path if hasattr(request, 'path') else request.path_url
|
||||
path = request.path if hasattr(request, 'path') else path_url(request.url)
|
||||
function_name = path.split('/')[-2]
|
||||
if self.lambda_backend.get_function(function_name):
|
||||
lambda_function = self.lambda_backend.get_function(function_name)
|
||||
|
@ -2,11 +2,14 @@
|
||||
from __future__ import unicode_literals
|
||||
from __future__ import absolute_import
|
||||
|
||||
from collections import defaultdict
|
||||
import functools
|
||||
import inspect
|
||||
import re
|
||||
import six
|
||||
from io import BytesIO
|
||||
from collections import defaultdict
|
||||
from botocore.handlers import BUILTIN_HANDLERS
|
||||
from botocore.awsrequest import AWSResponse
|
||||
|
||||
from moto import settings
|
||||
import responses
|
||||
@ -233,7 +236,86 @@ class ResponsesMockAWS(BaseMockAWS):
|
||||
pass
|
||||
|
||||
|
||||
MockAWS = ResponsesMockAWS
|
||||
BOTOCORE_HTTP_METHODS = [
|
||||
'GET', 'DELETE', 'HEAD', 'OPTIONS', 'PATCH', 'POST', 'PUT'
|
||||
]
|
||||
|
||||
|
||||
class MockRawResponse(BytesIO):
|
||||
def __init__(self, input):
|
||||
if isinstance(input, six.text_type):
|
||||
input = input.encode('utf-8')
|
||||
super(MockRawResponse, self).__init__(input)
|
||||
|
||||
def stream(self, **kwargs):
|
||||
contents = self.read()
|
||||
while contents:
|
||||
yield contents
|
||||
contents = self.read()
|
||||
|
||||
|
||||
class BotocoreStubber(object):
|
||||
def __init__(self):
|
||||
self.enabled = False
|
||||
self.methods = defaultdict(list)
|
||||
|
||||
def reset(self):
|
||||
self.methods.clear()
|
||||
|
||||
def register_response(self, method, pattern, response):
|
||||
matchers = self.methods[method]
|
||||
matchers.append((pattern, response))
|
||||
|
||||
def __call__(self, event_name, request, **kwargs):
|
||||
if not self.enabled:
|
||||
return None
|
||||
|
||||
response = None
|
||||
response_callback = None
|
||||
found_index = None
|
||||
matchers = self.methods.get(request.method)
|
||||
|
||||
base_url = request.url.split('?', 1)[0]
|
||||
for i, (pattern, callback) in enumerate(matchers):
|
||||
if pattern.match(base_url):
|
||||
if found_index is None:
|
||||
found_index = i
|
||||
response_callback = callback
|
||||
else:
|
||||
matchers.pop(found_index)
|
||||
break
|
||||
|
||||
if response_callback is not None:
|
||||
for header, value in request.headers.items():
|
||||
if isinstance(value, six.binary_type):
|
||||
request.headers[header] = value.decode('utf-8')
|
||||
status, headers, body = response_callback(request, request.url, request.headers)
|
||||
body = MockRawResponse(body)
|
||||
response = AWSResponse(request.url, status, headers, body)
|
||||
|
||||
return response
|
||||
|
||||
botocore_stubber = BotocoreStubber()
|
||||
BUILTIN_HANDLERS.append(('before-send', botocore_stubber))
|
||||
|
||||
class BotocoreEventMockAWS(BaseMockAWS):
|
||||
def reset(self):
|
||||
botocore_stubber.reset()
|
||||
|
||||
def enable_patching(self):
|
||||
botocore_stubber.enabled = True
|
||||
for method in BOTOCORE_HTTP_METHODS:
|
||||
for backend in self.backends_for_urls.values():
|
||||
for key, value in backend.urls.items():
|
||||
pattern = re.compile(key)
|
||||
botocore_stubber.register_response(method, pattern, value)
|
||||
|
||||
def disable_patching(self):
|
||||
botocore_stubber.enabled = False
|
||||
self.reset()
|
||||
|
||||
|
||||
MockAWS = BotocoreEventMockAWS
|
||||
|
||||
|
||||
class ServerModeMockAWS(BaseMockAWS):
|
||||
|
@ -8,6 +8,7 @@ import random
|
||||
import re
|
||||
import six
|
||||
import string
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
|
||||
REQUEST_ID_LONG = string.digits + string.ascii_uppercase
|
||||
@ -286,3 +287,13 @@ def amzn_request_id(f):
|
||||
return status, headers, body
|
||||
|
||||
return _wrapper
|
||||
|
||||
|
||||
def path_url(url):
|
||||
parsed_url = urlparse(url)
|
||||
path = parsed_url.path
|
||||
if not path:
|
||||
path = '/'
|
||||
if parsed_url.query:
|
||||
path = path + '?' + parsed_url.query
|
||||
return path
|
||||
|
@ -10,6 +10,7 @@ import xmltodict
|
||||
|
||||
from moto.packages.httpretty.core import HTTPrettyRequest
|
||||
from moto.core.responses import _TemplateEnvironmentMixin
|
||||
from moto.core.utils import path_url
|
||||
|
||||
from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, \
|
||||
parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys
|
||||
@ -487,7 +488,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
||||
if isinstance(request, HTTPrettyRequest):
|
||||
path = request.path
|
||||
else:
|
||||
path = request.full_path if hasattr(request, 'full_path') else request.path_url
|
||||
path = request.full_path if hasattr(request, 'full_path') else path_url(request.url)
|
||||
|
||||
if self.is_delete_keys(request, path, bucket_name):
|
||||
return self._bucket_response_delete_keys(request, body, bucket_name, headers)
|
||||
@ -708,7 +709,10 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
||||
# Copy key
|
||||
# you can have a quoted ?version=abc with a version Id, so work on
|
||||
# we need to parse the unquoted string first
|
||||
src_key_parsed = urlparse(request.headers.get("x-amz-copy-source"))
|
||||
src_key = request.headers.get("x-amz-copy-source")
|
||||
if isinstance(src_key, six.binary_type):
|
||||
src_key = src_key.decode('utf-8')
|
||||
src_key_parsed = urlparse(src_key)
|
||||
src_bucket, src_key = unquote(src_key_parsed.path).\
|
||||
lstrip("/").split("/", 1)
|
||||
src_version_id = parse_qs(src_key_parsed.query).get(
|
||||
|
@ -8,7 +8,7 @@ freezegun
|
||||
flask
|
||||
boto>=2.45.0
|
||||
boto3>=1.4.4
|
||||
botocore>=1.8.36
|
||||
botocore>=1.12.13
|
||||
six>=1.9
|
||||
prompt-toolkit==1.0.14
|
||||
click==6.7
|
||||
|
Loading…
Reference in New Issue
Block a user