diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md
index d4c250277..50858d4ff 100644
--- a/IMPLEMENTATION_COVERAGE.md
+++ b/IMPLEMENTATION_COVERAGE.md
@@ -2051,37 +2051,37 @@
## efs
-23% implemented
+56% implemented
-- [ ] create_access_point
+- [X] create_access_point
- [X] create_file_system
- [X] create_mount_target
- [ ] create_replication_configuration
- [ ] create_tags
-- [ ] delete_access_point
+- [X] delete_access_point
- [X] delete_file_system
- [ ] delete_file_system_policy
- [X] delete_mount_target
- [ ] delete_replication_configuration
- [ ] delete_tags
-- [ ] describe_access_points
+- [X] describe_access_points
- [ ] describe_account_preferences
- [X] describe_backup_policy
- [ ] describe_file_system_policy
- [X] describe_file_systems
-- [ ] describe_lifecycle_configuration
-- [ ] describe_mount_target_security_groups
+- [X] describe_lifecycle_configuration
+- [X] describe_mount_target_security_groups
- [X] describe_mount_targets
- [ ] describe_replication_configurations
- [ ] describe_tags
-- [ ] list_tags_for_resource
-- [ ] modify_mount_target_security_groups
+- [X] list_tags_for_resource
+- [X] modify_mount_target_security_groups
- [ ] put_account_preferences
- [ ] put_backup_policy
- [ ] put_file_system_policy
-- [ ] put_lifecycle_configuration
-- [ ] tag_resource
-- [ ] untag_resource
+- [X] put_lifecycle_configuration
+- [X] tag_resource
+- [X] untag_resource
- [ ] update_file_system
diff --git a/docs/docs/services/efs.rst b/docs/docs/services/efs.rst
index 9cd1ed5ac..47d76e667 100644
--- a/docs/docs/services/efs.rst
+++ b/docs/docs/services/efs.rst
@@ -27,7 +27,7 @@ efs
|start-h3| Implemented features for this service |end-h3|
-- [ ] create_access_point
+- [X] create_access_point
- [X] create_file_system
Create a new EFS File System Volume.
@@ -45,7 +45,7 @@ efs
- [ ] create_replication_configuration
- [ ] create_tags
-- [ ] delete_access_point
+- [X] delete_access_point
- [X] delete_file_system
Delete the file system specified by the given file_system_id.
@@ -65,7 +65,11 @@ efs
- [ ] delete_replication_configuration
- [ ] delete_tags
-- [ ] describe_access_points
+- [X] describe_access_points
+
+ Pagination is not yet implemented
+
+
- [ ] describe_account_preferences
- [X] describe_backup_policy
- [ ] describe_file_system_policy
@@ -75,26 +79,23 @@ efs
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
-- [ ] describe_lifecycle_configuration
-- [ ] describe_mount_target_security_groups
+- [X] describe_lifecycle_configuration
+- [X] describe_mount_target_security_groups
- [X] describe_mount_targets
- Describe the mount targets given a mount target ID or a file system ID.
-
- Note that as of this writing access points, and thus access point IDs are not
- supported.
+ Describe the mount targets given an access point ID, mount target ID or a file system ID.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
- [ ] describe_replication_configurations
- [ ] describe_tags
-- [ ] list_tags_for_resource
-- [ ] modify_mount_target_security_groups
+- [X] list_tags_for_resource
+- [X] modify_mount_target_security_groups
- [ ] put_account_preferences
- [ ] put_backup_policy
- [ ] put_file_system_policy
-- [ ] put_lifecycle_configuration
-- [ ] tag_resource
-- [ ] untag_resource
+- [X] put_lifecycle_configuration
+- [X] tag_resource
+- [X] untag_resource
- [ ] update_file_system
diff --git a/moto/efs/exceptions.py b/moto/efs/exceptions.py
index 7bc8c1c19..a1f36c789 100644
--- a/moto/efs/exceptions.py
+++ b/moto/efs/exceptions.py
@@ -1,10 +1,19 @@
-from moto.core.exceptions import RESTError
+from moto.core.exceptions import JsonRESTError
-class EFSError(RESTError):
+class EFSError(JsonRESTError):
pass
+class AccessPointNotFound(EFSError):
+ code = 404
+
+ def __init__(self, access_point_id):
+ super().__init__(
+ "AccessPointNotFound", f"Access Point {access_point_id} does not exist."
+ )
+
+
class FileSystemAlreadyExists(EFSError):
code = 409
@@ -13,7 +22,7 @@ class FileSystemAlreadyExists(EFSError):
"FileSystemAlreadyExists",
"File system with {} already exists.".format(creation_token),
*args,
- **kwargs
+ **kwargs,
)
@@ -25,7 +34,7 @@ class FileSystemNotFound(EFSError):
"FileSystemNotFound",
"File system {} does not exist.".format(file_system_id),
*args,
- **kwargs
+ **kwargs,
)
@@ -51,7 +60,7 @@ class MountTargetNotFound(EFSError):
"MountTargetNotFound",
"Mount target '{}' does not exist.".format(mount_target_id),
*args,
- **kwargs
+ **kwargs,
)
@@ -77,7 +86,7 @@ class SubnetNotFound(EFSError):
"SubnetNotFound",
"The subnet ID '{}' does not exist".format(subnet_id),
*args,
- **kwargs
+ **kwargs,
)
@@ -89,7 +98,7 @@ class SecurityGroupNotFound(EFSError):
"SecurityGroupNotFound",
"The SecurityGroup ID '{}' does not exist".format(security_group_id),
*args,
- **kwargs
+ **kwargs,
)
diff --git a/moto/efs/models.py b/moto/efs/models.py
index 1a3650134..b4d1a2800 100644
--- a/moto/efs/models.py
+++ b/moto/efs/models.py
@@ -9,7 +9,7 @@ import time
from copy import deepcopy
from hashlib import md5
-from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
+from moto.core import ACCOUNT_ID, BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import (
camelcase_to_underscores,
get_random_hex,
@@ -19,6 +19,7 @@ from moto.core.utils import (
from moto.ec2 import ec2_backends
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.efs.exceptions import (
+ AccessPointNotFound,
BadRequest,
FileSystemAlreadyExists,
FileSystemInUse,
@@ -30,6 +31,7 @@ from moto.efs.exceptions import (
SecurityGroupNotFound,
SecurityGroupLimitExceeded,
)
+from moto.utilities.tagging_service import TaggingService
def _lookup_az_id(az_name):
@@ -40,6 +42,48 @@ def _lookup_az_id(az_name):
return zone.zone_id
+class AccessPoint(BaseModel):
+ def __init__(
+ self,
+ region_name,
+ client_token,
+ file_system_id,
+ name,
+ posix_user,
+ root_directory,
+ context,
+ ):
+ self.access_point_id = get_random_hex(8)
+ self.access_point_arn = "arn:aws:elasticfilesystem:{region}:{user_id}:access-point/fsap-{file_system_id}".format(
+ region=region_name, user_id=ACCOUNT_ID, file_system_id=self.access_point_id
+ )
+ self.client_token = client_token
+ self.file_system_id = file_system_id
+ self.name = name
+ self.posix_user = posix_user
+
+ if not root_directory:
+ root_directory = {"Path": "/"}
+
+ self.root_directory = root_directory
+ self.context = context
+
+ def info_json(self):
+ tags = self.context.list_tags_for_resource(self.access_point_id)
+ return {
+ "ClientToken": self.client_token,
+ "Name": self.name,
+ "Tags": tags,
+ "AccessPointId": self.access_point_id,
+ "AccessPointArn": self.access_point_arn,
+ "FileSystemId": self.file_system_id,
+ "PosixUser": self.posix_user,
+ "RootDirectory": self.root_directory,
+ "OwnerId": ACCOUNT_ID,
+ "LifeCycleState": "available",
+ }
+
+
class FileSystem(CloudFormationModel):
"""A model for an EFS File System Volume."""
@@ -48,16 +92,16 @@ class FileSystem(CloudFormationModel):
region_name,
creation_token,
file_system_id,
- performance_mode="generalPurpose",
- encrypted=False,
- kms_key_id=None,
- throughput_mode="bursting",
- provisioned_throughput_in_mibps=None,
- availability_zone_name=None,
- backup=False,
+ context,
+ performance_mode,
+ encrypted,
+ kms_key_id,
+ throughput_mode,
+ provisioned_throughput_in_mibps,
+ availability_zone_name,
+ backup,
lifecycle_policies=None,
file_system_policy=None,
- tags=None,
):
if availability_zone_name:
backup = True
@@ -66,31 +110,20 @@ class FileSystem(CloudFormationModel):
# Save given parameters
self.creation_token = creation_token
- self.performance_mode = performance_mode
- self.encrypted = encrypted
+ self.performance_mode = performance_mode or "generalPurpose"
+ self.encrypted = encrypted or False
self.kms_key_id = kms_key_id
- self.throughput_mode = throughput_mode
+ self.throughput_mode = throughput_mode or "bursting"
self.provisioned_throughput_in_mibps = provisioned_throughput_in_mibps
self.availability_zone_name = availability_zone_name
self.availability_zone_id = None
if self.availability_zone_name:
self.availability_zone_id = _lookup_az_id(self.availability_zone_name)
self._backup = backup
- self.lifecycle_policies = lifecycle_policies
+ self.lifecycle_policies = lifecycle_policies or []
self.file_system_policy = file_system_policy
- # Validate tag structure.
- if tags is None:
- self.tags = []
- else:
- if (
- not isinstance(tags, list)
- or not all(isinstance(tag, dict) for tag in tags)
- or not all(set(tag.keys()) == {"Key", "Value"} for tag in tags)
- ):
- raise ValueError("Invalid tags: {}".format(tags))
- else:
- self.tags = tags
+ self._context = context
# Generate AWS-assigned parameters
self.file_system_id = file_system_id
@@ -135,6 +168,7 @@ class FileSystem(CloudFormationModel):
for k, v in self.__dict__.items()
if not k.startswith("_")
}
+ ret["Tags"] = self._context.list_tags_for_resource(self.file_system_id)
ret["SizeInBytes"] = self.size_in_bytes
ret["NumberOfMountTargets"] = self.number_of_mount_targets
return ret
@@ -318,9 +352,11 @@ class EFSBackend(BaseBackend):
super().__init__()
self.region_name = region_name
self.creation_tokens = set()
+ self.access_points = dict()
self.file_systems_by_id = {}
self.mount_targets_by_id = {}
self.next_markers = {}
+ self.tagging_service = TaggingService()
def reset(self):
# preserve region
@@ -331,7 +367,8 @@ class EFSBackend(BaseBackend):
def _mark_description(self, corpus, max_items):
if max_items < len(corpus):
new_corpus = corpus[max_items:]
- new_hash = md5(json.dumps(new_corpus).encode("utf-8"))
+ new_corpus_dict = [c.info_json() for c in new_corpus]
+ new_hash = md5(json.dumps(new_corpus_dict).encode("utf-8"))
next_marker = new_hash.hexdigest()
self.next_markers[next_marker] = new_corpus
else:
@@ -342,7 +379,18 @@ class EFSBackend(BaseBackend):
def ec2_backend(self):
return ec2_backends[self.region_name]
- def create_file_system(self, creation_token, **params):
+ def create_file_system(
+ self,
+ creation_token,
+ performance_mode,
+ encrypted,
+ kms_key_id,
+ throughput_mode,
+ provisioned_throughput_in_mibps,
+ availability_zone_name,
+ backup,
+ tags,
+ ):
"""Create a new EFS File System Volume.
https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html
@@ -363,12 +411,22 @@ class EFSBackend(BaseBackend):
self.region_name,
creation_token,
fsid,
- **{k: v for k, v in params.items() if v is not None}
+ context=self,
+ performance_mode=performance_mode,
+ encrypted=encrypted,
+ kms_key_id=kms_key_id,
+ throughput_mode=throughput_mode,
+ provisioned_throughput_in_mibps=provisioned_throughput_in_mibps,
+ availability_zone_name=availability_zone_name,
+ backup=backup,
)
+ self.tag_resource(fsid, tags)
self.creation_tokens.add(creation_token)
return self.file_systems_by_id[fsid]
- def describe_file_systems(self, marker, max_items, creation_token, file_system_id):
+ def describe_file_systems(
+ self, marker=None, max_items=10, creation_token=None, file_system_id=None
+ ):
"""Describe all the EFS File Systems, or specific File Systems.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
@@ -383,7 +441,7 @@ class EFSBackend(BaseBackend):
corpus = []
for fs in self.file_systems_by_id.values():
if fs.creation_token == creation_token:
- corpus.append(fs.info_json())
+ corpus.append(fs)
elif file_system_id:
# Handle the case that a file_system_id is given.
if file_system_id not in self.file_systems_by_id:
@@ -396,7 +454,7 @@ class EFSBackend(BaseBackend):
corpus = self.next_markers[marker]
else:
# Handle the vanilla case.
- corpus = [fs.info_json() for fs in self.file_systems_by_id.values()]
+ corpus = [fs for fs in self.file_systems_by_id.values()]
# Handle the max_items parameter.
file_systems = corpus[:max_items]
@@ -445,42 +503,40 @@ class EFSBackend(BaseBackend):
def describe_mount_targets(
self, max_items, file_system_id, mount_target_id, access_point_id, marker
):
- """Describe the mount targets given a mount target ID or a file system ID.
-
- Note that as of this writing access points, and thus access point IDs are not
- supported.
+ """Describe the mount targets given an access point ID, mount target ID or a file system ID.
https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
"""
# Restrict the possible corpus of results based on inputs.
if not (bool(file_system_id) ^ bool(mount_target_id) ^ bool(access_point_id)):
raise BadRequest("Must specify exactly one mutually exclusive parameter.")
- elif file_system_id:
+
+ if access_point_id:
+ file_system_id = self.access_points[access_point_id].file_system_id
+
+ if file_system_id:
# Handle the case that a file_system_id is given.
if file_system_id not in self.file_systems_by_id:
raise FileSystemNotFound(file_system_id)
corpus = [
- mt.info_json()
+ mt
for mt in self.file_systems_by_id[file_system_id].iter_mount_targets()
]
elif mount_target_id:
if mount_target_id not in self.mount_targets_by_id:
raise MountTargetNotFound(mount_target_id)
# Handle mount target specification case.
- corpus = [self.mount_targets_by_id[mount_target_id].info_json()]
- else:
- # We don't handle access_point_id's yet.
- assert False, "Moto does not yet support EFS access points."
+ corpus = [self.mount_targets_by_id[mount_target_id]]
# Handle the case that a marker is given. Note that the handling is quite
# different from that in describe_file_systems.
if marker is not None:
if marker not in self.next_markers:
raise BadRequest("Invalid Marker")
- corpus_mtids = {m["MountTargetId"] for m in corpus}
- marked_mtids = {m["MountTargetId"] for m in self.next_markers[marker]}
+ corpus_mtids = {m.mount_target_id for m in corpus}
+ marked_mtids = {m.mount_target_id for m in self.next_markers[marker]}
mt_ids = corpus_mtids & marked_mtids
- corpus = [self.mount_targets_by_id[mt_id].info_json() for mt_id in mt_ids]
+ corpus = [self.mount_targets_by_id[mt_id] for mt_id in mt_ids]
# Handle the max_items parameter.
mount_targets = corpus[:max_items]
@@ -529,5 +585,72 @@ class EFSBackend(BaseBackend):
raise PolicyNotFound("None")
return backup_policy
+ def put_lifecycle_configuration(self, file_system_id, policies):
+ _, fss = self.describe_file_systems(file_system_id=file_system_id)
+ file_system = fss[0]
+ file_system.lifecycle_policies = policies
+
+ def describe_lifecycle_configuration(self, file_system_id):
+ _, fss = self.describe_file_systems(file_system_id=file_system_id)
+ file_system = fss[0]
+ return file_system.lifecycle_policies
+
+ def describe_mount_target_security_groups(self, mount_target_id):
+ if mount_target_id not in self.mount_targets_by_id:
+ raise MountTargetNotFound(mount_target_id)
+
+ mount_target = self.mount_targets_by_id[mount_target_id]
+ return mount_target.security_groups
+
+ def modify_mount_target_security_groups(self, mount_target_id, security_groups):
+ if mount_target_id not in self.mount_targets_by_id:
+ raise MountTargetNotFound(mount_target_id)
+
+ mount_target = self.mount_targets_by_id[mount_target_id]
+ mount_target.security_groups = security_groups
+
+ self.ec2_backend.modify_network_interface_attribute(
+ eni_id=mount_target.network_interface_id, group_ids=security_groups
+ )
+
+ def create_access_point(
+ self, client_token, tags, file_system_id, posix_user, root_directory
+ ):
+ name = next((tag["Value"] for tag in tags if tag["Key"] == "Name"), None)
+ access_point = AccessPoint(
+ self.region_name,
+ client_token,
+ file_system_id,
+ name,
+ posix_user,
+ root_directory,
+ context=self,
+ )
+ self.tagging_service.tag_resource(access_point.access_point_id, tags)
+ self.access_points[access_point.access_point_id] = access_point
+ return access_point
+
+ def describe_access_points(self, access_point_id):
+ """
+ Pagination is not yet implemented
+ """
+ if access_point_id:
+ if access_point_id not in self.access_points:
+ raise AccessPointNotFound(access_point_id)
+ return [self.access_points[access_point_id]]
+ return self.access_points.values()
+
+ def delete_access_point(self, access_point_id):
+ self.access_points.pop(access_point_id, None)
+
+ def list_tags_for_resource(self, resource_id):
+ return self.tagging_service.list_tags_for_resource(resource_id)["Tags"]
+
+ def tag_resource(self, resource_id, tags):
+ self.tagging_service.tag_resource(resource_id, tags)
+
+ def untag_resource(self, resource_id, tag_keys):
+ self.tagging_service.untag_resource_using_names(resource_id, tag_keys)
+
efs_backends = BackendDict(EFSBackend, "efs")
diff --git a/moto/efs/responses.py b/moto/efs/responses.py
index 6fc05624a..4fb350772 100644
--- a/moto/efs/responses.py
+++ b/moto/efs/responses.py
@@ -23,7 +23,7 @@ class EFSResponse(BaseResponse):
)
availability_zone_name = self._get_param("AvailabilityZoneName")
backup = self._get_param("Backup")
- tags = self._get_param("Tags")
+ tags = self._get_param("Tags") or []
resource = self.efs_backend.create_file_system(
creation_token=creation_token,
performance_mode=performance_mode,
@@ -51,7 +51,7 @@ class EFSResponse(BaseResponse):
creation_token=creation_token,
file_system_id=file_system_id,
)
- resp_json = {"FileSystems": file_systems}
+ resp_json = {"FileSystems": [fs.info_json() for fs in file_systems]}
if marker:
resp_json["Marker"] = marker
if next_marker:
@@ -87,7 +87,7 @@ class EFSResponse(BaseResponse):
access_point_id=access_point_id,
marker=marker,
)
- resp_json = {"MountTargets": mount_targets}
+ resp_json = {"MountTargets": [mt.info_json() for mt in mount_targets]}
if marker:
resp_json["Marker"] = marker
if next_marker:
@@ -109,3 +109,80 @@ class EFSResponse(BaseResponse):
backup_policy = self.efs_backend.describe_backup_policy(file_system_id)
resp = {"BackupPolicy": backup_policy}
return json.dumps(resp), {"Content-Type": "application/json"}
+
+ def put_lifecycle_configuration(self):
+ file_system_id = self._get_param("FileSystemId")
+ policies = self._get_param("LifecyclePolicies")
+ self.efs_backend.put_lifecycle_configuration(file_system_id, policies)
+ return json.dumps({"LifecyclePolicies": policies}), {
+ "Content-Type": "application/json"
+ }
+
+ def describe_lifecycle_configuration(self):
+ file_system_id = self._get_param("FileSystemId")
+ policies = self.efs_backend.describe_lifecycle_configuration(file_system_id)
+ return json.dumps({"LifecyclePolicies": policies}), {
+ "Content-Type": "application/json"
+ }
+
+ def describe_mount_target_security_groups(self):
+ mount_target_id = self._get_param("MountTargetId")
+ security_groups = self.efs_backend.describe_mount_target_security_groups(
+ mount_target_id
+ )
+ return json.dumps({"SecurityGroups": security_groups}), {
+ "Content-Type": "application/json"
+ }
+
+ def modify_mount_target_security_groups(self):
+ mount_target_id = self._get_param("MountTargetId")
+ security_groups = self._get_param("SecurityGroups")
+ self.efs_backend.modify_mount_target_security_groups(
+ mount_target_id, security_groups
+ )
+ return "{}", {"Content-Type": "application/json"}
+
+ def create_access_point(self):
+ client_token = self._get_param("ClientToken")
+ tags = self._get_param("Tags") or []
+ file_system_id = self._get_param("FileSystemId")
+ posix_user = self._get_param("PosixUser")
+ root_directory = self._get_param("RootDirectory")
+ access_point = self.efs_backend.create_access_point(
+ client_token,
+ tags=tags,
+ file_system_id=file_system_id,
+ posix_user=posix_user,
+ root_directory=root_directory,
+ )
+ return json.dumps(access_point.info_json()), {
+ "Content-Type": "application/json"
+ }
+
+ def describe_access_points(self):
+ access_point_id = self._get_param("AccessPointId")
+ access_points = self.efs_backend.describe_access_points(access_point_id)
+ resp = [ap.info_json() for ap in access_points]
+ return json.dumps({"AccessPoints": resp}), {"Content-Type": "application/json"}
+
+ def delete_access_point(self):
+ access_point_id = self._get_param("AccessPointId")
+ self.efs_backend.delete_access_point(access_point_id)
+ return "{}", {"Content-Type": "application/json"}
+
+ def list_tags_for_resource(self):
+ resource_id = self._get_param("ResourceId")
+ tags = self.efs_backend.list_tags_for_resource(resource_id)
+ return json.dumps({"Tags": tags}), {"Content-Type": "application/json"}
+
+ def tag_resource(self):
+ resource_id = self._get_param("ResourceId")
+ tags = self._get_param("Tags")
+ self.efs_backend.tag_resource(resource_id, tags)
+ return "{}", {"Content-Type": "application/json"}
+
+ def untag_resource(self):
+ resource_id = self._get_param("ResourceId")
+ tag_keys = self.querystring.get("tagKeys", [])
+ self.efs_backend.untag_resource(resource_id, tag_keys)
+ return "{}", {"Content-Type": "application/json"}
diff --git a/moto/efs/urls.py b/moto/efs/urls.py
index 052e1c942..7cc53f31b 100644
--- a/moto/efs/urls.py
+++ b/moto/efs/urls.py
@@ -11,9 +11,14 @@ response = EFSResponse()
url_paths = {
"{0}/.*?$": response.dispatch,
+ "/2015-02-01/access-points": response.dispatch,
+ "/2015-02-01/access-points/": response.dispatch,
"/2015-02-01/file-systems": response.dispatch,
"/2015-02-01/file-systems/": response.dispatch,
"/2015-02-01/file-systems//backup-policy": response.dispatch,
+ "/2015-02-01/file-systems//lifecycle-configuration": response.dispatch,
"/2015-02-01/mount-targets": response.dispatch,
"/2015-02-01/mount-targets/": response.dispatch,
+ "/2015-02-01/mount-targets//security-groups": response.dispatch,
+ "/2015-02-01/resource-tags/": response.dispatch,
}
diff --git a/tests/terraform-tests.success.txt b/tests/terraform-tests.success.txt
index 1cfd9317e..ecacbc3b4 100644
--- a/tests/terraform-tests.success.txt
+++ b/tests/terraform-tests.success.txt
@@ -69,6 +69,8 @@ TestAccAWSEcrReplicationConfiguration
TestAccAWSEcrRepository
TestAccAWSEcrRepositoryDataSource
TestAccAWSEcrRepositoryPolicy
+TestAccAWSEFSAccessPoint
+TestAccAWSEFSMountTarget
TestAccAWSEgressOnlyInternetGateway
TestAccAWSElasticBeanstalkSolutionStackDataSource
TestAccAWSElbHostedZoneId
@@ -127,5 +129,9 @@ TestAccAWSVpc_
TestAccAWSVpcEndpointService
TestAccAWSVpnGateway
TestAccAWSVpnGatewayAttachment
+TestAccDataSourceAWSEFSAccessPoint
+TestAccDataSourceAWSEFSAccessPoints
+TestAccDataSourceAwsEfsFileSystem
+TestAccDataSourceAwsEfsMountTarget
TestAccDataSourceAwsNetworkInterface_
TestValidateSSMDocumentPermissions
diff --git a/tests/test_efs/test_access_point_tagging.py b/tests/test_efs/test_access_point_tagging.py
new file mode 100644
index 000000000..ab70296e9
--- /dev/null
+++ b/tests/test_efs/test_access_point_tagging.py
@@ -0,0 +1,94 @@
+from os import environ
+
+import boto3
+import pytest
+
+from moto import mock_efs
+
+
+@pytest.fixture(scope="function")
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ environ["AWS_ACCESS_KEY_ID"] = "testing"
+ environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ environ["AWS_SECURITY_TOKEN"] = "testing"
+ environ["AWS_SESSION_TOKEN"] = "testing"
+
+
+@pytest.fixture(scope="function")
+def efs(aws_credentials): # pylint: disable=unused-argument
+ with mock_efs():
+ yield boto3.client("efs", region_name="us-east-1")
+
+
+@pytest.fixture(scope="function")
+def file_system(efs):
+ create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
+ create_fs_resp.pop("ResponseMetadata")
+ yield create_fs_resp
+
+
+def test_list_tags_for_resource__without_tags(efs, file_system):
+ file_system_id = file_system["FileSystemId"]
+
+ ap_id = efs.create_access_point(ClientToken="ct", FileSystemId=file_system_id)[
+ "AccessPointId"
+ ]
+
+ resp = efs.list_tags_for_resource(ResourceId=ap_id)
+ resp.should.have.key("Tags").equals([])
+
+
+def test_list_tags_for_resource__with_tags(efs, file_system):
+ file_system_id = file_system["FileSystemId"]
+
+ ap_id = efs.create_access_point(
+ ClientToken="ct",
+ Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
+ FileSystemId=file_system_id,
+ )["AccessPointId"]
+
+ resp = efs.list_tags_for_resource(ResourceId=ap_id)
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
+ )
+
+
+def test_tag_resource(efs, file_system):
+ file_system_id = file_system["FileSystemId"]
+
+ ap_id = efs.create_access_point(ClientToken="ct", FileSystemId=file_system_id)[
+ "AccessPointId"
+ ]
+
+ efs.tag_resource(
+ ResourceId=ap_id,
+ Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
+ )
+
+ resp = efs.list_tags_for_resource(ResourceId=ap_id)
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
+ )
+
+
+def test_untag_resource(efs, file_system):
+ file_system_id = file_system["FileSystemId"]
+
+ ap_id = efs.create_access_point(
+ ClientToken="ct",
+ Tags=[{"Key": "key1", "Value": "val1"}],
+ FileSystemId=file_system_id,
+ )["AccessPointId"]
+
+ efs.tag_resource(
+ ResourceId=ap_id,
+ Tags=[{"Key": "key2", "Value": "val2"}, {"Key": "key3", "Value": "val3"}],
+ )
+
+ efs.untag_resource(ResourceId=ap_id, TagKeys=["key2"])
+
+ resp = efs.list_tags_for_resource(ResourceId=ap_id)
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key1", "Value": "val1"}, {"Key": "key3", "Value": "val3"}]
+ )
diff --git a/tests/test_efs/test_access_points.py b/tests/test_efs/test_access_points.py
new file mode 100644
index 000000000..2523eff68
--- /dev/null
+++ b/tests/test_efs/test_access_points.py
@@ -0,0 +1,148 @@
+from os import environ
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+from moto import mock_efs
+from moto.core import ACCOUNT_ID
+
+
+@pytest.fixture(scope="function")
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ environ["AWS_ACCESS_KEY_ID"] = "testing"
+ environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ environ["AWS_SECURITY_TOKEN"] = "testing"
+ environ["AWS_SESSION_TOKEN"] = "testing"
+
+
+@pytest.fixture(scope="function")
+def efs(aws_credentials): # pylint: disable=unused-argument
+ with mock_efs():
+ yield boto3.client("efs", region_name="us-east-1")
+
+
+@pytest.fixture(scope="function")
+def file_system(efs):
+ create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
+ create_fs_resp.pop("ResponseMetadata")
+ yield create_fs_resp
+
+
+def test_describe_access_points__initial(efs):
+ resp = efs.describe_access_points()
+ resp.should.have.key("AccessPoints").equals([])
+
+
+def test_create_access_point__simple(efs, file_system):
+ fs_id = file_system["FileSystemId"]
+
+ resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
+ resp.should.have.key("ClientToken").equals("ct")
+ resp.should.have.key("AccessPointId")
+ resp.should.have.key("AccessPointArn")
+ resp.should.have.key("FileSystemId").equals(fs_id)
+ resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
+ resp.should.have.key("LifeCycleState").equals("available")
+
+ resp.should.have.key("RootDirectory").equals({"Path": "/"})
+
+
+def test_create_access_point__full(efs, file_system):
+ fs_id = file_system["FileSystemId"]
+
+ resp = efs.create_access_point(
+ ClientToken="ct",
+ Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
+ FileSystemId=fs_id,
+ PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
+ RootDirectory={
+ "Path": "/root/path",
+ "CreationInfo": {
+ "OwnerUid": 987,
+ "OwnerGid": 986,
+ "Permissions": "root_permissions",
+ },
+ },
+ )
+
+ resp.should.have.key("ClientToken").equals("ct")
+ resp.should.have.key("Name").equals("myname")
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
+ )
+ resp.should.have.key("AccessPointId")
+ resp.should.have.key("AccessPointArn")
+ resp.should.have.key("FileSystemId").equals(fs_id)
+ resp.should.have.key("PosixUser").equals(
+ {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
+ )
+ resp.should.have.key("RootDirectory").equals(
+ {
+ "Path": "/root/path",
+ "CreationInfo": {
+ "OwnerUid": 987,
+ "OwnerGid": 986,
+ "Permissions": "root_permissions",
+ },
+ }
+ )
+ resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
+ resp.should.have.key("LifeCycleState").equals("available")
+
+
+def test_describe_access_point(efs, file_system):
+ fs_id = file_system["FileSystemId"]
+
+ access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
+ "AccessPointId"
+ ]
+
+ resp = efs.describe_access_points(AccessPointId=access_point_id)
+ resp.should.have.key("AccessPoints").length_of(1)
+ access_point = resp["AccessPoints"][0]
+
+ access_point.should.have.key("ClientToken").equals("ct")
+ access_point.should.have.key("AccessPointId")
+ access_point.should.have.key("AccessPointArn")
+ access_point.should.have.key("FileSystemId").equals(fs_id)
+ access_point.should.have.key("OwnerId").equals(ACCOUNT_ID)
+ access_point.should.have.key("LifeCycleState").equals("available")
+
+
+def test_describe_access_points__multiple(efs, file_system):
+ fs_id = file_system["FileSystemId"]
+
+ efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
+ efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)
+
+ resp = efs.describe_access_points()
+ resp.should.have.key("AccessPoints").length_of(2)
+
+
+def test_delete_access_points(efs, file_system):
+ fs_id = file_system["FileSystemId"]
+
+ ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
+ "AccessPointId"
+ ]
+ ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
+ "AccessPointId"
+ ]
+
+ # Delete one access point
+ efs.delete_access_point(AccessPointId=ap_id2)
+
+ # We can only find one
+ resp = efs.describe_access_points()
+ resp.should.have.key("AccessPoints").length_of(1)
+
+ # The first one still exists
+ efs.describe_access_points(AccessPointId=ap_id1)
+
+ # The second one is gone
+ with pytest.raises(ClientError) as exc_info:
+ efs.describe_access_points(AccessPointId=ap_id2)
+ err = exc_info.value.response["Error"]
+ err["Code"].should.equal("AccessPointNotFound")
diff --git a/tests/test_efs/test_file_system.py b/tests/test_efs/test_file_system.py
index 9f78e6379..d6f04cb2f 100644
--- a/tests/test_efs/test_file_system.py
+++ b/tests/test_efs/test_file_system.py
@@ -80,7 +80,7 @@ def test_create_file_system_correct_use(efs):
efs.describe_backup_policy(FileSystemId=create_fs_resp["FileSystemId"])
resp = exc_info.value.response
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 404
- assert "PolicyNotFound" in resp["Error"]["Message"]
+ assert "PolicyNotFound" == resp["Error"]["Code"]
# Check the arn in detail
match_obj = re.match(ARN_PATT, create_fs_resp["FileSystemArn"])
@@ -167,13 +167,32 @@ def test_create_file_system_file_system_already_exists(efs):
efs.create_file_system(CreationToken="foo")
resp = exc_info.value.response
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 409
- assert "FileSystemAlreadyExists" in resp["Error"]["Message"]
+ assert "FileSystemAlreadyExists" == resp["Error"]["Code"]
# Testing Describe
# ================
+def test_describe_file_systems_using_identifier(efs):
+ # Create the file system.
+ create_fs_resp = efs.create_file_system(CreationToken="foobar")
+ create_fs_resp.pop("ResponseMetadata")
+ fs_id = create_fs_resp["FileSystemId"]
+
+ # Describe the file system.
+ desc_fs_resp = efs.describe_file_systems(FileSystemId=fs_id)
+ desc_fs_resp.should.have.key("FileSystems").length_of(1)
+ desc_fs_resp["FileSystems"][0].should.have.key("FileSystemId").equals(fs_id)
+
+
+def test_describe_file_systems_using_unknown_identifier(efs):
+ with pytest.raises(ClientError) as exc:
+ efs.describe_file_systems(FileSystemId="unknown")
+ err = exc.value.response["Error"]
+ err["Code"].should.equal("FileSystemNotFound")
+
+
def test_describe_file_systems_minimal_case(efs):
# Create the file system.
create_fs_resp = efs.create_file_system(CreationToken="foobar")
@@ -306,7 +325,7 @@ def test_describe_file_systems_invalid_marker(efs):
efs.describe_file_systems(Marker="fiddlesticks")
resp = exc_info.value.response
assert has_status_code(resp, 400)
- assert "BadRequest" in resp["Error"]["Message"]
+ assert "BadRequest" == resp["Error"]["Code"]
def test_describe_file_systems_invalid_creation_token(efs):
@@ -320,7 +339,7 @@ def test_describe_file_systems_invalid_file_system_id(efs):
efs.describe_file_systems(FileSystemId="fs-29879313")
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "FileSystemNotFound" in resp["Error"]["Message"]
+ assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_describe_file_system_creation_token_and_file_system_id(efs):
@@ -328,7 +347,7 @@ def test_describe_file_system_creation_token_and_file_system_id(efs):
efs.describe_file_systems(CreationToken="fizzle", FileSystemId="fs-07987987")
resp = exc_info.value.response
assert has_status_code(resp, 400)
- assert "BadRequest" in resp["Error"]["Message"]
+ assert "BadRequest" == resp["Error"]["Code"]
# Testing Delete
@@ -358,4 +377,4 @@ def test_delete_file_system_invalid_file_system_id(efs):
efs.delete_file_system(FileSystemId="fs-2394287")
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "FileSystemNotFound" in resp["Error"]["Message"]
+ assert "FileSystemNotFound" == resp["Error"]["Code"]
diff --git a/tests/test_efs/test_filesystem_tagging.py b/tests/test_efs/test_filesystem_tagging.py
new file mode 100644
index 000000000..a0d7dae9b
--- /dev/null
+++ b/tests/test_efs/test_filesystem_tagging.py
@@ -0,0 +1,79 @@
+from os import environ
+
+import boto3
+import pytest
+
+from moto import mock_efs
+
+
+@pytest.fixture(scope="function")
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ environ["AWS_ACCESS_KEY_ID"] = "testing"
+ environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ environ["AWS_SECURITY_TOKEN"] = "testing"
+ environ["AWS_SESSION_TOKEN"] = "testing"
+
+
+@pytest.fixture(scope="function")
+def efs(aws_credentials): # pylint: disable=unused-argument
+ with mock_efs():
+ yield boto3.client("efs", region_name="us-east-1")
+
+
+def test_list_tags_for_resource__without_tags(efs):
+ file_system = efs.create_file_system(CreationToken="foobarbaz")
+ fs_id = file_system["FileSystemId"]
+
+ resp = efs.list_tags_for_resource(ResourceId=fs_id)
+ resp.should.have.key("Tags").equals([])
+
+
+def test_list_tags_for_resource__with_tags(efs):
+ file_system = efs.create_file_system(
+ CreationToken="foobarbaz",
+ Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
+ )
+ fs_id = file_system["FileSystemId"]
+
+ resp = efs.list_tags_for_resource(ResourceId=fs_id)
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
+ )
+
+
+def test_tag_resource(efs):
+ file_system = efs.create_file_system(
+ CreationToken="foobarbaz",
+ Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
+ )
+ fs_id = file_system["FileSystemId"]
+
+ efs.tag_resource(
+ ResourceId=fs_id,
+ Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
+ )
+
+ resp = efs.list_tags_for_resource(ResourceId=fs_id)
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}]
+ )
+
+
+def test_untag_resource(efs):
+ file_system = efs.create_file_system(
+ CreationToken="foobarbaz", Tags=[{"Key": "key1", "Value": "val1"}]
+ )
+ fs_id = file_system["FileSystemId"]
+
+ efs.tag_resource(
+ ResourceId=fs_id,
+ Tags=[{"Key": "key2", "Value": "val2"}, {"Key": "key3", "Value": "val3"}],
+ )
+
+ efs.untag_resource(ResourceId=fs_id, TagKeys=["key2"])
+
+ resp = efs.list_tags_for_resource(ResourceId=fs_id)
+ resp.should.have.key("Tags").equals(
+ [{"Key": "key1", "Value": "val1"}, {"Key": "key3", "Value": "val3"}]
+ )
diff --git a/tests/test_efs/test_lifecycle_config.py b/tests/test_efs/test_lifecycle_config.py
new file mode 100644
index 000000000..fbabe2162
--- /dev/null
+++ b/tests/test_efs/test_lifecycle_config.py
@@ -0,0 +1,57 @@
+from os import environ
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+from moto import mock_efs
+
+
+@pytest.fixture(scope="function")
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ environ["AWS_ACCESS_KEY_ID"] = "testing"
+ environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ environ["AWS_SECURITY_TOKEN"] = "testing"
+ environ["AWS_SESSION_TOKEN"] = "testing"
+
+
+@pytest.fixture(scope="function")
+def efs(aws_credentials): # pylint: disable=unused-argument
+ with mock_efs():
+ yield boto3.client("efs", region_name="us-east-1")
+
+
+def test_describe_filesystem_config__unknown(efs):
+ with pytest.raises(ClientError) as exc_info:
+ efs.describe_lifecycle_configuration(FileSystemId="unknown")
+ err = exc_info.value.response["Error"]
+ err["Code"].should.equal("FileSystemNotFound")
+ err["Message"].should.equal("File system unknown does not exist.")
+
+
+def test_describe_filesystem_config__initial(efs):
+ create_fs_resp = efs.create_file_system(CreationToken="foobar")
+ fs_id = create_fs_resp["FileSystemId"]
+
+ resp = efs.describe_lifecycle_configuration(FileSystemId=fs_id)
+ resp.should.have.key("LifecyclePolicies").equals([])
+
+
+def test_put_lifecycle_configuration(efs):
+ # Create the file system.
+ create_fs_resp = efs.create_file_system(CreationToken="foobar")
+ create_fs_resp.pop("ResponseMetadata")
+ fs_id = create_fs_resp["FileSystemId"]
+
+ # Create the lifecycle configuration
+ resp = efs.put_lifecycle_configuration(
+ FileSystemId=fs_id, LifecyclePolicies=[{"TransitionToIA": "AFTER_30_DAYS"}]
+ )
+ resp.should.have.key("LifecyclePolicies").length_of(1)
+ resp["LifecyclePolicies"][0].should.equal({"TransitionToIA": "AFTER_30_DAYS"})
+
+ # Describe the lifecycle configuration
+ resp = efs.describe_lifecycle_configuration(FileSystemId=fs_id)
+ resp.should.have.key("LifecyclePolicies").length_of(1)
+ resp["LifecyclePolicies"][0].should.equal({"TransitionToIA": "AFTER_30_DAYS"})
diff --git a/tests/test_efs/test_mount_target.py b/tests/test_efs/test_mount_target.py
index 7d07d86bb..6e2e31632 100644
--- a/tests/test_efs/test_mount_target.py
+++ b/tests/test_efs/test_mount_target.py
@@ -143,7 +143,7 @@ def test_create_mount_target_invalid_file_system_id(efs, subnet):
efs.create_mount_target(FileSystemId="fs-12343289", SubnetId=subnet["SubnetId"])
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "FileSystemNotFound" in resp["Error"]["Message"]
+ assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_create_mount_target_invalid_subnet_id(efs, file_system):
@@ -153,7 +153,7 @@ def test_create_mount_target_invalid_subnet_id(efs, file_system):
)
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "SubnetNotFound" in resp["Error"]["Message"]
+ assert "SubnetNotFound" == resp["Error"]["Code"]
def test_create_mount_target_invalid_sg_id(efs, file_system, subnet):
@@ -165,7 +165,7 @@ def test_create_mount_target_invalid_sg_id(efs, file_system, subnet):
)
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "SecurityGroupNotFound" in resp["Error"]["Message"]
+ assert "SecurityGroupNotFound" == resp["Error"]["Code"]
def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet):
@@ -183,7 +183,7 @@ def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet):
)
resp = exc_info.value.response
assert has_status_code(resp, 409)
- assert "MountTargetConflict" in resp["Error"]["Message"]
+ assert "MountTargetConflict" == resp["Error"]["Code"]
assert "VPC" in resp["Error"]["Message"]
@@ -197,7 +197,7 @@ def test_create_mount_target_duplicate_subnet_id(efs, file_system, subnet):
)
resp = exc_info.value.response
assert has_status_code(resp, 409)
- assert "MountTargetConflict" in resp["Error"]["Message"]
+ assert "MountTargetConflict" == resp["Error"]["Code"]
assert "AZ" in resp["Error"]["Message"]
@@ -217,7 +217,7 @@ def test_create_mount_target_subnets_in_same_zone(efs, ec2, file_system, subnet)
)
resp = exc_info.value.response
assert has_status_code(resp, 409)
- assert "MountTargetConflict" in resp["Error"]["Message"]
+ assert "MountTargetConflict" == resp["Error"]["Code"]
assert "AZ" in resp["Error"]["Message"]
@@ -230,7 +230,7 @@ def test_create_mount_target_ip_address_out_of_range(efs, file_system, subnet):
)
resp = exc_info.value.response
assert has_status_code(resp, 400)
- assert "BadRequest" in resp["Error"]["Message"]
+ assert "BadRequest" == resp["Error"]["Code"]
assert "Address" in resp["Error"]["Message"]
@@ -251,7 +251,7 @@ def test_create_mount_target_too_many_security_groups(efs, ec2, file_system, sub
)
resp = exc_info.value.response
assert has_status_code(resp, 400)
- assert "SecurityGroupLimitExceeded" in resp["Error"]["Message"]
+ assert "SecurityGroupLimitExceeded" == resp["Error"]["Code"]
def test_delete_file_system_mount_targets_attached(
@@ -264,7 +264,7 @@ def test_delete_file_system_mount_targets_attached(
efs.delete_file_system(FileSystemId=file_system["FileSystemId"])
resp = exc_info.value.response
assert has_status_code(resp, 409)
- assert "FileSystemInUse" in resp["Error"]["Message"]
+ assert "FileSystemInUse" == resp["Error"]["Code"]
def test_describe_mount_targets_minimal_case(
@@ -290,6 +290,29 @@ def test_describe_mount_targets_minimal_case(
assert mount_target == create_resp
+def test_describe_mount_targets__by_access_point_id(
+ efs, ec2, file_system, subnet
+): # pylint: disable=unused-argument
+ create_resp = efs.create_mount_target(
+ FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"]
+ )
+ create_resp.pop("ResponseMetadata")
+
+ ap_resp = efs.create_access_point(
+ ClientToken="ct1", FileSystemId=file_system["FileSystemId"]
+ )
+ access_point_id = ap_resp["AccessPointId"]
+
+ # Describe the mount targets
+ ap_resp = efs.describe_mount_targets(AccessPointId=access_point_id)
+
+ # Check the list results.
+ ap_resp.should.have.key("MountTargets").length_of(1)
+ ap_resp["MountTargets"][0]["MountTargetId"].should.equal(
+ create_resp["MountTargetId"]
+ )
+
+
def test_describe_mount_targets_paging(efs, ec2, file_system):
fs_id = file_system["FileSystemId"]
@@ -358,7 +381,7 @@ def test_describe_mount_targets_invalid_file_system_id(efs):
efs.describe_mount_targets(FileSystemId="fs-12343289")
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "FileSystemNotFound" in resp["Error"]["Message"]
+ assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_describe_mount_targets_invalid_mount_target_id(efs):
@@ -366,7 +389,7 @@ def test_describe_mount_targets_invalid_mount_target_id(efs):
efs.describe_mount_targets(MountTargetId="fsmt-ad9f8987")
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "MountTargetNotFound" in resp["Error"]["Message"]
+ assert "MountTargetNotFound" == resp["Error"]["Code"]
def test_describe_mount_targets_no_id_given(efs):
@@ -374,7 +397,7 @@ def test_describe_mount_targets_no_id_given(efs):
efs.describe_mount_targets()
resp = exc_info.value.response
assert has_status_code(resp, 400)
- assert "BadRequest" in resp["Error"]["Message"]
+ assert "BadRequest" == resp["Error"]["Code"]
def test_delete_mount_target_minimal_case(efs, file_system, subnet):
@@ -392,4 +415,4 @@ def test_delete_mount_target_invalid_mount_target_id(efs):
efs.delete_mount_target(MountTargetId="fsmt-98487aef0a7")
resp = exc_info.value.response
assert has_status_code(resp, 404)
- assert "MountTargetNotFound" in resp["Error"]["Message"]
+ assert "MountTargetNotFound" == resp["Error"]["Code"]
diff --git a/tests/test_efs/test_mount_target_security_groups.py b/tests/test_efs/test_mount_target_security_groups.py
new file mode 100644
index 000000000..7f6fb1798
--- /dev/null
+++ b/tests/test_efs/test_mount_target_security_groups.py
@@ -0,0 +1,125 @@
+from os import environ
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+from moto import mock_ec2, mock_efs
+
+
+@pytest.fixture(scope="function")
+def aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ environ["AWS_ACCESS_KEY_ID"] = "testing"
+ environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ environ["AWS_SECURITY_TOKEN"] = "testing"
+ environ["AWS_SESSION_TOKEN"] = "testing"
+
+
+@pytest.fixture(scope="function")
+def ec2(aws_credentials): # pylint: disable=unused-argument
+ with mock_ec2():
+ yield boto3.client("ec2", region_name="us-east-1")
+
+
+@pytest.fixture(scope="function")
+def efs(aws_credentials): # pylint: disable=unused-argument
+ with mock_efs():
+ yield boto3.client("efs", region_name="us-east-1")
+
+
+@pytest.fixture(scope="function")
+def file_system(efs):
+ create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
+ create_fs_resp.pop("ResponseMetadata")
+ yield create_fs_resp
+
+
+@pytest.fixture(scope="function")
+def subnet(ec2):
+ desc_sn_resp = ec2.describe_subnets()
+ subnet = desc_sn_resp["Subnets"][0]
+ yield subnet
+
+
+def test_describe_mount_target_security_groups__unknown(efs):
+ with pytest.raises(ClientError) as exc_info:
+ efs.describe_mount_target_security_groups(MountTargetId="mt-asdf1234asdf")
+ err = exc_info.value.response["Error"]
+ err["Code"].should.equal("MountTargetNotFound")
+ err["Message"].should.equal("Mount target 'mt-asdf1234asdf' does not exist.")
+
+
+def test_describe_mount_target_security_groups(efs, ec2, file_system, subnet):
+ subnet_id = subnet["SubnetId"]
+ file_system_id = file_system["FileSystemId"]
+
+ desc_sg_resp = ec2.describe_security_groups()
+ security_group_id = desc_sg_resp["SecurityGroups"][0]["GroupId"]
+
+ # Create Mount Target
+ sample_input = {
+ "FileSystemId": file_system_id,
+ "SubnetId": subnet_id,
+ "SecurityGroups": [security_group_id],
+ }
+ create_mt_resp = efs.create_mount_target(**sample_input)
+ mount_target_id = create_mt_resp["MountTargetId"]
+
+ # Describe it's Security Groups
+ resp = efs.describe_mount_target_security_groups(MountTargetId=mount_target_id)
+ resp.should.have.key("SecurityGroups").equals([security_group_id])
+
+
+def test_modify_mount_target_security_groups__unknown(efs):
+ with pytest.raises(ClientError) as exc_info:
+ efs.modify_mount_target_security_groups(
+ MountTargetId="mt-asdf1234asdf", SecurityGroups=[]
+ )
+ err = exc_info.value.response["Error"]
+ err["Code"].should.equal("MountTargetNotFound")
+ err["Message"].should.equal("Mount target 'mt-asdf1234asdf' does not exist.")
+
+
+def test_modify_mount_target_security_groups(efs, ec2, file_system, subnet):
+ subnet_id = subnet["SubnetId"]
+ file_system_id = file_system["FileSystemId"]
+
+ desc_sg_resp = ec2.describe_security_groups()["SecurityGroups"]
+ print(desc_sg_resp)
+ security_group_id = desc_sg_resp[0]["GroupId"]
+
+ # Create Mount Target
+ sample_input = {
+ "FileSystemId": file_system_id,
+ "SubnetId": subnet_id,
+ "SecurityGroups": [security_group_id],
+ }
+ create_mt_resp = efs.create_mount_target(**sample_input)
+ mount_target_id = create_mt_resp["MountTargetId"]
+ network_interface_id = create_mt_resp["NetworkInterfaceId"]
+
+ # Create alternative security groups
+ sg_id_2 = ec2.create_security_group(
+ VpcId=subnet["VpcId"], GroupName="sg-2", Description="SG-2"
+ )["GroupId"]
+ sg_id_3 = ec2.create_security_group(
+ VpcId=subnet["VpcId"], GroupName="sg-3", Description="SG-3"
+ )["GroupId"]
+
+ # Modify it's Security Groups
+ efs.modify_mount_target_security_groups(
+ MountTargetId=mount_target_id, SecurityGroups=[sg_id_2, sg_id_3]
+ )
+
+ # Describe it's Security Groups
+ resp = efs.describe_mount_target_security_groups(MountTargetId=mount_target_id)
+ resp.should.have.key("SecurityGroups").equals([sg_id_2, sg_id_3])
+
+ # Verify EC2 reflects this change
+ resp = ec2.describe_network_interfaces(NetworkInterfaceIds=[network_interface_id])
+ network_interface = resp["NetworkInterfaces"][0]
+ network_interface["Groups"].should.have.length_of(2)
+ set([sg["GroupId"] for sg in network_interface["Groups"]]).should.equal(
+ {sg_id_2, sg_id_3}
+ )