diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md
index 20a7e7552..699c85d59 100644
--- a/IMPLEMENTATION_COVERAGE.md
+++ b/IMPLEMENTATION_COVERAGE.md
@@ -1413,6 +1413,18 @@
- [X] list_streams
+## ebs
+
+100% implemented
+
+- [X] complete_snapshot
+- [X] get_snapshot_block
+- [X] list_changed_blocks
+- [X] list_snapshot_blocks
+- [X] put_snapshot_block
+- [X] start_snapshot
+
+
## ec2
35% implemented
@@ -5876,7 +5888,6 @@
- dlm
- docdb
- drs
-- ebs
- ecr-public
- elastic-inference
- evidently
diff --git a/docs/docs/services/ebs.rst b/docs/docs/services/ebs.rst
new file mode 100644
index 000000000..74b9a053b
--- /dev/null
+++ b/docs/docs/services/ebs.rst
@@ -0,0 +1,44 @@
+.. _implementedservice_ebs:
+
+.. |start-h3| raw:: html
+
+
+
+.. |end-h3| raw:: html
+
+
+
+===
+ebs
+===
+
+.. autoclass:: moto.ebs.models.EBSBackend
+
+|start-h3| Example usage |end-h3|
+
+.. sourcecode:: python
+
+ @mock_ebs
+ def test_ebs_behaviour:
+ boto3.client("ebs")
+ ...
+
+
+
+|start-h3| Implemented features for this service |end-h3|
+
+- [X] complete_snapshot
+- [X] get_snapshot_block
+
+ The BlockToken-parameter is not yet implemented
+
+
+- [X] list_changed_blocks
+
+ The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
+
+
+- [X] list_snapshot_blocks
+- [X] put_snapshot_block
+- [X] start_snapshot
+
diff --git a/moto/__init__.py b/moto/__init__.py
index b2fdddcd9..c162abd9a 100644
--- a/moto/__init__.py
+++ b/moto/__init__.py
@@ -82,6 +82,7 @@ mock_dynamodbstreams = lazy_load(".dynamodbstreams", "mock_dynamodbstreams")
mock_elasticbeanstalk = lazy_load(
".elasticbeanstalk", "mock_elasticbeanstalk", backend="eb_backends"
)
+mock_ebs = lazy_load(".ebs", "mock_ebs")
mock_ec2 = lazy_load(".ec2", "mock_ec2")
mock_ec2instanceconnect = lazy_load(".ec2instanceconnect", "mock_ec2instanceconnect")
mock_ecr = lazy_load(".ecr", "mock_ecr")
diff --git a/moto/backend_index.py b/moto/backend_index.py
index face99f60..d6348d149 100644
--- a/moto/backend_index.py
+++ b/moto/backend_index.py
@@ -36,6 +36,7 @@ backend_url_patterns = [
"dynamodbstreams",
re.compile("https?://streams\\.dynamodb\\.(.+)\\.amazonaws.com"),
),
+ ("ebs", re.compile("https?://ebs\\.(.+)\\.amazonaws\\.com")),
("ec2", re.compile("https?://ec2\\.(.+)\\.amazonaws\\.com(|\\.cn)")),
(
"ec2instanceconnect",
diff --git a/moto/ebs/__init__.py b/moto/ebs/__init__.py
new file mode 100644
index 000000000..8254b63e7
--- /dev/null
+++ b/moto/ebs/__init__.py
@@ -0,0 +1,5 @@
+"""ebs module initialization; sets value for base decorator."""
+from .models import ebs_backends
+from ..core.models import base_decorator
+
+mock_ebs = base_decorator(ebs_backends)
diff --git a/moto/ebs/models.py b/moto/ebs/models.py
new file mode 100644
index 000000000..a1ba32e3f
--- /dev/null
+++ b/moto/ebs/models.py
@@ -0,0 +1,130 @@
+"""EBSBackend class with methods for supported APIs."""
+
+from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
+from moto.core.utils import BackendDict, unix_time
+from moto.ec2 import ec2_backends
+from moto.ec2.models.elastic_block_store import Snapshot
+from uuid import uuid4
+
+
+class Block(BaseModel):
+ def __init__(self, block_data, checksum, checksum_algorithm, data_length):
+ self.block_data = block_data
+ self.checksum = checksum
+ self.checksum_algorithm = checksum_algorithm
+ self.data_length = data_length
+ self.block_token = str(uuid4())
+
+
+class EBSSnapshot(BaseModel):
+ def __init__(self, snapshot: Snapshot):
+ self.snapshot_id = snapshot.id
+ self.status = "pending"
+ self.start_time = unix_time()
+ self.volume_size = snapshot.volume.size
+ self.block_size = 512
+ self.tags = [
+ {"Key": t["key"], "Value": t["value"]} for t in snapshot.get_tags()
+ ]
+ self.description = snapshot.description
+
+ self.blocks = dict()
+
+ def put_block(
+ self, block_idx, block_data, checksum, checksum_algorithm, data_length
+ ):
+ block = Block(block_data, checksum, checksum_algorithm, data_length)
+ self.blocks[block_idx] = block
+
+ def to_json(self):
+ return {
+ "SnapshotId": self.snapshot_id,
+ "OwnerId": ACCOUNT_ID,
+ "Status": self.status,
+ "StartTime": self.start_time,
+ "VolumeSize": self.volume_size,
+ "BlockSize": self.block_size,
+ "Tags": self.tags,
+ "Description": self.description,
+ }
+
+
+class EBSBackend(BaseBackend):
+ """Implementation of EBS APIs."""
+
+ def __init__(self, region_name=None):
+ self.region_name = region_name
+ self.snapshots = dict()
+
+ @property
+ def ec2_backend(self):
+ return ec2_backends[self.region_name]
+
+ def reset(self):
+ """Re-initialize all attributes for this instance."""
+ region_name = self.region_name
+ self.__dict__ = {}
+ self.__init__(region_name)
+
+ def start_snapshot(self, volume_size, tags, description):
+ zone_name = f"{self.region_name}a"
+ vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)
+ snapshot = self.ec2_backend.create_snapshot(
+ volume_id=vol.id, description=description
+ )
+ if tags:
+ tags = {tag["Key"]: tag["Value"] for tag in tags}
+ snapshot.add_tags(tags)
+ ebs_snapshot = EBSSnapshot(snapshot=snapshot)
+ self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot
+ return ebs_snapshot
+
+ def complete_snapshot(self, snapshot_id):
+ self.snapshots[snapshot_id].status = "completed"
+ return {"Status": "completed"}
+
+ def put_snapshot_block(
+ self,
+ snapshot_id,
+ block_index,
+ block_data,
+ checksum,
+ checksum_algorithm,
+ data_length,
+ ):
+ snapshot = self.snapshots[snapshot_id]
+ snapshot.put_block(
+ block_index, block_data, checksum, checksum_algorithm, data_length
+ )
+ return checksum, checksum_algorithm
+
+ def get_snapshot_block(self, snapshot_id, block_index):
+ """
+ The BlockToken-parameter is not yet implemented
+ """
+ snapshot = self.snapshots[snapshot_id]
+ return snapshot.blocks[block_index]
+
+ def list_changed_blocks(self, first_snapshot_id, second_snapshot_id):
+ """
+ The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
+ """
+ snapshot1 = self.snapshots[first_snapshot_id]
+ snapshot2 = self.snapshots[second_snapshot_id]
+ changed_blocks = dict() # {idx: (token1, token2), ..}
+ for idx in snapshot1.blocks:
+ block1 = snapshot1.blocks[idx]
+ if idx in snapshot2.blocks:
+ block2 = snapshot2.blocks[idx]
+ if block1.block_data != block2.block_data:
+ changed_blocks[idx] = (block1.block_token, block2.block_token)
+ else:
+ changed_blocks[idx] = (block1.block_token, None)
+
+ return changed_blocks, snapshot1
+
+ def list_snapshot_blocks(self, snapshot_id):
+ return self.snapshots[snapshot_id]
+
+
+ebs_backends = BackendDict(EBSBackend, "ebs")
diff --git a/moto/ebs/responses.py b/moto/ebs/responses.py
new file mode 100644
index 000000000..5b92ff2f5
--- /dev/null
+++ b/moto/ebs/responses.py
@@ -0,0 +1,146 @@
+"""Handles incoming ebs requests, invokes methods, returns responses."""
+import json
+
+from moto.core.responses import BaseResponse
+from .models import ebs_backends
+
+
+class EBSResponse(BaseResponse):
+ """Handler for EBS requests and responses."""
+
+ @property
+ def ebs_backend(self):
+ """Return backend instance specific for this region."""
+ return ebs_backends[self.region]
+
+ def snapshots(self, request, full_url, headers):
+ self.setup_class(request, full_url, headers)
+ if request.method == "POST":
+ return self.start_snapshot()
+
+ def snapshot_block(self, request, full_url, headers):
+ self.setup_class(request, full_url, headers)
+ if request.method == "PUT":
+ return self.put_snapshot_block(full_url, headers)
+ if request.method == "GET":
+ return self.get_snapshot_block()
+
+ def snapshot_blocks(self, request, full_url, headers):
+ self.setup_class(request, full_url, headers)
+ if request.method == "GET":
+ return self.list_snapshot_blocks()
+
+ def start_snapshot(self):
+ """
+ The following parameters are not yet implemented: ParentSnapshotId, ClientToken, Encrypted, KmsKeyArn, Timeout
+ """
+ params = json.loads(self.body)
+ volume_size = params.get("VolumeSize")
+ tags = params.get("Tags")
+ description = params.get("Description")
+ snapshot = self.ebs_backend.start_snapshot(
+ volume_size=volume_size,
+ tags=tags,
+ description=description,
+ )
+ return 200, {}, json.dumps(snapshot.to_json())
+
+ def complete_snapshot(self, request, full_url, headers):
+ """
+ The following parameters are not yet supported: ChangedBlocksCount, Checksum, ChecksumAlgorithm, ChecksumAggregationMethod
+ """
+ self.setup_class(request, full_url, headers)
+ snapshot_id = full_url.split("/")[-1]
+ status = self.ebs_backend.complete_snapshot(snapshot_id=snapshot_id)
+ return 200, {}, json.dumps(status)
+
+ def put_snapshot_block(self, full_url, headers):
+ """
+ The following parameters are currently not taken into account: DataLength, Progress.
+ The Checksum and ChecksumAlgorithm are taken at face-value, but no validation takes place.
+ """
+ snapshot_id = full_url.split("/")[-3]
+ block_index = full_url.split("/")[-1]
+ block_data = self.body
+ headers = {k.lower(): v for k, v in headers.items()}
+ checksum = headers.get("x-amz-checksum")
+ checksum_algorithm = headers.get("x-amz-checksum-algorithm")
+ data_length = headers.get("x-amz-data-length")
+ checksum, checksum_algorithm = self.ebs_backend.put_snapshot_block(
+ snapshot_id=snapshot_id,
+ block_index=block_index,
+ block_data=block_data,
+ checksum=checksum,
+ checksum_algorithm=checksum_algorithm,
+ data_length=data_length,
+ )
+ return (
+ 200,
+ {
+ "x-amz-Checksum": checksum,
+ "x-amz-Checksum-Algorithm": checksum_algorithm,
+ },
+ "{}",
+ )
+
+ def get_snapshot_block(self):
+ snapshot_id = self.path.split("/")[-3]
+ block_index = self.path.split("/")[-1]
+ block = self.ebs_backend.get_snapshot_block(
+ snapshot_id=snapshot_id,
+ block_index=block_index,
+ )
+ headers = {
+ "x-amz-Checksum": block.checksum,
+ "x-amz-Checksum-Algorithm": block.checksum_algorithm,
+ "x-amz-Data-Length": block.data_length,
+ }
+ return 200, headers, block.block_data
+
+ def snapshot_changed_blocks(self, request, full_url, headers):
+ self.setup_class(request, full_url, headers)
+ first_snapshot_id = self._get_params().get("firstSnapshotId")
+ second_snapshot_id = self.path.split("/")[-2]
+ changed_blocks, snapshot = self.ebs_backend.list_changed_blocks(
+ first_snapshot_id=first_snapshot_id,
+ second_snapshot_id=second_snapshot_id,
+ )
+ blocks = [
+ {"BlockIndex": idx, "FirstBlockToken": x, "SecondBlockToken": y}
+ for idx, (x, y) in changed_blocks.items()
+ ]
+ return (
+ 200,
+ {},
+ json.dumps(
+ dict(
+ ChangedBlocks=blocks,
+ VolumeSize=snapshot.volume_size,
+ BlockSize=snapshot.block_size,
+ )
+ ),
+ )
+
+ def list_snapshot_blocks(self):
+ """
+ The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
+ """
+ snapshot_id = self.path.split("/")[-2]
+ snapshot = self.ebs_backend.list_snapshot_blocks(
+ snapshot_id=snapshot_id,
+ )
+ blocks = [
+ {"BlockIndex": idx, "BlockToken": b.block_token}
+ for idx, b in snapshot.blocks.items()
+ ]
+ return (
+ 200,
+ {},
+ json.dumps(
+ dict(
+ Blocks=blocks,
+ VolumeSize=snapshot.volume_size,
+ BlockSize=snapshot.block_size,
+ )
+ ),
+ )
diff --git a/moto/ebs/urls.py b/moto/ebs/urls.py
new file mode 100644
index 000000000..d2b59e511
--- /dev/null
+++ b/moto/ebs/urls.py
@@ -0,0 +1,16 @@
+"""ebs base URL and path."""
+from .responses import EBSResponse
+
+url_bases = [r"https?://ebs\.(.+)\.amazonaws\.com"]
+
+
+response = EBSResponse()
+
+
+url_paths = {
+ "{0}/snapshots$": response.snapshots,
+ "{0}/snapshots/completion/(?P[^/]+)$": response.complete_snapshot,
+ "{0}/snapshots/(?P[^/]+)/changedblocks$": response.snapshot_changed_blocks,
+ "{0}/snapshots/(?P[^/]+)/blocks$": response.snapshot_blocks,
+ "{0}/snapshots/(?P[^/]+)/blocks/(?P[^/]+)$": response.snapshot_block,
+}
diff --git a/setup.py b/setup.py
index 860747c0b..c75765e48 100755
--- a/setup.py
+++ b/setup.py
@@ -108,6 +108,8 @@ extras_per_service.update(
extras_per_service["dynamodb"] = extras_per_service["awslambda"]
extras_per_service["dynamodb2"] = extras_per_service["dynamodb"]
extras_per_service["dynamodbstreams"] = extras_per_service["awslambda"]
+# EBS depends on EC2 to create snapshots
+extras_per_service["ebs"] = extras_per_service["ec2"]
# EFS depends on EC2 to find subnets etc
extras_per_service["efs"] = extras_per_service["ec2"]
# DirectoryService needs EC2 to verify VPCs and subnets.
diff --git a/tests/test_ebs/__init__.py b/tests/test_ebs/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/test_ebs/test_ebs.py b/tests/test_ebs/test_ebs.py
new file mode 100644
index 000000000..8e288834e
--- /dev/null
+++ b/tests/test_ebs/test_ebs.py
@@ -0,0 +1,170 @@
+"""Unit tests for ebs-supported APIs."""
+import boto3
+import hashlib
+import sure # noqa # pylint: disable=unused-import
+from moto import mock_ebs, mock_ec2
+from moto.core import ACCOUNT_ID
+
+# See our Development Tips on writing tests for hints on how to write good tests:
+# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
+
+
+@mock_ebs
+def test_start_snapshot__minimal():
+ client = boto3.client("ebs", region_name="eu-west-1")
+ resp = client.start_snapshot(VolumeSize=720)
+
+ resp.should.have.key("SnapshotId")
+ resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
+ resp.should.have.key("Status").equals("pending")
+ resp.should.have.key("StartTime")
+ resp.should.have.key("VolumeSize").equals(720)
+ resp.should.have.key("BlockSize").equals(512)
+
+
+@mock_ebs
+def test_start_snapshot():
+ client = boto3.client("ebs", region_name="eu-west-1")
+ resp = client.start_snapshot(
+ VolumeSize=120,
+ Tags=[{"Key": "kt", "Value": "vt"}],
+ Description="my fancy snapshot",
+ )
+
+ resp.should.have.key("SnapshotId")
+ resp.should.have.key("OwnerId").equals(ACCOUNT_ID)
+ resp.should.have.key("Status").equals("pending")
+ resp.should.have.key("StartTime")
+ resp.should.have.key("VolumeSize").equals(120)
+ resp.should.have.key("BlockSize").equals(512)
+ resp.should.have.key("Tags").equals([{"Key": "kt", "Value": "vt"}])
+ resp.should.have.key("Description").equals("my fancy snapshot")
+
+
+@mock_ebs
+def test_complete_snapshot():
+ client = boto3.client("ebs", region_name="ap-southeast-1")
+ snapshot_id = client.start_snapshot(VolumeSize=720)["SnapshotId"]
+
+ resp = client.complete_snapshot(SnapshotId=snapshot_id, ChangedBlocksCount=0)
+ resp.should.have.key("Status").equals("completed")
+
+
+@mock_ebs
+def test_put_snapshot_block():
+ data = b"data for this specific block"
+ checksum = hashlib.sha256(data).hexdigest()
+ client = boto3.client("ebs", region_name="eu-west-1")
+ snapshot_id = client.start_snapshot(VolumeSize=720)["SnapshotId"]
+ resp = client.put_snapshot_block(
+ SnapshotId=snapshot_id,
+ BlockIndex=5,
+ BlockData=data,
+ DataLength=524288,
+ Checksum=checksum,
+ ChecksumAlgorithm="SHA256",
+ )
+
+ resp.should.have.key("Checksum").equals(checksum)
+ resp.should.have.key("ChecksumAlgorithm").equals("SHA256")
+
+
+@mock_ebs
+def test_get_snapshot_block():
+ client = boto3.client("ebs", region_name="eu-west-1")
+ snapshot_id = client.start_snapshot(VolumeSize=720)["SnapshotId"]
+ for idx, data in [(1, b"data 1"), (2, b"data 2"), (3, b"data 3")]:
+ checksum = hashlib.sha256(data).hexdigest()
+ client.put_snapshot_block(
+ SnapshotId=snapshot_id,
+ BlockIndex=idx,
+ BlockData=data,
+ DataLength=524288,
+ Checksum=checksum,
+ ChecksumAlgorithm="SHA256",
+ )
+
+ resp = client.get_snapshot_block(
+ SnapshotId=snapshot_id, BlockIndex=2, BlockToken="n/a"
+ )
+
+ resp.should.have.key("DataLength").equals(524288)
+ resp.should.have.key("BlockData")
+ resp["BlockData"].read().should.equal(b"data 2")
+ resp.should.have.key("Checksum")
+ resp.should.have.key("ChecksumAlgorithm").equals("SHA256")
+
+
+@mock_ebs
+def test_list_changed_blocks():
+ client = boto3.client("ebs", region_name="ap-southeast-1")
+ snapshot_id1 = client.start_snapshot(VolumeSize=415)["SnapshotId"]
+ snapshot_id2 = client.start_snapshot(VolumeSize=415)["SnapshotId"]
+ for idx, data in [(1, b"data 1"), (2, b"data 2"), (3, b"data 3")]:
+ checksum = hashlib.sha256(data).hexdigest()
+ client.put_snapshot_block(
+ SnapshotId=snapshot_id1,
+ BlockIndex=idx,
+ BlockData=data,
+ DataLength=524288,
+ Checksum=checksum,
+ ChecksumAlgorithm="SHA256",
+ )
+ for idx, data in [(1, b"data 1.1"), (2, b"data 2"), (4, b"data 3.1")]:
+ checksum = hashlib.sha256(data).hexdigest()
+ client.put_snapshot_block(
+ SnapshotId=snapshot_id2,
+ BlockIndex=idx,
+ BlockData=data,
+ DataLength=524288,
+ Checksum=checksum,
+ ChecksumAlgorithm="SHA256",
+ )
+ resp = client.list_changed_blocks(
+ FirstSnapshotId=snapshot_id1, SecondSnapshotId=snapshot_id2
+ )
+ changed_blocks = resp["ChangedBlocks"]
+ changed_idxes = [b["BlockIndex"] for b in changed_blocks]
+ changed_idxes.should.equal([1, 3])
+
+ changed_blocks[0].should.have.key("FirstBlockToken")
+ changed_blocks[0].should.have.key("SecondBlockToken")
+
+ changed_blocks[1].should.have.key("FirstBlockToken")
+ changed_blocks[1].shouldnt.have.key("SecondBlockToken")
+
+
+@mock_ebs
+def test_list_snapshot_blocks():
+ client = boto3.client("ebs", region_name="ap-southeast-1")
+ snapshot_id = client.start_snapshot(VolumeSize=415)["SnapshotId"]
+ for idx, data in [(1, b"data 1"), (2, b"data 2"), (3, b"data 3")]:
+ checksum = hashlib.sha256(data).hexdigest()
+ client.put_snapshot_block(
+ SnapshotId=snapshot_id,
+ BlockIndex=idx,
+ BlockData=data,
+ DataLength=524288,
+ Checksum=checksum,
+ ChecksumAlgorithm="SHA256",
+ )
+
+ resp = client.list_snapshot_blocks(SnapshotId=snapshot_id)
+
+ resp.should.have.key("VolumeSize").equals(415)
+ resp.should.have.key("BlockSize").equals(512)
+ resp.should.have.key("Blocks").length_of(3)
+
+ [b["BlockIndex"] for b in resp["Blocks"]].should.equal([1, 2, 3])
+
+
+@mock_ebs
+@mock_ec2
+def test_start_snapshot__should_be_created_in_ec2():
+ ebs = boto3.client("ebs", region_name="eu-north-1")
+ ec2 = boto3.client("ec2", region_name="eu-north-1")
+ snapshot_id = ebs.start_snapshot(VolumeSize=720)["SnapshotId"]
+ resp = ec2.describe_snapshots(SnapshotIds=[snapshot_id])["Snapshots"]
+ resp.should.have.length_of(1)
+
+ resp[0].should.have.key("VolumeSize").equals(720)