Merge pull request #2541 from baolsen/datasync-improvements

Datasync improvements #2526
This commit is contained in:
Mike Grima 2019-11-15 10:37:52 -08:00 committed by GitHub
commit b0b962f120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 53 deletions

View File

@ -35,9 +35,10 @@ install:
if [ "$TEST_SERVER_MODE" = "true" ]; then if [ "$TEST_SERVER_MODE" = "true" ]; then
python wait_for.py python wait_for.py
fi fi
before_script:
- if [[ $TRAVIS_PYTHON_VERSION == "3.7" ]]; then make lint; fi
script: script:
- make test-only - make test-only
- if [[ $TRAVIS_PYTHON_VERSION == "3.7" ]]; then make lint; fi
after_success: after_success:
- coveralls - coveralls
before_deploy: before_deploy:

View File

@ -27,12 +27,14 @@ class Task(BaseModel):
name, name,
region_name, region_name,
arn_counter=0, arn_counter=0,
metadata=None,
): ):
self.source_location_arn = source_location_arn self.source_location_arn = source_location_arn
self.destination_location_arn = destination_location_arn self.destination_location_arn = destination_location_arn
self.name = name
self.metadata = metadata
# For simplicity Tasks are either available or running # For simplicity Tasks are either available or running
self.status = "AVAILABLE" self.status = "AVAILABLE"
self.name = name
self.current_task_execution_arn = None self.current_task_execution_arn = None
# Generate ARN # Generate ARN
self.arn = "arn:aws:datasync:{0}:111222333444:task/task-{1}".format( self.arn = "arn:aws:datasync:{0}:111222333444:task/task-{1}".format(
@ -129,7 +131,27 @@ class DataSyncBackend(BaseBackend):
self.locations[location.arn] = location self.locations[location.arn] = location
return location.arn return location.arn
def create_task(self, source_location_arn, destination_location_arn, name): def _get_location(self, location_arn, typ):
if location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} is not found.".format(location_arn)
)
location = self.locations[location_arn]
if location.typ != typ:
raise InvalidRequestException(
"Invalid Location type: {0}".format(location.typ)
)
return location
def delete_location(self, location_arn):
if location_arn in self.locations:
del self.locations[location_arn]
else:
raise InvalidRequestException
def create_task(
self, source_location_arn, destination_location_arn, name, metadata=None
):
if source_location_arn not in self.locations: if source_location_arn not in self.locations:
raise InvalidRequestException( raise InvalidRequestException(
"Location {0} not found.".format(source_location_arn) "Location {0} not found.".format(source_location_arn)
@ -145,10 +167,33 @@ class DataSyncBackend(BaseBackend):
name, name,
region_name=self.region_name, region_name=self.region_name,
arn_counter=self.arn_counter, arn_counter=self.arn_counter,
metadata=metadata,
) )
self.tasks[task.arn] = task self.tasks[task.arn] = task
return task.arn return task.arn
def _get_task(self, task_arn):
if task_arn in self.tasks:
return self.tasks[task_arn]
else:
raise InvalidRequestException
def update_task(self, task_arn, name, metadata):
if task_arn in self.tasks:
task = self.tasks[task_arn]
task.name = name
task.metadata = metadata
else:
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_arn)
)
def delete_task(self, task_arn):
if task_arn in self.tasks:
del self.tasks[task_arn]
else:
raise InvalidRequestException
def start_task_execution(self, task_arn): def start_task_execution(self, task_arn):
self.arn_counter = self.arn_counter + 1 self.arn_counter = self.arn_counter + 1
if task_arn in self.tasks: if task_arn in self.tasks:
@ -161,12 +206,19 @@ class DataSyncBackend(BaseBackend):
return task_execution.arn return task_execution.arn
raise InvalidRequestException("Invalid request.") raise InvalidRequestException("Invalid request.")
def _get_task_execution(self, task_execution_arn):
if task_execution_arn in self.task_executions:
return self.task_executions[task_execution_arn]
else:
raise InvalidRequestException
def cancel_task_execution(self, task_execution_arn): def cancel_task_execution(self, task_execution_arn):
if task_execution_arn in self.task_executions: if task_execution_arn in self.task_executions:
task_execution = self.task_executions[task_execution_arn] task_execution = self.task_executions[task_execution_arn]
task_execution.cancel() task_execution.cancel()
task_arn = task_execution.task_arn task_arn = task_execution.task_arn
self.tasks[task_arn].current_task_execution_arn = None self.tasks[task_arn].current_task_execution_arn = None
self.tasks[task_arn].status = "AVAILABLE"
return return
raise InvalidRequestException( raise InvalidRequestException(
"Sync task {0} is not found.".format(task_execution_arn) "Sync task {0} is not found.".format(task_execution_arn)

View File

@ -2,7 +2,6 @@ import json
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .exceptions import InvalidRequestException
from .models import datasync_backends from .models import datasync_backends
@ -18,17 +17,7 @@ class DataSyncResponse(BaseResponse):
return json.dumps({"Locations": locations}) return json.dumps({"Locations": locations})
def _get_location(self, location_arn, typ): def _get_location(self, location_arn, typ):
location_arn = self._get_param("LocationArn") return self.datasync_backend._get_location(location_arn, typ)
if location_arn not in self.datasync_backend.locations:
raise InvalidRequestException(
"Location {0} is not found.".format(location_arn)
)
location = self.datasync_backend.locations[location_arn]
if location.typ != typ:
raise InvalidRequestException(
"Invalid Location type: {0}".format(location.typ)
)
return location
def create_location_s3(self): def create_location_s3(self):
# s3://bucket_name/folder/ # s3://bucket_name/folder/
@ -86,16 +75,40 @@ class DataSyncResponse(BaseResponse):
} }
) )
def delete_location(self):
location_arn = self._get_param("LocationArn")
self.datasync_backend.delete_location(location_arn)
return json.dumps({})
def create_task(self): def create_task(self):
destination_location_arn = self._get_param("DestinationLocationArn") destination_location_arn = self._get_param("DestinationLocationArn")
source_location_arn = self._get_param("SourceLocationArn") source_location_arn = self._get_param("SourceLocationArn")
name = self._get_param("Name") name = self._get_param("Name")
metadata = {
"CloudWatchLogGroupArn": self._get_param("CloudWatchLogGroupArn"),
"Options": self._get_param("Options"),
"Excludes": self._get_param("Excludes"),
"Tags": self._get_param("Tags"),
}
arn = self.datasync_backend.create_task( arn = self.datasync_backend.create_task(
source_location_arn, destination_location_arn, name source_location_arn, destination_location_arn, name, metadata=metadata
) )
return json.dumps({"TaskArn": arn}) return json.dumps({"TaskArn": arn})
def update_task(self):
task_arn = self._get_param("TaskArn")
self.datasync_backend.update_task(
task_arn,
name=self._get_param("Name"),
metadata={
"CloudWatchLogGroupArn": self._get_param("CloudWatchLogGroupArn"),
"Options": self._get_param("Options"),
"Excludes": self._get_param("Excludes"),
"Tags": self._get_param("Tags"),
},
)
return json.dumps({})
def list_tasks(self): def list_tasks(self):
tasks = list() tasks = list()
for arn, task in self.datasync_backend.tasks.items(): for arn, task in self.datasync_backend.tasks.items():
@ -104,29 +117,32 @@ class DataSyncResponse(BaseResponse):
) )
return json.dumps({"Tasks": tasks}) return json.dumps({"Tasks": tasks})
def delete_task(self):
task_arn = self._get_param("TaskArn")
self.datasync_backend.delete_task(task_arn)
return json.dumps({})
def describe_task(self): def describe_task(self):
task_arn = self._get_param("TaskArn") task_arn = self._get_param("TaskArn")
if task_arn in self.datasync_backend.tasks: task = self.datasync_backend._get_task(task_arn)
task = self.datasync_backend.tasks[task_arn] return json.dumps(
return json.dumps( {
{ "TaskArn": task.arn,
"TaskArn": task.arn, "Status": task.status,
"Name": task.name, "Name": task.name,
"CurrentTaskExecutionArn": task.current_task_execution_arn, "CurrentTaskExecutionArn": task.current_task_execution_arn,
"Status": task.status, "SourceLocationArn": task.source_location_arn,
"SourceLocationArn": task.source_location_arn, "DestinationLocationArn": task.destination_location_arn,
"DestinationLocationArn": task.destination_location_arn, "CloudWatchLogGroupArn": task.metadata["CloudWatchLogGroupArn"],
} "Options": task.metadata["Options"],
) "Excludes": task.metadata["Excludes"],
raise InvalidRequestException }
)
def start_task_execution(self): def start_task_execution(self):
task_arn = self._get_param("TaskArn") task_arn = self._get_param("TaskArn")
if task_arn in self.datasync_backend.tasks: arn = self.datasync_backend.start_task_execution(task_arn)
arn = self.datasync_backend.start_task_execution(task_arn) return json.dumps({"TaskExecutionArn": arn})
if arn:
return json.dumps({"TaskExecutionArn": arn})
raise InvalidRequestException("Invalid request.")
def cancel_task_execution(self): def cancel_task_execution(self):
task_execution_arn = self._get_param("TaskExecutionArn") task_execution_arn = self._get_param("TaskExecutionArn")
@ -135,21 +151,12 @@ class DataSyncResponse(BaseResponse):
def describe_task_execution(self): def describe_task_execution(self):
task_execution_arn = self._get_param("TaskExecutionArn") task_execution_arn = self._get_param("TaskExecutionArn")
task_execution = self.datasync_backend._get_task_execution(task_execution_arn)
if task_execution_arn in self.datasync_backend.task_executions: result = json.dumps(
task_execution = self.datasync_backend.task_executions[task_execution_arn] {"TaskExecutionArn": task_execution.arn, "Status": task_execution.status,}
if task_execution: )
result = json.dumps( if task_execution.status == "SUCCESS":
{ self.datasync_backend.tasks[task_execution.task_arn].status = "AVAILABLE"
"TaskExecutionArn": task_execution.arn, # Simulate task being executed
"Status": task_execution.status, task_execution.iterate_status()
} return result
)
if task_execution.status == "SUCCESS":
self.datasync_backend.tasks[
task_execution.task_arn
].status = "AVAILABLE"
# Simulate task being executed
task_execution.iterate_status()
return result
raise InvalidRequestException

View File

@ -127,6 +127,22 @@ def test_list_locations():
assert response["Locations"][2]["LocationUri"] == "s3://my_bucket/dir" assert response["Locations"][2]["LocationUri"] == "s3://my_bucket/dir"
@mock_datasync
def test_delete_location():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_smb=True)
response = client.list_locations()
assert len(response["Locations"]) == 1
location_arn = locations["smb_arn"]
response = client.delete_location(LocationArn=location_arn)
response = client.list_locations()
assert len(response["Locations"]) == 0
with assert_raises(ClientError) as e:
response = client.delete_location(LocationArn=location_arn)
@mock_datasync @mock_datasync
def test_create_task(): def test_create_task():
client = boto3.client("datasync", region_name="us-east-1") client = boto3.client("datasync", region_name="us-east-1")
@ -208,6 +224,72 @@ def test_describe_task_not_exist():
client.describe_task(TaskArn="abc") client.describe_task(TaskArn="abc")
@mock_datasync
def test_update_task():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
initial_name = "Initial_Name"
updated_name = "Updated_Name"
initial_options = {
"VerifyMode": "NONE",
"Atime": "BEST_EFFORT",
"Mtime": "PRESERVE",
}
updated_options = {
"VerifyMode": "POINT_IN_TIME_CONSISTENT",
"Atime": "BEST_EFFORT",
"Mtime": "PRESERVE",
}
response = client.create_task(
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name=initial_name,
Options=initial_options,
)
task_arn = response["TaskArn"]
response = client.describe_task(TaskArn=task_arn)
assert response["TaskArn"] == task_arn
assert response["Name"] == initial_name
assert response["Options"] == initial_options
response = client.update_task(
TaskArn=task_arn, Name=updated_name, Options=updated_options
)
response = client.describe_task(TaskArn=task_arn)
assert response["TaskArn"] == task_arn
assert response["Name"] == updated_name
assert response["Options"] == updated_options
with assert_raises(ClientError) as e:
client.update_task(TaskArn="doesnt_exist")
@mock_datasync
def test_delete_task():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.create_task(
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name="task_name",
)
response = client.list_tasks()
assert len(response["Tasks"]) == 1
task_arn = response["Tasks"][0]["TaskArn"]
assert task_arn is not None
response = client.delete_task(TaskArn=task_arn)
response = client.list_tasks()
assert len(response["Tasks"]) == 0
with assert_raises(ClientError) as e:
response = client.delete_task(TaskArn=task_arn)
@mock_datasync @mock_datasync
def test_start_task_execution(): def test_start_task_execution():
client = boto3.client("datasync", region_name="us-east-1") client = boto3.client("datasync", region_name="us-east-1")
@ -261,6 +343,8 @@ def test_describe_task_execution():
Name="task_name", Name="task_name",
) )
task_arn = response["TaskArn"] task_arn = response["TaskArn"]
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "AVAILABLE"
response = client.start_task_execution(TaskArn=task_arn) response = client.start_task_execution(TaskArn=task_arn)
task_execution_arn = response["TaskExecutionArn"] task_execution_arn = response["TaskExecutionArn"]
@ -270,26 +354,38 @@ def test_describe_task_execution():
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "INITIALIZING" assert response["Status"] == "INITIALIZING"
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "RUNNING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "PREPARING" assert response["Status"] == "PREPARING"
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "RUNNING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "TRANSFERRING" assert response["Status"] == "TRANSFERRING"
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "RUNNING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "VERIFYING" assert response["Status"] == "VERIFYING"
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "RUNNING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "SUCCESS" assert response["Status"] == "SUCCESS"
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "AVAILABLE"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "SUCCESS" assert response["Status"] == "SUCCESS"
response = client.describe_task(TaskArn=task_arn)
assert response["Status"] == "AVAILABLE"
@mock_datasync @mock_datasync
@ -317,11 +413,13 @@ def test_cancel_task_execution():
response = client.describe_task(TaskArn=task_arn) response = client.describe_task(TaskArn=task_arn)
assert response["CurrentTaskExecutionArn"] == task_execution_arn assert response["CurrentTaskExecutionArn"] == task_execution_arn
assert response["Status"] == "RUNNING"
response = client.cancel_task_execution(TaskExecutionArn=task_execution_arn) response = client.cancel_task_execution(TaskExecutionArn=task_execution_arn)
response = client.describe_task(TaskArn=task_arn) response = client.describe_task(TaskArn=task_arn)
assert "CurrentTaskExecutionArn" not in response assert "CurrentTaskExecutionArn" not in response
assert response["Status"] == "AVAILABLE"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn) response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["Status"] == "ERROR" assert response["Status"] == "ERROR"