Techdebt: MyPy EBS (#5854)

This commit is contained in:
Bert Blommers 2023-01-19 18:27:59 -01:00 committed by GitHub
parent 66507dd898
commit 4528cfe9cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 49 deletions

View File

@ -2,13 +2,17 @@
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import unix_time
from moto.ec2 import ec2_backends
from moto.ec2.models import ec2_backends, EC2Backend
from moto.ec2.models.elastic_block_store import Snapshot
from moto.moto_api._internal import mock_random
from typing import Any, Dict, List, Optional, Tuple
class Block(BaseModel):
def __init__(self, block_data, checksum, checksum_algorithm, data_length):
def __init__(
self, block_data: str, checksum: str, checksum_algorithm: str, data_length: str
):
self.block_data = block_data
self.checksum = checksum
self.checksum_algorithm = checksum_algorithm
@ -17,7 +21,7 @@ class Block(BaseModel):
class EBSSnapshot(BaseModel):
def __init__(self, account_id, snapshot: Snapshot):
def __init__(self, account_id: str, snapshot: Snapshot):
self.account_id = account_id
self.snapshot_id = snapshot.id
self.status = "pending"
@ -29,15 +33,20 @@ class EBSSnapshot(BaseModel):
]
self.description = snapshot.description
self.blocks = dict()
self.blocks: Dict[str, Block] = dict()
def put_block(
self, block_idx, block_data, checksum, checksum_algorithm, data_length
):
self,
block_idx: str,
block_data: str,
checksum: str,
checksum_algorithm: str,
data_length: str,
) -> None:
block = Block(block_data, checksum, checksum_algorithm, data_length)
self.blocks[block_idx] = block
def to_json(self):
def to_json(self) -> Dict[str, Any]:
return {
"SnapshotId": self.snapshot_id,
"OwnerId": self.account_id,
@ -53,60 +62,65 @@ class EBSSnapshot(BaseModel):
class EBSBackend(BaseBackend):
"""Implementation of EBS APIs."""
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.snapshots = dict()
self.snapshots: Dict[str, EBSSnapshot] = dict()
@property
def ec2_backend(self):
def ec2_backend(self) -> EC2Backend:
return ec2_backends[self.account_id][self.region_name]
def start_snapshot(self, volume_size, tags, description):
def start_snapshot(
self, volume_size: str, tags: Optional[List[Dict[str, str]]], description: str
) -> EBSSnapshot:
zone_name = f"{self.region_name}a"
vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)
snapshot = self.ec2_backend.create_snapshot(
volume_id=vol.id, description=description
)
if tags:
tags = {tag["Key"]: tag["Value"] for tag in tags}
snapshot.add_tags(tags)
snapshot.add_tags({tag["Key"]: tag["Value"] for tag in tags})
ebs_snapshot = EBSSnapshot(account_id=self.account_id, snapshot=snapshot)
self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot
return ebs_snapshot
def complete_snapshot(self, snapshot_id):
def complete_snapshot(self, snapshot_id: str) -> Dict[str, str]:
self.snapshots[snapshot_id].status = "completed"
return {"Status": "completed"}
def put_snapshot_block(
self,
snapshot_id,
block_index,
block_data,
checksum,
checksum_algorithm,
data_length,
):
snapshot_id: str,
block_index: str,
block_data: str,
checksum: str,
checksum_algorithm: str,
data_length: str,
) -> Tuple[str, str]:
snapshot = self.snapshots[snapshot_id]
snapshot.put_block(
block_index, block_data, checksum, checksum_algorithm, data_length
)
return checksum, checksum_algorithm
def get_snapshot_block(self, snapshot_id, block_index):
def get_snapshot_block(self, snapshot_id: str, block_index: str) -> Block:
"""
The BlockToken-parameter is not yet implemented
"""
snapshot = self.snapshots[snapshot_id]
return snapshot.blocks[block_index]
def list_changed_blocks(self, first_snapshot_id, second_snapshot_id):
def list_changed_blocks(
self, first_snapshot_id: str, second_snapshot_id: str
) -> Tuple[Dict[str, Tuple[str, Optional[str]]], EBSSnapshot]:
"""
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
"""
snapshot1 = self.snapshots[first_snapshot_id]
snapshot2 = self.snapshots[second_snapshot_id]
changed_blocks = dict() # {idx: (token1, token2), ..}
changed_blocks: Dict[
str, Tuple[str, Optional[str]]
] = dict() # {idx: (token1, token2), ..}
for idx in snapshot1.blocks:
block1 = snapshot1.blocks[idx]
if idx in snapshot2.blocks:
@ -118,7 +132,7 @@ class EBSBackend(BaseBackend):
return changed_blocks, snapshot1
def list_snapshot_blocks(self, snapshot_id):
def list_snapshot_blocks(self, snapshot_id: str) -> EBSSnapshot:
return self.snapshots[snapshot_id]

View File

@ -1,39 +1,41 @@
"""Handles incoming ebs requests, invokes methods, returns responses."""
import json
from typing import Any
from moto.core.responses import BaseResponse
from .models import ebs_backends
from moto.core.common_types import TYPE_RESPONSE
from .models import ebs_backends, EBSBackend
class EBSResponse(BaseResponse):
"""Handler for EBS requests and responses."""
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="ebs")
@property
def ebs_backend(self):
def ebs_backend(self) -> EBSBackend:
"""Return backend instance specific for this region."""
return ebs_backends[self.current_account][self.region]
def snapshots(self, request, full_url, headers):
def snapshots(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.start_snapshot()
def snapshot_block(self, request, full_url, headers):
def snapshot_block(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "PUT":
return self.put_snapshot_block(full_url, headers)
if request.method == "GET":
return self.get_snapshot_block()
def snapshot_blocks(self, request, full_url, headers):
def snapshot_blocks(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.list_snapshot_blocks()
def start_snapshot(self):
def start_snapshot(self) -> TYPE_RESPONSE:
"""
The following parameters are not yet implemented: ParentSnapshotId, ClientToken, Encrypted, KmsKeyArn, Timeout
"""
@ -48,7 +50,9 @@ class EBSResponse(BaseResponse):
)
return 200, {}, json.dumps(snapshot.to_json())
def complete_snapshot(self, request, full_url, headers):
def complete_snapshot(
self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE:
"""
The following parameters are not yet supported: ChangedBlocksCount, Checksum, ChecksumAlgorithm, ChecksumAggregationMethod
"""
@ -57,7 +61,7 @@ class EBSResponse(BaseResponse):
status = self.ebs_backend.complete_snapshot(snapshot_id=snapshot_id)
return 200, {}, json.dumps(status)
def put_snapshot_block(self, full_url, headers):
def put_snapshot_block(self, full_url: str, headers: Any) -> TYPE_RESPONSE:
"""
The following parameters are currently not taken into account: DataLength, Progress.
The Checksum and ChecksumAlgorithm are taken at face-value, but no validation takes place.
@ -86,7 +90,7 @@ class EBSResponse(BaseResponse):
"{}",
)
def get_snapshot_block(self):
def get_snapshot_block(self) -> TYPE_RESPONSE:
snapshot_id = self.path.split("/")[-3]
block_index = self.path.split("/")[-1]
block = self.ebs_backend.get_snapshot_block(
@ -100,12 +104,14 @@ class EBSResponse(BaseResponse):
}
return 200, headers, block.block_data
def snapshot_changed_blocks(self, request, full_url, headers):
def snapshot_changed_blocks(
self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers)
first_snapshot_id = self._get_params().get("firstSnapshotId")
second_snapshot_id = self.path.split("/")[-2]
changed_blocks, snapshot = self.ebs_backend.list_changed_blocks(
first_snapshot_id=first_snapshot_id,
first_snapshot_id=first_snapshot_id, # type: ignore[arg-type]
second_snapshot_id=second_snapshot_id,
)
blocks = [
@ -124,7 +130,7 @@ class EBSResponse(BaseResponse):
),
)
def list_snapshot_blocks(self):
def list_snapshot_blocks(self) -> TYPE_RESPONSE:
"""
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
"""

View File

@ -1,10 +1,11 @@
from typing import Dict, List
from moto.core import BaseModel
from ..exceptions import FilterNotImplementedError
class TaggedEC2Resource(BaseModel):
def get_tags(self):
def get_tags(self) -> List[Dict[str, str]]:
tags = []
if self.id:
tags = self.ec2_backend.describe_tags(filters={"resource-id": [self.id]})
@ -13,7 +14,7 @@ class TaggedEC2Resource(BaseModel):
def add_tag(self, key, value):
self.ec2_backend.create_tags([self.id], {key: value})
def add_tags(self, tag_map):
def add_tags(self, tag_map: Dict[str, str]):
for key, value in tag_map.items():
self.ec2_backend.create_tags([self.id], {key: value})

View File

@ -1,3 +1,5 @@
from typing import Optional
from moto.core import CloudFormationModel
from moto.packages.boto.ec2.blockdevicemapping import BlockDeviceType
from ..exceptions import (
@ -240,14 +242,14 @@ class EBSBackend:
def create_volume(
self,
size,
zone_name,
snapshot_id=None,
encrypted=False,
kms_key_id=None,
volume_type=None,
iops=None,
):
size: str,
zone_name: str,
snapshot_id: Optional[str] = None,
encrypted: bool = False,
kms_key_id: Optional[str] = None,
volume_type: Optional[str] = None,
iops: Optional[str] = None,
) -> Volume:
if kms_key_id and not encrypted:
raise InvalidParameterDependency("KmsKeyId", "Encrypted")
if encrypted and not kms_key_id:
@ -360,7 +362,13 @@ class EBSBackend:
volume.attachment = None
return old_attachment
def create_snapshot(self, volume_id, description, owner_id=None, from_ami=None):
def create_snapshot(
self,
volume_id: str,
description: str,
owner_id: Optional[str] = None,
from_ami: Optional[str] = None,
) -> Snapshot:
snapshot_id = random_snapshot_id()
volume = self.get_volume(volume_id)
params = [self, snapshot_id, volume, description, volume.encrypted]