moto/moto/events/models.py
cpitchford 176f19136c
Action is now recorded as event:PutEvents (fully qualified)
We no longer need to amend how the Action is reported in the describe_event_bus policy
2018-05-15 18:58:16 +01:00

249 lines
7.8 KiB
Python

import os
import re
from moto.core.exceptions import JsonRESTError
from moto.core import BaseBackend, BaseModel
class Rule(BaseModel):
def _generate_arn(self, name):
return 'arn:aws:events:us-west-2:111111111111:rule/' + name
def __init__(self, name, **kwargs):
self.name = name
self.arn = kwargs.get('Arn') or self._generate_arn(name)
self.event_pattern = kwargs.get('EventPattern')
self.schedule_exp = kwargs.get('ScheduleExpression')
self.state = kwargs.get('State') or 'ENABLED'
self.description = kwargs.get('Description')
self.role_arn = kwargs.get('RoleArn')
self.targets = []
def enable(self):
self.state = 'ENABLED'
def disable(self):
self.state = 'DISABLED'
# This song and dance for targets is because we need order for Limits and NextTokens, but can't use OrderedDicts
# with Python 2.6, so tracking it with an array it is.
def _check_target_exists(self, target_id):
for i in range(0, len(self.targets)):
if target_id == self.targets[i]['Id']:
return i
return None
def put_targets(self, targets):
# Not testing for valid ARNs.
for target in targets:
index = self._check_target_exists(target['Id'])
if index is not None:
self.targets[index] = target
else:
self.targets.append(target)
def remove_targets(self, ids):
for target_id in ids:
index = self._check_target_exists(target_id)
if index is not None:
self.targets.pop(index)
class EventsBackend(BaseBackend):
ACCOUNT_ID = re.compile(r'^(\d{1,12}|\*)$')
STATEMENT_ID = re.compile(r'^[a-zA-Z0-9-_]{1,64}$')
def __init__(self):
self.rules = {}
# This array tracks the order in which the rules have been added, since
# 2.6 doesn't have OrderedDicts.
self.rules_order = []
self.next_tokens = {}
self.permissions = {}
def _get_rule_by_index(self, i):
return self.rules.get(self.rules_order[i])
def _gen_next_token(self, index):
token = os.urandom(128).encode('base64')
self.next_tokens[token] = index
return token
def _process_token_and_limits(self, array_len, next_token=None, limit=None):
start_index = 0
end_index = array_len
new_next_token = None
if next_token:
start_index = self.next_tokens.pop(next_token, 0)
if limit is not None:
new_end_index = start_index + int(limit)
if new_end_index < end_index:
end_index = new_end_index
new_next_token = self._gen_next_token(end_index)
return start_index, end_index, new_next_token
def delete_rule(self, name):
self.rules_order.pop(self.rules_order.index(name))
return self.rules.pop(name) is not None
def describe_rule(self, name):
return self.rules.get(name)
def disable_rule(self, name):
if name in self.rules:
self.rules[name].disable()
return True
return False
def enable_rule(self, name):
if name in self.rules:
self.rules[name].enable()
return True
return False
def list_rule_names_by_target(self, target_arn, next_token=None, limit=None):
matching_rules = []
return_obj = {}
start_index, end_index, new_next_token = self._process_token_and_limits(
len(self.rules), next_token, limit)
for i in range(start_index, end_index):
rule = self._get_rule_by_index(i)
for target in rule.targets:
if target['Arn'] == target_arn:
matching_rules.append(rule.name)
return_obj['RuleNames'] = matching_rules
if new_next_token is not None:
return_obj['NextToken'] = new_next_token
return return_obj
def list_rules(self, prefix=None, next_token=None, limit=None):
match_string = '.*'
if prefix is not None:
match_string = '^' + prefix + match_string
match_regex = re.compile(match_string)
matching_rules = []
return_obj = {}
start_index, end_index, new_next_token = self._process_token_and_limits(
len(self.rules), next_token, limit)
for i in range(start_index, end_index):
rule = self._get_rule_by_index(i)
if match_regex.match(rule.name):
matching_rules.append(rule)
return_obj['Rules'] = matching_rules
if new_next_token is not None:
return_obj['NextToken'] = new_next_token
return return_obj
def list_targets_by_rule(self, rule, next_token=None, limit=None):
# We'll let a KeyError exception be thrown for response to handle if
# rule doesn't exist.
rule = self.rules[rule]
start_index, end_index, new_next_token = self._process_token_and_limits(
len(rule.targets), next_token, limit)
returned_targets = []
return_obj = {}
for i in range(start_index, end_index):
returned_targets.append(rule.targets[i])
return_obj['Targets'] = returned_targets
if new_next_token is not None:
return_obj['NextToken'] = new_next_token
return return_obj
def put_rule(self, name, **kwargs):
rule = Rule(name, **kwargs)
self.rules[rule.name] = rule
self.rules_order.append(rule.name)
return rule.arn
def put_targets(self, name, targets):
rule = self.rules.get(name)
if rule:
rule.put_targets(targets)
return True
return False
def put_events(self, events):
num_events = len(events)
if num_events < 1:
raise JsonRESTError('ValidationError', 'Need at least 1 event')
elif num_events > 10:
raise JsonRESTError('ValidationError', 'Can only submit 10 events at once')
# We dont really need to store the events yet
return []
def remove_targets(self, name, ids):
rule = self.rules.get(name)
if rule:
rule.remove_targets(ids)
return True
return False
def test_event_pattern(self):
raise NotImplementedError()
def put_permission(self, action, principal, statement_id):
if action is None or action != 'events:PutEvents':
raise JsonRESTError('InvalidParameterValue', 'Action must be PutEvents')
if principal is None or self.ACCOUNT_ID.match(principal) is None:
raise JsonRESTError('InvalidParameterValue', 'Principal must match ^(\d{1,12}|\*)$')
if statement_id is None or self.STATEMENT_ID.match(statement_id) is None:
raise JsonRESTError('InvalidParameterValue', 'StatementId must match ^[a-zA-Z0-9-_]{1,64}$')
self.permissions[statement_id] = {'action': action, 'principal': principal}
def remove_permission(self, statement_id):
try:
del self.permissions[statement_id]
except KeyError:
raise JsonRESTError('ResourceNotFoundException', 'StatementId not found')
def describe_event_bus(self):
arn = "arn:aws:events:us-east-1:000000000000:event-bus/default"
statements = []
for statement_id, data in self.permissions.items():
statements.append({
'Sid': statement_id,
'Effect': 'Allow',
'Principal': {'AWS': 'arn:aws:iam::{0}:root'.format(data['principal'])},
'Action': data['action'],
'Resource': arn
})
return {
'Policy': {'Version': '2012-10-17', 'Statement': statements},
'Name': 'default',
'Arn': arn
}
events_backend = EventsBackend()