AWS X-Ray client mock. (#1255)
* X-Ray Client SDK patched Fixes #1250 * Fixed flake8 * Fixed some issues * Fixed flake8 * Fixed more typos * Fixed python2 string * Fixed aws-sdk patch order * Added more test cases to test the patching
This commit is contained in:
parent
2bb3e841d1
commit
49ddb500a8
@ -38,7 +38,7 @@ from .sts import mock_sts, mock_sts_deprecated # flake8: noqa
|
||||
from .ssm import mock_ssm # flake8: noqa
|
||||
from .route53 import mock_route53, mock_route53_deprecated # flake8: noqa
|
||||
from .swf import mock_swf, mock_swf_deprecated # flake8: noqa
|
||||
from .xray import mock_xray # flake8: noqa
|
||||
from .xray import mock_xray, mock_xray_client, XRaySegment # flake8: noqa
|
||||
from .logs import mock_logs, mock_logs_deprecated # flake8: noqa
|
||||
|
||||
|
||||
|
@ -9,6 +9,7 @@ try:
|
||||
except:
|
||||
from urllib.parse import unquote, urlparse, parse_qs
|
||||
|
||||
from moto.core.utils import amz_crc32, amzn_request_id
|
||||
from moto.core.responses import BaseResponse
|
||||
|
||||
|
||||
@ -32,6 +33,8 @@ class LambdaResponse(BaseResponse):
|
||||
else:
|
||||
raise ValueError("Cannot handle request")
|
||||
|
||||
@amz_crc32
|
||||
@amzn_request_id
|
||||
def invoke(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == 'POST':
|
||||
@ -39,6 +42,8 @@ class LambdaResponse(BaseResponse):
|
||||
else:
|
||||
raise ValueError("Cannot handle request")
|
||||
|
||||
@amz_crc32
|
||||
@amzn_request_id
|
||||
def invoke_async(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == 'POST':
|
||||
|
@ -199,10 +199,14 @@ class BaseResponse(_TemplateEnvironmentMixin):
|
||||
response = method()
|
||||
except HTTPException as http_error:
|
||||
response = http_error.description, dict(status=http_error.code)
|
||||
|
||||
if isinstance(response, six.string_types):
|
||||
return 200, headers, response
|
||||
else:
|
||||
body, new_headers = response
|
||||
if len(response) == 2:
|
||||
body, new_headers = response
|
||||
else:
|
||||
status, new_headers, body = response
|
||||
status = new_headers.get('status', 200)
|
||||
headers.update(new_headers)
|
||||
# Cast status to string
|
||||
|
@ -1,10 +1,16 @@
|
||||
from __future__ import unicode_literals
|
||||
from functools import wraps
|
||||
|
||||
import binascii
|
||||
import datetime
|
||||
import inspect
|
||||
import random
|
||||
import re
|
||||
import six
|
||||
import string
|
||||
|
||||
|
||||
REQUEST_ID_LONG = string.digits + string.ascii_uppercase
|
||||
|
||||
|
||||
def camelcase_to_underscores(argument):
|
||||
@ -194,3 +200,90 @@ def unix_time(dt=None):
|
||||
|
||||
def unix_time_millis(dt=None):
|
||||
return unix_time(dt) * 1000.0
|
||||
|
||||
|
||||
def gen_amz_crc32(response, headerdict=None):
|
||||
if not isinstance(response, bytes):
|
||||
response = response.encode()
|
||||
|
||||
crc = str(binascii.crc32(response))
|
||||
|
||||
if headerdict is not None and isinstance(headerdict, dict):
|
||||
headerdict.update({'x-amz-crc32': crc})
|
||||
|
||||
return crc
|
||||
|
||||
|
||||
def gen_amzn_requestid_long(headerdict=None):
|
||||
req_id = ''.join([random.choice(REQUEST_ID_LONG) for _ in range(0, 52)])
|
||||
|
||||
if headerdict is not None and isinstance(headerdict, dict):
|
||||
headerdict.update({'x-amzn-requestid': req_id})
|
||||
|
||||
return req_id
|
||||
|
||||
|
||||
def amz_crc32(f):
|
||||
@wraps(f)
|
||||
def _wrapper(*args, **kwargs):
|
||||
response = f(*args, **kwargs)
|
||||
|
||||
headers = {}
|
||||
status = 200
|
||||
|
||||
if isinstance(response, six.string_types):
|
||||
body = response
|
||||
else:
|
||||
if len(response) == 2:
|
||||
body, new_headers = response
|
||||
status = new_headers.get('status', 200)
|
||||
else:
|
||||
status, new_headers, body = response
|
||||
headers.update(new_headers)
|
||||
# Cast status to string
|
||||
if "status" in headers:
|
||||
headers['status'] = str(headers['status'])
|
||||
|
||||
try:
|
||||
# Doesnt work on python2 for some odd unicode strings
|
||||
gen_amz_crc32(body, headers)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return status, headers, body
|
||||
|
||||
return _wrapper
|
||||
|
||||
|
||||
def amzn_request_id(f):
|
||||
@wraps(f)
|
||||
def _wrapper(*args, **kwargs):
|
||||
response = f(*args, **kwargs)
|
||||
|
||||
headers = {}
|
||||
status = 200
|
||||
|
||||
if isinstance(response, six.string_types):
|
||||
body = response
|
||||
else:
|
||||
if len(response) == 2:
|
||||
body, new_headers = response
|
||||
status = new_headers.get('status', 200)
|
||||
else:
|
||||
status, new_headers, body = response
|
||||
headers.update(new_headers)
|
||||
# Cast status to string
|
||||
if "status" in headers:
|
||||
headers['status'] = str(headers['status'])
|
||||
|
||||
request_id = gen_amzn_requestid_long(headers)
|
||||
|
||||
# Update request ID in XML
|
||||
try:
|
||||
body = body.replace('{{ requestid }}', request_id)
|
||||
except Exception: # Will just ignore if it cant work on bytes (which are str's on python2)
|
||||
pass
|
||||
|
||||
return status, headers, body
|
||||
|
||||
return _wrapper
|
||||
|
@ -4,7 +4,7 @@ import six
|
||||
import re
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from moto.core.utils import camelcase_to_underscores
|
||||
from moto.core.utils import camelcase_to_underscores, amzn_request_id
|
||||
from .models import dynamodb_backend2, dynamo_json_dump
|
||||
|
||||
|
||||
@ -24,6 +24,7 @@ class DynamoHandler(BaseResponse):
|
||||
def error(self, type_, message, status=400):
|
||||
return status, self.response_headers, dynamo_json_dump({'__type': type_, 'message': message})
|
||||
|
||||
@amzn_request_id
|
||||
def call_action(self):
|
||||
self.body = json.loads(self.body or '{}')
|
||||
endpoint = self.get_endpoint_name(self.headers)
|
||||
@ -56,6 +57,7 @@ class DynamoHandler(BaseResponse):
|
||||
response = {"TableNames": tables}
|
||||
if limit and len(all_tables) > start + limit:
|
||||
response["LastEvaluatedTableName"] = tables[-1]
|
||||
|
||||
return dynamo_json_dump(response)
|
||||
|
||||
def create_table(self):
|
||||
|
@ -2,7 +2,7 @@ from __future__ import unicode_literals
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from moto.core.utils import camelcase_to_underscores
|
||||
from moto.core.utils import camelcase_to_underscores, amz_crc32, amzn_request_id
|
||||
from .utils import parse_message_attributes
|
||||
from .models import sqs_backends
|
||||
from .exceptions import (
|
||||
@ -52,6 +52,8 @@ class SQSResponse(BaseResponse):
|
||||
|
||||
return visibility_timeout
|
||||
|
||||
@amz_crc32 # crc last as request_id can edit XML
|
||||
@amzn_request_id
|
||||
def call_action(self):
|
||||
status_code, headers, body = super(SQSResponse, self).call_action()
|
||||
if status_code == 404:
|
||||
@ -296,7 +298,7 @@ CREATE_QUEUE_RESPONSE = """<CreateQueueResponse>
|
||||
<VisibilityTimeout>{{ queue.visibility_timeout }}</VisibilityTimeout>
|
||||
</CreateQueueResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId>
|
||||
<RequestId>{{ requestid }}</RequestId>
|
||||
</ResponseMetadata>
|
||||
</CreateQueueResponse>"""
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
from __future__ import unicode_literals
|
||||
from .models import xray_backends
|
||||
from ..core.models import base_decorator
|
||||
from .mock_client import mock_xray_client, XRaySegment # noqa
|
||||
|
||||
xray_backend = xray_backends['us-east-1']
|
||||
mock_xray = base_decorator(xray_backends)
|
||||
|
83
moto/xray/mock_client.py
Normal file
83
moto/xray/mock_client.py
Normal file
@ -0,0 +1,83 @@
|
||||
from functools import wraps
|
||||
import os
|
||||
from moto.xray import xray_backends
|
||||
import aws_xray_sdk.core
|
||||
from aws_xray_sdk.core.context import Context as AWSContext
|
||||
from aws_xray_sdk.core.emitters.udp_emitter import UDPEmitter
|
||||
|
||||
|
||||
class MockEmitter(UDPEmitter):
|
||||
"""
|
||||
Replaces the code that sends UDP to local X-Ray daemon
|
||||
"""
|
||||
def __init__(self, daemon_address='127.0.0.1:2000'):
|
||||
address = os.getenv('AWS_XRAY_DAEMON_ADDRESS_YEAH_NOT_TODAY_MATE', daemon_address)
|
||||
self._ip, self._port = self._parse_address(address)
|
||||
|
||||
def _xray_backend(self, region):
|
||||
return xray_backends[region]
|
||||
|
||||
def send_entity(self, entity):
|
||||
# Hack to get region
|
||||
# region = entity.subsegments[0].aws['region']
|
||||
# xray = self._xray_backend(region)
|
||||
|
||||
# TODO store X-Ray data, pretty sure X-Ray needs refactor for this
|
||||
pass
|
||||
|
||||
def _send_data(self, data):
|
||||
raise RuntimeError('Should not be running this')
|
||||
|
||||
|
||||
def mock_xray_client(f):
|
||||
"""
|
||||
Mocks the X-Ray sdk by pwning its evil singleton with our methods
|
||||
|
||||
The X-Ray SDK has normally been imported and `patched()` called long before we start mocking.
|
||||
This means the Context() will be very unhappy if an env var isnt present, so we set that, save
|
||||
the old context, then supply our new context.
|
||||
We also patch the Emitter by subclassing the UDPEmitter class replacing its methods and pushing
|
||||
that itno the recorder instance.
|
||||
"""
|
||||
@wraps(f)
|
||||
def _wrapped(*args, **kwargs):
|
||||
print("Starting X-Ray Patch")
|
||||
|
||||
old_xray_context_var = os.environ.get('AWS_XRAY_CONTEXT_MISSING')
|
||||
os.environ['AWS_XRAY_CONTEXT_MISSING'] = 'LOG_ERROR'
|
||||
old_xray_context = aws_xray_sdk.core.xray_recorder._context
|
||||
old_xray_emitter = aws_xray_sdk.core.xray_recorder._emitter
|
||||
aws_xray_sdk.core.xray_recorder._context = AWSContext()
|
||||
aws_xray_sdk.core.xray_recorder._emitter = MockEmitter()
|
||||
|
||||
try:
|
||||
f(*args, **kwargs)
|
||||
finally:
|
||||
|
||||
if old_xray_context_var is None:
|
||||
del os.environ['AWS_XRAY_CONTEXT_MISSING']
|
||||
else:
|
||||
os.environ['AWS_XRAY_CONTEXT_MISSING'] = old_xray_context_var
|
||||
|
||||
aws_xray_sdk.core.xray_recorder._emitter = old_xray_emitter
|
||||
aws_xray_sdk.core.xray_recorder._context = old_xray_context
|
||||
|
||||
return _wrapped
|
||||
|
||||
|
||||
class XRaySegment(object):
|
||||
"""
|
||||
XRay is request oriented, when a request comes in, normally middleware like django (or automatically in lambda) will mark
|
||||
the start of a segment, this stay open during the lifetime of the request. During that time subsegments may be generated
|
||||
by calling other SDK aware services or using some boto functions. Once the request is finished, middleware will also stop
|
||||
the segment, thus causing it to be emitted via UDP.
|
||||
|
||||
During testing we're going to have to control the start and end of a segment via context managers.
|
||||
"""
|
||||
def __enter__(self):
|
||||
aws_xray_sdk.core.xray_recorder.begin_segment(name='moto_mock', traceid=None, parent_id=None, sampling=1)
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
aws_xray_sdk.core.xray_recorder.end_segment()
|
3
setup.py
3
setup.py
@ -19,7 +19,8 @@ install_requires = [
|
||||
"pytz",
|
||||
"python-dateutil<3.0.0,>=2.1",
|
||||
"mock",
|
||||
"docker>=2.5.1"
|
||||
"docker>=2.5.1",
|
||||
"aws-xray-sdk==0.92.2"
|
||||
]
|
||||
|
||||
extras_require = {
|
||||
|
@ -414,7 +414,8 @@ def test_get_authorization_token_assume_region():
|
||||
client = boto3.client('ecr', region_name='us-east-1')
|
||||
auth_token_response = client.get_authorization_token()
|
||||
|
||||
list(auth_token_response.keys()).should.equal(['authorizationData', 'ResponseMetadata'])
|
||||
auth_token_response.should.contain('authorizationData')
|
||||
auth_token_response.should.contain('ResponseMetadata')
|
||||
auth_token_response['authorizationData'].should.equal([
|
||||
{
|
||||
'authorizationToken': 'QVdTOnVzLWVhc3QtMS1hdXRoLXRva2Vu',
|
||||
@ -429,7 +430,8 @@ def test_get_authorization_token_explicit_regions():
|
||||
client = boto3.client('ecr', region_name='us-east-1')
|
||||
auth_token_response = client.get_authorization_token(registryIds=['us-east-1', 'us-west-1'])
|
||||
|
||||
list(auth_token_response.keys()).should.equal(['authorizationData', 'ResponseMetadata'])
|
||||
auth_token_response.should.contain('authorizationData')
|
||||
auth_token_response.should.contain('ResponseMetadata')
|
||||
auth_token_response['authorizationData'].should.equal([
|
||||
{
|
||||
'authorizationToken': 'QVdTOnVzLWVhc3QtMS1hdXRoLXRva2Vu',
|
||||
|
72
tests/test_xray/test_xray_client.py
Normal file
72
tests/test_xray/test_xray_client.py
Normal file
@ -0,0 +1,72 @@
|
||||
from __future__ import unicode_literals
|
||||
from moto import mock_xray_client, XRaySegment, mock_dynamodb2
|
||||
import sure # noqa
|
||||
import boto3
|
||||
|
||||
from moto.xray.mock_client import MockEmitter
|
||||
import aws_xray_sdk.core as xray_core
|
||||
import aws_xray_sdk.core.patcher as xray_core_patcher
|
||||
|
||||
import botocore.client
|
||||
import botocore.endpoint
|
||||
original_make_api_call = botocore.client.BaseClient._make_api_call
|
||||
original_encode_headers = botocore.endpoint.Endpoint._encode_headers
|
||||
|
||||
import requests
|
||||
original_session_request = requests.Session.request
|
||||
original_session_prep_request = requests.Session.prepare_request
|
||||
|
||||
|
||||
@mock_xray_client
|
||||
@mock_dynamodb2
|
||||
def test_xray_dynamo_request_id():
|
||||
# Could be ran in any order, so we need to tell sdk that its been unpatched
|
||||
xray_core_patcher._PATCHED_MODULES = set()
|
||||
xray_core.patch_all()
|
||||
|
||||
client = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
with XRaySegment():
|
||||
resp = client.list_tables()
|
||||
resp['ResponseMetadata'].should.contain('RequestId')
|
||||
id1 = resp['ResponseMetadata']['RequestId']
|
||||
|
||||
with XRaySegment():
|
||||
client.list_tables()
|
||||
resp = client.list_tables()
|
||||
id2 = resp['ResponseMetadata']['RequestId']
|
||||
|
||||
id1.should_not.equal(id2)
|
||||
|
||||
setattr(botocore.client.BaseClient, '_make_api_call', original_make_api_call)
|
||||
setattr(botocore.endpoint.Endpoint, '_encode_headers', original_encode_headers)
|
||||
setattr(requests.Session, 'request', original_session_request)
|
||||
setattr(requests.Session, 'prepare_request', original_session_prep_request)
|
||||
|
||||
|
||||
@mock_xray_client
|
||||
def test_xray_udp_emitter_patched():
|
||||
# Could be ran in any order, so we need to tell sdk that its been unpatched
|
||||
xray_core_patcher._PATCHED_MODULES = set()
|
||||
xray_core.patch_all()
|
||||
|
||||
assert isinstance(xray_core.xray_recorder._emitter, MockEmitter)
|
||||
|
||||
setattr(botocore.client.BaseClient, '_make_api_call', original_make_api_call)
|
||||
setattr(botocore.endpoint.Endpoint, '_encode_headers', original_encode_headers)
|
||||
setattr(requests.Session, 'request', original_session_request)
|
||||
setattr(requests.Session, 'prepare_request', original_session_prep_request)
|
||||
|
||||
|
||||
@mock_xray_client
|
||||
def test_xray_context_patched():
|
||||
# Could be ran in any order, so we need to tell sdk that its been unpatched
|
||||
xray_core_patcher._PATCHED_MODULES = set()
|
||||
xray_core.patch_all()
|
||||
|
||||
xray_core.xray_recorder._context.context_missing.should.equal('LOG_ERROR')
|
||||
|
||||
setattr(botocore.client.BaseClient, '_make_api_call', original_make_api_call)
|
||||
setattr(botocore.endpoint.Endpoint, '_encode_headers', original_encode_headers)
|
||||
setattr(requests.Session, 'request', original_session_request)
|
||||
setattr(requests.Session, 'prepare_request', original_session_prep_request)
|
Loading…
Reference in New Issue
Block a user