Add partial support for EFS (#4080)
This commit is contained in:
parent
49c689be72
commit
6b4032a9a8
@ -127,6 +127,7 @@ mock_mediapackage = lazy_load(".mediapackage", "mock_mediapackage")
|
||||
mock_mediastore = lazy_load(".mediastore", "mock_mediastore")
|
||||
mock_eks = lazy_load(".eks", "mock_eks")
|
||||
mock_mediastoredata = lazy_load(".mediastoredata", "mock_mediastoredata")
|
||||
mock_efs = lazy_load(".efs", "mock_efs")
|
||||
|
||||
# import logging
|
||||
# logging.getLogger('boto').setLevel(logging.CRITICAL)
|
||||
|
@ -84,6 +84,7 @@ BACKENDS = {
|
||||
"mediastore": ("mediastore", "mediastore_backends"),
|
||||
"mediastore-data": ("mediastoredata", "mediastoredata_backends"),
|
||||
"eks": ("eks", "eks_backends"),
|
||||
"efs": ("efs", "efs_backends"),
|
||||
}
|
||||
|
||||
|
||||
|
@ -911,11 +911,11 @@ class BatchBackend(BaseBackend):
|
||||
"could not find instanceRole {0}".format(cr["instanceRole"])
|
||||
)
|
||||
|
||||
if cr["maxvCpus"] < 0:
|
||||
if int(cr["maxvCpus"]) < 0:
|
||||
raise InvalidParameterValueException("maxVCpus must be positive")
|
||||
if cr["minvCpus"] < 0:
|
||||
if int(cr["minvCpus"]) < 0:
|
||||
raise InvalidParameterValueException("minVCpus must be positive")
|
||||
if cr["maxvCpus"] < cr["minvCpus"]:
|
||||
if int(cr["maxvCpus"]) < int(cr["minvCpus"]):
|
||||
raise InvalidParameterValueException(
|
||||
"maxVCpus must be greater than minvCpus"
|
||||
)
|
||||
|
@ -754,9 +754,11 @@ class CloudFormationBackend(BaseBackend):
|
||||
if name_or_stack_id in self.stacks:
|
||||
# Delete by stack id
|
||||
stack = self.stacks.pop(name_or_stack_id, None)
|
||||
export_names = [export.name for export in stack.exports]
|
||||
stack.delete()
|
||||
self.deleted_stacks[stack.stack_id] = stack
|
||||
[self.exports.pop(export.name) for export in stack.exports]
|
||||
for export_name in export_names:
|
||||
self.exports.pop(export_name)
|
||||
return self.stacks.pop(name_or_stack_id, None)
|
||||
else:
|
||||
# Delete by stack name
|
||||
|
@ -25,6 +25,7 @@ from moto.dynamodb2 import models as dynamodb2_models # noqa
|
||||
from moto.ec2 import models as ec2_models
|
||||
from moto.ecr import models as ecr_models # noqa
|
||||
from moto.ecs import models as ecs_models # noqa
|
||||
from moto.efs import models as efs_models # noqa
|
||||
from moto.elb import models as elb_models # noqa
|
||||
from moto.elbv2 import models as elbv2_models # noqa
|
||||
from moto.events import models as events_models # noqa
|
||||
@ -701,23 +702,27 @@ class ResourceMap(collections_abc.Mapping):
|
||||
for resource in remaining_resources.copy():
|
||||
parsed_resource = self._parsed_resources.get(resource)
|
||||
try:
|
||||
if parsed_resource and hasattr(parsed_resource, "delete"):
|
||||
parsed_resource.delete(self._region_name)
|
||||
else:
|
||||
if hasattr(parsed_resource, "physical_resource_id"):
|
||||
resource_name = parsed_resource.physical_resource_id
|
||||
if (
|
||||
not isinstance(parsed_resource, str)
|
||||
and parsed_resource is not None
|
||||
):
|
||||
if parsed_resource and hasattr(parsed_resource, "delete"):
|
||||
parsed_resource.delete(self._region_name)
|
||||
else:
|
||||
resource_name = None
|
||||
if hasattr(parsed_resource, "physical_resource_id"):
|
||||
resource_name = parsed_resource.physical_resource_id
|
||||
else:
|
||||
resource_name = None
|
||||
|
||||
resource_json = self._resource_json_map[
|
||||
parsed_resource.logical_resource_id
|
||||
]
|
||||
resource_json = self._resource_json_map[
|
||||
parsed_resource.logical_resource_id
|
||||
]
|
||||
|
||||
parse_and_delete_resource(
|
||||
resource_name, resource_json, self, self._region_name,
|
||||
)
|
||||
parse_and_delete_resource(
|
||||
resource_name, resource_json, self, self._region_name,
|
||||
)
|
||||
|
||||
self._parsed_resources.pop(parsed_resource.logical_resource_id)
|
||||
self._parsed_resources.pop(parsed_resource.logical_resource_id)
|
||||
except Exception as e:
|
||||
# skip over dependency violations, and try again in a
|
||||
# second pass
|
||||
|
@ -1996,12 +1996,12 @@ class RegionsAndZonesBackend(object):
|
||||
|
||||
class SecurityRule(object):
|
||||
def __init__(self, ip_protocol, from_port, to_port, ip_ranges, source_groups):
|
||||
self.ip_protocol = ip_protocol
|
||||
self.ip_protocol = str(ip_protocol)
|
||||
self.ip_ranges = ip_ranges or []
|
||||
self.source_groups = source_groups
|
||||
self.from_port = self.to_port = None
|
||||
|
||||
if ip_protocol != "-1":
|
||||
if self.ip_protocol != "-1":
|
||||
self.from_port = int(from_port)
|
||||
self.to_port = int(to_port)
|
||||
|
||||
@ -3542,6 +3542,10 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
|
||||
def availability_zone(self):
|
||||
return self._availability_zone.name
|
||||
|
||||
@property
|
||||
def availability_zone_id(self):
|
||||
return self._availability_zone.zone_id
|
||||
|
||||
@property
|
||||
def physical_resource_id(self):
|
||||
return self.id
|
||||
|
7
moto/efs/__init__.py
Normal file
7
moto/efs/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from ..core.models import base_decorator
|
||||
from .models import efs_backends
|
||||
|
||||
efs_backend = efs_backends["us-east-1"]
|
||||
mock_efs = base_decorator(efs_backends)
|
106
moto/efs/exceptions.py
Normal file
106
moto/efs/exceptions.py
Normal file
@ -0,0 +1,106 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from moto.core.exceptions import RESTError
|
||||
|
||||
|
||||
class EFSError(RESTError):
|
||||
pass
|
||||
|
||||
|
||||
class FileSystemAlreadyExists(EFSError):
|
||||
code = 409
|
||||
|
||||
def __init__(self, creation_token, *args, **kwargs):
|
||||
super(FileSystemAlreadyExists, self).__init__(
|
||||
"FileSystemAlreadyExists",
|
||||
"File system with {} already exists.".format(creation_token),
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
class FileSystemNotFound(EFSError):
|
||||
code = 404
|
||||
|
||||
def __init__(self, file_system_id, *args, **kwargs):
|
||||
super(FileSystemNotFound, self).__init__(
|
||||
"FileSystemNotFound",
|
||||
"File system {} does not exist.".format(file_system_id),
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
class FileSystemInUse(EFSError):
|
||||
code = 409
|
||||
|
||||
def __init__(self, msg, *args, **kwargs):
|
||||
super(FileSystemInUse, self).__init__("FileSystemInUse", msg, *args, **kwargs)
|
||||
|
||||
|
||||
class MountTargetConflict(EFSError):
|
||||
code = 409
|
||||
|
||||
def __init__(self, msg, *args, **kwargs):
|
||||
super(MountTargetConflict, self).__init__(
|
||||
"MountTargetConflict", msg, *args, **kwargs
|
||||
)
|
||||
|
||||
|
||||
class MountTargetNotFound(EFSError):
|
||||
code = 404
|
||||
|
||||
def __init__(self, mount_target_id, *args, **kwargs):
|
||||
super(MountTargetNotFound, self).__init__(
|
||||
"MountTargetNotFound",
|
||||
"Mount target '{}' does not exist.".format(mount_target_id),
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
class BadRequest(EFSError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, msg, *args, **kwargs):
|
||||
super(BadRequest, self).__init__("BadRequest", msg, *args, **kwargs)
|
||||
|
||||
|
||||
class PolicyNotFound(EFSError):
|
||||
code = 404
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PolicyNotFound, self).__init__("PolicyNotFound", *args, **kwargs)
|
||||
|
||||
|
||||
class SubnetNotFound(EFSError):
|
||||
code = 404
|
||||
|
||||
def __init__(self, subnet_id, *args, **kwargs):
|
||||
super(SubnetNotFound, self).__init__(
|
||||
"SubnetNotFound",
|
||||
"The subnet ID '{}' does not exist".format(subnet_id),
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
class SecurityGroupNotFound(EFSError):
|
||||
code = 404
|
||||
|
||||
def __init__(self, security_group_id, *args, **kwargs):
|
||||
super(SecurityGroupNotFound, self).__init__(
|
||||
"SecurityGroupNotFound",
|
||||
"The SecurityGroup ID '{}' does not exist".format(security_group_id),
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
class SecurityGroupLimitExceeded(EFSError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, msg, *args, **kwargs):
|
||||
super(SecurityGroupLimitExceeded, self).__init__(
|
||||
"SecurityGroupLimitExceeded", msg, *args, **kwargs
|
||||
)
|
542
moto/efs/models.py
Normal file
542
moto/efs/models.py
Normal file
@ -0,0 +1,542 @@
|
||||
"""Implement models for EFS resources.
|
||||
|
||||
See AWS docs for details:
|
||||
https://docs.aws.amazon.com/efs/latest/ug/whatisefs.html
|
||||
"""
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from hashlib import md5
|
||||
|
||||
from boto3 import Session
|
||||
|
||||
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
|
||||
from moto.core.utils import (
|
||||
camelcase_to_underscores,
|
||||
get_random_hex,
|
||||
underscores_to_camelcase,
|
||||
)
|
||||
from moto.ec2 import ec2_backends
|
||||
from moto.ec2.exceptions import InvalidSubnetIdError
|
||||
from moto.efs.exceptions import (
|
||||
BadRequest,
|
||||
FileSystemAlreadyExists,
|
||||
FileSystemInUse,
|
||||
FileSystemNotFound,
|
||||
MountTargetConflict,
|
||||
MountTargetNotFound,
|
||||
PolicyNotFound,
|
||||
SubnetNotFound,
|
||||
SecurityGroupNotFound,
|
||||
SecurityGroupLimitExceeded,
|
||||
)
|
||||
|
||||
|
||||
def _lookup_az_id(az_name):
|
||||
"""Find the Availability zone ID given the AZ name."""
|
||||
ec2 = ec2_backends[az_name[:-1]]
|
||||
for zone in ec2.describe_availability_zones():
|
||||
if zone.name == az_name:
|
||||
return zone.zone_id
|
||||
|
||||
|
||||
class FileSystem(CloudFormationModel):
|
||||
"""A model for an EFS File System Volume."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
region_name,
|
||||
creation_token,
|
||||
file_system_id,
|
||||
performance_mode="generalPurpose",
|
||||
encrypted=False,
|
||||
kms_key_id=None,
|
||||
throughput_mode="bursting",
|
||||
provisioned_throughput_in_mibps=None,
|
||||
availability_zone_name=None,
|
||||
backup=False,
|
||||
lifecycle_policies=None,
|
||||
file_system_policy=None,
|
||||
tags=None,
|
||||
):
|
||||
if availability_zone_name:
|
||||
backup = True
|
||||
if kms_key_id and not encrypted:
|
||||
raise BadRequest('If kms_key_id given, "encrypted" must be True.')
|
||||
|
||||
# Save given parameters
|
||||
self.creation_token = creation_token
|
||||
self.performance_mode = performance_mode
|
||||
self.encrypted = encrypted
|
||||
self.kms_key_id = kms_key_id
|
||||
self.throughput_mode = throughput_mode
|
||||
self.provisioned_throughput_in_mibps = provisioned_throughput_in_mibps
|
||||
self.availability_zone_name = availability_zone_name
|
||||
self.availability_zone_id = None
|
||||
if self.availability_zone_name:
|
||||
self.availability_zone_id = _lookup_az_id(self.availability_zone_name)
|
||||
self._backup = backup
|
||||
self.lifecycle_policies = lifecycle_policies
|
||||
self.file_system_policy = file_system_policy
|
||||
|
||||
# Validate tag structure.
|
||||
if tags is None:
|
||||
self.tags = []
|
||||
else:
|
||||
if (
|
||||
not isinstance(tags, list)
|
||||
or not all(isinstance(tag, dict) for tag in tags)
|
||||
or not all(set(tag.keys()) == {"Key", "Value"} for tag in tags)
|
||||
):
|
||||
raise ValueError("Invalid tags: {}".format(tags))
|
||||
else:
|
||||
self.tags = tags
|
||||
|
||||
# Generate AWS-assigned parameters
|
||||
self.file_system_id = file_system_id
|
||||
self.file_system_arn = "arn:aws:elasticfilesystem:{region}:{user_id}:file-system/{file_system_id}".format(
|
||||
region=region_name, user_id=ACCOUNT_ID, file_system_id=self.file_system_id
|
||||
)
|
||||
self.creation_time = time.time()
|
||||
self.owner_id = ACCOUNT_ID
|
||||
|
||||
# Initialize some state parameters
|
||||
self.life_cycle_state = "available"
|
||||
self._mount_targets = {}
|
||||
self._size_value = 0
|
||||
|
||||
@property
|
||||
def size_in_bytes(self):
|
||||
return {
|
||||
"Value": self._size_value,
|
||||
"ValueInIA": 0,
|
||||
"ValueInStandard": self._size_value,
|
||||
"Timestamp": time.time(),
|
||||
}
|
||||
|
||||
@property
|
||||
def physical_resource_id(self):
|
||||
return self.file_system_id
|
||||
|
||||
@property
|
||||
def number_of_mount_targets(self):
|
||||
return len(self._mount_targets)
|
||||
|
||||
@property
|
||||
def backup_policy(self):
|
||||
if self._backup:
|
||||
return {"Status": "ENABLED"}
|
||||
else:
|
||||
return
|
||||
|
||||
def info_json(self):
|
||||
ret = {
|
||||
underscores_to_camelcase(k.capitalize()): v
|
||||
for k, v in self.__dict__.items()
|
||||
if not k.startswith("_")
|
||||
}
|
||||
ret["SizeInBytes"] = self.size_in_bytes
|
||||
ret["NumberOfMountTargets"] = self.number_of_mount_targets
|
||||
return ret
|
||||
|
||||
def add_mount_target(self, subnet, mount_target):
|
||||
# Check that the mount target doesn't violate constraints.
|
||||
for other_mount_target in self._mount_targets.values():
|
||||
if other_mount_target.subnet_vpc_id != subnet.vpc_id:
|
||||
raise MountTargetConflict(
|
||||
"requested subnet for new mount target is not in the same VPC as existing mount targets"
|
||||
)
|
||||
|
||||
if subnet.availability_zone in self._mount_targets:
|
||||
raise MountTargetConflict("mount target already exists in this AZ")
|
||||
|
||||
self._mount_targets[subnet.availability_zone] = mount_target
|
||||
|
||||
def has_mount_target(self, subnet):
|
||||
return subnet.availability_zone in self._mount_targets
|
||||
|
||||
def iter_mount_targets(self):
|
||||
for mt in self._mount_targets.values():
|
||||
yield mt
|
||||
|
||||
def remove_mount_target(self, subnet):
|
||||
del self._mount_targets[subnet.availability_zone]
|
||||
|
||||
@staticmethod
|
||||
def cloudformation_name_type():
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def cloudformation_type():
|
||||
return "AWS::EFS::FileSystem"
|
||||
|
||||
@classmethod
|
||||
def create_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-filesystem.html
|
||||
props = deepcopy(cloudformation_json["Properties"])
|
||||
props = {camelcase_to_underscores(k): v for k, v in props.items()}
|
||||
if "file_system_tags" in props:
|
||||
props["tags"] = props.pop("file_system_tags")
|
||||
if "backup_policy" in props:
|
||||
if "status" not in props["backup_policy"]:
|
||||
raise ValueError("BackupPolicy must be of type BackupPolicy.")
|
||||
status = props.pop("backup_policy")["status"]
|
||||
if status not in ["ENABLED", "DISABLED"]:
|
||||
raise ValueError('Invalid status: "{}".'.format(status))
|
||||
props["backup"] = status == "ENABLED"
|
||||
if "bypass_policy_lockout_safety_check" in props:
|
||||
raise ValueError(
|
||||
"BypassPolicyLockoutSafetyCheck not currently "
|
||||
"supported by AWS Cloudformation."
|
||||
)
|
||||
|
||||
return efs_backends[region_name].create_file_system(resource_name, **props)
|
||||
|
||||
@classmethod
|
||||
def update_from_cloudformation_json(
|
||||
cls, original_resource, new_resource_name, cloudformation_json, region_name
|
||||
):
|
||||
raise NotImplementedError(
|
||||
"Update of EFS File System via cloudformation is not yet implemented."
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def delete_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
return efs_backends[region_name].delete_file_system(resource_name)
|
||||
|
||||
|
||||
class MountTarget(CloudFormationModel):
|
||||
"""A model for an EFS Mount Target."""
|
||||
|
||||
def __init__(self, file_system, subnet, ip_address, security_groups):
|
||||
# Set the simple given parameters.
|
||||
self.file_system_id = file_system.file_system_id
|
||||
self._file_system = file_system
|
||||
self._file_system.add_mount_target(subnet, self)
|
||||
self.subnet_id = subnet.id
|
||||
self._subnet = subnet
|
||||
self.vpc_id = subnet.vpc_id
|
||||
self.security_groups = security_groups
|
||||
|
||||
# Check the number of security groups.
|
||||
if self.security_groups is not None and len(self.security_groups) > 5:
|
||||
raise SecurityGroupLimitExceeded(
|
||||
"The maximum number of security groups per interface has been reached."
|
||||
)
|
||||
|
||||
# Get an IP address if needed, otherwise validate the one we're given.
|
||||
if ip_address is None:
|
||||
ip_address = subnet.get_available_subnet_ip(self)
|
||||
else:
|
||||
try:
|
||||
subnet.request_ip(ip_address, self)
|
||||
except Exception as e:
|
||||
if "IP" in str(e) and "CIDR" in str(e):
|
||||
raise BadRequest(
|
||||
"Address does not fall within the subnet's address range"
|
||||
)
|
||||
else:
|
||||
raise e
|
||||
self.ip_address = ip_address
|
||||
|
||||
# Init non-user-assigned values.
|
||||
self.owner_id = ACCOUNT_ID
|
||||
self.mount_target_id = "fsmt-{}".format(get_random_hex())
|
||||
self.life_cycle_state = "available"
|
||||
self.network_interface_id = None
|
||||
self.availability_zone_id = subnet.availability_zone_id
|
||||
self.availability_zone_name = subnet.availability_zone
|
||||
|
||||
def clean_up(self):
|
||||
self._file_system.remove_mount_target(self._subnet)
|
||||
self._subnet.del_subnet_ip(self.ip_address)
|
||||
|
||||
def set_network_interface(self, network_interface):
|
||||
self.network_interface_id = network_interface.id
|
||||
|
||||
def info_json(self):
|
||||
ret = {
|
||||
underscores_to_camelcase(k.capitalize()): v
|
||||
for k, v in self.__dict__.items()
|
||||
if not k.startswith("_")
|
||||
}
|
||||
return ret
|
||||
|
||||
@property
|
||||
def physical_resource_id(self):
|
||||
return self.mounted_target_id
|
||||
|
||||
@property
|
||||
def subnet_vpc_id(self):
|
||||
return self._subnet.vpc_id
|
||||
|
||||
@staticmethod
|
||||
def cloudformation_name_type():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def cloudformation_type():
|
||||
return "AWS::EFS::MountTarget"
|
||||
|
||||
@classmethod
|
||||
def create_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-mounttarget.html
|
||||
props = deepcopy(cloudformation_json["Properties"])
|
||||
props = {camelcase_to_underscores(k): v for k, v in props.items()}
|
||||
return efs_backends[region_name].create_mount_target(**props)
|
||||
|
||||
@classmethod
|
||||
def update_from_cloudformation_json(
|
||||
cls, original_resource, new_resource_name, cloudformation_json, region_name
|
||||
):
|
||||
raise NotImplementedError(
|
||||
"Updates of EFS Mount Target via cloudformation are not yet implemented."
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def delete_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
return efs_backends[region_name].delete_mount_target(resource_name)
|
||||
|
||||
|
||||
class EFSBackend(BaseBackend):
|
||||
"""The backend manager of EFS resources.
|
||||
|
||||
This is the state-machine for each region, tracking the file systems, mount targets,
|
||||
and eventually access points that are deployed. Creating, updating, and destroying
|
||||
such resources should always go through this class.
|
||||
"""
|
||||
|
||||
def __init__(self, region_name=None):
|
||||
super(EFSBackend, self).__init__()
|
||||
self.region_name = region_name
|
||||
self.creation_tokens = set()
|
||||
self.file_systems_by_id = {}
|
||||
self.mount_targets_by_id = {}
|
||||
self.next_markers = {}
|
||||
|
||||
def reset(self):
|
||||
# preserve region
|
||||
region_name = self.region_name
|
||||
self.__dict__ = {}
|
||||
self.__init__(region_name)
|
||||
|
||||
def _mark_description(self, corpus, max_items):
|
||||
if max_items < len(corpus):
|
||||
new_corpus = corpus[max_items:]
|
||||
new_hash = md5(json.dumps(new_corpus).encode("utf-8"))
|
||||
next_marker = new_hash.hexdigest()
|
||||
self.next_markers[next_marker] = new_corpus
|
||||
else:
|
||||
next_marker = None
|
||||
return next_marker
|
||||
|
||||
@property
|
||||
def ec2_backend(self):
|
||||
return ec2_backends[self.region_name]
|
||||
|
||||
def create_file_system(self, creation_token, **params):
|
||||
"""Create a new EFS File System Volume.
|
||||
|
||||
https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html
|
||||
"""
|
||||
if not creation_token:
|
||||
raise ValueError("No creation token given.")
|
||||
if creation_token in self.creation_tokens:
|
||||
raise FileSystemAlreadyExists(creation_token)
|
||||
|
||||
# Create a new file system ID:
|
||||
def make_id():
|
||||
return "fs-{}".format(get_random_hex())
|
||||
|
||||
fsid = make_id()
|
||||
while fsid in self.file_systems_by_id:
|
||||
fsid = make_id()
|
||||
self.file_systems_by_id[fsid] = FileSystem(
|
||||
self.region_name,
|
||||
creation_token,
|
||||
fsid,
|
||||
**{k: v for k, v in params.items() if v is not None}
|
||||
)
|
||||
self.creation_tokens.add(creation_token)
|
||||
return self.file_systems_by_id[fsid]
|
||||
|
||||
def describe_file_systems(self, marker, max_items, creation_token, file_system_id):
|
||||
"""Describe all the EFS File Systems, or specific File Systems.
|
||||
|
||||
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
|
||||
"""
|
||||
# Restrict the possible corpus of resules based on inputs.
|
||||
if creation_token and file_system_id:
|
||||
raise BadRequest(
|
||||
"Request cannot contain both a file system ID and a creation token."
|
||||
)
|
||||
elif creation_token:
|
||||
# Handle the creation token case.
|
||||
corpus = []
|
||||
for fs in self.file_systems_by_id.values():
|
||||
if fs.creation_token == creation_token:
|
||||
corpus.append(fs.info_json())
|
||||
elif file_system_id:
|
||||
# Handle the case that a file_system_id is given.
|
||||
if file_system_id not in self.file_systems_by_id:
|
||||
raise FileSystemNotFound(file_system_id)
|
||||
corpus = [self.file_systems_by_id[file_system_id]]
|
||||
elif marker is not None:
|
||||
# Handle the case that a marker is given.
|
||||
if marker not in self.next_markers:
|
||||
raise BadRequest("Invalid Marker")
|
||||
corpus = self.next_markers[marker]
|
||||
else:
|
||||
# Handle the vanilla case.
|
||||
corpus = [fs.info_json() for fs in self.file_systems_by_id.values()]
|
||||
|
||||
# Handle the max_items parameter.
|
||||
file_systems = corpus[:max_items]
|
||||
next_marker = self._mark_description(corpus, max_items)
|
||||
return next_marker, file_systems
|
||||
|
||||
def create_mount_target(
|
||||
self, file_system_id, subnet_id, ip_address=None, security_groups=None
|
||||
):
|
||||
"""Create a new EFS Mount Target for a given File System to a given subnet.
|
||||
|
||||
Note that you can only create one mount target for each availability zone
|
||||
(which is implied by the subnet ID).
|
||||
|
||||
https://docs.aws.amazon.com/efs/latest/ug/API_CreateMountTarget.html
|
||||
"""
|
||||
# Get the relevant existing resources
|
||||
try:
|
||||
subnet = self.ec2_backend.get_subnet(subnet_id)
|
||||
except InvalidSubnetIdError:
|
||||
raise SubnetNotFound(subnet_id)
|
||||
if file_system_id not in self.file_systems_by_id:
|
||||
raise FileSystemNotFound(file_system_id)
|
||||
file_system = self.file_systems_by_id[file_system_id]
|
||||
|
||||
# Validate the security groups.
|
||||
if security_groups:
|
||||
sg_lookup = {sg.id for sg in self.ec2_backend.describe_security_groups()}
|
||||
for sg_id in security_groups:
|
||||
if sg_id not in sg_lookup:
|
||||
raise SecurityGroupNotFound(sg_id)
|
||||
|
||||
# Create the new mount target
|
||||
mount_target = MountTarget(file_system, subnet, ip_address, security_groups)
|
||||
|
||||
# Establish the network interface.
|
||||
network_interface = self.ec2_backend.create_network_interface(
|
||||
subnet, [mount_target.ip_address], group_ids=security_groups
|
||||
)
|
||||
mount_target.set_network_interface(network_interface)
|
||||
|
||||
# Record the new mount target
|
||||
self.mount_targets_by_id[mount_target.mount_target_id] = mount_target
|
||||
return mount_target
|
||||
|
||||
def describe_mount_targets(
|
||||
self, max_items, file_system_id, mount_target_id, access_point_id, marker
|
||||
):
|
||||
"""Describe the mount targets given a mount target ID or a file system ID.
|
||||
|
||||
Note that as of this writing access points, and thus access point IDs are not
|
||||
supported.
|
||||
|
||||
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
|
||||
"""
|
||||
# Restrict the possible corpus of results based on inputs.
|
||||
if not (bool(file_system_id) ^ bool(mount_target_id) ^ bool(access_point_id)):
|
||||
raise BadRequest("Must specify exactly one mutually exclusive parameter.")
|
||||
elif file_system_id:
|
||||
# Handle the case that a file_system_id is given.
|
||||
if file_system_id not in self.file_systems_by_id:
|
||||
raise FileSystemNotFound(file_system_id)
|
||||
corpus = [
|
||||
mt.info_json()
|
||||
for mt in self.file_systems_by_id[file_system_id].iter_mount_targets()
|
||||
]
|
||||
elif mount_target_id:
|
||||
if mount_target_id not in self.mount_targets_by_id:
|
||||
raise MountTargetNotFound(mount_target_id)
|
||||
# Handle mount target specification case.
|
||||
corpus = [self.mount_targets_by_id[mount_target_id].info_json()]
|
||||
else:
|
||||
# We don't handle access_point_id's yet.
|
||||
assert False, "Moto does not yet support EFS access points."
|
||||
|
||||
# Handle the case that a marker is given. Note that the handling is quite
|
||||
# different from that in describe_file_systems.
|
||||
if marker is not None:
|
||||
if marker not in self.next_markers:
|
||||
raise BadRequest("Invalid Marker")
|
||||
corpus_mtids = {m["MountTargetId"] for m in corpus}
|
||||
marked_mtids = {m["MountTargetId"] for m in self.next_markers[marker]}
|
||||
mt_ids = corpus_mtids & marked_mtids
|
||||
corpus = [self.mount_targets_by_id[mt_id].info_json() for mt_id in mt_ids]
|
||||
|
||||
# Handle the max_items parameter.
|
||||
mount_targets = corpus[:max_items]
|
||||
next_marker = self._mark_description(corpus, max_items)
|
||||
return next_marker, mount_targets
|
||||
|
||||
def delete_file_system(self, file_system_id):
|
||||
"""Delete the file system specified by the given file_system_id.
|
||||
|
||||
Note that mount targets must be deleted first.
|
||||
|
||||
https://docs.aws.amazon.com/efs/latest/ug/API_DeleteFileSystem.html
|
||||
"""
|
||||
if file_system_id not in self.file_systems_by_id:
|
||||
raise FileSystemNotFound(file_system_id)
|
||||
|
||||
file_system = self.file_systems_by_id[file_system_id]
|
||||
if file_system.number_of_mount_targets > 0:
|
||||
raise FileSystemInUse(
|
||||
"Must delete all mount targets before deleting file system."
|
||||
)
|
||||
|
||||
del self.file_systems_by_id[file_system_id]
|
||||
self.creation_tokens.remove(file_system.creation_token)
|
||||
return
|
||||
|
||||
def delete_mount_target(self, mount_target_id):
|
||||
"""Delete a mount target specified by the given mount_target_id.
|
||||
|
||||
Note that this will also delete a network interface.
|
||||
|
||||
https://docs.aws.amazon.com/efs/latest/ug/API_DeleteMountTarget.html
|
||||
"""
|
||||
if mount_target_id not in self.mount_targets_by_id:
|
||||
raise MountTargetNotFound(mount_target_id)
|
||||
|
||||
mount_target = self.mount_targets_by_id[mount_target_id]
|
||||
self.ec2_backend.delete_network_interface(mount_target.network_interface_id)
|
||||
del self.mount_targets_by_id[mount_target_id]
|
||||
mount_target.clean_up()
|
||||
return
|
||||
|
||||
def describe_backup_policy(self, file_system_id):
|
||||
backup_policy = self.file_systems_by_id[file_system_id].backup_policy
|
||||
if not backup_policy:
|
||||
raise PolicyNotFound("None")
|
||||
return backup_policy
|
||||
|
||||
|
||||
efs_backends = {}
|
||||
for region in Session().get_available_regions("efs"):
|
||||
efs_backends[region] = EFSBackend(region)
|
||||
for region in Session().get_available_regions("efs", partition_name="aws-us-gov"):
|
||||
efs_backends[region] = EFSBackend(region)
|
||||
for region in Session().get_available_regions("efs", partition_name="aws-cn"):
|
||||
efs_backends[region] = EFSBackend(region)
|
115
moto/efs/responses.py
Normal file
115
moto/efs/responses.py
Normal file
@ -0,0 +1,115 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
|
||||
from .models import efs_backends
|
||||
|
||||
|
||||
class EFSResponse(BaseResponse):
|
||||
SERVICE_NAME = "efs"
|
||||
|
||||
@property
|
||||
def efs_backend(self):
|
||||
return efs_backends[self.region]
|
||||
|
||||
def create_file_system(self):
|
||||
creation_token = self._get_param("CreationToken")
|
||||
performance_mode = self._get_param("PerformanceMode")
|
||||
encrypted = self._get_param("Encrypted")
|
||||
kms_key_id = self._get_param("KmsKeyId")
|
||||
throughput_mode = self._get_param("ThroughputMode")
|
||||
provisioned_throughput_in_mibps = self._get_param(
|
||||
"ProvisionedThroughputInMibps"
|
||||
)
|
||||
availability_zone_name = self._get_param("AvailabilityZoneName")
|
||||
backup = self._get_param("Backup")
|
||||
tags = self._get_param("Tags")
|
||||
resource = self.efs_backend.create_file_system(
|
||||
creation_token=creation_token,
|
||||
performance_mode=performance_mode,
|
||||
encrypted=encrypted,
|
||||
kms_key_id=kms_key_id,
|
||||
throughput_mode=throughput_mode,
|
||||
provisioned_throughput_in_mibps=provisioned_throughput_in_mibps,
|
||||
availability_zone_name=availability_zone_name,
|
||||
backup=backup,
|
||||
tags=tags,
|
||||
)
|
||||
return (
|
||||
json.dumps(resource.info_json()),
|
||||
{"status": 201, "Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
def describe_file_systems(self):
|
||||
max_items = self._get_int_param("MaxItems", 10)
|
||||
marker = self._get_param("Marker")
|
||||
creation_token = self._get_param("CreationToken")
|
||||
file_system_id = self._get_param("FileSystemId")
|
||||
next_marker, file_systems = self.efs_backend.describe_file_systems(
|
||||
marker=marker,
|
||||
max_items=max_items,
|
||||
creation_token=creation_token,
|
||||
file_system_id=file_system_id,
|
||||
)
|
||||
resp_json = {"FileSystems": file_systems}
|
||||
if marker:
|
||||
resp_json["Marker"] = marker
|
||||
if next_marker:
|
||||
resp_json["NextMarker"] = next_marker
|
||||
return json.dumps(resp_json), {"Content-Type": "application/json"}
|
||||
|
||||
def create_mount_target(self):
|
||||
file_system_id = self._get_param("FileSystemId")
|
||||
subnet_id = self._get_param("SubnetId")
|
||||
ip_address = self._get_param("IpAddress")
|
||||
security_groups = self._get_param("SecurityGroups")
|
||||
mount_target = self.efs_backend.create_mount_target(
|
||||
file_system_id=file_system_id,
|
||||
subnet_id=subnet_id,
|
||||
ip_address=ip_address,
|
||||
security_groups=security_groups,
|
||||
)
|
||||
return (
|
||||
json.dumps(mount_target.info_json()),
|
||||
{"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
def describe_mount_targets(self):
|
||||
max_items = self._get_int_param("MaxItems", 10)
|
||||
marker = self._get_param("Marker")
|
||||
file_system_id = self._get_param("FileSystemId")
|
||||
mount_target_id = self._get_param("MountTargetId")
|
||||
access_point_id = self._get_param("AccessPointId")
|
||||
next_marker, mount_targets = self.efs_backend.describe_mount_targets(
|
||||
max_items=max_items,
|
||||
file_system_id=file_system_id,
|
||||
mount_target_id=mount_target_id,
|
||||
access_point_id=access_point_id,
|
||||
marker=marker,
|
||||
)
|
||||
resp_json = {"MountTargets": mount_targets}
|
||||
if marker:
|
||||
resp_json["Marker"] = marker
|
||||
if next_marker:
|
||||
resp_json["NextMarker"] = next_marker
|
||||
return json.dumps(resp_json), {"Content-Type": "application/json"}
|
||||
|
||||
def delete_file_system(self):
|
||||
file_system_id = self._get_param("FileSystemId")
|
||||
self.efs_backend.delete_file_system(file_system_id=file_system_id,)
|
||||
return json.dumps(dict()), {"status": 204, "Content-Type": "application/json"}
|
||||
|
||||
def delete_mount_target(self):
|
||||
mount_target_id = self._get_param("MountTargetId")
|
||||
self.efs_backend.delete_mount_target(mount_target_id=mount_target_id,)
|
||||
return json.dumps(dict()), {"status": 204, "Content-Type": "application/json"}
|
||||
|
||||
def describe_backup_policy(self):
|
||||
file_system_id = self._get_param("FileSystemId")
|
||||
backup_policy = self.efs_backend.describe_backup_policy(
|
||||
file_system_id=file_system_id,
|
||||
)
|
||||
resp = {"BackupPolicy": backup_policy}
|
||||
return json.dumps(resp), {"Content-Type": "application/json"}
|
21
moto/efs/urls.py
Normal file
21
moto/efs/urls.py
Normal file
@ -0,0 +1,21 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from .responses import EFSResponse
|
||||
|
||||
url_bases = [
|
||||
"https?://elasticfilesystem.(.+).amazonaws.com",
|
||||
"https?://elasticfilesystem.amazonaws.com",
|
||||
]
|
||||
|
||||
|
||||
response = EFSResponse()
|
||||
|
||||
|
||||
url_paths = {
|
||||
"{0}/.*?$": response.dispatch,
|
||||
"/2015-02-01/file-systems": response.dispatch,
|
||||
"/2015-02-01/file-systems/<file_system_id>": response.dispatch,
|
||||
"/2015-02-01/file-systems/<file_system_id>/backup-policy": response.dispatch,
|
||||
"/2015-02-01/mount-targets": response.dispatch,
|
||||
"/2015-02-01/mount-targets/<mount_target_id>": response.dispatch,
|
||||
}
|
@ -193,7 +193,7 @@ class Database(CloudFormationModel):
|
||||
if not self.engine_version:
|
||||
return (None, None)
|
||||
|
||||
minor_engine_version = ".".join(self.engine_version.rsplit(".")[:-1])
|
||||
minor_engine_version = ".".join(str(self.engine_version).rsplit(".")[:-1])
|
||||
db_family = "{0}{1}".format(self.engine.lower(), minor_engine_version)
|
||||
|
||||
return db_family, "default.{0}".format(db_family)
|
||||
|
0
tests/test_efs/__init__.py
Normal file
0
tests/test_efs/__init__.py
Normal file
2
tests/test_efs/junk_drawer.py
Normal file
2
tests/test_efs/junk_drawer.py
Normal file
@ -0,0 +1,2 @@
|
||||
def has_status_code(response, code):
|
||||
return response["ResponseMetadata"]["HTTPStatusCode"] == code
|
363
tests/test_efs/test_file_system.py
Normal file
363
tests/test_efs/test_file_system.py
Normal file
@ -0,0 +1,363 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import re
|
||||
from os import environ
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from moto import mock_efs
|
||||
from tests.test_efs.junk_drawer import has_status_code
|
||||
|
||||
ARN_PATT = "^arn:(?P<Partition>[^:\n]*):(?P<Service>[^:\n]*):(?P<Region>[^:\n]*):(?P<AccountID>[^:\n]*):(?P<Ignore>(?P<ResourceType>[^:\/\n]*)[:\/])?(?P<Resource>.*)$"
|
||||
STRICT_ARN_PATT = "^arn:aws:[a-z]+:[a-z]{2}-[a-z]+-[0-9]:[0-9]+:[a-z-]+\/[a-z0-9-]+$"
|
||||
|
||||
SAMPLE_1_PARAMS = {
|
||||
"CreationToken": "myFileSystem1",
|
||||
"PerformanceMode": "generalPurpose",
|
||||
"Backup": True,
|
||||
"Encrypted": True,
|
||||
"Tags": [{"Key": "Name", "Value": "Test Group1"}],
|
||||
}
|
||||
|
||||
SAMPLE_2_PARAMS = {
|
||||
"CreationToken": "myFileSystem2",
|
||||
"PerformanceMode": "generalPurpose",
|
||||
"Backup": True,
|
||||
"AvailabilityZoneName": "us-west-2b",
|
||||
"Encrypted": True,
|
||||
"ThroughputMode": "provisioned",
|
||||
"ProvisionedThroughputInMibps": 60,
|
||||
"Tags": [{"Key": "Name", "Value": "Test Group1"}],
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def aws_credentials():
|
||||
"""Mocked AWS Credentials for moto."""
|
||||
environ["AWS_ACCESS_KEY_ID"] = "testing"
|
||||
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
|
||||
environ["AWS_SECURITY_TOKEN"] = "testing"
|
||||
environ["AWS_SESSION_TOKEN"] = "testing"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def efs(aws_credentials):
|
||||
with mock_efs():
|
||||
yield boto3.client("efs", region_name="us-east-1")
|
||||
|
||||
|
||||
# Testing Create
|
||||
# ==============
|
||||
|
||||
|
||||
def test_create_file_system_correct_use(efs):
|
||||
from datetime import datetime
|
||||
|
||||
creation_token = "test_efs_create"
|
||||
create_fs_resp = efs.create_file_system(
|
||||
CreationToken=creation_token,
|
||||
Tags=[{"Key": "Name", "Value": "Test EFS Container"}],
|
||||
)
|
||||
|
||||
# Check the response.
|
||||
assert has_status_code(create_fs_resp, 201)
|
||||
assert create_fs_resp["CreationToken"] == creation_token
|
||||
assert "fs-" in create_fs_resp["FileSystemId"]
|
||||
assert isinstance(create_fs_resp["CreationTime"], datetime)
|
||||
assert create_fs_resp["LifeCycleState"] == "available"
|
||||
assert create_fs_resp["Tags"][0] == {"Key": "Name", "Value": "Test EFS Container"}
|
||||
assert create_fs_resp["ThroughputMode"] == "bursting"
|
||||
assert create_fs_resp["PerformanceMode"] == "generalPurpose"
|
||||
assert create_fs_resp["Encrypted"] == False
|
||||
assert create_fs_resp["NumberOfMountTargets"] == 0
|
||||
for key_name in ["Value", "ValueInIA", "ValueInStandard"]:
|
||||
assert key_name in create_fs_resp["SizeInBytes"]
|
||||
assert create_fs_resp["SizeInBytes"][key_name] == 0
|
||||
assert re.match(STRICT_ARN_PATT, create_fs_resp["FileSystemArn"])
|
||||
|
||||
# Check the (lack of the) backup policy.
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_backup_policy(FileSystemId=create_fs_resp["FileSystemId"])
|
||||
resp = exc_info.value.response
|
||||
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 404
|
||||
assert "PolicyNotFound" in resp["Error"]["Message"]
|
||||
|
||||
# Check the arn in detail
|
||||
match_obj = re.match(ARN_PATT, create_fs_resp["FileSystemArn"])
|
||||
arn_parts = match_obj.groupdict()
|
||||
assert arn_parts["ResourceType"] == "file-system"
|
||||
assert arn_parts["Resource"] == create_fs_resp["FileSystemId"]
|
||||
assert arn_parts["Service"] == "elasticfilesystem"
|
||||
assert arn_parts["AccountID"] == create_fs_resp["OwnerId"]
|
||||
|
||||
|
||||
def test_create_file_system_aws_sample_1(efs):
|
||||
resp = efs.create_file_system(**SAMPLE_1_PARAMS)
|
||||
resp_metadata = resp.pop("ResponseMetadata")
|
||||
assert resp_metadata["HTTPStatusCode"] == 201
|
||||
assert set(resp.keys()) == {
|
||||
"OwnerId",
|
||||
"CreationToken",
|
||||
"Encrypted",
|
||||
"PerformanceMode",
|
||||
"FileSystemId",
|
||||
"FileSystemArn",
|
||||
"CreationTime",
|
||||
"LifeCycleState",
|
||||
"NumberOfMountTargets",
|
||||
"SizeInBytes",
|
||||
"Tags",
|
||||
"ThroughputMode",
|
||||
}
|
||||
assert resp["Tags"] == [{"Key": "Name", "Value": "Test Group1"}]
|
||||
assert resp["PerformanceMode"] == "generalPurpose"
|
||||
assert resp["Encrypted"]
|
||||
|
||||
policy_resp = efs.describe_backup_policy(FileSystemId=resp["FileSystemId"])
|
||||
assert policy_resp["BackupPolicy"]["Status"] == "ENABLED"
|
||||
|
||||
|
||||
def test_create_file_system_aws_sample_2(efs):
|
||||
resp = efs.create_file_system(**SAMPLE_2_PARAMS)
|
||||
resp_metadata = resp.pop("ResponseMetadata")
|
||||
assert resp_metadata["HTTPStatusCode"] == 201
|
||||
assert set(resp.keys()) == {
|
||||
"AvailabilityZoneId",
|
||||
"AvailabilityZoneName",
|
||||
"PerformanceMode",
|
||||
"ProvisionedThroughputInMibps",
|
||||
"SizeInBytes",
|
||||
"Tags",
|
||||
"ThroughputMode",
|
||||
"CreationTime",
|
||||
"CreationToken",
|
||||
"Encrypted",
|
||||
"LifeCycleState",
|
||||
"FileSystemId",
|
||||
"FileSystemArn",
|
||||
"NumberOfMountTargets",
|
||||
"OwnerId",
|
||||
}
|
||||
assert resp["ProvisionedThroughputInMibps"] == 60
|
||||
assert resp["AvailabilityZoneId"] == "usw2-az1"
|
||||
assert resp["AvailabilityZoneName"] == "us-west-2b"
|
||||
assert resp["ThroughputMode"] == "provisioned"
|
||||
|
||||
policy_resp = efs.describe_backup_policy(FileSystemId=resp["FileSystemId"])
|
||||
assert policy_resp["BackupPolicy"]["Status"] == "ENABLED"
|
||||
|
||||
|
||||
def test_create_file_system_az_name_given_backup_default(efs):
|
||||
resp = efs.create_file_system(AvailabilityZoneName="us-east-1e")
|
||||
policy_resp = efs.describe_backup_policy(FileSystemId=resp["FileSystemId"])
|
||||
assert policy_resp["BackupPolicy"]["Status"] == "ENABLED"
|
||||
|
||||
|
||||
def test_create_file_system_no_creation_token_given(efs):
|
||||
# Note that from the API docs, it would seem this should create an error. However it
|
||||
# turns out that botocore just automatically assigns a UUID.
|
||||
resp = efs.create_file_system()
|
||||
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 201
|
||||
assert "CreationToken" in resp
|
||||
|
||||
|
||||
def test_create_file_system_file_system_already_exists(efs):
|
||||
efs.create_file_system(CreationToken="foo")
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_file_system(CreationToken="foo")
|
||||
resp = exc_info.value.response
|
||||
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 409
|
||||
assert "FileSystemAlreadyExists" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
# Testing Describe
|
||||
# ================
|
||||
|
||||
|
||||
def test_describe_file_systems_minimal_case(efs):
|
||||
# Create the file system.
|
||||
create_fs_resp = efs.create_file_system(CreationToken="foobar")
|
||||
create_fs_resp.pop("ResponseMetadata")
|
||||
|
||||
# Describe the file systems.
|
||||
desc_fs_resp = efs.describe_file_systems()
|
||||
desc_fs_resp_metadata = desc_fs_resp.pop("ResponseMetadata")
|
||||
assert desc_fs_resp_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the list results.
|
||||
fs_list = desc_fs_resp["FileSystems"]
|
||||
assert len(fs_list) == 1
|
||||
file_system = fs_list[0]
|
||||
assert set(file_system.keys()) == {
|
||||
"CreationTime",
|
||||
"CreationToken",
|
||||
"Encrypted",
|
||||
"LifeCycleState",
|
||||
"PerformanceMode",
|
||||
"SizeInBytes",
|
||||
"Tags",
|
||||
"ThroughputMode",
|
||||
"FileSystemId",
|
||||
"FileSystemArn",
|
||||
"NumberOfMountTargets",
|
||||
"OwnerId",
|
||||
}
|
||||
assert file_system["FileSystemId"] == create_fs_resp["FileSystemId"]
|
||||
|
||||
# Pop out the timestamps and see if the rest of the description is the same.
|
||||
create_fs_resp["SizeInBytes"].pop("Timestamp")
|
||||
file_system["SizeInBytes"].pop("Timestamp")
|
||||
assert file_system == create_fs_resp
|
||||
|
||||
|
||||
def test_describe_file_systems_aws_create_sample_2(efs):
|
||||
efs.create_file_system(**SAMPLE_2_PARAMS)
|
||||
|
||||
# Describe the file systems.
|
||||
desc_resp = efs.describe_file_systems()
|
||||
desc_fs_resp_metadata = desc_resp.pop("ResponseMetadata")
|
||||
assert desc_fs_resp_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the list results.
|
||||
fs_list = desc_resp["FileSystems"]
|
||||
assert len(fs_list) == 1
|
||||
file_system = fs_list[0]
|
||||
|
||||
assert set(file_system.keys()) == {
|
||||
"AvailabilityZoneId",
|
||||
"AvailabilityZoneName",
|
||||
"CreationTime",
|
||||
"CreationToken",
|
||||
"Encrypted",
|
||||
"LifeCycleState",
|
||||
"PerformanceMode",
|
||||
"ProvisionedThroughputInMibps",
|
||||
"SizeInBytes",
|
||||
"Tags",
|
||||
"ThroughputMode",
|
||||
"FileSystemId",
|
||||
"FileSystemArn",
|
||||
"NumberOfMountTargets",
|
||||
"OwnerId",
|
||||
}
|
||||
assert file_system["ProvisionedThroughputInMibps"] == 60
|
||||
assert file_system["AvailabilityZoneId"] == "usw2-az1"
|
||||
assert file_system["AvailabilityZoneName"] == "us-west-2b"
|
||||
assert file_system["ThroughputMode"] == "provisioned"
|
||||
|
||||
|
||||
def test_describe_file_systems_paging(efs):
|
||||
# Create several file systems.
|
||||
for i in range(10):
|
||||
efs.create_file_system(CreationToken="foobar_{}".format(i))
|
||||
|
||||
# First call (Start)
|
||||
# ------------------
|
||||
|
||||
# Call the tested function
|
||||
resp1 = efs.describe_file_systems(MaxItems=4)
|
||||
|
||||
# Check the response status
|
||||
assert has_status_code(resp1, 200)
|
||||
|
||||
# Check content of the result.
|
||||
resp1.pop("ResponseMetadata")
|
||||
assert set(resp1.keys()) == {"NextMarker", "FileSystems"}
|
||||
assert len(resp1["FileSystems"]) == 4
|
||||
fs_id_set_1 = {fs["FileSystemId"] for fs in resp1["FileSystems"]}
|
||||
|
||||
# Second call (Middle)
|
||||
# --------------------
|
||||
|
||||
# Get the next marker.
|
||||
resp2 = efs.describe_file_systems(MaxItems=4, Marker=resp1["NextMarker"])
|
||||
|
||||
# Check the response status
|
||||
resp2_metadata = resp2.pop("ResponseMetadata")
|
||||
assert resp2_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the response contents.
|
||||
assert set(resp2.keys()) == {"NextMarker", "FileSystems", "Marker"}
|
||||
assert len(resp2["FileSystems"]) == 4
|
||||
assert resp2["Marker"] == resp1["NextMarker"]
|
||||
fs_id_set_2 = {fs["FileSystemId"] for fs in resp2["FileSystems"]}
|
||||
assert fs_id_set_1 & fs_id_set_2 == set()
|
||||
|
||||
# Third call (End)
|
||||
# ----------------
|
||||
|
||||
# Get the last marker results
|
||||
resp3 = efs.describe_file_systems(MaxItems=4, Marker=resp2["NextMarker"])
|
||||
|
||||
# Check the response status
|
||||
resp3_metadata = resp3.pop("ResponseMetadata")
|
||||
assert resp3_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the response contents.
|
||||
assert set(resp3.keys()) == {"FileSystems", "Marker"}
|
||||
assert len(resp3["FileSystems"]) == 2
|
||||
assert resp3["Marker"] == resp2["NextMarker"]
|
||||
fs_id_set_3 = {fs["FileSystemId"] for fs in resp3["FileSystems"]}
|
||||
assert fs_id_set_3 & (fs_id_set_1 | fs_id_set_2) == set()
|
||||
|
||||
|
||||
def test_describe_file_systems_invalid_marker(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_file_systems(Marker="fiddlesticks")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 400)
|
||||
assert "BadRequest" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_describe_file_systems_invalid_creation_token(efs):
|
||||
resp = efs.describe_file_systems(CreationToken="fizzle")
|
||||
assert has_status_code(resp, 200)
|
||||
assert len(resp["FileSystems"]) == 0
|
||||
|
||||
|
||||
def test_describe_file_systems_invalid_file_system_id(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_file_systems(FileSystemId="fs-29879313")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "FileSystemNotFound" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_describe_file_system_creation_token_and_file_system_id(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_file_systems(CreationToken="fizzle", FileSystemId="fs-07987987")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 400)
|
||||
assert "BadRequest" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
# Testing Delete
|
||||
# ==============
|
||||
|
||||
|
||||
def test_delete_file_system_minimal_case(efs):
|
||||
# Create the file system
|
||||
resp = efs.create_file_system()
|
||||
|
||||
# Describe the file system, prove it shows up.
|
||||
desc1 = efs.describe_file_systems()
|
||||
assert len(desc1["FileSystems"]) == 1
|
||||
assert resp["FileSystemId"] in {fs["FileSystemId"] for fs in desc1["FileSystems"]}
|
||||
|
||||
# Delete the file system.
|
||||
del_resp = efs.delete_file_system(FileSystemId=resp["FileSystemId"])
|
||||
assert has_status_code(del_resp, 204)
|
||||
|
||||
# Check that the file system is no longer there.
|
||||
desc2 = efs.describe_file_systems()
|
||||
assert len(desc2["FileSystems"]) == 0
|
||||
|
||||
|
||||
def test_delete_file_system_invalid_file_system_id(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.delete_file_system(FileSystemId="fs-2394287")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "FileSystemNotFound" in resp["Error"]["Message"]
|
394
tests/test_efs/test_mount_target.py
Normal file
394
tests/test_efs/test_mount_target.py
Normal file
@ -0,0 +1,394 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import re
|
||||
import sys
|
||||
from os import environ
|
||||
from ipaddress import IPv4Network
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from moto import mock_ec2, mock_efs
|
||||
from moto.core import ACCOUNT_ID
|
||||
from tests.test_efs.junk_drawer import has_status_code
|
||||
|
||||
|
||||
# Handle the fact that `subnet_of` is not a feature before 3.7.
|
||||
# Source for alternative version: https://github.com/python/cpython/blob/v3.7.0/Lib/ipaddress.py#L976
|
||||
# Discovered via: https://stackoverflow.com/questions/35115138/how-do-i-check-if-a-network-is-contained-in-another-network-in-python
|
||||
if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
|
||||
|
||||
def is_subnet_of(a, b):
|
||||
return a.subnet_of(b)
|
||||
|
||||
|
||||
else:
|
||||
|
||||
def is_subnet_of(a, b):
|
||||
try:
|
||||
# Always false if one is v4 and the other is v6.
|
||||
if a._version != b._version:
|
||||
raise TypeError(f"{a} and {b} are not of the same version")
|
||||
return (
|
||||
b.network_address <= a.network_address
|
||||
and b.broadcast_address >= a.broadcast_address
|
||||
)
|
||||
except AttributeError:
|
||||
raise TypeError(
|
||||
f"Unable to test subnet containment " f"between {a} and {b}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def aws_credentials():
|
||||
"""Mocked AWS Credentials for moto."""
|
||||
environ["AWS_ACCESS_KEY_ID"] = "testing"
|
||||
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
|
||||
environ["AWS_SECURITY_TOKEN"] = "testing"
|
||||
environ["AWS_SESSION_TOKEN"] = "testing"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def ec2(aws_credentials):
|
||||
with mock_ec2():
|
||||
yield boto3.client("ec2", region_name="us-east-1")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def efs(aws_credentials):
|
||||
with mock_efs():
|
||||
yield boto3.client("efs", region_name="us-east-1")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def file_system(efs):
|
||||
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
|
||||
create_fs_resp.pop("ResponseMetadata")
|
||||
yield create_fs_resp
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def subnet(ec2):
|
||||
desc_sn_resp = ec2.describe_subnets()
|
||||
subnet = desc_sn_resp["Subnets"][0]
|
||||
yield subnet
|
||||
|
||||
|
||||
def test_create_mount_target_minimal_correct_use(efs, file_system, subnet):
|
||||
subnet_id = subnet["SubnetId"]
|
||||
file_system_id = file_system["FileSystemId"]
|
||||
|
||||
# Create the mount target.
|
||||
create_mt_resp = efs.create_mount_target(
|
||||
FileSystemId=file_system_id, SubnetId=subnet_id
|
||||
)
|
||||
|
||||
# Check the mount target response code.
|
||||
resp_metadata = create_mt_resp.pop("ResponseMetadata")
|
||||
assert resp_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the mount target response body.
|
||||
assert re.match("^fsmt-[a-f0-9]+$", create_mt_resp["MountTargetId"])
|
||||
assert re.match("^eni-[a-f0-9]+$", create_mt_resp["NetworkInterfaceId"])
|
||||
assert create_mt_resp["AvailabilityZoneId"] == subnet["AvailabilityZoneId"]
|
||||
assert create_mt_resp["AvailabilityZoneName"] == subnet["AvailabilityZone"]
|
||||
assert create_mt_resp["VpcId"] == subnet["VpcId"]
|
||||
assert create_mt_resp["SubnetId"] == subnet_id
|
||||
assert is_subnet_of(
|
||||
IPv4Network(create_mt_resp["IpAddress"]), IPv4Network(subnet["CidrBlock"])
|
||||
)
|
||||
assert create_mt_resp["FileSystemId"] == file_system_id
|
||||
assert create_mt_resp["OwnerId"] == ACCOUNT_ID
|
||||
assert create_mt_resp["LifeCycleState"] == "available"
|
||||
|
||||
# Check that the number of mount targets in the fs is correct.
|
||||
desc_fs_resp = efs.describe_file_systems()
|
||||
file_system = desc_fs_resp["FileSystems"][0]
|
||||
assert file_system["NumberOfMountTargets"] == 1
|
||||
return
|
||||
|
||||
|
||||
def test_create_mount_target_aws_sample_2(efs, ec2, file_system, subnet):
|
||||
subnet_id = subnet["SubnetId"]
|
||||
file_system_id = file_system["FileSystemId"]
|
||||
subnet_network = IPv4Network(subnet["CidrBlock"])
|
||||
for ip_addr_obj in subnet_network.hosts():
|
||||
ip_addr = ip_addr_obj.exploded
|
||||
break
|
||||
else:
|
||||
assert False, "Could not generate an IP address from CIDR block: {}".format(
|
||||
subnet["CidrBlock"]
|
||||
)
|
||||
desc_sg_resp = ec2.describe_security_groups()
|
||||
security_group = desc_sg_resp["SecurityGroups"][0]
|
||||
security_group_id = security_group["GroupId"]
|
||||
|
||||
# Make sure nothing chokes.
|
||||
sample_input = {
|
||||
"FileSystemId": file_system_id,
|
||||
"SubnetId": subnet_id,
|
||||
"IpAddress": ip_addr,
|
||||
"SecurityGroups": [security_group_id],
|
||||
}
|
||||
create_mt_resp = efs.create_mount_target(**sample_input)
|
||||
|
||||
# Check the mount target response code.
|
||||
resp_metadata = create_mt_resp.pop("ResponseMetadata")
|
||||
assert resp_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check that setting the IP Address worked.
|
||||
assert create_mt_resp["IpAddress"] == ip_addr
|
||||
|
||||
|
||||
def test_create_mount_target_invalid_file_system_id(efs, subnet):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(FileSystemId="fs-12343289", SubnetId=subnet["SubnetId"])
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "FileSystemNotFound" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_mount_target_invalid_subnet_id(efs, file_system):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId="subnet-12345678"
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "SubnetNotFound" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_mount_target_invalid_sg_id(efs, file_system, subnet):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"],
|
||||
SubnetId=subnet["SubnetId"],
|
||||
SecurityGroups=["sg-1234df235"],
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "SecurityGroupNotFound" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet):
|
||||
vpc_info = ec2.create_vpc(CidrBlock="10.1.0.0/16")
|
||||
new_subnet_info = ec2.create_subnet(
|
||||
VpcId=vpc_info["Vpc"]["VpcId"], CidrBlock="10.1.1.0/24"
|
||||
)
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"],
|
||||
SubnetId=new_subnet_info["Subnet"]["SubnetId"],
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 409)
|
||||
assert "MountTargetConflict" in resp["Error"]["Message"]
|
||||
assert "VPC" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_mount_target_duplicate_subnet_id(efs, file_system, subnet):
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 409)
|
||||
assert "MountTargetConflict" in resp["Error"]["Message"]
|
||||
assert "AZ" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_mount_target_subnets_in_same_zone(efs, ec2, file_system, subnet):
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
subnet_info = ec2.create_subnet(
|
||||
VpcId=subnet["VpcId"],
|
||||
CidrBlock="172.31.96.0/20",
|
||||
AvailabilityZone=subnet["AvailabilityZone"],
|
||||
)
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"],
|
||||
SubnetId=subnet_info["Subnet"]["SubnetId"],
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 409)
|
||||
assert "MountTargetConflict" in resp["Error"]["Message"]
|
||||
assert "AZ" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_mount_target_ip_address_out_of_range(efs, file_system, subnet):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"],
|
||||
SubnetId=subnet["SubnetId"],
|
||||
IpAddress="10.0.1.0",
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 400)
|
||||
assert "BadRequest" in resp["Error"]["Message"]
|
||||
assert "Address" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_create_mount_target_too_many_security_groups(efs, ec2, file_system, subnet):
|
||||
sg_id_list = []
|
||||
for i in range(6):
|
||||
sg_info = ec2.create_security_group(
|
||||
VpcId=subnet["VpcId"],
|
||||
GroupName="sg-{}".format(i),
|
||||
Description="SG-{} protects us from the Goa'uld.".format(i),
|
||||
)
|
||||
sg_id_list.append(sg_info["GroupId"])
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"],
|
||||
SubnetId=subnet["SubnetId"],
|
||||
SecurityGroups=sg_id_list,
|
||||
)
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 400)
|
||||
assert "SecurityGroupLimitExceeded" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_delete_file_system_mount_targets_attached(efs, ec2, file_system, subnet):
|
||||
efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.delete_file_system(FileSystemId=file_system["FileSystemId"])
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 409)
|
||||
assert "FileSystemInUse" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_describe_mount_targets_minimal_case(efs, ec2, file_system, subnet):
|
||||
create_resp = efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
create_resp.pop("ResponseMetadata")
|
||||
|
||||
# Describe the mount targets
|
||||
desc_mt_resp = efs.describe_mount_targets(FileSystemId=file_system["FileSystemId"])
|
||||
desc_mt_resp_metadata = desc_mt_resp.pop("ResponseMetadata")
|
||||
assert desc_mt_resp_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the list results.
|
||||
mt_list = desc_mt_resp["MountTargets"]
|
||||
assert len(mt_list) == 1
|
||||
mount_target = mt_list[0]
|
||||
assert mount_target["MountTargetId"] == create_resp["MountTargetId"]
|
||||
|
||||
# Pop out the timestamps and see if the rest of the description is the same.
|
||||
assert mount_target == create_resp
|
||||
|
||||
|
||||
def test_describe_mount_targets_paging(efs, ec2, file_system):
|
||||
fs_id = file_system["FileSystemId"]
|
||||
|
||||
# Get a list of subnets.
|
||||
subnet_list = ec2.describe_subnets()["Subnets"]
|
||||
|
||||
# Create several mount targets.
|
||||
for subnet in subnet_list:
|
||||
efs.create_mount_target(FileSystemId=fs_id, SubnetId=subnet["SubnetId"])
|
||||
|
||||
# First call (Start)
|
||||
# ------------------
|
||||
|
||||
# Call the tested function
|
||||
resp1 = efs.describe_mount_targets(FileSystemId=fs_id, MaxItems=2)
|
||||
|
||||
# Check the response status
|
||||
assert has_status_code(resp1, 200)
|
||||
|
||||
# Check content of the result.
|
||||
resp1.pop("ResponseMetadata")
|
||||
assert set(resp1.keys()) == {"NextMarker", "MountTargets"}
|
||||
assert len(resp1["MountTargets"]) == 2
|
||||
mt_id_set_1 = {mt["MountTargetId"] for mt in resp1["MountTargets"]}
|
||||
|
||||
# Second call (Middle)
|
||||
# --------------------
|
||||
|
||||
# Get the next marker.
|
||||
resp2 = efs.describe_mount_targets(
|
||||
FileSystemId=fs_id, MaxItems=2, Marker=resp1["NextMarker"]
|
||||
)
|
||||
|
||||
# Check the response status
|
||||
resp2_metadata = resp2.pop("ResponseMetadata")
|
||||
assert resp2_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the response contents.
|
||||
assert set(resp2.keys()) == {"NextMarker", "MountTargets", "Marker"}
|
||||
assert len(resp2["MountTargets"]) == 2
|
||||
assert resp2["Marker"] == resp1["NextMarker"]
|
||||
mt_id_set_2 = {mt["MountTargetId"] for mt in resp2["MountTargets"]}
|
||||
assert mt_id_set_1 & mt_id_set_2 == set()
|
||||
|
||||
# Third call (End)
|
||||
# ----------------
|
||||
|
||||
# Get the last marker results
|
||||
resp3 = efs.describe_mount_targets(
|
||||
FileSystemId=fs_id, MaxItems=20, Marker=resp2["NextMarker"]
|
||||
)
|
||||
|
||||
# Check the response status
|
||||
resp3_metadata = resp3.pop("ResponseMetadata")
|
||||
assert resp3_metadata["HTTPStatusCode"] == 200
|
||||
|
||||
# Check the response contents.
|
||||
assert set(resp3.keys()) == {"MountTargets", "Marker"}
|
||||
assert resp3["Marker"] == resp2["NextMarker"]
|
||||
mt_id_set_3 = {mt["MountTargetId"] for mt in resp3["MountTargets"]}
|
||||
assert mt_id_set_3 & (mt_id_set_1 | mt_id_set_2) == set()
|
||||
|
||||
|
||||
def test_describe_mount_targets_invalid_file_system_id(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_mount_targets(FileSystemId="fs-12343289")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "FileSystemNotFound" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_describe_mount_targets_invalid_mount_target_id(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_mount_targets(MountTargetId="fsmt-ad9f8987")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "MountTargetNotFound" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_describe_mount_targets_no_id_given(efs):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.describe_mount_targets()
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 400)
|
||||
assert "BadRequest" in resp["Error"]["Message"]
|
||||
|
||||
|
||||
def test_delete_mount_target_minimal_case(efs, file_system, subnet):
|
||||
mt_info = efs.create_mount_target(
|
||||
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
|
||||
)
|
||||
resp = efs.delete_mount_target(MountTargetId=mt_info["MountTargetId"])
|
||||
assert has_status_code(resp, 204)
|
||||
desc_resp = efs.describe_mount_targets(FileSystemId=file_system["FileSystemId"])
|
||||
assert len(desc_resp["MountTargets"]) == 0
|
||||
|
||||
|
||||
def test_delete_mount_target_invalid_mount_target_id(efs, file_system, subnet):
|
||||
with pytest.raises(ClientError) as exc_info:
|
||||
efs.delete_mount_target(MountTargetId="fsmt-98487aef0a7")
|
||||
resp = exc_info.value.response
|
||||
assert has_status_code(resp, 404)
|
||||
assert "MountTargetNotFound" in resp["Error"]["Message"]
|
101
tests/test_efs/test_server.py
Normal file
101
tests/test_efs/test_server.py
Normal file
@ -0,0 +1,101 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import re
|
||||
from os import environ
|
||||
|
||||
import pytest
|
||||
|
||||
from moto import mock_efs, mock_ec2
|
||||
import moto.server as server
|
||||
|
||||
|
||||
FILE_SYSTEMS = "/2015-02-01/file-systems"
|
||||
MOUNT_TARGETS = "/2015-02-01/mount-targets"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def aws_credentials():
|
||||
"""Mocked AWS Credentials for moto."""
|
||||
environ["AWS_ACCESS_KEY_ID"] = "testing"
|
||||
environ["AWS_SESSION_TOKEN"] = "testing"
|
||||
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
|
||||
environ["AWS_SECURITY_TOKEN"] = "testing"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def efs_client(aws_credentials):
|
||||
with mock_efs():
|
||||
yield server.create_backend_app("efs").test_client()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def subnet_id(aws_credentials):
|
||||
with mock_ec2():
|
||||
ec2_client = server.create_backend_app("ec2").test_client()
|
||||
resp = ec2_client.get("/?Action=DescribeSubnets")
|
||||
subnet_ids = re.findall("<subnetId>(.*?)</subnetId>", resp.data.decode("utf-8"))
|
||||
yield subnet_ids[0]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def file_system_id(efs_client):
|
||||
resp = efs_client.post(
|
||||
FILE_SYSTEMS, json={"CreationToken": "foobarbaz", "Backup": True}
|
||||
)
|
||||
yield resp.json["FileSystemId"]
|
||||
|
||||
|
||||
"""
|
||||
Test each method current supported to ensure it connects via the server.
|
||||
|
||||
NOTE: this does NOT test whether the endpoints are really functioning, just that they
|
||||
connect and respond. Tests of functionality are contained in `test_file_system` and
|
||||
`test_mount_target`.
|
||||
"""
|
||||
|
||||
|
||||
def test_efs_file_system_create(efs_client):
|
||||
res = efs_client.post(FILE_SYSTEMS, json={"CreationToken": "2398asdfkajsdf"})
|
||||
assert res.status_code == 201
|
||||
|
||||
|
||||
def test_efs_file_system_describe(efs_client):
|
||||
res = efs_client.get(FILE_SYSTEMS)
|
||||
assert res.status_code == 200
|
||||
|
||||
|
||||
def test_efs_file_system_delete(file_system_id, efs_client):
|
||||
res = efs_client.delete("/2015-02-01/file-systems/{}".format(file_system_id))
|
||||
assert res.status_code == 204
|
||||
|
||||
|
||||
def test_efs_mount_target_create(file_system_id, subnet_id, efs_client):
|
||||
res = efs_client.post(
|
||||
"/2015-02-01/mount-targets",
|
||||
json={"FileSystemId": file_system_id, "SubnetId": subnet_id,},
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
|
||||
def test_efs_mount_target_describe(file_system_id, efs_client):
|
||||
res = efs_client.get(
|
||||
"/2015-02-01/mount-targets?FileSystemId={}".format(file_system_id)
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
|
||||
def test_efs_mount_target_delete(file_system_id, subnet_id, efs_client):
|
||||
create_res = efs_client.post(
|
||||
"/2015-02-01/mount-targets",
|
||||
json={"FileSystemId": file_system_id, "SubnetId": subnet_id,},
|
||||
)
|
||||
mt_id = create_res.json["MountTargetId"]
|
||||
res = efs_client.delete("/2015-02-01/mount-targets/{}".format(mt_id))
|
||||
assert res.status_code == 204
|
||||
|
||||
|
||||
def test_efs_describe_backup_policy(file_system_id, efs_client):
|
||||
res = efs_client.get(
|
||||
"/2015-02-01/file-systems/{}/backup-policy".format(file_system_id)
|
||||
)
|
||||
assert res.status_code == 200
|
Loading…
Reference in New Issue
Block a user