EFS enhancements (#4940)

This commit is contained in:
Bert Blommers 2022-03-15 18:51:03 -01:00 committed by GitHub
parent 67ab7f857a
commit bbd4b2afc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 864 additions and 98 deletions

View File

@ -2051,37 +2051,37 @@
## efs ## efs
<details> <details>
<summary>23% implemented</summary> <summary>56% implemented</summary>
- [ ] create_access_point - [X] create_access_point
- [X] create_file_system - [X] create_file_system
- [X] create_mount_target - [X] create_mount_target
- [ ] create_replication_configuration - [ ] create_replication_configuration
- [ ] create_tags - [ ] create_tags
- [ ] delete_access_point - [X] delete_access_point
- [X] delete_file_system - [X] delete_file_system
- [ ] delete_file_system_policy - [ ] delete_file_system_policy
- [X] delete_mount_target - [X] delete_mount_target
- [ ] delete_replication_configuration - [ ] delete_replication_configuration
- [ ] delete_tags - [ ] delete_tags
- [ ] describe_access_points - [X] describe_access_points
- [ ] describe_account_preferences - [ ] describe_account_preferences
- [X] describe_backup_policy - [X] describe_backup_policy
- [ ] describe_file_system_policy - [ ] describe_file_system_policy
- [X] describe_file_systems - [X] describe_file_systems
- [ ] describe_lifecycle_configuration - [X] describe_lifecycle_configuration
- [ ] describe_mount_target_security_groups - [X] describe_mount_target_security_groups
- [X] describe_mount_targets - [X] describe_mount_targets
- [ ] describe_replication_configurations - [ ] describe_replication_configurations
- [ ] describe_tags - [ ] describe_tags
- [ ] list_tags_for_resource - [X] list_tags_for_resource
- [ ] modify_mount_target_security_groups - [X] modify_mount_target_security_groups
- [ ] put_account_preferences - [ ] put_account_preferences
- [ ] put_backup_policy - [ ] put_backup_policy
- [ ] put_file_system_policy - [ ] put_file_system_policy
- [ ] put_lifecycle_configuration - [X] put_lifecycle_configuration
- [ ] tag_resource - [X] tag_resource
- [ ] untag_resource - [X] untag_resource
- [ ] update_file_system - [ ] update_file_system
</details> </details>

View File

@ -27,7 +27,7 @@ efs
|start-h3| Implemented features for this service |end-h3| |start-h3| Implemented features for this service |end-h3|
- [ ] create_access_point - [X] create_access_point
- [X] create_file_system - [X] create_file_system
Create a new EFS File System Volume. Create a new EFS File System Volume.
@ -45,7 +45,7 @@ efs
- [ ] create_replication_configuration - [ ] create_replication_configuration
- [ ] create_tags - [ ] create_tags
- [ ] delete_access_point - [X] delete_access_point
- [X] delete_file_system - [X] delete_file_system
Delete the file system specified by the given file_system_id. Delete the file system specified by the given file_system_id.
@ -65,7 +65,11 @@ efs
- [ ] delete_replication_configuration - [ ] delete_replication_configuration
- [ ] delete_tags - [ ] delete_tags
- [ ] describe_access_points - [X] describe_access_points
Pagination is not yet implemented
- [ ] describe_account_preferences - [ ] describe_account_preferences
- [X] describe_backup_policy - [X] describe_backup_policy
- [ ] describe_file_system_policy - [ ] describe_file_system_policy
@ -75,26 +79,23 @@ efs
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
- [ ] describe_lifecycle_configuration - [X] describe_lifecycle_configuration
- [ ] describe_mount_target_security_groups - [X] describe_mount_target_security_groups
- [X] describe_mount_targets - [X] describe_mount_targets
Describe the mount targets given a mount target ID or a file system ID. Describe the mount targets given an access point ID, mount target ID or a file system ID.
Note that as of this writing access points, and thus access point IDs are not
supported.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
- [ ] describe_replication_configurations - [ ] describe_replication_configurations
- [ ] describe_tags - [ ] describe_tags
- [ ] list_tags_for_resource - [X] list_tags_for_resource
- [ ] modify_mount_target_security_groups - [X] modify_mount_target_security_groups
- [ ] put_account_preferences - [ ] put_account_preferences
- [ ] put_backup_policy - [ ] put_backup_policy
- [ ] put_file_system_policy - [ ] put_file_system_policy
- [ ] put_lifecycle_configuration - [X] put_lifecycle_configuration
- [ ] tag_resource - [X] tag_resource
- [ ] untag_resource - [X] untag_resource
- [ ] update_file_system - [ ] update_file_system

View File

@ -1,10 +1,19 @@
from moto.core.exceptions import RESTError from moto.core.exceptions import JsonRESTError
class EFSError(RESTError): class EFSError(JsonRESTError):
pass pass
class AccessPointNotFound(EFSError):
code = 404
def __init__(self, access_point_id):
super().__init__(
"AccessPointNotFound", f"Access Point {access_point_id} does not exist."
)
class FileSystemAlreadyExists(EFSError): class FileSystemAlreadyExists(EFSError):
code = 409 code = 409
@ -13,7 +22,7 @@ class FileSystemAlreadyExists(EFSError):
"FileSystemAlreadyExists", "FileSystemAlreadyExists",
"File system with {} already exists.".format(creation_token), "File system with {} already exists.".format(creation_token),
*args, *args,
**kwargs **kwargs,
) )
@ -25,7 +34,7 @@ class FileSystemNotFound(EFSError):
"FileSystemNotFound", "FileSystemNotFound",
"File system {} does not exist.".format(file_system_id), "File system {} does not exist.".format(file_system_id),
*args, *args,
**kwargs **kwargs,
) )
@ -51,7 +60,7 @@ class MountTargetNotFound(EFSError):
"MountTargetNotFound", "MountTargetNotFound",
"Mount target '{}' does not exist.".format(mount_target_id), "Mount target '{}' does not exist.".format(mount_target_id),
*args, *args,
**kwargs **kwargs,
) )
@ -77,7 +86,7 @@ class SubnetNotFound(EFSError):
"SubnetNotFound", "SubnetNotFound",
"The subnet ID '{}' does not exist".format(subnet_id), "The subnet ID '{}' does not exist".format(subnet_id),
*args, *args,
**kwargs **kwargs,
) )
@ -89,7 +98,7 @@ class SecurityGroupNotFound(EFSError):
"SecurityGroupNotFound", "SecurityGroupNotFound",
"The SecurityGroup ID '{}' does not exist".format(security_group_id), "The SecurityGroup ID '{}' does not exist".format(security_group_id),
*args, *args,
**kwargs **kwargs,
) )

View File

@ -9,7 +9,7 @@ import time
from copy import deepcopy from copy import deepcopy
from hashlib import md5 from hashlib import md5
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel from moto.core import ACCOUNT_ID, BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import ( from moto.core.utils import (
camelcase_to_underscores, camelcase_to_underscores,
get_random_hex, get_random_hex,
@ -19,6 +19,7 @@ from moto.core.utils import (
from moto.ec2 import ec2_backends from moto.ec2 import ec2_backends
from moto.ec2.exceptions import InvalidSubnetIdError from moto.ec2.exceptions import InvalidSubnetIdError
from moto.efs.exceptions import ( from moto.efs.exceptions import (
AccessPointNotFound,
BadRequest, BadRequest,
FileSystemAlreadyExists, FileSystemAlreadyExists,
FileSystemInUse, FileSystemInUse,
@ -30,6 +31,7 @@ from moto.efs.exceptions import (
SecurityGroupNotFound, SecurityGroupNotFound,
SecurityGroupLimitExceeded, SecurityGroupLimitExceeded,
) )
from moto.utilities.tagging_service import TaggingService
def _lookup_az_id(az_name): def _lookup_az_id(az_name):
@ -40,6 +42,48 @@ def _lookup_az_id(az_name):
return zone.zone_id return zone.zone_id
class AccessPoint(BaseModel):
def __init__(
self,
region_name,
client_token,
file_system_id,
name,
posix_user,
root_directory,
context,
):
self.access_point_id = get_random_hex(8)
self.access_point_arn = "arn:aws:elasticfilesystem:{region}:{user_id}:access-point/fsap-{file_system_id}".format(
region=region_name, user_id=ACCOUNT_ID, file_system_id=self.access_point_id
)
self.client_token = client_token
self.file_system_id = file_system_id
self.name = name
self.posix_user = posix_user
if not root_directory:
root_directory = {"Path": "/"}
self.root_directory = root_directory
self.context = context
def info_json(self):
tags = self.context.list_tags_for_resource(self.access_point_id)
return {
"ClientToken": self.client_token,
"Name": self.name,
"Tags": tags,
"AccessPointId": self.access_point_id,
"AccessPointArn": self.access_point_arn,
"FileSystemId": self.file_system_id,
"PosixUser": self.posix_user,
"RootDirectory": self.root_directory,
"OwnerId": ACCOUNT_ID,
"LifeCycleState": "available",
}
class FileSystem(CloudFormationModel): class FileSystem(CloudFormationModel):
"""A model for an EFS File System Volume.""" """A model for an EFS File System Volume."""
@ -48,16 +92,16 @@ class FileSystem(CloudFormationModel):
region_name, region_name,
creation_token, creation_token,
file_system_id, file_system_id,
performance_mode="generalPurpose", context,
encrypted=False, performance_mode,
kms_key_id=None, encrypted,
throughput_mode="bursting", kms_key_id,
provisioned_throughput_in_mibps=None, throughput_mode,
availability_zone_name=None, provisioned_throughput_in_mibps,
backup=False, availability_zone_name,
backup,
lifecycle_policies=None, lifecycle_policies=None,
file_system_policy=None, file_system_policy=None,
tags=None,
): ):
if availability_zone_name: if availability_zone_name:
backup = True backup = True
@ -66,31 +110,20 @@ class FileSystem(CloudFormationModel):
# Save given parameters # Save given parameters
self.creation_token = creation_token self.creation_token = creation_token
self.performance_mode = performance_mode self.performance_mode = performance_mode or "generalPurpose"
self.encrypted = encrypted self.encrypted = encrypted or False
self.kms_key_id = kms_key_id self.kms_key_id = kms_key_id
self.throughput_mode = throughput_mode self.throughput_mode = throughput_mode or "bursting"
self.provisioned_throughput_in_mibps = provisioned_throughput_in_mibps self.provisioned_throughput_in_mibps = provisioned_throughput_in_mibps
self.availability_zone_name = availability_zone_name self.availability_zone_name = availability_zone_name
self.availability_zone_id = None self.availability_zone_id = None
if self.availability_zone_name: if self.availability_zone_name:
self.availability_zone_id = _lookup_az_id(self.availability_zone_name) self.availability_zone_id = _lookup_az_id(self.availability_zone_name)
self._backup = backup self._backup = backup
self.lifecycle_policies = lifecycle_policies self.lifecycle_policies = lifecycle_policies or []
self.file_system_policy = file_system_policy self.file_system_policy = file_system_policy
# Validate tag structure. self._context = context
if tags is None:
self.tags = []
else:
if (
not isinstance(tags, list)
or not all(isinstance(tag, dict) for tag in tags)
or not all(set(tag.keys()) == {"Key", "Value"} for tag in tags)
):
raise ValueError("Invalid tags: {}".format(tags))
else:
self.tags = tags
# Generate AWS-assigned parameters # Generate AWS-assigned parameters
self.file_system_id = file_system_id self.file_system_id = file_system_id
@ -135,6 +168,7 @@ class FileSystem(CloudFormationModel):
for k, v in self.__dict__.items() for k, v in self.__dict__.items()
if not k.startswith("_") if not k.startswith("_")
} }
ret["Tags"] = self._context.list_tags_for_resource(self.file_system_id)
ret["SizeInBytes"] = self.size_in_bytes ret["SizeInBytes"] = self.size_in_bytes
ret["NumberOfMountTargets"] = self.number_of_mount_targets ret["NumberOfMountTargets"] = self.number_of_mount_targets
return ret return ret
@ -318,9 +352,11 @@ class EFSBackend(BaseBackend):
super().__init__() super().__init__()
self.region_name = region_name self.region_name = region_name
self.creation_tokens = set() self.creation_tokens = set()
self.access_points = dict()
self.file_systems_by_id = {} self.file_systems_by_id = {}
self.mount_targets_by_id = {} self.mount_targets_by_id = {}
self.next_markers = {} self.next_markers = {}
self.tagging_service = TaggingService()
def reset(self): def reset(self):
# preserve region # preserve region
@ -331,7 +367,8 @@ class EFSBackend(BaseBackend):
def _mark_description(self, corpus, max_items): def _mark_description(self, corpus, max_items):
if max_items < len(corpus): if max_items < len(corpus):
new_corpus = corpus[max_items:] new_corpus = corpus[max_items:]
new_hash = md5(json.dumps(new_corpus).encode("utf-8")) new_corpus_dict = [c.info_json() for c in new_corpus]
new_hash = md5(json.dumps(new_corpus_dict).encode("utf-8"))
next_marker = new_hash.hexdigest() next_marker = new_hash.hexdigest()
self.next_markers[next_marker] = new_corpus self.next_markers[next_marker] = new_corpus
else: else:
@ -342,7 +379,18 @@ class EFSBackend(BaseBackend):
def ec2_backend(self): def ec2_backend(self):
return ec2_backends[self.region_name] return ec2_backends[self.region_name]
def create_file_system(self, creation_token, **params): def create_file_system(
self,
creation_token,
performance_mode,
encrypted,
kms_key_id,
throughput_mode,
provisioned_throughput_in_mibps,
availability_zone_name,
backup,
tags,
):
"""Create a new EFS File System Volume. """Create a new EFS File System Volume.
https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html
@ -363,12 +411,22 @@ class EFSBackend(BaseBackend):
self.region_name, self.region_name,
creation_token, creation_token,
fsid, fsid,
**{k: v for k, v in params.items() if v is not None} context=self,
performance_mode=performance_mode,
encrypted=encrypted,
kms_key_id=kms_key_id,
throughput_mode=throughput_mode,
provisioned_throughput_in_mibps=provisioned_throughput_in_mibps,
availability_zone_name=availability_zone_name,
backup=backup,
) )
self.tag_resource(fsid, tags)
self.creation_tokens.add(creation_token) self.creation_tokens.add(creation_token)
return self.file_systems_by_id[fsid] return self.file_systems_by_id[fsid]
def describe_file_systems(self, marker, max_items, creation_token, file_system_id): def describe_file_systems(
self, marker=None, max_items=10, creation_token=None, file_system_id=None
):
"""Describe all the EFS File Systems, or specific File Systems. """Describe all the EFS File Systems, or specific File Systems.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
@ -383,7 +441,7 @@ class EFSBackend(BaseBackend):
corpus = [] corpus = []
for fs in self.file_systems_by_id.values(): for fs in self.file_systems_by_id.values():
if fs.creation_token == creation_token: if fs.creation_token == creation_token:
corpus.append(fs.info_json()) corpus.append(fs)
elif file_system_id: elif file_system_id:
# Handle the case that a file_system_id is given. # Handle the case that a file_system_id is given.
if file_system_id not in self.file_systems_by_id: if file_system_id not in self.file_systems_by_id:
@ -396,7 +454,7 @@ class EFSBackend(BaseBackend):
corpus = self.next_markers[marker] corpus = self.next_markers[marker]
else: else:
# Handle the vanilla case. # Handle the vanilla case.
corpus = [fs.info_json() for fs in self.file_systems_by_id.values()] corpus = [fs for fs in self.file_systems_by_id.values()]
# Handle the max_items parameter. # Handle the max_items parameter.
file_systems = corpus[:max_items] file_systems = corpus[:max_items]
@ -445,42 +503,40 @@ class EFSBackend(BaseBackend):
def describe_mount_targets( def describe_mount_targets(
self, max_items, file_system_id, mount_target_id, access_point_id, marker self, max_items, file_system_id, mount_target_id, access_point_id, marker
): ):
"""Describe the mount targets given a mount target ID or a file system ID. """Describe the mount targets given an access point ID, mount target ID or a file system ID.
Note that as of this writing access points, and thus access point IDs are not
supported.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
""" """
# Restrict the possible corpus of results based on inputs. # Restrict the possible corpus of results based on inputs.
if not (bool(file_system_id) ^ bool(mount_target_id) ^ bool(access_point_id)): if not (bool(file_system_id) ^ bool(mount_target_id) ^ bool(access_point_id)):
raise BadRequest("Must specify exactly one mutually exclusive parameter.") raise BadRequest("Must specify exactly one mutually exclusive parameter.")
elif file_system_id:
if access_point_id:
file_system_id = self.access_points[access_point_id].file_system_id
if file_system_id:
# Handle the case that a file_system_id is given. # Handle the case that a file_system_id is given.
if file_system_id not in self.file_systems_by_id: if file_system_id not in self.file_systems_by_id:
raise FileSystemNotFound(file_system_id) raise FileSystemNotFound(file_system_id)
corpus = [ corpus = [
mt.info_json() mt
for mt in self.file_systems_by_id[file_system_id].iter_mount_targets() for mt in self.file_systems_by_id[file_system_id].iter_mount_targets()
] ]
elif mount_target_id: elif mount_target_id:
if mount_target_id not in self.mount_targets_by_id: if mount_target_id not in self.mount_targets_by_id:
raise MountTargetNotFound(mount_target_id) raise MountTargetNotFound(mount_target_id)
# Handle mount target specification case. # Handle mount target specification case.
corpus = [self.mount_targets_by_id[mount_target_id].info_json()] corpus = [self.mount_targets_by_id[mount_target_id]]
else:
# We don't handle access_point_id's yet.
assert False, "Moto does not yet support EFS access points."
# Handle the case that a marker is given. Note that the handling is quite # Handle the case that a marker is given. Note that the handling is quite
# different from that in describe_file_systems. # different from that in describe_file_systems.
if marker is not None: if marker is not None:
if marker not in self.next_markers: if marker not in self.next_markers:
raise BadRequest("Invalid Marker") raise BadRequest("Invalid Marker")
corpus_mtids = {m["MountTargetId"] for m in corpus} corpus_mtids = {m.mount_target_id for m in corpus}
marked_mtids = {m["MountTargetId"] for m in self.next_markers[marker]} marked_mtids = {m.mount_target_id for m in self.next_markers[marker]}
mt_ids = corpus_mtids & marked_mtids mt_ids = corpus_mtids & marked_mtids
corpus = [self.mount_targets_by_id[mt_id].info_json() for mt_id in mt_ids] corpus = [self.mount_targets_by_id[mt_id] for mt_id in mt_ids]
# Handle the max_items parameter. # Handle the max_items parameter.
mount_targets = corpus[:max_items] mount_targets = corpus[:max_items]
@ -529,5 +585,72 @@ class EFSBackend(BaseBackend):
raise PolicyNotFound("None") raise PolicyNotFound("None")
return backup_policy return backup_policy
def put_lifecycle_configuration(self, file_system_id, policies):
_, fss = self.describe_file_systems(file_system_id=file_system_id)
file_system = fss[0]
file_system.lifecycle_policies = policies
def describe_lifecycle_configuration(self, file_system_id):
_, fss = self.describe_file_systems(file_system_id=file_system_id)
file_system = fss[0]
return file_system.lifecycle_policies
def describe_mount_target_security_groups(self, mount_target_id):
if mount_target_id not in self.mount_targets_by_id:
raise MountTargetNotFound(mount_target_id)
mount_target = self.mount_targets_by_id[mount_target_id]
return mount_target.security_groups
def modify_mount_target_security_groups(self, mount_target_id, security_groups):
if mount_target_id not in self.mount_targets_by_id:
raise MountTargetNotFound(mount_target_id)
mount_target = self.mount_targets_by_id[mount_target_id]
mount_target.security_groups = security_groups
self.ec2_backend.modify_network_interface_attribute(
eni_id=mount_target.network_interface_id, group_ids=security_groups
)
def create_access_point(
self, client_token, tags, file_system_id, posix_user, root_directory
):
name = next((tag["Value"] for tag in tags if tag["Key"] == "Name"), None)
access_point = AccessPoint(
self.region_name,
client_token,
file_system_id,
name,
posix_user,
root_directory,
context=self,
)
self.tagging_service.tag_resource(access_point.access_point_id, tags)
self.access_points[access_point.access_point_id] = access_point
return access_point
def describe_access_points(self, access_point_id):
"""
Pagination is not yet implemented
"""
if access_point_id:
if access_point_id not in self.access_points:
raise AccessPointNotFound(access_point_id)
return [self.access_points[access_point_id]]
return self.access_points.values()
def delete_access_point(self, access_point_id):
self.access_points.pop(access_point_id, None)
def list_tags_for_resource(self, resource_id):
return self.tagging_service.list_tags_for_resource(resource_id)["Tags"]
def tag_resource(self, resource_id, tags):
self.tagging_service.tag_resource(resource_id, tags)
def untag_resource(self, resource_id, tag_keys):
self.tagging_service.untag_resource_using_names(resource_id, tag_keys)
efs_backends = BackendDict(EFSBackend, "efs") efs_backends = BackendDict(EFSBackend, "efs")

View File

@ -23,7 +23,7 @@ class EFSResponse(BaseResponse):
) )
availability_zone_name = self._get_param("AvailabilityZoneName") availability_zone_name = self._get_param("AvailabilityZoneName")
backup = self._get_param("Backup") backup = self._get_param("Backup")
tags = self._get_param("Tags") tags = self._get_param("Tags") or []
resource = self.efs_backend.create_file_system( resource = self.efs_backend.create_file_system(
creation_token=creation_token, creation_token=creation_token,
performance_mode=performance_mode, performance_mode=performance_mode,
@ -51,7 +51,7 @@ class EFSResponse(BaseResponse):
creation_token=creation_token, creation_token=creation_token,
file_system_id=file_system_id, file_system_id=file_system_id,
) )
resp_json = {"FileSystems": file_systems} resp_json = {"FileSystems": [fs.info_json() for fs in file_systems]}
if marker: if marker:
resp_json["Marker"] = marker resp_json["Marker"] = marker
if next_marker: if next_marker:
@ -87,7 +87,7 @@ class EFSResponse(BaseResponse):
access_point_id=access_point_id, access_point_id=access_point_id,
marker=marker, marker=marker,
) )
resp_json = {"MountTargets": mount_targets} resp_json = {"MountTargets": [mt.info_json() for mt in mount_targets]}
if marker: if marker:
resp_json["Marker"] = marker resp_json["Marker"] = marker
if next_marker: if next_marker:
@ -109,3 +109,80 @@ class EFSResponse(BaseResponse):
backup_policy = self.efs_backend.describe_backup_policy(file_system_id) backup_policy = self.efs_backend.describe_backup_policy(file_system_id)
resp = {"BackupPolicy": backup_policy} resp = {"BackupPolicy": backup_policy}
return json.dumps(resp), {"Content-Type": "application/json"} return json.dumps(resp), {"Content-Type": "application/json"}
def put_lifecycle_configuration(self):
file_system_id = self._get_param("FileSystemId")
policies = self._get_param("LifecyclePolicies")
self.efs_backend.put_lifecycle_configuration(file_system_id, policies)
return json.dumps({"LifecyclePolicies": policies}), {
"Content-Type": "application/json"
}
def describe_lifecycle_configuration(self):
file_system_id = self._get_param("FileSystemId")
policies = self.efs_backend.describe_lifecycle_configuration(file_system_id)
return json.dumps({"LifecyclePolicies": policies}), {
"Content-Type": "application/json"
}
def describe_mount_target_security_groups(self):
mount_target_id = self._get_param("MountTargetId")
security_groups = self.efs_backend.describe_mount_target_security_groups(
mount_target_id
)
return json.dumps({"SecurityGroups": security_groups}), {
"Content-Type": "application/json"
}
def modify_mount_target_security_groups(self):
mount_target_id = self._get_param("MountTargetId")
security_groups = self._get_param("SecurityGroups")
self.efs_backend.modify_mount_target_security_groups(
mount_target_id, security_groups
)
return "{}", {"Content-Type": "application/json"}
def create_access_point(self):
client_token = self._get_param("ClientToken")
tags = self._get_param("Tags") or []
file_system_id = self._get_param("FileSystemId")
posix_user = self._get_param("PosixUser")
root_directory = self._get_param("RootDirectory")
access_point = self.efs_backend.create_access_point(
client_token,
tags=tags,
file_system_id=file_system_id,
posix_user=posix_user,
root_directory=root_directory,
)
return json.dumps(access_point.info_json()), {
"Content-Type": "application/json"
}
def describe_access_points(self):
access_point_id = self._get_param("AccessPointId")
access_points = self.efs_backend.describe_access_points(access_point_id)
resp = [ap.info_json() for ap in access_points]
return json.dumps({"AccessPoints": resp}), {"Content-Type": "application/json"}
def delete_access_point(self):
access_point_id = self._get_param("AccessPointId")
self.efs_backend.delete_access_point(access_point_id)
return "{}", {"Content-Type": "application/json"}
def list_tags_for_resource(self):
resource_id = self._get_param("ResourceId")
tags = self.efs_backend.list_tags_for_resource(resource_id)
return json.dumps({"Tags": tags}), {"Content-Type": "application/json"}
def tag_resource(self):
resource_id = self._get_param("ResourceId")
tags = self._get_param("Tags")
self.efs_backend.tag_resource(resource_id, tags)
return "{}", {"Content-Type": "application/json"}
def untag_resource(self):
resource_id = self._get_param("ResourceId")
tag_keys = self.querystring.get("tagKeys", [])
self.efs_backend.untag_resource(resource_id, tag_keys)
return "{}", {"Content-Type": "application/json"}

View File

@ -11,9 +11,14 @@ response = EFSResponse()
url_paths = { url_paths = {
"{0}/.*?$": response.dispatch, "{0}/.*?$": response.dispatch,
"/2015-02-01/access-points": response.dispatch,
"/2015-02-01/access-points/<access_point_id>": response.dispatch,
"/2015-02-01/file-systems": response.dispatch, "/2015-02-01/file-systems": response.dispatch,
"/2015-02-01/file-systems/<file_system_id>": response.dispatch, "/2015-02-01/file-systems/<file_system_id>": response.dispatch,
"/2015-02-01/file-systems/<file_system_id>/backup-policy": response.dispatch, "/2015-02-01/file-systems/<file_system_id>/backup-policy": response.dispatch,
"/2015-02-01/file-systems/<file_system_id>/lifecycle-configuration": response.dispatch,
"/2015-02-01/mount-targets": response.dispatch, "/2015-02-01/mount-targets": response.dispatch,
"/2015-02-01/mount-targets/<mount_target_id>": response.dispatch, "/2015-02-01/mount-targets/<mount_target_id>": response.dispatch,
"/2015-02-01/mount-targets/<mount_target_id>/security-groups": response.dispatch,
"/2015-02-01/resource-tags/<resource_id>": response.dispatch,
} }

View File

@ -69,6 +69,8 @@ TestAccAWSEcrReplicationConfiguration
TestAccAWSEcrRepository TestAccAWSEcrRepository
TestAccAWSEcrRepositoryDataSource TestAccAWSEcrRepositoryDataSource
TestAccAWSEcrRepositoryPolicy TestAccAWSEcrRepositoryPolicy
TestAccAWSEFSAccessPoint
TestAccAWSEFSMountTarget
TestAccAWSEgressOnlyInternetGateway TestAccAWSEgressOnlyInternetGateway
TestAccAWSElasticBeanstalkSolutionStackDataSource TestAccAWSElasticBeanstalkSolutionStackDataSource
TestAccAWSElbHostedZoneId TestAccAWSElbHostedZoneId
@ -127,5 +129,9 @@ TestAccAWSVpc_
TestAccAWSVpcEndpointService TestAccAWSVpcEndpointService
TestAccAWSVpnGateway TestAccAWSVpnGateway
TestAccAWSVpnGatewayAttachment TestAccAWSVpnGatewayAttachment
TestAccDataSourceAWSEFSAccessPoint
TestAccDataSourceAWSEFSAccessPoints
TestAccDataSourceAwsEfsFileSystem
TestAccDataSourceAwsEfsMountTarget
TestAccDataSourceAwsNetworkInterface_ TestAccDataSourceAwsNetworkInterface_
TestValidateSSMDocumentPermissions TestValidateSSMDocumentPermissions

View File

@ -0,0 +1,94 @@
from os import environ
import boto3
import pytest
from moto import mock_efs
@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
environ["AWS_ACCESS_KEY_ID"] = "testing"
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
environ["AWS_SECURITY_TOKEN"] = "testing"
environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture(scope="function")
def efs(aws_credentials): # pylint: disable=unused-argument
with mock_efs():
yield boto3.client("efs", region_name="us-east-1")
@pytest.fixture(scope="function")
def file_system(efs):
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
create_fs_resp.pop("ResponseMetadata")
yield create_fs_resp
def test_list_tags_for_resource__without_tags(efs, file_system):
file_system_id = file_system["FileSystemId"]
ap_id = efs.create_access_point(ClientToken="ct", FileSystemId=file_system_id)[
"AccessPointId"
]
resp = efs.list_tags_for_resource(ResourceId=ap_id)
resp.should.have.key("Tags").equals([])
def test_list_tags_for_resource__with_tags(efs, file_system):
file_system_id = file_system["FileSystemId"]
ap_id = efs.create_access_point(
ClientToken="ct",
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
FileSystemId=file_system_id,
)["AccessPointId"]
resp = efs.list_tags_for_resource(ResourceId=ap_id)
resp.should.have.key("Tags").equals(
[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
)
def test_tag_resource(efs, file_system):
file_system_id = file_system["FileSystemId"]
ap_id = efs.create_access_point(ClientToken="ct", FileSystemId=file_system_id)[
"AccessPointId"
]
efs.tag_resource(
ResourceId=ap_id,
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
)
resp = efs.list_tags_for_resource(ResourceId=ap_id)
resp.should.have.key("Tags").equals(
[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
)
def test_untag_resource(efs, file_system):
file_system_id = file_system["FileSystemId"]
ap_id = efs.create_access_point(
ClientToken="ct",
Tags=[{"Key": "key1", "Value": "val1"}],
FileSystemId=file_system_id,
)["AccessPointId"]
efs.tag_resource(
ResourceId=ap_id,
Tags=[{"Key": "key2", "Value": "val2"}, {"Key": "key3", "Value": "val3"}],
)
efs.untag_resource(ResourceId=ap_id, TagKeys=["key2"])
resp = efs.list_tags_for_resource(ResourceId=ap_id)
resp.should.have.key("Tags").equals(
[{"Key": "key1", "Value": "val1"}, {"Key": "key3", "Value": "val3"}]
)

View File

@ -0,0 +1,148 @@
from os import environ
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_efs
from moto.core import ACCOUNT_ID
@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
environ["AWS_ACCESS_KEY_ID"] = "testing"
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
environ["AWS_SECURITY_TOKEN"] = "testing"
environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture(scope="function")
def efs(aws_credentials): # pylint: disable=unused-argument
with mock_efs():
yield boto3.client("efs", region_name="us-east-1")
@pytest.fixture(scope="function")
def file_system(efs):
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
create_fs_resp.pop("ResponseMetadata")
yield create_fs_resp
def test_describe_access_points__initial(efs):
resp = efs.describe_access_points()
resp.should.have.key("AccessPoints").equals([])
def test_create_access_point__simple(efs, file_system):
fs_id = file_system["FileSystemId"]
resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
resp.should.have.key("ClientToken").equals("ct")
resp.should.have.key("AccessPointId")
resp.should.have.key("AccessPointArn")
resp.should.have.key("FileSystemId").equals(fs_id)
resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
resp.should.have.key("LifeCycleState").equals("available")
resp.should.have.key("RootDirectory").equals({"Path": "/"})
def test_create_access_point__full(efs, file_system):
fs_id = file_system["FileSystemId"]
resp = efs.create_access_point(
ClientToken="ct",
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
FileSystemId=fs_id,
PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
RootDirectory={
"Path": "/root/path",
"CreationInfo": {
"OwnerUid": 987,
"OwnerGid": 986,
"Permissions": "root_permissions",
},
},
)
resp.should.have.key("ClientToken").equals("ct")
resp.should.have.key("Name").equals("myname")
resp.should.have.key("Tags").equals(
[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
)
resp.should.have.key("AccessPointId")
resp.should.have.key("AccessPointArn")
resp.should.have.key("FileSystemId").equals(fs_id)
resp.should.have.key("PosixUser").equals(
{"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
)
resp.should.have.key("RootDirectory").equals(
{
"Path": "/root/path",
"CreationInfo": {
"OwnerUid": 987,
"OwnerGid": 986,
"Permissions": "root_permissions",
},
}
)
resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
resp.should.have.key("LifeCycleState").equals("available")
def test_describe_access_point(efs, file_system):
fs_id = file_system["FileSystemId"]
access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
"AccessPointId"
]
resp = efs.describe_access_points(AccessPointId=access_point_id)
resp.should.have.key("AccessPoints").length_of(1)
access_point = resp["AccessPoints"][0]
access_point.should.have.key("ClientToken").equals("ct")
access_point.should.have.key("AccessPointId")
access_point.should.have.key("AccessPointArn")
access_point.should.have.key("FileSystemId").equals(fs_id)
access_point.should.have.key("OwnerId").equals(ACCOUNT_ID)
access_point.should.have.key("LifeCycleState").equals("available")
def test_describe_access_points__multiple(efs, file_system):
fs_id = file_system["FileSystemId"]
efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)
resp = efs.describe_access_points()
resp.should.have.key("AccessPoints").length_of(2)
def test_delete_access_points(efs, file_system):
fs_id = file_system["FileSystemId"]
ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
"AccessPointId"
]
ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
"AccessPointId"
]
# Delete one access point
efs.delete_access_point(AccessPointId=ap_id2)
# We can only find one
resp = efs.describe_access_points()
resp.should.have.key("AccessPoints").length_of(1)
# The first one still exists
efs.describe_access_points(AccessPointId=ap_id1)
# The second one is gone
with pytest.raises(ClientError) as exc_info:
efs.describe_access_points(AccessPointId=ap_id2)
err = exc_info.value.response["Error"]
err["Code"].should.equal("AccessPointNotFound")

View File

@ -80,7 +80,7 @@ def test_create_file_system_correct_use(efs):
efs.describe_backup_policy(FileSystemId=create_fs_resp["FileSystemId"]) efs.describe_backup_policy(FileSystemId=create_fs_resp["FileSystemId"])
resp = exc_info.value.response resp = exc_info.value.response
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 404 assert resp["ResponseMetadata"]["HTTPStatusCode"] == 404
assert "PolicyNotFound" in resp["Error"]["Message"] assert "PolicyNotFound" == resp["Error"]["Code"]
# Check the arn in detail # Check the arn in detail
match_obj = re.match(ARN_PATT, create_fs_resp["FileSystemArn"]) match_obj = re.match(ARN_PATT, create_fs_resp["FileSystemArn"])
@ -167,13 +167,32 @@ def test_create_file_system_file_system_already_exists(efs):
efs.create_file_system(CreationToken="foo") efs.create_file_system(CreationToken="foo")
resp = exc_info.value.response resp = exc_info.value.response
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 409 assert resp["ResponseMetadata"]["HTTPStatusCode"] == 409
assert "FileSystemAlreadyExists" in resp["Error"]["Message"] assert "FileSystemAlreadyExists" == resp["Error"]["Code"]
# Testing Describe # Testing Describe
# ================ # ================
def test_describe_file_systems_using_identifier(efs):
# Create the file system.
create_fs_resp = efs.create_file_system(CreationToken="foobar")
create_fs_resp.pop("ResponseMetadata")
fs_id = create_fs_resp["FileSystemId"]
# Describe the file system.
desc_fs_resp = efs.describe_file_systems(FileSystemId=fs_id)
desc_fs_resp.should.have.key("FileSystems").length_of(1)
desc_fs_resp["FileSystems"][0].should.have.key("FileSystemId").equals(fs_id)
def test_describe_file_systems_using_unknown_identifier(efs):
with pytest.raises(ClientError) as exc:
efs.describe_file_systems(FileSystemId="unknown")
err = exc.value.response["Error"]
err["Code"].should.equal("FileSystemNotFound")
def test_describe_file_systems_minimal_case(efs): def test_describe_file_systems_minimal_case(efs):
# Create the file system. # Create the file system.
create_fs_resp = efs.create_file_system(CreationToken="foobar") create_fs_resp = efs.create_file_system(CreationToken="foobar")
@ -306,7 +325,7 @@ def test_describe_file_systems_invalid_marker(efs):
efs.describe_file_systems(Marker="fiddlesticks") efs.describe_file_systems(Marker="fiddlesticks")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 400) assert has_status_code(resp, 400)
assert "BadRequest" in resp["Error"]["Message"] assert "BadRequest" == resp["Error"]["Code"]
def test_describe_file_systems_invalid_creation_token(efs): def test_describe_file_systems_invalid_creation_token(efs):
@ -320,7 +339,7 @@ def test_describe_file_systems_invalid_file_system_id(efs):
efs.describe_file_systems(FileSystemId="fs-29879313") efs.describe_file_systems(FileSystemId="fs-29879313")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "FileSystemNotFound" in resp["Error"]["Message"] assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_describe_file_system_creation_token_and_file_system_id(efs): def test_describe_file_system_creation_token_and_file_system_id(efs):
@ -328,7 +347,7 @@ def test_describe_file_system_creation_token_and_file_system_id(efs):
efs.describe_file_systems(CreationToken="fizzle", FileSystemId="fs-07987987") efs.describe_file_systems(CreationToken="fizzle", FileSystemId="fs-07987987")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 400) assert has_status_code(resp, 400)
assert "BadRequest" in resp["Error"]["Message"] assert "BadRequest" == resp["Error"]["Code"]
# Testing Delete # Testing Delete
@ -358,4 +377,4 @@ def test_delete_file_system_invalid_file_system_id(efs):
efs.delete_file_system(FileSystemId="fs-2394287") efs.delete_file_system(FileSystemId="fs-2394287")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "FileSystemNotFound" in resp["Error"]["Message"] assert "FileSystemNotFound" == resp["Error"]["Code"]

View File

@ -0,0 +1,79 @@
from os import environ
import boto3
import pytest
from moto import mock_efs
@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
environ["AWS_ACCESS_KEY_ID"] = "testing"
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
environ["AWS_SECURITY_TOKEN"] = "testing"
environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture(scope="function")
def efs(aws_credentials): # pylint: disable=unused-argument
with mock_efs():
yield boto3.client("efs", region_name="us-east-1")
def test_list_tags_for_resource__without_tags(efs):
file_system = efs.create_file_system(CreationToken="foobarbaz")
fs_id = file_system["FileSystemId"]
resp = efs.list_tags_for_resource(ResourceId=fs_id)
resp.should.have.key("Tags").equals([])
def test_list_tags_for_resource__with_tags(efs):
file_system = efs.create_file_system(
CreationToken="foobarbaz",
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
)
fs_id = file_system["FileSystemId"]
resp = efs.list_tags_for_resource(ResourceId=fs_id)
resp.should.have.key("Tags").equals(
[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
)
def test_tag_resource(efs):
file_system = efs.create_file_system(
CreationToken="foobarbaz",
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
)
fs_id = file_system["FileSystemId"]
efs.tag_resource(
ResourceId=fs_id,
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
)
resp = efs.list_tags_for_resource(ResourceId=fs_id)
resp.should.have.key("Tags").equals(
[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
)
def test_untag_resource(efs):
file_system = efs.create_file_system(
CreationToken="foobarbaz", Tags=[{"Key": "key1", "Value": "val1"}]
)
fs_id = file_system["FileSystemId"]
efs.tag_resource(
ResourceId=fs_id,
Tags=[{"Key": "key2", "Value": "val2"}, {"Key": "key3", "Value": "val3"}],
)
efs.untag_resource(ResourceId=fs_id, TagKeys=["key2"])
resp = efs.list_tags_for_resource(ResourceId=fs_id)
resp.should.have.key("Tags").equals(
[{"Key": "key1", "Value": "val1"}, {"Key": "key3", "Value": "val3"}]
)

View File

@ -0,0 +1,57 @@
from os import environ
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_efs
@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
environ["AWS_ACCESS_KEY_ID"] = "testing"
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
environ["AWS_SECURITY_TOKEN"] = "testing"
environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture(scope="function")
def efs(aws_credentials): # pylint: disable=unused-argument
with mock_efs():
yield boto3.client("efs", region_name="us-east-1")
def test_describe_filesystem_config__unknown(efs):
with pytest.raises(ClientError) as exc_info:
efs.describe_lifecycle_configuration(FileSystemId="unknown")
err = exc_info.value.response["Error"]
err["Code"].should.equal("FileSystemNotFound")
err["Message"].should.equal("File system unknown does not exist.")
def test_describe_filesystem_config__initial(efs):
create_fs_resp = efs.create_file_system(CreationToken="foobar")
fs_id = create_fs_resp["FileSystemId"]
resp = efs.describe_lifecycle_configuration(FileSystemId=fs_id)
resp.should.have.key("LifecyclePolicies").equals([])
def test_put_lifecycle_configuration(efs):
# Create the file system.
create_fs_resp = efs.create_file_system(CreationToken="foobar")
create_fs_resp.pop("ResponseMetadata")
fs_id = create_fs_resp["FileSystemId"]
# Create the lifecycle configuration
resp = efs.put_lifecycle_configuration(
FileSystemId=fs_id, LifecyclePolicies=[{"TransitionToIA": "AFTER_30_DAYS"}]
)
resp.should.have.key("LifecyclePolicies").length_of(1)
resp["LifecyclePolicies"][0].should.equal({"TransitionToIA": "AFTER_30_DAYS"})
# Describe the lifecycle configuration
resp = efs.describe_lifecycle_configuration(FileSystemId=fs_id)
resp.should.have.key("LifecyclePolicies").length_of(1)
resp["LifecyclePolicies"][0].should.equal({"TransitionToIA": "AFTER_30_DAYS"})

View File

@ -143,7 +143,7 @@ def test_create_mount_target_invalid_file_system_id(efs, subnet):
efs.create_mount_target(FileSystemId="fs-12343289", SubnetId=subnet["SubnetId"]) efs.create_mount_target(FileSystemId="fs-12343289", SubnetId=subnet["SubnetId"])
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "FileSystemNotFound" in resp["Error"]["Message"] assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_create_mount_target_invalid_subnet_id(efs, file_system): def test_create_mount_target_invalid_subnet_id(efs, file_system):
@ -153,7 +153,7 @@ def test_create_mount_target_invalid_subnet_id(efs, file_system):
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "SubnetNotFound" in resp["Error"]["Message"] assert "SubnetNotFound" == resp["Error"]["Code"]
def test_create_mount_target_invalid_sg_id(efs, file_system, subnet): def test_create_mount_target_invalid_sg_id(efs, file_system, subnet):
@ -165,7 +165,7 @@ def test_create_mount_target_invalid_sg_id(efs, file_system, subnet):
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "SecurityGroupNotFound" in resp["Error"]["Message"] assert "SecurityGroupNotFound" == resp["Error"]["Code"]
def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet): def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet):
@ -183,7 +183,7 @@ def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet):
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 409) assert has_status_code(resp, 409)
assert "MountTargetConflict" in resp["Error"]["Message"] assert "MountTargetConflict" == resp["Error"]["Code"]
assert "VPC" in resp["Error"]["Message"] assert "VPC" in resp["Error"]["Message"]
@ -197,7 +197,7 @@ def test_create_mount_target_duplicate_subnet_id(efs, file_system, subnet):
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 409) assert has_status_code(resp, 409)
assert "MountTargetConflict" in resp["Error"]["Message"] assert "MountTargetConflict" == resp["Error"]["Code"]
assert "AZ" in resp["Error"]["Message"] assert "AZ" in resp["Error"]["Message"]
@ -217,7 +217,7 @@ def test_create_mount_target_subnets_in_same_zone(efs, ec2, file_system, subnet)
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 409) assert has_status_code(resp, 409)
assert "MountTargetConflict" in resp["Error"]["Message"] assert "MountTargetConflict" == resp["Error"]["Code"]
assert "AZ" in resp["Error"]["Message"] assert "AZ" in resp["Error"]["Message"]
@ -230,7 +230,7 @@ def test_create_mount_target_ip_address_out_of_range(efs, file_system, subnet):
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 400) assert has_status_code(resp, 400)
assert "BadRequest" in resp["Error"]["Message"] assert "BadRequest" == resp["Error"]["Code"]
assert "Address" in resp["Error"]["Message"] assert "Address" in resp["Error"]["Message"]
@ -251,7 +251,7 @@ def test_create_mount_target_too_many_security_groups(efs, ec2, file_system, sub
) )
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 400) assert has_status_code(resp, 400)
assert "SecurityGroupLimitExceeded" in resp["Error"]["Message"] assert "SecurityGroupLimitExceeded" == resp["Error"]["Code"]
def test_delete_file_system_mount_targets_attached( def test_delete_file_system_mount_targets_attached(
@ -264,7 +264,7 @@ def test_delete_file_system_mount_targets_attached(
efs.delete_file_system(FileSystemId=file_system["FileSystemId"]) efs.delete_file_system(FileSystemId=file_system["FileSystemId"])
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 409) assert has_status_code(resp, 409)
assert "FileSystemInUse" in resp["Error"]["Message"] assert "FileSystemInUse" == resp["Error"]["Code"]
def test_describe_mount_targets_minimal_case( def test_describe_mount_targets_minimal_case(
@ -290,6 +290,29 @@ def test_describe_mount_targets_minimal_case(
assert mount_target == create_resp assert mount_target == create_resp
def test_describe_mount_targets__by_access_point_id(
efs, ec2, file_system, subnet
): # pylint: disable=unused-argument
create_resp = efs.create_mount_target(
FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
)
create_resp.pop("ResponseMetadata")
ap_resp = efs.create_access_point(
ClientToken="ct1", FileSystemId=file_system["FileSystemId"]
)
access_point_id = ap_resp["AccessPointId"]
# Describe the mount targets
ap_resp = efs.describe_mount_targets(AccessPointId=access_point_id)
# Check the list results.
ap_resp.should.have.key("MountTargets").length_of(1)
ap_resp["MountTargets"][0]["MountTargetId"].should.equal(
create_resp["MountTargetId"]
)
def test_describe_mount_targets_paging(efs, ec2, file_system): def test_describe_mount_targets_paging(efs, ec2, file_system):
fs_id = file_system["FileSystemId"] fs_id = file_system["FileSystemId"]
@ -358,7 +381,7 @@ def test_describe_mount_targets_invalid_file_system_id(efs):
efs.describe_mount_targets(FileSystemId="fs-12343289") efs.describe_mount_targets(FileSystemId="fs-12343289")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "FileSystemNotFound" in resp["Error"]["Message"] assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_describe_mount_targets_invalid_mount_target_id(efs): def test_describe_mount_targets_invalid_mount_target_id(efs):
@ -366,7 +389,7 @@ def test_describe_mount_targets_invalid_mount_target_id(efs):
efs.describe_mount_targets(MountTargetId="fsmt-ad9f8987") efs.describe_mount_targets(MountTargetId="fsmt-ad9f8987")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "MountTargetNotFound" in resp["Error"]["Message"] assert "MountTargetNotFound" == resp["Error"]["Code"]
def test_describe_mount_targets_no_id_given(efs): def test_describe_mount_targets_no_id_given(efs):
@ -374,7 +397,7 @@ def test_describe_mount_targets_no_id_given(efs):
efs.describe_mount_targets() efs.describe_mount_targets()
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 400) assert has_status_code(resp, 400)
assert "BadRequest" in resp["Error"]["Message"] assert "BadRequest" == resp["Error"]["Code"]
def test_delete_mount_target_minimal_case(efs, file_system, subnet): def test_delete_mount_target_minimal_case(efs, file_system, subnet):
@ -392,4 +415,4 @@ def test_delete_mount_target_invalid_mount_target_id(efs):
efs.delete_mount_target(MountTargetId="fsmt-98487aef0a7") efs.delete_mount_target(MountTargetId="fsmt-98487aef0a7")
resp = exc_info.value.response resp = exc_info.value.response
assert has_status_code(resp, 404) assert has_status_code(resp, 404)
assert "MountTargetNotFound" in resp["Error"]["Message"] assert "MountTargetNotFound" == resp["Error"]["Code"]

View File

@ -0,0 +1,125 @@
from os import environ
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_ec2, mock_efs
@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
environ["AWS_ACCESS_KEY_ID"] = "testing"
environ["AWS_SECRET_ACCESS_KEY"] = "testing"
environ["AWS_SECURITY_TOKEN"] = "testing"
environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture(scope="function")
def ec2(aws_credentials): # pylint: disable=unused-argument
with mock_ec2():
yield boto3.client("ec2", region_name="us-east-1")
@pytest.fixture(scope="function")
def efs(aws_credentials): # pylint: disable=unused-argument
with mock_efs():
yield boto3.client("efs", region_name="us-east-1")
@pytest.fixture(scope="function")
def file_system(efs):
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
create_fs_resp.pop("ResponseMetadata")
yield create_fs_resp
@pytest.fixture(scope="function")
def subnet(ec2):
desc_sn_resp = ec2.describe_subnets()
subnet = desc_sn_resp["Subnets"][0]
yield subnet
def test_describe_mount_target_security_groups__unknown(efs):
with pytest.raises(ClientError) as exc_info:
efs.describe_mount_target_security_groups(MountTargetId="mt-asdf1234asdf")
err = exc_info.value.response["Error"]
err["Code"].should.equal("MountTargetNotFound")
err["Message"].should.equal("Mount target 'mt-asdf1234asdf' does not exist.")
def test_describe_mount_target_security_groups(efs, ec2, file_system, subnet):
subnet_id = subnet["SubnetId"]
file_system_id = file_system["FileSystemId"]
desc_sg_resp = ec2.describe_security_groups()
security_group_id = desc_sg_resp["SecurityGroups"][0]["GroupId"]
# Create Mount Target
sample_input = {
"FileSystemId": file_system_id,
"SubnetId": subnet_id,
"SecurityGroups": [security_group_id],
}
create_mt_resp = efs.create_mount_target(**sample_input)
mount_target_id = create_mt_resp["MountTargetId"]
# Describe it's Security Groups
resp = efs.describe_mount_target_security_groups(MountTargetId=mount_target_id)
resp.should.have.key("SecurityGroups").equals([security_group_id])
def test_modify_mount_target_security_groups__unknown(efs):
with pytest.raises(ClientError) as exc_info:
efs.modify_mount_target_security_groups(
MountTargetId="mt-asdf1234asdf", SecurityGroups=[]
)
err = exc_info.value.response["Error"]
err["Code"].should.equal("MountTargetNotFound")
err["Message"].should.equal("Mount target 'mt-asdf1234asdf' does not exist.")
def test_modify_mount_target_security_groups(efs, ec2, file_system, subnet):
subnet_id = subnet["SubnetId"]
file_system_id = file_system["FileSystemId"]
desc_sg_resp = ec2.describe_security_groups()["SecurityGroups"]
print(desc_sg_resp)
security_group_id = desc_sg_resp[0]["GroupId"]
# Create Mount Target
sample_input = {
"FileSystemId": file_system_id,
"SubnetId": subnet_id,
"SecurityGroups": [security_group_id],
}
create_mt_resp = efs.create_mount_target(**sample_input)
mount_target_id = create_mt_resp["MountTargetId"]
network_interface_id = create_mt_resp["NetworkInterfaceId"]
# Create alternative security groups
sg_id_2 = ec2.create_security_group(
VpcId=subnet["VpcId"], GroupName="sg-2", Description="SG-2"
)["GroupId"]
sg_id_3 = ec2.create_security_group(
VpcId=subnet["VpcId"], GroupName="sg-3", Description="SG-3"
)["GroupId"]
# Modify it's Security Groups
efs.modify_mount_target_security_groups(
MountTargetId=mount_target_id, SecurityGroups=[sg_id_2, sg_id_3]
)
# Describe it's Security Groups
resp = efs.describe_mount_target_security_groups(MountTargetId=mount_target_id)
resp.should.have.key("SecurityGroups").equals([sg_id_2, sg_id_3])
# Verify EC2 reflects this change
resp = ec2.describe_network_interfaces(NetworkInterfaceIds=[network_interface_id])
network_interface = resp["NetworkInterfaces"][0]
network_interface["Groups"].should.have.length_of(2)
set([sg["GroupId"] for sg in network_interface["Groups"]]).should.equal(
{sg_id_2, sg_id_3}
)