230 lines
7.3 KiB
Python
230 lines
7.3 KiB
Python
import json
|
|
import re
|
|
|
|
from moto.core.responses import BaseResponse
|
|
from moto.events import events_backend
|
|
|
|
|
|
class EventsHandler(BaseResponse):
|
|
|
|
def _generate_rule_dict(self, rule):
|
|
return {
|
|
'Name': rule.name,
|
|
'Arn': rule.arn,
|
|
'EventPattern': rule.event_pattern,
|
|
'State': rule.state,
|
|
'Description': rule.description,
|
|
'ScheduleExpression': rule.schedule_exp,
|
|
'RoleArn': rule.role_arn
|
|
}
|
|
|
|
@property
|
|
def request_params(self):
|
|
if not hasattr(self, '_json_body'):
|
|
try:
|
|
self._json_body = json.loads(self.body)
|
|
except ValueError:
|
|
self._json_body = {}
|
|
return self._json_body
|
|
|
|
def _get_param(self, param, if_none=None):
|
|
return self.request_params.get(param, if_none)
|
|
|
|
def error(self, type_, message='', status=400):
|
|
headers = self.response_headers
|
|
headers['status'] = status
|
|
return json.dumps({'__type': type_, 'message': message}), headers,
|
|
|
|
def delete_rule(self):
|
|
name = self._get_param('Name')
|
|
|
|
if not name:
|
|
return self.error('ValidationException', 'Parameter Name is required.')
|
|
events_backend.delete_rule(name)
|
|
|
|
return '', self.response_headers
|
|
|
|
def describe_rule(self):
|
|
name = self._get_param('Name')
|
|
|
|
if not name:
|
|
return self.error('ValidationException', 'Parameter Name is required.')
|
|
|
|
rule = events_backend.describe_rule(name)
|
|
|
|
if not rule:
|
|
return self.error('ResourceNotFoundException', 'Rule test does not exist.')
|
|
|
|
rule_dict = self._generate_rule_dict(rule)
|
|
return json.dumps(rule_dict), self.response_headers
|
|
|
|
def disable_rule(self):
|
|
name = self._get_param('Name')
|
|
|
|
if not name:
|
|
return self.error('ValidationException', 'Parameter Name is required.')
|
|
|
|
if not events_backend.disable_rule(name):
|
|
return self.error('ResourceNotFoundException', 'Rule ' + name + ' does not exist.')
|
|
|
|
return '', self.response_headers
|
|
|
|
def enable_rule(self):
|
|
name = self._get_param('Name')
|
|
|
|
if not name:
|
|
return self.error('ValidationException', 'Parameter Name is required.')
|
|
|
|
if not events_backend.enable_rule(name):
|
|
return self.error('ResourceNotFoundException', 'Rule ' + name + ' does not exist.')
|
|
|
|
return '', self.response_headers
|
|
|
|
def generate_presigned_url(self):
|
|
pass
|
|
|
|
def list_rule_names_by_target(self):
|
|
target_arn = self._get_param('TargetArn')
|
|
next_token = self._get_param('NextToken')
|
|
limit = self._get_param('Limit')
|
|
|
|
if not target_arn:
|
|
return self.error('ValidationException', 'Parameter TargetArn is required.')
|
|
|
|
rule_names = events_backend.list_rule_names_by_target(
|
|
target_arn, next_token, limit)
|
|
|
|
return json.dumps(rule_names), self.response_headers
|
|
|
|
def list_rules(self):
|
|
prefix = self._get_param('NamePrefix')
|
|
next_token = self._get_param('NextToken')
|
|
limit = self._get_param('Limit')
|
|
|
|
rules = events_backend.list_rules(prefix, next_token, limit)
|
|
rules_obj = {'Rules': []}
|
|
|
|
for rule in rules['Rules']:
|
|
rules_obj['Rules'].append(self._generate_rule_dict(rule))
|
|
|
|
if rules.get('NextToken'):
|
|
rules_obj['NextToken'] = rules['NextToken']
|
|
|
|
return json.dumps(rules_obj), self.response_headers
|
|
|
|
def list_targets_by_rule(self):
|
|
rule_name = self._get_param('Rule')
|
|
next_token = self._get_param('NextToken')
|
|
limit = self._get_param('Limit')
|
|
|
|
if not rule_name:
|
|
return self.error('ValidationException', 'Parameter Rule is required.')
|
|
|
|
try:
|
|
targets = events_backend.list_targets_by_rule(
|
|
rule_name, next_token, limit)
|
|
except KeyError:
|
|
return self.error('ResourceNotFoundException', 'Rule ' + rule_name + ' does not exist.')
|
|
|
|
return json.dumps(targets), self.response_headers
|
|
|
|
def put_events(self):
|
|
events = self._get_param('Entries')
|
|
|
|
failed_entries = events_backend.put_events(events)
|
|
|
|
if failed_entries:
|
|
return json.dumps({
|
|
'FailedEntryCount': len(failed_entries),
|
|
'Entries': failed_entries
|
|
})
|
|
|
|
return '', self.response_headers
|
|
|
|
def put_rule(self):
|
|
name = self._get_param('Name')
|
|
event_pattern = self._get_param('EventPattern')
|
|
sched_exp = self._get_param('ScheduleExpression')
|
|
state = self._get_param('State')
|
|
desc = self._get_param('Description')
|
|
role_arn = self._get_param('RoleArn')
|
|
|
|
if not name:
|
|
return self.error('ValidationException', 'Parameter Name is required.')
|
|
|
|
if event_pattern:
|
|
try:
|
|
json.loads(event_pattern)
|
|
except ValueError:
|
|
# Not quite as informative as the real error, but it'll work
|
|
# for now.
|
|
return self.error('InvalidEventPatternException', 'Event pattern is not valid.')
|
|
|
|
if sched_exp:
|
|
if not (re.match('^cron\(.*\)', sched_exp) or
|
|
re.match('^rate\(\d*\s(minute|minutes|hour|hours|day|days)\)', sched_exp)):
|
|
return self.error('ValidationException', 'Parameter ScheduleExpression is not valid.')
|
|
|
|
rule_arn = events_backend.put_rule(
|
|
name,
|
|
ScheduleExpression=sched_exp,
|
|
EventPattern=event_pattern,
|
|
State=state,
|
|
Description=desc,
|
|
RoleArn=role_arn
|
|
)
|
|
|
|
return json.dumps({'RuleArn': rule_arn}), self.response_headers
|
|
|
|
def put_targets(self):
|
|
rule_name = self._get_param('Rule')
|
|
targets = self._get_param('Targets')
|
|
|
|
if not rule_name:
|
|
return self.error('ValidationException', 'Parameter Rule is required.')
|
|
|
|
if not targets:
|
|
return self.error('ValidationException', 'Parameter Targets is required.')
|
|
|
|
if not events_backend.put_targets(rule_name, targets):
|
|
return self.error('ResourceNotFoundException', 'Rule ' + rule_name + ' does not exist.')
|
|
|
|
return '', self.response_headers
|
|
|
|
def remove_targets(self):
|
|
rule_name = self._get_param('Rule')
|
|
ids = self._get_param('Ids')
|
|
|
|
if not rule_name:
|
|
return self.error('ValidationException', 'Parameter Rule is required.')
|
|
|
|
if not ids:
|
|
return self.error('ValidationException', 'Parameter Ids is required.')
|
|
|
|
if not events_backend.remove_targets(rule_name, ids):
|
|
return self.error('ResourceNotFoundException', 'Rule ' + rule_name + ' does not exist.')
|
|
|
|
return '', self.response_headers
|
|
|
|
def test_event_pattern(self):
|
|
pass
|
|
|
|
def put_permission(self):
|
|
action = self._get_param('Action')
|
|
principal = self._get_param('Principal')
|
|
statement_id = self._get_param('StatementId')
|
|
|
|
events_backend.put_permission(action, principal, statement_id)
|
|
|
|
return ''
|
|
|
|
def remove_permission(self):
|
|
statement_id = self._get_param('StatementId')
|
|
|
|
events_backend.remove_permission(statement_id)
|
|
|
|
return ''
|
|
|
|
def describe_event_bus(self):
|
|
return json.dumps(events_backend.describe_event_bus())
|