Feature: EBS (#5106)
This commit is contained in:
parent
9a8c9f164a
commit
86d617e034
@ -1413,6 +1413,18 @@
|
||||
- [X] list_streams
|
||||
</details>
|
||||
|
||||
## ebs
|
||||
<details>
|
||||
<summary>100% implemented</summary>
|
||||
|
||||
- [X] complete_snapshot
|
||||
- [X] get_snapshot_block
|
||||
- [X] list_changed_blocks
|
||||
- [X] list_snapshot_blocks
|
||||
- [X] put_snapshot_block
|
||||
- [X] start_snapshot
|
||||
</details>
|
||||
|
||||
## ec2
|
||||
<details>
|
||||
<summary>35% implemented</summary>
|
||||
@ -5876,7 +5888,6 @@
|
||||
- dlm
|
||||
- docdb
|
||||
- drs
|
||||
- ebs
|
||||
- ecr-public
|
||||
- elastic-inference
|
||||
- evidently
|
||||
|
44
docs/docs/services/ebs.rst
Normal file
44
docs/docs/services/ebs.rst
Normal file
@ -0,0 +1,44 @@
|
||||
.. _implementedservice_ebs:
|
||||
|
||||
.. |start-h3| raw:: html
|
||||
|
||||
<h3>
|
||||
|
||||
.. |end-h3| raw:: html
|
||||
|
||||
</h3>
|
||||
|
||||
===
|
||||
ebs
|
||||
===
|
||||
|
||||
.. autoclass:: moto.ebs.models.EBSBackend
|
||||
|
||||
|start-h3| Example usage |end-h3|
|
||||
|
||||
.. sourcecode:: python
|
||||
|
||||
@mock_ebs
|
||||
def test_ebs_behaviour:
|
||||
boto3.client("ebs")
|
||||
...
|
||||
|
||||
|
||||
|
||||
|start-h3| Implemented features for this service |end-h3|
|
||||
|
||||
- [X] complete_snapshot
|
||||
- [X] get_snapshot_block
|
||||
|
||||
The BlockToken-parameter is not yet implemented
|
||||
|
||||
|
||||
- [X] list_changed_blocks
|
||||
|
||||
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
|
||||
|
||||
|
||||
- [X] list_snapshot_blocks
|
||||
- [X] put_snapshot_block
|
||||
- [X] start_snapshot
|
||||
|
@ -82,6 +82,7 @@ mock_dynamodbstreams = lazy_load(".dynamodbstreams", "mock_dynamodbstreams")
|
||||
mock_elasticbeanstalk = lazy_load(
|
||||
".elasticbeanstalk", "mock_elasticbeanstalk", backend="eb_backends"
|
||||
)
|
||||
mock_ebs = lazy_load(".ebs", "mock_ebs")
|
||||
mock_ec2 = lazy_load(".ec2", "mock_ec2")
|
||||
mock_ec2instanceconnect = lazy_load(".ec2instanceconnect", "mock_ec2instanceconnect")
|
||||
mock_ecr = lazy_load(".ecr", "mock_ecr")
|
||||
|
@ -36,6 +36,7 @@ backend_url_patterns = [
|
||||
"dynamodbstreams",
|
||||
re.compile("https?://streams\\.dynamodb\\.(.+)\\.amazonaws.com"),
|
||||
),
|
||||
("ebs", re.compile("https?://ebs\\.(.+)\\.amazonaws\\.com")),
|
||||
("ec2", re.compile("https?://ec2\\.(.+)\\.amazonaws\\.com(|\\.cn)")),
|
||||
(
|
||||
"ec2instanceconnect",
|
||||
|
5
moto/ebs/__init__.py
Normal file
5
moto/ebs/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
"""ebs module initialization; sets value for base decorator."""
|
||||
from .models import ebs_backends
|
||||
from ..core.models import base_decorator
|
||||
|
||||
mock_ebs = base_decorator(ebs_backends)
|
130
moto/ebs/models.py
Normal file
130
moto/ebs/models.py
Normal file
@ -0,0 +1,130 @@
|
||||
"""EBSBackend class with methods for supported APIs."""
|
||||
|
||||
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
|
||||
from moto.core.utils import BackendDict, unix_time
|
||||
from moto.ec2 import ec2_backends
|
||||
from moto.ec2.models.elastic_block_store import Snapshot
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
class Block(BaseModel):
|
||||
def __init__(self, block_data, checksum, checksum_algorithm, data_length):
|
||||
self.block_data = block_data
|
||||
self.checksum = checksum
|
||||
self.checksum_algorithm = checksum_algorithm
|
||||
self.data_length = data_length
|
||||
self.block_token = str(uuid4())
|
||||
|
||||
|
||||
class EBSSnapshot(BaseModel):
|
||||
def __init__(self, snapshot: Snapshot):
|
||||
self.snapshot_id = snapshot.id
|
||||
self.status = "pending"
|
||||
self.start_time = unix_time()
|
||||
self.volume_size = snapshot.volume.size
|
||||
self.block_size = 512
|
||||
self.tags = [
|
||||
{"Key": t["key"], "Value": t["value"]} for t in snapshot.get_tags()
|
||||
]
|
||||
self.description = snapshot.description
|
||||
|
||||
self.blocks = dict()
|
||||
|
||||
def put_block(
|
||||
self, block_idx, block_data, checksum, checksum_algorithm, data_length
|
||||
):
|
||||
block = Block(block_data, checksum, checksum_algorithm, data_length)
|
||||
self.blocks[block_idx] = block
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"SnapshotId": self.snapshot_id,
|
||||
"OwnerId": ACCOUNT_ID,
|
||||
"Status": self.status,
|
||||
"StartTime": self.start_time,
|
||||
"VolumeSize": self.volume_size,
|
||||
"BlockSize": self.block_size,
|
||||
"Tags": self.tags,
|
||||
"Description": self.description,
|
||||
}
|
||||
|
||||
|
||||
class EBSBackend(BaseBackend):
|
||||
"""Implementation of EBS APIs."""
|
||||
|
||||
def __init__(self, region_name=None):
|
||||
self.region_name = region_name
|
||||
self.snapshots = dict()
|
||||
|
||||
@property
|
||||
def ec2_backend(self):
|
||||
return ec2_backends[self.region_name]
|
||||
|
||||
def reset(self):
|
||||
"""Re-initialize all attributes for this instance."""
|
||||
region_name = self.region_name
|
||||
self.__dict__ = {}
|
||||
self.__init__(region_name)
|
||||
|
||||
def start_snapshot(self, volume_size, tags, description):
|
||||
zone_name = f"{self.region_name}a"
|
||||
vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)
|
||||
snapshot = self.ec2_backend.create_snapshot(
|
||||
volume_id=vol.id, description=description
|
||||
)
|
||||
if tags:
|
||||
tags = {tag["Key"]: tag["Value"] for tag in tags}
|
||||
snapshot.add_tags(tags)
|
||||
ebs_snapshot = EBSSnapshot(snapshot=snapshot)
|
||||
self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot
|
||||
return ebs_snapshot
|
||||
|
||||
def complete_snapshot(self, snapshot_id):
|
||||
self.snapshots[snapshot_id].status = "completed"
|
||||
return {"Status": "completed"}
|
||||
|
||||
def put_snapshot_block(
|
||||
self,
|
||||
snapshot_id,
|
||||
block_index,
|
||||
block_data,
|
||||
checksum,
|
||||
checksum_algorithm,
|
||||
data_length,
|
||||
):
|
||||
snapshot = self.snapshots[snapshot_id]
|
||||
snapshot.put_block(
|
||||
block_index, block_data, checksum, checksum_algorithm, data_length
|
||||
)
|
||||
return checksum, checksum_algorithm
|
||||
|
||||
def get_snapshot_block(self, snapshot_id, block_index):
|
||||
"""
|
||||
The BlockToken-parameter is not yet implemented
|
||||
"""
|
||||
snapshot = self.snapshots[snapshot_id]
|
||||
return snapshot.blocks[block_index]
|
||||
|
||||
def list_changed_blocks(self, first_snapshot_id, second_snapshot_id):
|
||||
"""
|
||||
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
|
||||
"""
|
||||
snapshot1 = self.snapshots[first_snapshot_id]
|
||||
snapshot2 = self.snapshots[second_snapshot_id]
|
||||
changed_blocks = dict() # {idx: (token1, token2), ..}
|
||||
for idx in snapshot1.blocks:
|
||||
block1 = snapshot1.blocks[idx]
|
||||
if idx in snapshot2.blocks:
|
||||
block2 = snapshot2.blocks[idx]
|
||||
if block1.block_data != block2.block_data:
|
||||
changed_blocks[idx] = (block1.block_token, block2.block_token)
|
||||
else:
|
||||
changed_blocks[idx] = (block1.block_token, None)
|
||||
|
||||
return changed_blocks, snapshot1
|
||||
|
||||
def list_snapshot_blocks(self, snapshot_id):
|
||||
return self.snapshots[snapshot_id]
|
||||
|
||||
|
||||
ebs_backends = BackendDict(EBSBackend, "ebs")
|
146
moto/ebs/responses.py
Normal file
146
moto/ebs/responses.py
Normal file
@ -0,0 +1,146 @@
|
||||
"""Handles incoming ebs requests, invokes methods, returns responses."""
|
||||
import json
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import ebs_backends
|
||||
|
||||
|
||||
class EBSResponse(BaseResponse):
|
||||
"""Handler for EBS requests and responses."""
|
||||
|
||||
@property
|
||||
def ebs_backend(self):
|
||||
"""Return backend instance specific for this region."""
|
||||
return ebs_backends[self.region]
|
||||
|
||||
def snapshots(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "POST":
|
||||
return self.start_snapshot()
|
||||
|
||||
def snapshot_block(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "PUT":
|
||||
return self.put_snapshot_block(full_url, headers)
|
||||
if request.method == "GET":
|
||||
return self.get_snapshot_block()
|
||||
|
||||
def snapshot_blocks(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "GET":
|
||||
return self.list_snapshot_blocks()
|
||||
|
||||
def start_snapshot(self):
|
||||
"""
|
||||
The following parameters are not yet implemented: ParentSnapshotId, ClientToken, Encrypted, KmsKeyArn, Timeout
|
||||
"""
|
||||
params = json.loads(self.body)
|
||||
volume_size = params.get("VolumeSize")
|
||||
tags = params.get("Tags")
|
||||
description = params.get("Description")
|
||||
snapshot = self.ebs_backend.start_snapshot(
|
||||
volume_size=volume_size,
|
||||
tags=tags,
|
||||
description=description,
|
||||
)
|
||||
return 200, {}, json.dumps(snapshot.to_json())
|
||||
|
||||
def complete_snapshot(self, request, full_url, headers):
|
||||
"""
|
||||
The following parameters are not yet supported: ChangedBlocksCount, Checksum, ChecksumAlgorithm, ChecksumAggregationMethod
|
||||
"""
|
||||
self.setup_class(request, full_url, headers)
|
||||
snapshot_id = full_url.split("/")[-1]
|
||||
status = self.ebs_backend.complete_snapshot(snapshot_id=snapshot_id)
|
||||
return 200, {}, json.dumps(status)
|
||||
|
||||
def put_snapshot_block(self, full_url, headers):
|
||||
"""
|
||||
The following parameters are currently not taken into account: DataLength, Progress.
|
||||
The Checksum and ChecksumAlgorithm are taken at face-value, but no validation takes place.
|
||||
"""
|
||||
snapshot_id = full_url.split("/")[-3]
|
||||
block_index = full_url.split("/")[-1]
|
||||
block_data = self.body
|
||||
headers = {k.lower(): v for k, v in headers.items()}
|
||||
checksum = headers.get("x-amz-checksum")
|
||||
checksum_algorithm = headers.get("x-amz-checksum-algorithm")
|
||||
data_length = headers.get("x-amz-data-length")
|
||||
checksum, checksum_algorithm = self.ebs_backend.put_snapshot_block(
|
||||
snapshot_id=snapshot_id,
|
||||
block_index=block_index,
|
||||
block_data=block_data,
|
||||
checksum=checksum,
|
||||
checksum_algorithm=checksum_algorithm,
|
||||
data_length=data_length,
|
||||
)
|
||||
return (
|
||||
200,
|
||||
{
|
||||
"x-amz-Checksum": checksum,
|
||||
"x-amz-Checksum-Algorithm": checksum_algorithm,
|
||||
},
|
||||
"{}",
|
||||
)
|
||||
|
||||
def get_snapshot_block(self):
|
||||
snapshot_id = self.path.split("/")[-3]
|
||||
block_index = self.path.split("/")[-1]
|
||||
block = self.ebs_backend.get_snapshot_block(
|
||||
snapshot_id=snapshot_id,
|
||||
block_index=block_index,
|
||||
)
|
||||
headers = {
|
||||
"x-amz-Checksum": block.checksum,
|
||||
"x-amz-Checksum-Algorithm": block.checksum_algorithm,
|
||||
"x-amz-Data-Length": block.data_length,
|
||||
}
|
||||
return 200, headers, block.block_data
|
||||
|
||||
def snapshot_changed_blocks(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
first_snapshot_id = self._get_params().get("firstSnapshotId")
|
||||
second_snapshot_id = self.path.split("/")[-2]
|
||||
changed_blocks, snapshot = self.ebs_backend.list_changed_blocks(
|
||||
first_snapshot_id=first_snapshot_id,
|
||||
second_snapshot_id=second_snapshot_id,
|
||||
)
|
||||
blocks = [
|
||||
{"BlockIndex": idx, "FirstBlockToken": x, "SecondBlockToken": y}
|
||||
for idx, (x, y) in changed_blocks.items()
|
||||
]
|
||||
return (
|
||||
200,
|
||||
{},
|
||||
json.dumps(
|
||||
dict(
|
||||
ChangedBlocks=blocks,
|
||||
VolumeSize=snapshot.volume_size,
|
||||
BlockSize=snapshot.block_size,
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
def list_snapshot_blocks(self):
|
||||
"""
|
||||
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
|
||||
"""
|
||||
snapshot_id = self.path.split("/")[-2]
|
||||
snapshot = self.ebs_backend.list_snapshot_blocks(
|
||||
snapshot_id=snapshot_id,
|
||||
)
|
||||
blocks = [
|
||||
{"BlockIndex": idx, "BlockToken": b.block_token}
|
||||
for idx, b in snapshot.blocks.items()
|
||||
]
|
||||
return (
|
||||
200,
|
||||
{},
|
||||
json.dumps(
|
||||
dict(
|
||||
Blocks=blocks,
|
||||
VolumeSize=snapshot.volume_size,
|
||||
BlockSize=snapshot.block_size,
|
||||
)
|
||||
),
|
||||
)
|
16
moto/ebs/urls.py
Normal file
16
moto/ebs/urls.py
Normal file
@ -0,0 +1,16 @@
|
||||
"""ebs base URL and path."""
|
||||
from .responses import EBSResponse
|
||||
|
||||
url_bases = [r"https?://ebs\.(.+)\.amazonaws\.com"]
|
||||
|
||||
|
||||
response = EBSResponse()
|
||||
|
||||
|
||||
url_paths = {
|
||||
"{0}/snapshots$": response.snapshots,
|
||||
"{0}/snapshots/completion/(?P<snapshot_id>[^/]+)$": response.complete_snapshot,
|
||||
"{0}/snapshots/(?P<snapshot_id>[^/]+)/changedblocks$": response.snapshot_changed_blocks,
|
||||
"{0}/snapshots/(?P<snapshot_id>[^/]+)/blocks$": response.snapshot_blocks,
|
||||
"{0}/snapshots/(?P<snapshot_id>[^/]+)/blocks/(?P<block_idx>[^/]+)$": response.snapshot_block,
|
||||
}
|
2
setup.py
2
setup.py
@ -108,6 +108,8 @@ extras_per_service.update(
|
||||
extras_per_service["dynamodb"] = extras_per_service["awslambda"]
|
||||
extras_per_service["dynamodb2"] = extras_per_service["dynamodb"]
|
||||
extras_per_service["dynamodbstreams"] = extras_per_service["awslambda"]
|
||||
# EBS depends on EC2 to create snapshots
|
||||
extras_per_service["ebs"] = extras_per_service["ec2"]
|
||||
# EFS depends on EC2 to find subnets etc
|
||||
extras_per_service["efs"] = extras_per_service["ec2"]
|
||||
# DirectoryService needs EC2 to verify VPCs and subnets.
|
||||
|
0
tests/test_ebs/__init__.py
Normal file
0
tests/test_ebs/__init__.py
Normal file
170
tests/test_ebs/test_ebs.py
Normal file
170
tests/test_ebs/test_ebs.py
Normal file
@ -0,0 +1,170 @@
|
||||
"""Unit tests for ebs-supported APIs."""
|
||||
import boto3
|
||||
import hashlib
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from moto import mock_ebs, mock_ec2
|
||||
from moto.core import ACCOUNT_ID
|
||||
|
||||
# See our Development Tips on writing tests for hints on how to write good tests:
|
||||
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_start_snapshot__minimal():
|
||||
client = boto3.client("ebs", region_name="eu-west-1")
|
||||
resp = client.start_snapshot(VolumeSize=720)
|
||||
|
||||
resp.should.have.key("SnapshotId")
|
||||
resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
|
||||
resp.should.have.key("Status").equals("pending")
|
||||
resp.should.have.key("StartTime")
|
||||
resp.should.have.key("VolumeSize").equals(720)
|
||||
resp.should.have.key("BlockSize").equals(512)
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_start_snapshot():
|
||||
client = boto3.client("ebs", region_name="eu-west-1")
|
||||
resp = client.start_snapshot(
|
||||
VolumeSize=120,
|
||||
Tags=[{"Key": "kt", "Value": "vt"}],
|
||||
Description="my fancy snapshot",
|
||||
)
|
||||
|
||||
resp.should.have.key("SnapshotId")
|
||||
resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
|
||||
resp.should.have.key("Status").equals("pending")
|
||||
resp.should.have.key("StartTime")
|
||||
resp.should.have.key("VolumeSize").equals(120)
|
||||
resp.should.have.key("BlockSize").equals(512)
|
||||
resp.should.have.key("Tags").equals([{"Key": "kt", "Value": "vt"}])
|
||||
resp.should.have.key("Description").equals("my fancy snapshot")
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_complete_snapshot():
|
||||
client = boto3.client("ebs", region_name="ap-southeast-1")
|
||||
snapshot_id = client.start_snapshot(VolumeSize=720)["SnapshotId"]
|
||||
|
||||
resp = client.complete_snapshot(SnapshotId=snapshot_id, ChangedBlocksCount=0)
|
||||
resp.should.have.key("Status").equals("completed")
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_put_snapshot_block():
|
||||
data = b"data for this specific block"
|
||||
checksum = hashlib.sha256(data).hexdigest()
|
||||
client = boto3.client("ebs", region_name="eu-west-1")
|
||||
snapshot_id = client.start_snapshot(VolumeSize=720)["SnapshotId"]
|
||||
resp = client.put_snapshot_block(
|
||||
SnapshotId=snapshot_id,
|
||||
BlockIndex=5,
|
||||
BlockData=data,
|
||||
DataLength=524288,
|
||||
Checksum=checksum,
|
||||
ChecksumAlgorithm="SHA256",
|
||||
)
|
||||
|
||||
resp.should.have.key("Checksum").equals(checksum)
|
||||
resp.should.have.key("ChecksumAlgorithm").equals("SHA256")
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_get_snapshot_block():
|
||||
client = boto3.client("ebs", region_name="eu-west-1")
|
||||
snapshot_id = client.start_snapshot(VolumeSize=720)["SnapshotId"]
|
||||
for idx, data in [(1, b"data 1"), (2, b"data 2"), (3, b"data 3")]:
|
||||
checksum = hashlib.sha256(data).hexdigest()
|
||||
client.put_snapshot_block(
|
||||
SnapshotId=snapshot_id,
|
||||
BlockIndex=idx,
|
||||
BlockData=data,
|
||||
DataLength=524288,
|
||||
Checksum=checksum,
|
||||
ChecksumAlgorithm="SHA256",
|
||||
)
|
||||
|
||||
resp = client.get_snapshot_block(
|
||||
SnapshotId=snapshot_id, BlockIndex=2, BlockToken="n/a"
|
||||
)
|
||||
|
||||
resp.should.have.key("DataLength").equals(524288)
|
||||
resp.should.have.key("BlockData")
|
||||
resp["BlockData"].read().should.equal(b"data 2")
|
||||
resp.should.have.key("Checksum")
|
||||
resp.should.have.key("ChecksumAlgorithm").equals("SHA256")
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_list_changed_blocks():
|
||||
client = boto3.client("ebs", region_name="ap-southeast-1")
|
||||
snapshot_id1 = client.start_snapshot(VolumeSize=415)["SnapshotId"]
|
||||
snapshot_id2 = client.start_snapshot(VolumeSize=415)["SnapshotId"]
|
||||
for idx, data in [(1, b"data 1"), (2, b"data 2"), (3, b"data 3")]:
|
||||
checksum = hashlib.sha256(data).hexdigest()
|
||||
client.put_snapshot_block(
|
||||
SnapshotId=snapshot_id1,
|
||||
BlockIndex=idx,
|
||||
BlockData=data,
|
||||
DataLength=524288,
|
||||
Checksum=checksum,
|
||||
ChecksumAlgorithm="SHA256",
|
||||
)
|
||||
for idx, data in [(1, b"data 1.1"), (2, b"data 2"), (4, b"data 3.1")]:
|
||||
checksum = hashlib.sha256(data).hexdigest()
|
||||
client.put_snapshot_block(
|
||||
SnapshotId=snapshot_id2,
|
||||
BlockIndex=idx,
|
||||
BlockData=data,
|
||||
DataLength=524288,
|
||||
Checksum=checksum,
|
||||
ChecksumAlgorithm="SHA256",
|
||||
)
|
||||
resp = client.list_changed_blocks(
|
||||
FirstSnapshotId=snapshot_id1, SecondSnapshotId=snapshot_id2
|
||||
)
|
||||
changed_blocks = resp["ChangedBlocks"]
|
||||
changed_idxes = [b["BlockIndex"] for b in changed_blocks]
|
||||
changed_idxes.should.equal([1, 3])
|
||||
|
||||
changed_blocks[0].should.have.key("FirstBlockToken")
|
||||
changed_blocks[0].should.have.key("SecondBlockToken")
|
||||
|
||||
changed_blocks[1].should.have.key("FirstBlockToken")
|
||||
changed_blocks[1].shouldnt.have.key("SecondBlockToken")
|
||||
|
||||
|
||||
@mock_ebs
|
||||
def test_list_snapshot_blocks():
|
||||
client = boto3.client("ebs", region_name="ap-southeast-1")
|
||||
snapshot_id = client.start_snapshot(VolumeSize=415)["SnapshotId"]
|
||||
for idx, data in [(1, b"data 1"), (2, b"data 2"), (3, b"data 3")]:
|
||||
checksum = hashlib.sha256(data).hexdigest()
|
||||
client.put_snapshot_block(
|
||||
SnapshotId=snapshot_id,
|
||||
BlockIndex=idx,
|
||||
BlockData=data,
|
||||
DataLength=524288,
|
||||
Checksum=checksum,
|
||||
ChecksumAlgorithm="SHA256",
|
||||
)
|
||||
|
||||
resp = client.list_snapshot_blocks(SnapshotId=snapshot_id)
|
||||
|
||||
resp.should.have.key("VolumeSize").equals(415)
|
||||
resp.should.have.key("BlockSize").equals(512)
|
||||
resp.should.have.key("Blocks").length_of(3)
|
||||
|
||||
[b["BlockIndex"] for b in resp["Blocks"]].should.equal([1, 2, 3])
|
||||
|
||||
|
||||
@mock_ebs
|
||||
@mock_ec2
|
||||
def test_start_snapshot__should_be_created_in_ec2():
|
||||
ebs = boto3.client("ebs", region_name="eu-north-1")
|
||||
ec2 = boto3.client("ec2", region_name="eu-north-1")
|
||||
snapshot_id = ebs.start_snapshot(VolumeSize=720)["SnapshotId"]
|
||||
resp = ec2.describe_snapshots(SnapshotIds=[snapshot_id])["Snapshots"]
|
||||
resp.should.have.length_of(1)
|
||||
|
||||
resp[0].should.have.key("VolumeSize").equals(720)
|
Loading…
Reference in New Issue
Block a user