diff --git a/moto/ebs/models.py b/moto/ebs/models.py index 58f05e313..e8ac96437 100644 --- a/moto/ebs/models.py +++ b/moto/ebs/models.py @@ -2,13 +2,17 @@ from moto.core import BaseBackend, BackendDict, BaseModel from moto.core.utils import unix_time -from moto.ec2 import ec2_backends +from moto.ec2.models import ec2_backends, EC2Backend from moto.ec2.models.elastic_block_store import Snapshot from moto.moto_api._internal import mock_random +from typing import Any, Dict, List, Optional, Tuple + class Block(BaseModel): - def __init__(self, block_data, checksum, checksum_algorithm, data_length): + def __init__( + self, block_data: str, checksum: str, checksum_algorithm: str, data_length: str + ): self.block_data = block_data self.checksum = checksum self.checksum_algorithm = checksum_algorithm @@ -17,7 +21,7 @@ class Block(BaseModel): class EBSSnapshot(BaseModel): - def __init__(self, account_id, snapshot: Snapshot): + def __init__(self, account_id: str, snapshot: Snapshot): self.account_id = account_id self.snapshot_id = snapshot.id self.status = "pending" @@ -29,15 +33,20 @@ class EBSSnapshot(BaseModel): ] self.description = snapshot.description - self.blocks = dict() + self.blocks: Dict[str, Block] = dict() def put_block( - self, block_idx, block_data, checksum, checksum_algorithm, data_length - ): + self, + block_idx: str, + block_data: str, + checksum: str, + checksum_algorithm: str, + data_length: str, + ) -> None: block = Block(block_data, checksum, checksum_algorithm, data_length) self.blocks[block_idx] = block - def to_json(self): + def to_json(self) -> Dict[str, Any]: return { "SnapshotId": self.snapshot_id, "OwnerId": self.account_id, @@ -53,60 +62,65 @@ class EBSSnapshot(BaseModel): class EBSBackend(BaseBackend): """Implementation of EBS APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.snapshots = dict() + self.snapshots: Dict[str, EBSSnapshot] = dict() @property - def ec2_backend(self): + def ec2_backend(self) -> EC2Backend: return ec2_backends[self.account_id][self.region_name] - def start_snapshot(self, volume_size, tags, description): + def start_snapshot( + self, volume_size: str, tags: Optional[List[Dict[str, str]]], description: str + ) -> EBSSnapshot: zone_name = f"{self.region_name}a" vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name) snapshot = self.ec2_backend.create_snapshot( volume_id=vol.id, description=description ) if tags: - tags = {tag["Key"]: tag["Value"] for tag in tags} - snapshot.add_tags(tags) + snapshot.add_tags({tag["Key"]: tag["Value"] for tag in tags}) ebs_snapshot = EBSSnapshot(account_id=self.account_id, snapshot=snapshot) self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot return ebs_snapshot - def complete_snapshot(self, snapshot_id): + def complete_snapshot(self, snapshot_id: str) -> Dict[str, str]: self.snapshots[snapshot_id].status = "completed" return {"Status": "completed"} def put_snapshot_block( self, - snapshot_id, - block_index, - block_data, - checksum, - checksum_algorithm, - data_length, - ): + snapshot_id: str, + block_index: str, + block_data: str, + checksum: str, + checksum_algorithm: str, + data_length: str, + ) -> Tuple[str, str]: snapshot = self.snapshots[snapshot_id] snapshot.put_block( block_index, block_data, checksum, checksum_algorithm, data_length ) return checksum, checksum_algorithm - def get_snapshot_block(self, snapshot_id, block_index): + def get_snapshot_block(self, snapshot_id: str, block_index: str) -> Block: """ The BlockToken-parameter is not yet implemented """ snapshot = self.snapshots[snapshot_id] return snapshot.blocks[block_index] - def list_changed_blocks(self, first_snapshot_id, second_snapshot_id): + def list_changed_blocks( + self, first_snapshot_id: str, second_snapshot_id: str + ) -> Tuple[Dict[str, Tuple[str, Optional[str]]], EBSSnapshot]: """ The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex """ snapshot1 = self.snapshots[first_snapshot_id] snapshot2 = self.snapshots[second_snapshot_id] - changed_blocks = dict() # {idx: (token1, token2), ..} + changed_blocks: Dict[ + str, Tuple[str, Optional[str]] + ] = dict() # {idx: (token1, token2), ..} for idx in snapshot1.blocks: block1 = snapshot1.blocks[idx] if idx in snapshot2.blocks: @@ -118,7 +132,7 @@ class EBSBackend(BaseBackend): return changed_blocks, snapshot1 - def list_snapshot_blocks(self, snapshot_id): + def list_snapshot_blocks(self, snapshot_id: str) -> EBSSnapshot: return self.snapshots[snapshot_id] diff --git a/moto/ebs/responses.py b/moto/ebs/responses.py index 4f301a415..75d1549f9 100644 --- a/moto/ebs/responses.py +++ b/moto/ebs/responses.py @@ -1,39 +1,41 @@ """Handles incoming ebs requests, invokes methods, returns responses.""" import json +from typing import Any from moto.core.responses import BaseResponse -from .models import ebs_backends +from moto.core.common_types import TYPE_RESPONSE +from .models import ebs_backends, EBSBackend class EBSResponse(BaseResponse): """Handler for EBS requests and responses.""" - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="ebs") @property - def ebs_backend(self): + def ebs_backend(self) -> EBSBackend: """Return backend instance specific for this region.""" return ebs_backends[self.current_account][self.region] - def snapshots(self, request, full_url, headers): + def snapshots(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "POST": return self.start_snapshot() - def snapshot_block(self, request, full_url, headers): + def snapshot_block(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "PUT": return self.put_snapshot_block(full_url, headers) if request.method == "GET": return self.get_snapshot_block() - def snapshot_blocks(self, request, full_url, headers): + def snapshot_blocks(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "GET": return self.list_snapshot_blocks() - def start_snapshot(self): + def start_snapshot(self) -> TYPE_RESPONSE: """ The following parameters are not yet implemented: ParentSnapshotId, ClientToken, Encrypted, KmsKeyArn, Timeout """ @@ -48,7 +50,9 @@ class EBSResponse(BaseResponse): ) return 200, {}, json.dumps(snapshot.to_json()) - def complete_snapshot(self, request, full_url, headers): + def complete_snapshot( + self, request: Any, full_url: str, headers: Any + ) -> TYPE_RESPONSE: """ The following parameters are not yet supported: ChangedBlocksCount, Checksum, ChecksumAlgorithm, ChecksumAggregationMethod """ @@ -57,7 +61,7 @@ class EBSResponse(BaseResponse): status = self.ebs_backend.complete_snapshot(snapshot_id=snapshot_id) return 200, {}, json.dumps(status) - def put_snapshot_block(self, full_url, headers): + def put_snapshot_block(self, full_url: str, headers: Any) -> TYPE_RESPONSE: """ The following parameters are currently not taken into account: DataLength, Progress. The Checksum and ChecksumAlgorithm are taken at face-value, but no validation takes place. @@ -86,7 +90,7 @@ class EBSResponse(BaseResponse): "{}", ) - def get_snapshot_block(self): + def get_snapshot_block(self) -> TYPE_RESPONSE: snapshot_id = self.path.split("/")[-3] block_index = self.path.split("/")[-1] block = self.ebs_backend.get_snapshot_block( @@ -100,12 +104,14 @@ class EBSResponse(BaseResponse): } return 200, headers, block.block_data - def snapshot_changed_blocks(self, request, full_url, headers): + def snapshot_changed_blocks( + self, request: Any, full_url: str, headers: Any + ) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) first_snapshot_id = self._get_params().get("firstSnapshotId") second_snapshot_id = self.path.split("/")[-2] changed_blocks, snapshot = self.ebs_backend.list_changed_blocks( - first_snapshot_id=first_snapshot_id, + first_snapshot_id=first_snapshot_id, # type: ignore[arg-type] second_snapshot_id=second_snapshot_id, ) blocks = [ @@ -124,7 +130,7 @@ class EBSResponse(BaseResponse): ), ) - def list_snapshot_blocks(self): + def list_snapshot_blocks(self) -> TYPE_RESPONSE: """ The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex """ diff --git a/moto/ec2/models/core.py b/moto/ec2/models/core.py index 6712b84b3..5a7c6c9f2 100644 --- a/moto/ec2/models/core.py +++ b/moto/ec2/models/core.py @@ -1,10 +1,11 @@ +from typing import Dict, List from moto.core import BaseModel from ..exceptions import FilterNotImplementedError class TaggedEC2Resource(BaseModel): - def get_tags(self): + def get_tags(self) -> List[Dict[str, str]]: tags = [] if self.id: tags = self.ec2_backend.describe_tags(filters={"resource-id": [self.id]}) @@ -13,7 +14,7 @@ class TaggedEC2Resource(BaseModel): def add_tag(self, key, value): self.ec2_backend.create_tags([self.id], {key: value}) - def add_tags(self, tag_map): + def add_tags(self, tag_map: Dict[str, str]): for key, value in tag_map.items(): self.ec2_backend.create_tags([self.id], {key: value}) diff --git a/moto/ec2/models/elastic_block_store.py b/moto/ec2/models/elastic_block_store.py index 33148d4bd..c2cff2478 100644 --- a/moto/ec2/models/elastic_block_store.py +++ b/moto/ec2/models/elastic_block_store.py @@ -1,3 +1,5 @@ +from typing import Optional + from moto.core import CloudFormationModel from moto.packages.boto.ec2.blockdevicemapping import BlockDeviceType from ..exceptions import ( @@ -240,14 +242,14 @@ class EBSBackend: def create_volume( self, - size, - zone_name, - snapshot_id=None, - encrypted=False, - kms_key_id=None, - volume_type=None, - iops=None, - ): + size: str, + zone_name: str, + snapshot_id: Optional[str] = None, + encrypted: bool = False, + kms_key_id: Optional[str] = None, + volume_type: Optional[str] = None, + iops: Optional[str] = None, + ) -> Volume: if kms_key_id and not encrypted: raise InvalidParameterDependency("KmsKeyId", "Encrypted") if encrypted and not kms_key_id: @@ -360,7 +362,13 @@ class EBSBackend: volume.attachment = None return old_attachment - def create_snapshot(self, volume_id, description, owner_id=None, from_ami=None): + def create_snapshot( + self, + volume_id: str, + description: str, + owner_id: Optional[str] = None, + from_ami: Optional[str] = None, + ) -> Snapshot: snapshot_id = random_snapshot_id() volume = self.get_volume(volume_id) params = [self, snapshot_id, volume, description, volume.encrypted]