Athena - implementing first two endpoints (#2506)

This implements create_work_group() and list_work_groups()
This commit is contained in:
Jack Danger 2019-10-22 14:37:29 -07:00 committed by GitHub
parent a05c7da3bb
commit 4d0099499f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 213 additions and 1 deletions

View File

@ -7,6 +7,7 @@ __version__ = '1.3.14.dev'
from .acm import mock_acm # flake8: noqa
from .apigateway import mock_apigateway, mock_apigateway_deprecated # flake8: noqa
from .athena import mock_athena # flake8: noqa
from .autoscaling import mock_autoscaling, mock_autoscaling_deprecated # flake8: noqa
from .awslambda import mock_lambda, mock_lambda_deprecated # flake8: noqa
from .cloudformation import mock_cloudformation, mock_cloudformation_deprecated # flake8: noqa

7
moto/athena/__init__.py Normal file
View File

@ -0,0 +1,7 @@
from __future__ import unicode_literals
from .models import athena_backends
from ..core.models import base_decorator, deprecated_base_decorator
athena_backend = athena_backends['us-east-1']
mock_athena = base_decorator(athena_backends)
mock_athena_deprecated = deprecated_base_decorator(athena_backends)

18
moto/athena/exceptions.py Normal file
View File

@ -0,0 +1,18 @@
from __future__ import unicode_literals
import json
from werkzeug.exceptions import BadRequest
class AthenaClientError(BadRequest):
def __init__(self, code, message):
super(AthenaClientError, self).__init__()
self.description = json.dumps({
"Error": {
"Code": code,
"Message": message,
'Type': "InvalidRequestException",
},
'RequestId': '6876f774-7273-11e4-85dc-39e55ca848d1',
})

79
moto/athena/models.py Normal file
View File

@ -0,0 +1,79 @@
from __future__ import unicode_literals
import time
import boto3
from moto.core import BaseBackend, BaseModel
ACCOUNT_ID = 123456789012
class TaggableResourceMixin(object):
# This mixing was copied from Redshift when initially implementing
# Athena. TBD if it's worth the overhead.
def __init__(self, region_name, resource_name, tags):
self.region = region_name
self.resource_name = resource_name
self.tags = tags or []
@property
def arn(self):
return "arn:aws:athena:{region}:{account_id}:{resource_name}".format(
region=self.region,
account_id=ACCOUNT_ID,
resource_name=self.resource_name)
def create_tags(self, tags):
new_keys = [tag_set['Key'] for tag_set in tags]
self.tags = [tag_set for tag_set in self.tags
if tag_set['Key'] not in new_keys]
self.tags.extend(tags)
return self.tags
def delete_tags(self, tag_keys):
self.tags = [tag_set for tag_set in self.tags
if tag_set['Key'] not in tag_keys]
return self.tags
class WorkGroup(TaggableResourceMixin, BaseModel):
resource_type = 'workgroup'
state = 'ENABLED'
def __init__(self, athena_backend, name, configuration, description, tags):
self.region_name = athena_backend.region_name
super(WorkGroup, self).__init__(self.region_name, "workgroup/{}".format(name), tags)
self.athena_backend = athena_backend
self.name = name
self.description = description
self.configuration = configuration
class AthenaBackend(BaseBackend):
region_name = None
def __init__(self, region_name=None):
if region_name is not None:
self.region_name = region_name
self.work_groups = {}
def create_work_group(self, name, configuration, description, tags):
if name in self.work_groups:
return None
work_group = WorkGroup(self, name, configuration, description, tags)
self.work_groups[name] = work_group
return work_group
def list_work_groups(self):
return [{
'Name': wg.name,
'State': wg.state,
'Description': wg.description,
'CreationTime': time.time(),
} for wg in self.work_groups.values()]
athena_backends = {}
for region in boto3.Session().get_available_regions('athena'):
athena_backends[region] = AthenaBackend(region)

35
moto/athena/responses.py Normal file
View File

@ -0,0 +1,35 @@
import json
from moto.core.responses import BaseResponse
from .models import athena_backends
class AthenaResponse(BaseResponse):
@property
def athena_backend(self):
return athena_backends[self.region]
def create_work_group(self):
name = self._get_param('Name')
description = self._get_param('Description')
configuration = self._get_param('Configuration')
tags = self._get_param('Tags')
work_group = self.athena_backend.create_work_group(name, configuration, description, tags)
if not work_group:
return json.dumps({
'__type': 'InvalidRequestException',
'Message': 'WorkGroup already exists',
}), dict(status=400)
return json.dumps({
"CreateWorkGroupResponse": {
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})
def list_work_groups(self):
return json.dumps({
"WorkGroups": self.athena_backend.list_work_groups()
})

10
moto/athena/urls.py Normal file
View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import AthenaResponse
url_bases = [
"https?://athena.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': AthenaResponse.dispatch,
}

1
moto/athena/utils.py Normal file
View File

@ -0,0 +1 @@
from __future__ import unicode_literals

View File

@ -2,6 +2,7 @@ from __future__ import unicode_literals
from moto.acm import acm_backends
from moto.apigateway import apigateway_backends
from moto.athena import athena_backends
from moto.autoscaling import autoscaling_backends
from moto.awslambda import lambda_backends
from moto.cloudformation import cloudformation_backends
@ -35,8 +36,8 @@ from moto.redshift import redshift_backends
from moto.resourcegroups import resourcegroups_backends
from moto.route53 import route53_backends
from moto.s3 import s3_backends
from moto.ses import ses_backends
from moto.secretsmanager import secretsmanager_backends
from moto.ses import ses_backends
from moto.sns import sns_backends
from moto.sqs import sqs_backends
from moto.ssm import ssm_backends
@ -53,6 +54,7 @@ from moto.config import config_backends
BACKENDS = {
'acm': acm_backends,
'apigateway': apigateway_backends,
'athena': athena_backends,
'autoscaling': autoscaling_backends,
'batch': batch_backends,
'cloudformation': cloudformation_backends,

View File

@ -0,0 +1,59 @@
from __future__ import unicode_literals
import datetime
from botocore.exceptions import ClientError
import boto3
import sure # noqa
from moto import mock_athena
@mock_athena
def test_create_work_group():
client = boto3.client('athena', region_name='us-east-1')
response = client.create_work_group(
Name='athena_workgroup',
Description='Test work group',
Configuration={
'ResultConfiguration': {
'OutputLocation': 's3://bucket-name/prefix/',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_KMS',
'KmsKey': 'aws:arn:kms:1233456789:us-east-1:key/number-1',
},
},
},
Tags=[],
)
try:
# The second time should throw an error
response = client.create_work_group(
Name='athena_workgroup',
Description='duplicate',
Configuration={
'ResultConfiguration': {
'OutputLocation': 's3://bucket-name/prefix/',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_KMS',
'KmsKey': 'aws:arn:kms:1233456789:us-east-1:key/number-1',
},
},
},
)
except ClientError as err:
err.response['Error']['Code'].should.equal('InvalidRequestException')
err.response['Error']['Message'].should.equal('WorkGroup already exists')
else:
raise RuntimeError('Should have raised ResourceNotFoundException')
# Then test the work group appears in the work group list
response = client.list_work_groups()
response['WorkGroups'].should.have.length_of(1)
work_group = response['WorkGroups'][0]
work_group['Name'].should.equal('athena_workgroup')
work_group['Description'].should.equal('Test work group')
work_group['State'].should.equal('ENABLED')