Completed the CloudWatch Events mocking module and tests.

This commit is contained in:
Justin Wiley 2016-12-01 17:23:51 -08:00
parent d0def03c4c
commit db0b494b4f
5 changed files with 368 additions and 48 deletions

View File

@ -1,28 +1,30 @@
from __future__ import unicode_literals
from moto.apigateway import apigateway_backend
from moto.autoscaling import autoscaling_backend
from moto.awslambda import lambda_backend
from moto.cloudwatch import cloudwatch_backend
from moto.cloudformation import cloudformation_backend
from moto.cloudwatch import cloudwatch_backend
from moto.datapipeline import datapipeline_backend
from moto.dynamodb import dynamodb_backend
from moto.dynamodb2 import dynamodb_backend2
from moto.ec2 import ec2_backend
from moto.elb import elb_backend
from moto.emr import emr_backend
from moto.events import events_backend
from moto.glacier import glacier_backend
from moto.iam import iam_backend
from moto.opsworks import opsworks_backend
from moto.kinesis import kinesis_backend
from moto.kms import kms_backend
from moto.opsworks import opsworks_backend
from moto.rds import rds_backend
from moto.redshift import redshift_backend
from moto.route53 import route53_backend
from moto.s3 import s3_backend
from moto.ses import ses_backend
from moto.sns import sns_backend
from moto.sqs import sqs_backend
from moto.sts import sts_backend
from moto.route53 import route53_backend
BACKENDS = {
'apigateway': apigateway_backend,
@ -34,6 +36,7 @@ BACKENDS = {
'dynamodb2': dynamodb_backend2,
'ec2': ec2_backend,
'elb': elb_backend,
'events': events_backend,
'emr': emr_backend,
'glacier': glacier_backend,
'iam': iam_backend,

View File

@ -1,4 +1,3 @@
import binascii
import os
import re
from collections import OrderedDict
@ -13,12 +12,12 @@ class Rule(object):
def __init__(self, name, **kwargs):
self.name = name
self.arn = kwargs['arn'] if 'arn' in kwargs else self._generate_arn(name)
self.event_pattern = kwargs['event_pattern'] if 'event_pattern' in kwargs else None
self.schedule_exp = kwargs['schedule_exp'] if 'schedule_exp' in kwargs else None
self.state = kwargs['state'] if 'state' in kwargs else 'ENABLED'
self.description = kwargs['description'] if 'description' in kwargs else None
self.role_arn = kwargs['role_arn'] if 'role_arn' in kwargs else None
self.arn = kwargs.get('Arn') or self._generate_arn(name)
self.event_pattern = kwargs.get('EventPattern')
self.schedule_exp = kwargs.get('ScheduleExpression')
self.state = kwargs.get('State') or 'ENABLED'
self.description = kwargs.get('Description')
self.role_arn = kwargs.get('RoleArn')
self.targets = {}
def enable(self):
@ -28,9 +27,9 @@ class Rule(object):
self.state = 'DISABLED'
def put_targets(self, targets):
# TODO: Will need to test for valid ARNs.
# Not testing for valid ARNs.
for target in targets:
self.targets[target['TargetId']] = target
self.targets[target['Id']] = target
def remove_targets(self, ids):
for target in ids:
@ -45,7 +44,7 @@ class EventsBackend(BaseBackend):
self.next_tokens = {}
def _gen_next_token(self, index):
token = binascii.hexlify(os.urandom(16))
token = os.urandom(128).encode('base64')
self.next_tokens[token] = index
return token
@ -54,15 +53,14 @@ class EventsBackend(BaseBackend):
end_index = array_len
new_next_token = None
if next_token is not None:
if next_token in self.next_tokens:
start_index = self.next_tokens[next_token]
if next_token:
start_index = self.next_tokens.pop(next_token, 0)
if limit is not None:
new_end_index = start_index + int(limit)
if new_end_index < end_index:
end_index = new_end_index
new_next_token = self._gen_next_token(end_index - 1)
new_next_token = self._gen_next_token(end_index)
return start_index, end_index, new_next_token
@ -70,10 +68,7 @@ class EventsBackend(BaseBackend):
return self.rules.pop(name) is not None
def describe_rule(self, name):
if name in self.rules:
return self.rules[name]
return None
return self.rules.get(name)
def disable_rule(self, name):
if name in self.rules:
@ -102,8 +97,9 @@ class EventsBackend(BaseBackend):
for i in range(start_index, end_index):
rule = rules_array[i]
if target_arn in rule.targets:
matching_rules.append(rule.name)
for target in rule.targets:
if rule.targets[target]['Arn'] == target_arn:
matching_rules.append(rule.name)
return_obj['RuleNames'] = matching_rules
if new_next_token is not None:
@ -165,10 +161,22 @@ class EventsBackend(BaseBackend):
return rule.arn
def put_targets(self, name, targets):
self.rules[name].put_targets(targets)
rule = self.rules.get(name)
if rule:
rule.put_targets(targets)
return True
return False
def remove_targets(self, name, ids):
self.rules[name].remove_targets(ids)
rule = self.rules.get(name)
if rule:
rule.remove_targets(ids)
return True
return False
def test_event_pattern(self):
raise NotImplementedError()

View File

@ -1,59 +1,195 @@
import json
import re
from moto.core.responses import BaseResponse
from moto.events import events_backend
class EventsHandler(BaseResponse):
def error(self, type_, message='', status=400):
return status, self.response_headers, json.dumps({'__type': type_, 'message': message})
def _generate_rule_dict(self, rule):
return {
'Name': rule.name,
'Arn': rule.arn,
'EventPattern': rule.event_pattern,
'State': rule.state,
'Description': rule.description,
'ScheduleExpression': rule.schedule_exp,
'RoleArn': rule.role_arn
}
def can_paginate(self):
pass
def load_body(self):
decoded_body = self.body.decode('utf-8')
return json.loads(decoded_body or '{}')
def error(self, type_, message='', status=400):
headers = self.response_headers
headers['status'] = status
return json.dumps({'__type': type_, 'message': message}), headers,
def delete_rule(self):
pass
body = self.load_body()
name = body.get('NamePrefix')
if not name:
return self.error('ValidationException', 'Parameter Name is required.')
return '', self.response_headers
def describe_rule(self):
pass
body = self.load_body()
name = body.get('Name')
if not name:
return self.error('ValidationException', 'Parameter Name is required.')
rule = events_backend.describe_rule(name)
if not rule:
return self.error('ResourceNotFoundException', 'Rule test does not exist.')
rule_dict = self._generate_rule_dict(rule)
return json.dumps(rule_dict), self.response_headers
def disable_rule(self):
pass
body = self.load_body()
name = body.get('Name')
if not name:
return self.error('ValidationException', 'Parameter Name is required.')
if not events_backend.disable_rule(name):
return self.error('ResourceNotFoundException', 'Rule ' + name + ' does not exist.')
return '', self.response_headers
def enable_rule(self):
pass
body = self.load_body()
name = body.get('Name')
if not name:
return self.error('ValidationException', 'Parameter Name is required.')
if not events_backend.enable_rule(name):
return self.error('ResourceNotFoundException', 'Rule ' + name + ' does not exist.')
return '', self.response_headers
def generate_presigned_url(self):
pass
def get_paginator(self):
pass
def get_waiter(self):
pass
def list_rule_names_by_target(self):
pass
body = self.load_body()
target_arn = body.get('TargetArn')
next_token = body.get('NextToken')
limit = body.get('Limit')
if not target_arn:
return self.error('ValidationException', 'Parameter TargetArn is required.')
rule_names = events_backend.list_rule_names_by_target(target_arn, next_token, limit)
return json.dumps(rule_names), self.response_headers
def list_rules(self):
pass
body = self.load_body()
prefix = body.get('NamePrefix')
next_token = body.get('NextToken')
limit = body.get('Limit')
rules = events_backend.list_rules(prefix, next_token, limit)
rules_obj = {'Rules': []}
for rule in rules['Rules']:
rules_obj['Rules'].append(self._generate_rule_dict(rule))
if rules.get('NextToken'):
rules_obj['NextToken'] = rules['NextToken']
return json.dumps(rules_obj), self.response_headers
def list_targets_by_rule(self):
pass
body = self.load_body()
rule_name = body.get('Rule')
next_token = body.get('NextToken')
limit = body.get('Limit')
if not rule_name:
return self.error('ValidationException', 'Parameter Rule is required.')
try:
targets = events_backend.list_targets_by_rule(rule_name, next_token, limit)
except KeyError:
return self.error('ResourceNotFoundException', 'Rule ' + rule_name + ' does not exist.')
return json.dumps(targets), self.response_headers
def put_events(self):
pass
return '', self.response_headers
def put_rule(self):
if 'Name' not in self.body:
body = self.load_body()
name = body.get('Name')
event_pattern = body.get('EventPattern')
sched_exp = body.get('ScheduleExpression')
if not name:
return self.error('ValidationException', 'Parameter Name is required.')
pass
if event_pattern:
try:
json.loads(event_pattern)
except ValueError:
# Not quite as informative as the real error, but it'll work for now.
return self.error('InvalidEventPatternException', 'Event pattern is not valid.')
if sched_exp:
if not (re.match('^cron\(.*\)', sched_exp) or
re.match('^rate\(\d*\s(minute|minutes|hour|hours|day|days)\)', sched_exp)):
return self.error('ValidationException', 'Parameter ScheduleExpression is not valid.')
rule_arn = events_backend.put_rule(
name,
ScheduleExpression=sched_exp,
EventPattern=event_pattern,
State=body.get('State'),
Description=body.get('Description'),
RoleArn=body.get('RoleArn')
)
return json.dumps({'RuleArn': rule_arn}), self.response_headers
def put_targets(self):
pass
body = self.load_body()
rule_name = body.get('Rule')
targets = body.get('Targets')
if not rule_name:
return self.error('ValidationException', 'Parameter Rule is required.')
if not targets:
return self.error('ValidationException', 'Parameter Targets is required.')
if not events_backend.put_targets(rule_name, targets):
return self.error('ResourceNotFoundException', 'Rule ' + rule_name + ' does not exist.')
return '', self.response_headers
def remove_targets(self):
pass
body = self.load_body()
rule_name = body.get('Rule')
ids = body.get('Ids')
if not rule_name:
return self.error('ValidationException', 'Parameter Rule is required.')
if not ids:
return self.error('ValidationException', 'Parameter Ids is required.')
if not events_backend.remove_targets(rule_name, ids):
return self.error('ResourceNotFoundException', 'Rule ' + rule_name + ' does not exist.')
return '', self.response_headers
def test_event_pattern(self):
pass

View File

@ -3,7 +3,7 @@ from __future__ import unicode_literals
from .responses import EventsHandler
url_bases = [
"https?://events.(.+).amazonaws.com"
"https://events.(.+).amazonaws.com"
]
url_paths = {

View File

@ -0,0 +1,173 @@
import random
import boto3
from moto.events import mock_events
RULES = [
{'Name': 'test1', 'ScheduleExpression': 'rate(5 minutes)'},
{'Name': 'test2', 'ScheduleExpression': 'rate(1 minute)'},
{'Name': 'test3', 'EventPattern': '{"source": ["test-source"]}'}
]
TARGETS = {
'test-target-1': {
'Id': 'test-target-1',
'Arn': 'arn:aws:lambda:us-west-2:111111111111:function:test-function-1',
'Rules': ['test1', 'test2']
},
'test-target-2': {
'Id': 'test-target-2',
'Arn': 'arn:aws:lambda:us-west-2:111111111111:function:test-function-2',
'Rules': ['test1', 'test3']
},
'test-target-3': {
'Id': 'test-target-3',
'Arn': 'arn:aws:lambda:us-west-2:111111111111:function:test-function-3',
'Rules': ['test1', 'test2']
},
'test-target-4': {
'Id': 'test-target-4',
'Arn': 'arn:aws:lambda:us-west-2:111111111111:function:test-function-4',
'Rules': ['test1', 'test3']
},
'test-target-5': {
'Id': 'test-target-5',
'Arn': 'arn:aws:lambda:us-west-2:111111111111:function:test-function-5',
'Rules': ['test1', 'test2']
},
'test-target-6': {
'Id': 'test-target-6',
'Arn': 'arn:aws:lambda:us-west-2:111111111111:function:test-function-6',
'Rules': ['test1', 'test3']
}
}
def get_random_rule():
return RULES[random.randint(0, len(RULES) - 1)]
@mock_events
def generate_environment():
client = boto3.client('events', 'us-west-2')
for rule in RULES:
client.put_rule(
Name=rule['Name'],
ScheduleExpression=rule.get('ScheduleExpression', ''),
EventPattern=rule.get('EventPattern', '')
)
targets = []
for target, target_attr in TARGETS.iteritems():
if rule['Name'] in target_attr.get('Rules'):
targets.append({'Id': target, 'Arn': target_attr['Arn']})
client.put_targets(Rule=rule['Name'], Targets=targets)
return client
@mock_events
def test_list_rules():
client = generate_environment()
response = client.list_rules()
assert(response is not None)
assert(len(response['Rules']) > 0)
@mock_events
def test_describe_rule():
rule_name = get_random_rule()['Name']
client = generate_environment()
response = client.describe_rule(Name=rule_name)
assert(response is not None)
assert(response.get('Name') == rule_name)
assert(response.get('Arn') is not None)
@mock_events
def test_enable_disable_rule():
rule_name = get_random_rule()['Name']
client = generate_environment()
# Rules should start out enabled in these tests.
rule = client.describe_rule(Name=rule_name)
assert(rule['State'] == 'ENABLED')
client.disable_rule(Name=rule_name)
rule = client.describe_rule(Name=rule_name)
assert(rule['State'] == 'DISABLED')
client.enable_rule(Name=rule_name)
rule = client.describe_rule(Name=rule_name)
assert(rule['State'] == 'ENABLED')
@mock_events
def test_list_rule_names_by_target():
test_1_target = TARGETS['test-target-1']
test_2_target = TARGETS['test-target-2']
client = generate_environment()
rules = client.list_rule_names_by_target(TargetArn=test_1_target['Arn'])
assert(len(rules) == len(test_1_target['Rules']))
for rule in rules['RuleNames']:
assert(rule in test_1_target['Rules'])
rules = client.list_rule_names_by_target(TargetArn=test_2_target['Arn'])
assert(len(rules) == len(test_2_target['Rules']))
for rule in rules['RuleNames']:
assert(rule in test_2_target['Rules'])
@mock_events
def test_list_rules():
client = generate_environment()
rules = client.list_rules()
assert(len(rules['Rules']) == len(RULES))
@mock_events
def test_list_targets_by_rule():
rule_name = get_random_rule()['Name']
client = generate_environment()
targets = client.list_targets_by_rule(Rule=rule_name)
expected_targets = []
for target, attrs in TARGETS.iteritems():
if rule_name in attrs.get('Rules'):
expected_targets.append(target)
assert(len(targets['Targets']) == len(expected_targets))
@mock_events
def test_remove_targets():
rule_name = get_random_rule()['Name']
client = generate_environment()
targets = client.list_targets_by_rule(Rule=rule_name)['Targets']
targets_before = len(targets)
assert(targets_before > 0)
client.remove_targets(Rule=rule_name, Ids=[targets[0]['Id']])
targets = client.list_targets_by_rule(Rule=rule_name)['Targets']
targets_after = len(targets)
assert(targets_before - 1 == targets_after)
if __name__ == '__main__':
test_list_rules()
test_describe_rule()
test_enable_disable_rule()
test_list_rule_names_by_target()
test_list_rules()
test_list_targets_by_rule()
test_remove_targets()