Merge pull request #1183 from terrycain/xray

AWS X-Ray
This commit is contained in:
Jack Danger 2017-09-25 14:06:24 -07:00 committed by GitHub
commit d41bb2e875
11 changed files with 616 additions and 5 deletions

View File

@ -120,6 +120,8 @@ It gets even better! Moto isn't just for Python code and it isn't just for S3. L
|------------------------------------------------------------------------------|
| SWF | @mock_swf | basic endpoints done |
|------------------------------------------------------------------------------|
| X-Ray | @mock_xray | core endpoints done |
|------------------------------------------------------------------------------|
```
### Another Example

View File

@ -36,6 +36,7 @@ from .sts import mock_sts, mock_sts_deprecated # flake8: noqa
from .ssm import mock_ssm # flake8: noqa
from .route53 import mock_route53, mock_route53_deprecated # flake8: noqa
from .swf import mock_swf, mock_swf_deprecated # flake8: noqa
from .xray import mock_xray # flake8: noqa
try:

View File

@ -31,6 +31,7 @@ from moto.sns import sns_backends
from moto.sqs import sqs_backends
from moto.ssm import ssm_backends
from moto.sts import sts_backends
from moto.xray import xray_backends
BACKENDS = {
'apigateway': apigateway_backends,
@ -65,6 +66,7 @@ BACKENDS = {
'sts': sts_backends,
'route53': route53_backends,
'lambda': lambda_backends,
'xray': xray_backends
}

View File

@ -178,8 +178,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
self.setup_class(request, full_url, headers)
return self.call_action()
def call_action(self):
headers = self.response_headers
def _get_action(self):
action = self.querystring.get('Action', [""])[0]
if not action: # Some services use a header for the action
# Headers are case-insensitive. Probably a better way to do this.
@ -188,7 +187,11 @@ class BaseResponse(_TemplateEnvironmentMixin):
if match:
action = match.split(".")[-1]
action = camelcase_to_underscores(action)
return action
def call_action(self):
headers = self.response_headers
action = camelcase_to_underscores(self._get_action())
method_names = method_names_from_class(self.__class__)
if action in method_names:
method = getattr(self, action)

View File

@ -139,10 +139,13 @@ def create_backend_app(service):
else:
endpoint = None
if endpoint in backend_app.view_functions:
original_endpoint = endpoint
index = 2
while endpoint in backend_app.view_functions:
# HACK: Sometimes we map the same view to multiple url_paths. Flask
# requries us to have different names.
endpoint += "2"
endpoint = original_endpoint + str(index)
index += 1
backend_app.add_url_rule(
url_path,

6
moto/xray/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import xray_backends
from ..core.models import base_decorator
xray_backend = xray_backends['us-east-1']
mock_xray = base_decorator(xray_backends)

39
moto/xray/exceptions.py Normal file
View File

@ -0,0 +1,39 @@
import json
class AWSError(Exception):
CODE = None
STATUS = 400
def __init__(self, message, code=None, status=None):
self.message = message
self.code = code if code is not None else self.CODE
self.status = status if status is not None else self.STATUS
def response(self):
return json.dumps({'__type': self.code, 'message': self.message}), dict(status=self.status)
class InvalidRequestException(AWSError):
CODE = 'InvalidRequestException'
class BadSegmentException(Exception):
def __init__(self, seg_id=None, code=None, message=None):
self.id = seg_id
self.code = code
self.message = message
def __repr__(self):
return '<BadSegment {0}>'.format('-'.join([self.id, self.code, self.message]))
def to_dict(self):
result = {}
if self.id is not None:
result['Id'] = self.id
if self.code is not None:
result['ErrorCode'] = self.code
if self.message is not None:
result['Message'] = self.message
return result

251
moto/xray/models.py Normal file
View File

@ -0,0 +1,251 @@
from __future__ import unicode_literals
import bisect
import datetime
from collections import defaultdict
import json
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
from .exceptions import BadSegmentException, AWSError
class TelemetryRecords(BaseModel):
def __init__(self, instance_id, hostname, resource_arn, records):
self.instance_id = instance_id
self.hostname = hostname
self.resource_arn = resource_arn
self.records = records
@classmethod
def from_json(cls, json):
instance_id = json.get('EC2InstanceId', None)
hostname = json.get('Hostname')
resource_arn = json.get('ResourceARN')
telemetry_records = json['TelemetryRecords']
return cls(instance_id, hostname, resource_arn, telemetry_records)
# https://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html
class TraceSegment(BaseModel):
def __init__(self, name, segment_id, trace_id, start_time, raw, end_time=None, in_progress=False, service=None, user=None,
origin=None, parent_id=None, http=None, aws=None, metadata=None, annotations=None, subsegments=None, **kwargs):
self.name = name
self.id = segment_id
self.trace_id = trace_id
self._trace_version = None
self._original_request_start_time = None
self._trace_identifier = None
self.start_time = start_time
self._start_date = None
self.end_time = end_time
self._end_date = None
self.in_progress = in_progress
self.service = service
self.user = user
self.origin = origin
self.parent_id = parent_id
self.http = http
self.aws = aws
self.metadata = metadata
self.annotations = annotations
self.subsegments = subsegments
self.misc = kwargs
# Raw json string
self.raw = raw
def __lt__(self, other):
return self.start_date < other.start_date
@property
def trace_version(self):
if self._trace_version is None:
self._trace_version = int(self.trace_id.split('-', 1)[0])
return self._trace_version
@property
def request_start_date(self):
if self._original_request_start_time is None:
start_time = int(self.trace_id.split('-')[1], 16)
self._original_request_start_time = datetime.datetime.fromtimestamp(start_time)
return self._original_request_start_time
@property
def start_date(self):
if self._start_date is None:
self._start_date = datetime.datetime.fromtimestamp(self.start_time)
return self._start_date
@property
def end_date(self):
if self._end_date is None:
self._end_date = datetime.datetime.fromtimestamp(self.end_time)
return self._end_date
@classmethod
def from_dict(cls, data, raw):
# Check manditory args
if 'id' not in data:
raise BadSegmentException(code='MissingParam', message='Missing segment ID')
seg_id = data['id']
data['segment_id'] = seg_id # Just adding this key for future convenience
for arg in ('name', 'trace_id', 'start_time'):
if arg not in data:
raise BadSegmentException(seg_id=seg_id, code='MissingParam', message='Missing segment ID')
if 'end_time' not in data and 'in_progress' not in data:
raise BadSegmentException(seg_id=seg_id, code='MissingParam', message='Missing end_time or in_progress')
if 'end_time' not in data and data['in_progress'] == 'false':
raise BadSegmentException(seg_id=seg_id, code='MissingParam', message='Missing end_time')
return cls(raw=raw, **data)
class SegmentCollection(object):
def __init__(self):
self._traces = defaultdict(self._new_trace_item)
@staticmethod
def _new_trace_item():
return {
'start_date': datetime.datetime(1970, 1, 1),
'end_date': datetime.datetime(1970, 1, 1),
'finished': False,
'trace_id': None,
'segments': []
}
def put_segment(self, segment):
# insert into a sorted list
bisect.insort_left(self._traces[segment.trace_id]['segments'], segment)
# Get the last segment (takes into account incorrect ordering)
# and if its the last one, mark trace as complete
if self._traces[segment.trace_id]['segments'][-1].end_time is not None:
self._traces[segment.trace_id]['finished'] = True
start_time = self._traces[segment.trace_id]['segments'][0].start_date
end_time = self._traces[segment.trace_id]['segments'][-1].end_date
self._traces[segment.trace_id]['start_date'] = start_time
self._traces[segment.trace_id]['end_date'] = end_time
self._traces[segment.trace_id]['trace_id'] = segment.trace_id
# Todo consolidate trace segments into a trace.
# not enough working knowledge of xray to do this
def summary(self, start_time, end_time, filter_expression=None, sampling=False):
# This beast https://docs.aws.amazon.com/xray/latest/api/API_GetTraceSummaries.html#API_GetTraceSummaries_ResponseSyntax
if filter_expression is not None:
raise AWSError('Not implemented yet - moto', code='InternalFailure', status=500)
summaries = []
for tid, trace in self._traces.items():
if trace['finished'] and start_time < trace['start_date'] and trace['end_date'] < end_time:
duration = int((trace['end_date'] - trace['start_date']).total_seconds())
# this stuff is mostly guesses, refer to TODO above
has_error = any(['error' in seg.misc for seg in trace['segments']])
has_fault = any(['fault' in seg.misc for seg in trace['segments']])
has_throttle = any(['throttle' in seg.misc for seg in trace['segments']])
# Apparently all of these options are optional
summary_part = {
'Annotations': {}, # Not implemented yet
'Duration': duration,
'HasError': has_error,
'HasFault': has_fault,
'HasThrottle': has_throttle,
'Http': {}, # Not implemented yet
'Id': tid,
'IsParital': False, # needs lots more work to work on partials
'ResponseTime': 1, # definitely 1ms resposnetime
'ServiceIds': [], # Not implemented yet
'Users': {} # Not implemented yet
}
summaries.append(summary_part)
result = {
"ApproximateTime": int((datetime.datetime.now() - datetime.datetime(1970, 1, 1)).total_seconds()),
"TracesProcessedCount": len(summaries),
"TraceSummaries": summaries
}
return result
def get_trace_ids(self, trace_ids):
traces = []
unprocessed = []
# Its a default dict
existing_trace_ids = list(self._traces.keys())
for trace_id in trace_ids:
if trace_id in existing_trace_ids:
traces.append(self._traces[trace_id])
else:
unprocessed.append(trace_id)
return traces, unprocessed
class XRayBackend(BaseBackend):
def __init__(self):
self._telemetry_records = []
self._segment_collection = SegmentCollection()
def add_telemetry_records(self, json):
self._telemetry_records.append(
TelemetryRecords.from_json(json)
)
def process_segment(self, doc):
try:
data = json.loads(doc)
except ValueError:
raise BadSegmentException(code='JSONFormatError', message='Bad JSON data')
try:
# Get Segment Object
segment = TraceSegment.from_dict(data, raw=doc)
except ValueError:
raise BadSegmentException(code='JSONFormatError', message='Bad JSON data')
try:
# Store Segment Object
self._segment_collection.put_segment(segment)
except Exception as err:
raise BadSegmentException(seg_id=segment.id, code='InternalFailure', message=str(err))
def get_trace_summary(self, start_time, end_time, filter_expression, summaries):
return self._segment_collection.summary(start_time, end_time, filter_expression, summaries)
def get_trace_ids(self, trace_ids, next_token):
traces, unprocessed_ids = self._segment_collection.get_trace_ids(trace_ids)
result = {
'Traces': [],
'UnprocessedTraceIds': unprocessed_ids
}
for trace in traces:
segments = []
for segment in trace['segments']:
segments.append({
'Id': segment.id,
'Document': segment.raw
})
result['Traces'].append({
'Duration': int((trace['end_date'] - trace['start_date']).total_seconds()),
'Id': trace['trace_id'],
'Segments': segments
})
return result
xray_backends = {}
for region, ec2_backend in ec2_backends.items():
xray_backends[region] = XRayBackend()

150
moto/xray/responses.py Normal file
View File

@ -0,0 +1,150 @@
from __future__ import unicode_literals
import json
import datetime
from moto.core.responses import BaseResponse
from six.moves.urllib.parse import urlsplit
from .models import xray_backends
from .exceptions import AWSError, BadSegmentException
class XRayResponse(BaseResponse):
def _error(self, code, message):
return json.dumps({'__type': code, 'message': message}), dict(status=400)
@property
def xray_backend(self):
return xray_backends[self.region]
@property
def request_params(self):
try:
return json.loads(self.body)
except ValueError:
return {}
def _get_param(self, param, default=None):
return self.request_params.get(param, default)
def _get_action(self):
# Amazon is just calling urls like /TelemetryRecords etc...
# This uses the value after / as the camalcase action, which then
# gets converted in call_action to find the following methods
return urlsplit(self.uri).path.lstrip('/')
# PutTelemetryRecords
def telemetry_records(self):
try:
self.xray_backend.add_telemetry_records(self.request_params)
except AWSError as err:
return err.response()
return ''
# PutTraceSegments
def trace_segments(self):
docs = self._get_param('TraceSegmentDocuments')
if docs is None:
msg = 'Parameter TraceSegmentDocuments is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
# Raises an exception that contains info about a bad segment,
# the object also has a to_dict() method
bad_segments = []
for doc in docs:
try:
self.xray_backend.process_segment(doc)
except BadSegmentException as bad_seg:
bad_segments.append(bad_seg)
except Exception as err:
return json.dumps({'__type': 'InternalFailure', 'message': str(err)}), dict(status=500)
result = {'UnprocessedTraceSegments': [x.to_dict() for x in bad_segments]}
return json.dumps(result)
# GetTraceSummaries
def trace_summaries(self):
start_time = self._get_param('StartTime')
end_time = self._get_param('EndTime')
if start_time is None:
msg = 'Parameter StartTime is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
if end_time is None:
msg = 'Parameter EndTime is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
filter_expression = self._get_param('FilterExpression')
sampling = self._get_param('Sampling', 'false') == 'true'
try:
start_time = datetime.datetime.fromtimestamp(int(start_time))
end_time = datetime.datetime.fromtimestamp(int(end_time))
except ValueError:
msg = 'start_time and end_time are not integers'
return json.dumps({'__type': 'InvalidParameterValue', 'message': msg}), dict(status=400)
except Exception as err:
return json.dumps({'__type': 'InternalFailure', 'message': str(err)}), dict(status=500)
try:
result = self.xray_backend.get_trace_summary(start_time, end_time, filter_expression, sampling)
except AWSError as err:
return err.response()
except Exception as err:
return json.dumps({'__type': 'InternalFailure', 'message': str(err)}), dict(status=500)
return json.dumps(result)
# BatchGetTraces
def traces(self):
trace_ids = self._get_param('TraceIds')
next_token = self._get_param('NextToken') # not implemented yet
if trace_ids is None:
msg = 'Parameter TraceIds is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
try:
result = self.xray_backend.get_trace_ids(trace_ids, next_token)
except AWSError as err:
return err.response()
except Exception as err:
return json.dumps({'__type': 'InternalFailure', 'message': str(err)}), dict(status=500)
return json.dumps(result)
# GetServiceGraph - just a dummy response for now
def service_graph(self):
start_time = self._get_param('StartTime')
end_time = self._get_param('EndTime')
# next_token = self._get_param('NextToken') # not implemented yet
if start_time is None:
msg = 'Parameter StartTime is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
if end_time is None:
msg = 'Parameter EndTime is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
result = {
'StartTime': start_time,
'EndTime': end_time,
'Services': []
}
return json.dumps(result)
# GetTraceGraph - just a dummy response for now
def trace_graph(self):
trace_ids = self._get_param('TraceIds')
# next_token = self._get_param('NextToken') # not implemented yet
if trace_ids is None:
msg = 'Parameter TraceIds is missing'
return json.dumps({'__type': 'MissingParameter', 'message': msg}), dict(status=400)
result = {
'Services': []
}
return json.dumps(result)

15
moto/xray/urls.py Normal file
View File

@ -0,0 +1,15 @@
from __future__ import unicode_literals
from .responses import XRayResponse
url_bases = [
"https?://xray.(.+).amazonaws.com",
]
url_paths = {
'{0}/TelemetryRecords$': XRayResponse.dispatch,
'{0}/TraceSegments$': XRayResponse.dispatch,
'{0}/Traces$': XRayResponse.dispatch,
'{0}/ServiceGraph$': XRayResponse.dispatch,
'{0}/TraceGraph$': XRayResponse.dispatch,
'{0}/TraceSummaries$': XRayResponse.dispatch,
}

View File

@ -0,0 +1,139 @@
from __future__ import unicode_literals
import boto3
import json
import botocore.exceptions
import sure # noqa
from moto import mock_xray
import datetime
@mock_xray
def test_put_telemetry():
client = boto3.client('xray', region_name='us-east-1')
client.put_telemetry_records(
TelemetryRecords=[
{
'Timestamp': datetime.datetime(2015, 1, 1),
'SegmentsReceivedCount': 123,
'SegmentsSentCount': 123,
'SegmentsSpilloverCount': 123,
'SegmentsRejectedCount': 123,
'BackendConnectionErrors': {
'TimeoutCount': 123,
'ConnectionRefusedCount': 123,
'HTTPCode4XXCount': 123,
'HTTPCode5XXCount': 123,
'UnknownHostCount': 123,
'OtherCount': 123
}
},
],
EC2InstanceId='string',
Hostname='string',
ResourceARN='string'
)
@mock_xray
def test_put_trace_segments():
client = boto3.client('xray', region_name='us-east-1')
client.put_trace_segments(
TraceSegmentDocuments=[
json.dumps({
'name': 'example.com',
'id': '70de5b6f19ff9a0a',
'start_time': 1.478293361271E9,
'trace_id': '1-581cf771-a006649127e371903a2de979',
'end_time': 1.478293361449E9
})
]
)
@mock_xray
def test_trace_summary():
client = boto3.client('xray', region_name='us-east-1')
client.put_trace_segments(
TraceSegmentDocuments=[
json.dumps({
'name': 'example.com',
'id': '70de5b6f19ff9a0a',
'start_time': 1.478293361271E9,
'trace_id': '1-581cf771-a006649127e371903a2de979',
'in_progress': True
}),
json.dumps({
'name': 'example.com',
'id': '70de5b6f19ff9a0b',
'start_time': 1478293365,
'trace_id': '1-581cf771-a006649127e371903a2de979',
'end_time': 1478293385
})
]
)
client.get_trace_summaries(
StartTime=datetime.datetime(2014, 1, 1),
EndTime=datetime.datetime(2017, 1, 1)
)
@mock_xray
def test_batch_get_trace():
client = boto3.client('xray', region_name='us-east-1')
client.put_trace_segments(
TraceSegmentDocuments=[
json.dumps({
'name': 'example.com',
'id': '70de5b6f19ff9a0a',
'start_time': 1.478293361271E9,
'trace_id': '1-581cf771-a006649127e371903a2de979',
'in_progress': True
}),
json.dumps({
'name': 'example.com',
'id': '70de5b6f19ff9a0b',
'start_time': 1478293365,
'trace_id': '1-581cf771-a006649127e371903a2de979',
'end_time': 1478293385
})
]
)
resp = client.batch_get_traces(
TraceIds=['1-581cf771-a006649127e371903a2de979', '1-581cf772-b006649127e371903a2de979']
)
len(resp['UnprocessedTraceIds']).should.equal(1)
len(resp['Traces']).should.equal(1)
# Following are not implemented, just testing it returns what boto expects
@mock_xray
def test_batch_get_service_graph():
client = boto3.client('xray', region_name='us-east-1')
client.get_service_graph(
StartTime=datetime.datetime(2014, 1, 1),
EndTime=datetime.datetime(2017, 1, 1)
)
@mock_xray
def test_batch_get_trace_graph():
client = boto3.client('xray', region_name='us-east-1')
client.batch_get_traces(
TraceIds=['1-581cf771-a006649127e371903a2de979', '1-581cf772-b006649127e371903a2de979']
)