moto/moto/datasync/models.py

243 lines
8.5 KiB
Python
Raw Normal View History

2019-12-26 16:12:22 +00:00
from boto3 import Session
from collections import OrderedDict
2019-11-01 17:24:21 +00:00
from moto.core import BaseBackend, BaseModel
2019-11-02 19:34:35 +00:00
from .exceptions import InvalidRequestException
2019-11-01 17:24:21 +00:00
2019-11-02 19:34:35 +00:00
class Location(BaseModel):
def __init__(
self, location_uri, region_name=None, typ=None, metadata=None, arn_counter=0
):
2019-11-01 19:16:59 +00:00
self.uri = location_uri
self.region_name = region_name
2019-11-02 19:34:35 +00:00
self.metadata = metadata
self.typ = typ
2019-11-01 19:16:59 +00:00
# Generate ARN
2019-11-02 19:34:35 +00:00
self.arn = "arn:aws:datasync:{0}:111222333444:location/loc-{1}".format(
region_name, str(arn_counter).zfill(17)
)
2019-11-01 17:24:21 +00:00
2019-11-01 19:16:59 +00:00
class Task(BaseModel):
2019-11-02 19:34:35 +00:00
def __init__(
self,
source_location_arn,
destination_location_arn,
name,
region_name,
arn_counter=0,
metadata=None,
2019-11-02 19:34:35 +00:00
):
2019-11-01 19:16:59 +00:00
self.source_location_arn = source_location_arn
self.destination_location_arn = destination_location_arn
self.name = name
self.metadata = metadata
2019-11-02 19:34:35 +00:00
# For simplicity Tasks are either available or running
self.status = "AVAILABLE"
self.current_task_execution_arn = None
2019-11-01 19:16:59 +00:00
# Generate ARN
2019-11-02 19:34:35 +00:00
self.arn = "arn:aws:datasync:{0}:111222333444:task/task-{1}".format(
region_name, str(arn_counter).zfill(17)
)
2019-11-01 17:24:21 +00:00
2019-11-01 19:16:59 +00:00
class TaskExecution(BaseModel):
2019-11-02 19:34:35 +00:00
# For simplicity, task_execution can never fail
# Some documentation refers to this list:
# 'Status': 'QUEUED'|'LAUNCHING'|'PREPARING'|'TRANSFERRING'|'VERIFYING'|'SUCCESS'|'ERROR'
# Others refers to this list:
# INITIALIZING | PREPARING | TRANSFERRING | VERIFYING | SUCCESS/FAILURE
# Checking with AWS Support...
TASK_EXECUTION_INTERMEDIATE_STATES = (
"INITIALIZING",
# 'QUEUED', 'LAUNCHING',
"PREPARING",
"TRANSFERRING",
"VERIFYING",
)
TASK_EXECUTION_FAILURE_STATES = ("ERROR",)
TASK_EXECUTION_SUCCESS_STATES = ("SUCCESS",)
# Also COMPLETED state?
def __init__(self, task_arn, arn_counter=0):
2019-11-01 19:16:59 +00:00
self.task_arn = task_arn
2019-11-02 19:34:35 +00:00
self.arn = "{0}/execution/exec-{1}".format(task_arn, str(arn_counter).zfill(17))
self.status = self.TASK_EXECUTION_INTERMEDIATE_STATES[0]
# Simulate a task execution
def iterate_status(self):
if self.status in self.TASK_EXECUTION_FAILURE_STATES:
return
if self.status in self.TASK_EXECUTION_SUCCESS_STATES:
return
if self.status in self.TASK_EXECUTION_INTERMEDIATE_STATES:
for i, status in enumerate(self.TASK_EXECUTION_INTERMEDIATE_STATES):
if status == self.status:
if i < len(self.TASK_EXECUTION_INTERMEDIATE_STATES) - 1:
self.status = self.TASK_EXECUTION_INTERMEDIATE_STATES[i + 1]
else:
self.status = self.TASK_EXECUTION_SUCCESS_STATES[0]
return
raise Exception(
"TaskExecution.iterate_status: Unknown status={0}".format(self.status)
)
def cancel(self):
if self.status not in self.TASK_EXECUTION_INTERMEDIATE_STATES:
raise InvalidRequestException(
"Sync task cannot be cancelled in its current status: {0}".format(
self.status
)
)
self.status = "ERROR"
2019-11-01 17:24:21 +00:00
class DataSyncBackend(BaseBackend):
def __init__(self, region_name):
self.region_name = region_name
2019-11-01 19:16:59 +00:00
# Always increase when new things are created
# This ensures uniqueness
2019-11-02 19:34:35 +00:00
self.arn_counter = 0
self.locations = OrderedDict()
self.tasks = OrderedDict()
self.task_executions = OrderedDict()
2019-11-01 17:24:21 +00:00
def reset(self):
region_name = self.region_name
self._reset_model_refs()
self.__dict__ = {}
self.__init__(region_name)
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""Default VPC endpoint service."""
return BaseBackend.default_vpc_endpoint_service_factory(
service_region, zones, "datasync"
)
2019-11-02 19:34:35 +00:00
def create_location(self, location_uri, typ=None, metadata=None):
"""
# AWS DataSync allows for duplicate LocationUris
2019-11-01 19:16:59 +00:00
for arn, location in self.locations.items():
if location.uri == location_uri:
raise Exception('Location already exists')
2019-11-02 19:34:35 +00:00
"""
if not typ:
raise Exception("Location type must be specified")
2019-11-01 19:16:59 +00:00
self.arn_counter = self.arn_counter + 1
2019-11-02 19:34:35 +00:00
location = Location(
location_uri,
region_name=self.region_name,
arn_counter=self.arn_counter,
metadata=metadata,
typ=typ,
)
2019-11-01 19:16:59 +00:00
self.locations[location.arn] = location
2019-11-01 17:24:21 +00:00
return location.arn
def _get_location(self, location_arn, typ):
if location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} is not found.".format(location_arn)
)
location = self.locations[location_arn]
if location.typ != typ:
raise InvalidRequestException(
"Invalid Location type: {0}".format(location.typ)
)
return location
def delete_location(self, location_arn):
if location_arn in self.locations:
del self.locations[location_arn]
else:
raise InvalidRequestException
def create_task(
self, source_location_arn, destination_location_arn, name, metadata=None
):
2019-11-02 19:34:35 +00:00
if source_location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} not found.".format(source_location_arn)
)
if destination_location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} not found.".format(destination_location_arn)
)
2019-11-01 19:16:59 +00:00
self.arn_counter = self.arn_counter + 1
2019-11-02 19:34:35 +00:00
task = Task(
source_location_arn,
destination_location_arn,
name,
region_name=self.region_name,
arn_counter=self.arn_counter,
metadata=metadata,
2019-11-02 19:34:35 +00:00
)
2019-11-01 19:16:59 +00:00
self.tasks[task.arn] = task
return task.arn
def _get_task(self, task_arn):
if task_arn in self.tasks:
return self.tasks[task_arn]
else:
raise InvalidRequestException
def update_task(self, task_arn, name, metadata):
if task_arn in self.tasks:
task = self.tasks[task_arn]
task.name = name
task.metadata = metadata
else:
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_arn)
)
def delete_task(self, task_arn):
if task_arn in self.tasks:
del self.tasks[task_arn]
else:
raise InvalidRequestException
2019-11-01 19:16:59 +00:00
def start_task_execution(self, task_arn):
self.arn_counter = self.arn_counter + 1
2019-11-02 19:34:35 +00:00
if task_arn in self.tasks:
task = self.tasks[task_arn]
if task.status == "AVAILABLE":
task_execution = TaskExecution(task_arn, arn_counter=self.arn_counter)
self.task_executions[task_execution.arn] = task_execution
self.tasks[task_arn].current_task_execution_arn = task_execution.arn
self.tasks[task_arn].status = "RUNNING"
return task_execution.arn
raise InvalidRequestException("Invalid request.")
def _get_task_execution(self, task_execution_arn):
if task_execution_arn in self.task_executions:
return self.task_executions[task_execution_arn]
else:
raise InvalidRequestException
2019-11-02 19:34:35 +00:00
def cancel_task_execution(self, task_execution_arn):
if task_execution_arn in self.task_executions:
task_execution = self.task_executions[task_execution_arn]
task_execution.cancel()
task_arn = task_execution.task_arn
self.tasks[task_arn].current_task_execution_arn = None
self.tasks[task_arn].status = "AVAILABLE"
2019-11-02 19:34:35 +00:00
return
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_execution_arn)
)
2019-11-01 17:24:21 +00:00
datasync_backends = {}
2019-12-26 16:12:22 +00:00
for region in Session().get_available_regions("datasync"):
datasync_backends[region] = DataSyncBackend(region)
for region in Session().get_available_regions("datasync", partition_name="aws-us-gov"):
datasync_backends[region] = DataSyncBackend(region)
for region in Session().get_available_regions("datasync", partition_name="aws-cn"):
datasync_backends[region] = DataSyncBackend(region)