Techdebt: MyPy O (#6177)

This commit is contained in:
Bert Blommers 2023-04-05 12:04:58 +00:00 committed by GitHub
parent be17a7d8e2
commit db6874aec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 454 additions and 506 deletions

View File

@ -2,10 +2,10 @@ from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
def __init__(self, message):
def __init__(self, message: str):
super().__init__(error_type="ResourceNotFoundException", message=message)
class ValidationException(JsonRESTError):
def __init__(self, message):
def __init__(self, message: str):
super().__init__(error_type="ResourceNotFoundException", message=message)

View File

@ -2,6 +2,7 @@ from moto.core import BaseBackend, BackendDict, BaseModel
from moto.ec2 import ec2_backends
from moto.moto_api._internal import mock_random as random
import datetime
from typing import Any, Dict, List, Optional
from .exceptions import ResourceNotFoundException, ValidationException
@ -15,27 +16,27 @@ class OpsworkInstance(BaseModel):
def __init__(
self,
stack_id,
layer_ids,
instance_type,
ec2_backend,
auto_scale_type=None,
hostname=None,
os=None,
ami_id="ami-08111162",
ssh_keyname=None,
availability_zone=None,
virtualization_type="hvm",
subnet_id=None,
architecture="x86_64",
root_device_type="ebs",
block_device_mappings=None,
install_updates_on_boot=True,
ebs_optimized=False,
agent_version="INHERIT",
instance_profile_arn=None,
associate_public_ip=None,
security_group_ids=None,
stack_id: str,
layer_ids: List[str],
instance_type: str,
ec2_backend: Any,
auto_scale_type: Optional[str] = None,
hostname: Optional[str] = None,
os: Optional[str] = None,
ami_id: str = "ami-08111162",
ssh_keyname: Optional[str] = None,
availability_zone: Optional[str] = None,
virtualization_type: str = "hvm",
subnet_id: Optional[str] = None,
architecture: str = "x86_64",
root_device_type: str = "ebs",
block_device_mappings: Optional[List[Dict[str, Any]]] = None,
install_updates_on_boot: bool = True,
ebs_optimized: bool = False,
agent_version: str = "INHERIT",
instance_profile_arn: Optional[str] = None,
associate_public_ip: Optional[str] = None,
security_group_ids: Optional[List[str]] = None,
):
self.ec2_backend = ec2_backend
@ -57,17 +58,13 @@ class OpsworkInstance(BaseModel):
# todo: refactor how we track block_device_mappings to use
# boto.ec2.blockdevicemapping.BlockDeviceType and standardize
# formatting in to_dict()
self.block_device_mappings = block_device_mappings
if self.block_device_mappings is None:
self.block_device_mappings = [
{
"DeviceName": "ROOT_DEVICE",
"Ebs": {"VolumeSize": 8, "VolumeType": "gp2"},
}
]
self.security_group_ids = security_group_ids
if self.security_group_ids is None:
self.security_group_ids = []
self.block_device_mappings = block_device_mappings or [
{
"DeviceName": "ROOT_DEVICE",
"Ebs": {"VolumeSize": 8, "VolumeType": "gp2"},
}
]
self.security_group_ids = security_group_ids or []
self.os = os
self.hostname = hostname
@ -76,15 +73,15 @@ class OpsworkInstance(BaseModel):
self.subnet_id = subnet_id
self.associate_public_ip = associate_public_ip
self.instance = None
self.reported_os = {}
self.instance: Any = None
self.reported_os: Dict[str, Any] = {}
self.infrastructure_class = "ec2 (fixed)"
self.platform = "linux (fixed)"
self.id = str(random.uuid4())
self.created_at = datetime.datetime.utcnow()
def start(self):
def start(self) -> None:
"""
create an ec2 reservation if one doesn't already exist and call
start_instance. Update instance attributes to the newly created instance
@ -120,7 +117,7 @@ class OpsworkInstance(BaseModel):
self.ec2_backend.start_instances([self.instance.id])
@property
def status(self):
def status(self) -> str:
if self.instance is None:
return "stopped"
# OpsWorks reports the "running" state as "online"
@ -128,7 +125,7 @@ class OpsworkInstance(BaseModel):
return "online"
return self.instance._state.name
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
d = {
"AgentVersion": self.agent_version,
"Architecture": self.architecture,
@ -182,86 +179,74 @@ class OpsworkInstance(BaseModel):
class Layer(BaseModel):
def __init__(
self,
stack_id,
layer_type,
name,
shortname,
attributes=None,
custom_instance_profile_arn=None,
custom_json=None,
custom_security_group_ids=None,
packages=None,
volume_configurations=None,
enable_autohealing=None,
auto_assign_elastic_ips=None,
auto_assign_public_ips=None,
custom_recipes=None,
install_updates_on_boot=None,
use_ebs_optimized_instances=None,
lifecycle_event_configuration=None,
stack_id: str,
layer_type: str,
name: str,
shortname: str,
attributes: Optional[Dict[str, Any]] = None,
custom_instance_profile_arn: Optional[str] = None,
custom_json: Optional[str] = None,
custom_security_group_ids: Optional[List[Dict[str, Any]]] = None,
packages: Optional[str] = None,
volume_configurations: Optional[List[Dict[str, Any]]] = None,
enable_autohealing: Optional[str] = None,
auto_assign_elastic_ips: Optional[str] = None,
auto_assign_public_ips: Optional[str] = None,
custom_recipes: Optional[Dict[str, Any]] = None,
install_updates_on_boot: Optional[str] = None,
use_ebs_optimized_instances: Optional[str] = None,
lifecycle_event_configuration: Optional[Dict[str, Any]] = None,
):
self.stack_id = stack_id
self.type = layer_type
self.name = name
self.shortname = shortname
self.attributes = attributes
if attributes is None:
self.attributes = {
"BundlerVersion": None,
"EcsClusterArn": None,
"EnableHaproxyStats": None,
"GangliaPassword": None,
"GangliaUrl": None,
"GangliaUser": None,
"HaproxyHealthCheckMethod": None,
"HaproxyHealthCheckUrl": None,
"HaproxyStatsPassword": None,
"HaproxyStatsUrl": None,
"HaproxyStatsUser": None,
"JavaAppServer": None,
"JavaAppServerVersion": None,
"Jvm": None,
"JvmOptions": None,
"JvmVersion": None,
"ManageBundler": None,
"MemcachedMemory": None,
"MysqlRootPassword": None,
"MysqlRootPasswordUbiquitous": None,
"NodejsVersion": None,
"PassengerVersion": None,
"RailsStack": None,
"RubyVersion": None,
"RubygemsVersion": None,
} # May not be accurate
self.attributes = attributes or {
"BundlerVersion": None,
"EcsClusterArn": None,
"EnableHaproxyStats": None,
"GangliaPassword": None,
"GangliaUrl": None,
"GangliaUser": None,
"HaproxyHealthCheckMethod": None,
"HaproxyHealthCheckUrl": None,
"HaproxyStatsPassword": None,
"HaproxyStatsUrl": None,
"HaproxyStatsUser": None,
"JavaAppServer": None,
"JavaAppServerVersion": None,
"Jvm": None,
"JvmOptions": None,
"JvmVersion": None,
"ManageBundler": None,
"MemcachedMemory": None,
"MysqlRootPassword": None,
"MysqlRootPasswordUbiquitous": None,
"NodejsVersion": None,
"PassengerVersion": None,
"RailsStack": None,
"RubyVersion": None,
"RubygemsVersion": None,
} # May not be accurate
self.packages = packages
if packages is None:
self.packages = packages
self.custom_recipes = custom_recipes
if custom_recipes is None:
self.custom_recipes = {
"Configure": [],
"Deploy": [],
"Setup": [],
"Shutdown": [],
"Undeploy": [],
}
self.custom_recipes = custom_recipes or {
"Configure": [],
"Deploy": [],
"Setup": [],
"Shutdown": [],
"Undeploy": [],
}
self.custom_security_group_ids = custom_security_group_ids
if custom_security_group_ids is None:
self.custom_security_group_ids = []
self.custom_security_group_ids = custom_security_group_ids or []
self.lifecycle_event_configuration = lifecycle_event_configuration
if lifecycle_event_configuration is None:
self.lifecycle_event_configuration = {
"Shutdown": {"DelayUntilElbConnectionsDrained": False}
}
self.lifecycle_event_configuration = lifecycle_event_configuration or {
"Shutdown": {"DelayUntilElbConnectionsDrained": False}
}
self.volume_configurations = volume_configurations
if volume_configurations is None:
self.volume_configurations = []
self.volume_configurations = volume_configurations or []
self.custom_instance_profile_arn = custom_instance_profile_arn
self.custom_json = custom_json
@ -274,11 +259,11 @@ class Layer(BaseModel):
self.id = str(random.uuid4())
self.created_at = datetime.datetime.utcnow()
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return self.id == other.id
def to_dict(self):
d = {
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {
"Attributes": self.attributes,
"AutoAssignElasticIps": self.auto_assign_elastic_ips,
"AutoAssignPublicIps": self.auto_assign_public_ips,
@ -312,26 +297,26 @@ class Layer(BaseModel):
class Stack(BaseModel):
def __init__(
self,
name,
account_id,
region,
service_role_arn,
default_instance_profile_arn,
vpcid="vpc-1f99bf7a",
attributes=None,
default_os="Ubuntu 12.04 LTS",
hostname_theme="Layer_Dependent",
default_availability_zone="us-east-1a",
default_subnet_id="subnet-73981004",
custom_json=None,
configuration_manager=None,
chef_configuration=None,
use_custom_cookbooks=False,
use_opsworks_security_groups=True,
custom_cookbooks_source=None,
default_ssh_keyname=None,
default_root_device_type="instance-store",
agent_version="LATEST",
name: str,
account_id: str,
region: str,
service_role_arn: str,
default_instance_profile_arn: str,
vpcid: str = "vpc-1f99bf7a",
attributes: Optional[Dict[str, Any]] = None,
default_os: str = "Ubuntu 12.04 LTS",
hostname_theme: str = "Layer_Dependent",
default_availability_zone: str = "us-east-1a",
default_subnet_id: str = "subnet-73981004",
custom_json: Optional[str] = None,
configuration_manager: Optional[Dict[str, Any]] = None,
chef_configuration: Optional[Dict[str, Any]] = None,
use_custom_cookbooks: bool = False,
use_opsworks_security_groups: bool = True,
custom_cookbooks_source: Optional[Dict[str, Any]] = None,
default_ssh_keyname: Optional[str] = None,
default_root_device_type: str = "instance-store",
agent_version: str = "LATEST",
):
self.name = name
@ -340,21 +325,16 @@ class Stack(BaseModel):
self.default_instance_profile_arn = default_instance_profile_arn
self.vpcid = vpcid
self.attributes = attributes
if attributes is None:
self.attributes = {"Color": None}
self.attributes = attributes or {"Color": None}
self.configuration_manager = configuration_manager
if configuration_manager is None:
self.configuration_manager = {"Name": "Chef", "Version": "11.4"}
self.configuration_manager = configuration_manager or {
"Name": "Chef",
"Version": "11.4",
}
self.chef_configuration = chef_configuration
if chef_configuration is None:
self.chef_configuration = {}
self.chef_configuration = chef_configuration or {}
self.custom_cookbooks_source = custom_cookbooks_source
if custom_cookbooks_source is None:
self.custom_cookbooks_source = {}
self.custom_cookbooks_source = custom_cookbooks_source or {}
self.custom_json = custom_json
self.default_ssh_keyname = default_ssh_keyname
@ -368,23 +348,23 @@ class Stack(BaseModel):
self.agent_version = agent_version
self.id = str(random.uuid4())
self.layers = []
self.apps = []
self.layers: List[Layer] = []
self.apps: List[App] = []
self.account_number = account_id
self.created_at = datetime.datetime.utcnow()
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return self.id == other.id
def generate_hostname(self):
def generate_hostname(self) -> str:
# this doesn't match amazon's implementation
return f"{self.hostname_theme}-{[random.choice('abcdefghijhk') for _ in range(4)]}-(moto)"
@property
def arn(self):
def arn(self) -> str:
return f"arn:aws:opsworks:{self.region}:{self.account_number}:stack/{self.id}"
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
response = {
"AgentVersion": self.agent_version,
"Arn": self.arn,
@ -418,18 +398,18 @@ class Stack(BaseModel):
class App(BaseModel):
def __init__(
self,
stack_id,
name,
app_type,
shortname=None,
description=None,
datasources=None,
app_source=None,
domains=None,
enable_ssl=False,
ssl_configuration=None,
attributes=None,
environment=None,
stack_id: str,
name: str,
app_type: str,
shortname: Optional[str] = None,
description: Optional[str] = None,
datasources: Optional[List[str]] = None,
app_source: Optional[Dict[str, Any]] = None,
domains: Optional[List[str]] = None,
enable_ssl: bool = False,
ssl_configuration: Optional[Dict[str, Any]] = None,
attributes: Optional[Dict[str, Any]] = None,
environment: Optional[Dict[str, Any]] = None,
):
self.stack_id = stack_id
self.name = name
@ -437,40 +417,28 @@ class App(BaseModel):
self.shortname = shortname
self.description = description
self.datasources = datasources
if datasources is None:
self.datasources = []
self.datasources = datasources or []
self.app_source = app_source
if app_source is None:
self.app_source = {}
self.app_source = app_source or {}
self.domains = domains
if domains is None:
self.domains = []
self.domains = domains or []
self.enable_ssl = enable_ssl
self.ssl_configuration = ssl_configuration
if ssl_configuration is None:
self.ssl_configuration = {}
self.ssl_configuration = ssl_configuration or {}
self.attributes = attributes
if attributes is None:
self.attributes = {}
self.attributes = attributes or {}
self.environment = environment
if environment is None:
self.environment = {}
self.environment = environment or {}
self.id = str(random.uuid4())
self.created_at = datetime.datetime.utcnow()
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return self.id == other.id
def to_dict(self):
d = {
def to_dict(self) -> Dict[str, Any]:
return {
"AppId": self.id,
"AppSource": self.app_source,
"Attributes": self.attributes,
@ -486,24 +454,23 @@ class App(BaseModel):
"StackId": self.stack_id,
"Type": self.type,
}
return d
class OpsWorksBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.stacks = {}
self.layers = {}
self.apps = {}
self.instances = {}
self.stacks: Dict[str, Stack] = {}
self.layers: Dict[str, Layer] = {}
self.apps: Dict[str, App] = {}
self.instances: Dict[str, OpsworkInstance] = {}
self.ec2_backend = ec2_backends[account_id][region_name]
def create_stack(self, **kwargs):
def create_stack(self, **kwargs: Any) -> Stack:
stack = Stack(account_id=self.account_id, **kwargs)
self.stacks[stack.id] = stack
return stack
def create_layer(self, **kwargs):
def create_layer(self, **kwargs: Any) -> Layer:
name = kwargs["name"]
shortname = kwargs["shortname"]
stackid = kwargs["stack_id"]
@ -522,7 +489,7 @@ class OpsWorksBackend(BaseBackend):
self.stacks[stackid].layers.append(layer)
return layer
def create_app(self, **kwargs):
def create_app(self, **kwargs: Any) -> App:
name = kwargs["name"]
stackid = kwargs["stack_id"]
if stackid not in self.stacks:
@ -536,7 +503,7 @@ class OpsWorksBackend(BaseBackend):
self.stacks[stackid].apps.append(app)
return app
def create_instance(self, **kwargs):
def create_instance(self, **kwargs: Any) -> OpsworkInstance:
stack_id = kwargs["stack_id"]
layer_ids = kwargs["layer_ids"]
@ -576,7 +543,7 @@ class OpsWorksBackend(BaseBackend):
self.instances[opsworks_instance.id] = opsworks_instance
return opsworks_instance
def describe_stacks(self, stack_ids):
def describe_stacks(self, stack_ids: List[str]) -> List[Dict[str, Any]]:
if stack_ids is None:
return [stack.to_dict() for stack in self.stacks.values()]
@ -585,7 +552,9 @@ class OpsWorksBackend(BaseBackend):
raise ResourceNotFoundException(", ".join(unknown_stacks))
return [self.stacks[id].to_dict() for id in stack_ids]
def describe_layers(self, stack_id, layer_ids):
def describe_layers(
self, stack_id: str, layer_ids: List[str]
) -> List[Dict[str, Any]]:
if stack_id is not None and layer_ids is not None:
raise ValidationException(
"Please provide one or more layer IDs or a stack ID"
@ -602,7 +571,7 @@ class OpsWorksBackend(BaseBackend):
raise ResourceNotFoundException(", ".join(unknown_layers))
return [self.layers[id].to_dict() for id in layer_ids]
def describe_apps(self, stack_id, app_ids):
def describe_apps(self, stack_id: str, app_ids: List[str]) -> List[Dict[str, Any]]:
if stack_id is not None and app_ids is not None:
raise ValidationException(
"Please provide one or more app IDs or a stack ID"
@ -619,7 +588,7 @@ class OpsWorksBackend(BaseBackend):
raise ResourceNotFoundException(", ".join(unknown_apps))
return [self.apps[id].to_dict() for id in app_ids]
def describe_instances(self, instance_ids, layer_id, stack_id):
def describe_instances(self, instance_ids: List[str], layer_id: str, stack_id: str) -> List[Dict[str, Any]]: # type: ignore[return]
if len(list(filter(None, (instance_ids, layer_id, stack_id)))) != 1:
raise ValidationException(
"Please provide either one or more "
@ -637,22 +606,20 @@ class OpsWorksBackend(BaseBackend):
raise ResourceNotFoundException(
f"Unable to find layer with ID {layer_id}"
)
instances = [
return [
i.to_dict() for i in self.instances.values() if layer_id in i.layer_ids
]
return instances
if stack_id:
if stack_id not in self.stacks:
raise ResourceNotFoundException(
f"Unable to find stack with ID {stack_id}"
)
instances = [
return [
i.to_dict() for i in self.instances.values() if stack_id == i.stack_id
]
return instances
def start_instance(self, instance_id):
def start_instance(self, instance_id: str) -> None:
if instance_id not in self.instances:
raise ResourceNotFoundException(
f"Unable to find instance with ID {instance_id}"

View File

@ -1,143 +1,135 @@
import json
from moto.core.responses import BaseResponse
from .models import opsworks_backends
from .models import opsworks_backends, OpsWorksBackend
class OpsWorksResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="opsworks")
@property
def parameters(self):
return json.loads(self.body)
@property
def opsworks_backend(self):
def opsworks_backend(self) -> OpsWorksBackend:
return opsworks_backends[self.current_account][self.region]
def create_stack(self):
def create_stack(self) -> str:
kwargs = dict(
name=self.parameters.get("Name"),
region=self.parameters.get("Region"),
vpcid=self.parameters.get("VpcId"),
attributes=self.parameters.get("Attributes"),
default_instance_profile_arn=self.parameters.get(
"DefaultInstanceProfileArn"
),
default_os=self.parameters.get("DefaultOs"),
hostname_theme=self.parameters.get("HostnameTheme"),
default_availability_zone=self.parameters.get("DefaultAvailabilityZone"),
default_subnet_id=self.parameters.get("DefaultInstanceProfileArn"),
custom_json=self.parameters.get("CustomJson"),
configuration_manager=self.parameters.get("ConfigurationManager"),
chef_configuration=self.parameters.get("ChefConfiguration"),
use_custom_cookbooks=self.parameters.get("UseCustomCookbooks"),
use_opsworks_security_groups=self.parameters.get(
"UseOpsworksSecurityGroups"
),
custom_cookbooks_source=self.parameters.get("CustomCookbooksSource"),
default_ssh_keyname=self.parameters.get("DefaultSshKeyName"),
default_root_device_type=self.parameters.get("DefaultRootDeviceType"),
service_role_arn=self.parameters.get("ServiceRoleArn"),
agent_version=self.parameters.get("AgentVersion"),
name=self._get_param("Name"),
region=self._get_param("Region"),
vpcid=self._get_param("VpcId"),
attributes=self._get_param("Attributes"),
default_instance_profile_arn=self._get_param("DefaultInstanceProfileArn"),
default_os=self._get_param("DefaultOs"),
hostname_theme=self._get_param("HostnameTheme"),
default_availability_zone=self._get_param("DefaultAvailabilityZone"),
default_subnet_id=self._get_param("DefaultInstanceProfileArn"),
custom_json=self._get_param("CustomJson"),
configuration_manager=self._get_param("ConfigurationManager"),
chef_configuration=self._get_param("ChefConfiguration"),
use_custom_cookbooks=self._get_param("UseCustomCookbooks"),
use_opsworks_security_groups=self._get_param("UseOpsworksSecurityGroups"),
custom_cookbooks_source=self._get_param("CustomCookbooksSource"),
default_ssh_keyname=self._get_param("DefaultSshKeyName"),
default_root_device_type=self._get_param("DefaultRootDeviceType"),
service_role_arn=self._get_param("ServiceRoleArn"),
agent_version=self._get_param("AgentVersion"),
)
stack = self.opsworks_backend.create_stack(**kwargs)
return json.dumps({"StackId": stack.id}, indent=1)
def create_layer(self):
def create_layer(self) -> str:
kwargs = dict(
stack_id=self.parameters.get("StackId"),
layer_type=self.parameters.get("Type"),
name=self.parameters.get("Name"),
shortname=self.parameters.get("Shortname"),
attributes=self.parameters.get("Attributes"),
custom_instance_profile_arn=self.parameters.get("CustomInstanceProfileArn"),
custom_json=self.parameters.get("CustomJson"),
custom_security_group_ids=self.parameters.get("CustomSecurityGroupIds"),
packages=self.parameters.get("Packages"),
volume_configurations=self.parameters.get("VolumeConfigurations"),
enable_autohealing=self.parameters.get("EnableAutoHealing"),
auto_assign_elastic_ips=self.parameters.get("AutoAssignElasticIps"),
auto_assign_public_ips=self.parameters.get("AutoAssignPublicIps"),
custom_recipes=self.parameters.get("CustomRecipes"),
install_updates_on_boot=self.parameters.get("InstallUpdatesOnBoot"),
use_ebs_optimized_instances=self.parameters.get("UseEbsOptimizedInstances"),
lifecycle_event_configuration=self.parameters.get(
stack_id=self._get_param("StackId"),
layer_type=self._get_param("Type"),
name=self._get_param("Name"),
shortname=self._get_param("Shortname"),
attributes=self._get_param("Attributes"),
custom_instance_profile_arn=self._get_param("CustomInstanceProfileArn"),
custom_json=self._get_param("CustomJson"),
custom_security_group_ids=self._get_param("CustomSecurityGroupIds"),
packages=self._get_param("Packages"),
volume_configurations=self._get_param("VolumeConfigurations"),
enable_autohealing=self._get_param("EnableAutoHealing"),
auto_assign_elastic_ips=self._get_param("AutoAssignElasticIps"),
auto_assign_public_ips=self._get_param("AutoAssignPublicIps"),
custom_recipes=self._get_param("CustomRecipes"),
install_updates_on_boot=self._get_param("InstallUpdatesOnBoot"),
use_ebs_optimized_instances=self._get_param("UseEbsOptimizedInstances"),
lifecycle_event_configuration=self._get_param(
"LifecycleEventConfiguration"
),
)
layer = self.opsworks_backend.create_layer(**kwargs)
return json.dumps({"LayerId": layer.id}, indent=1)
def create_app(self):
def create_app(self) -> str:
kwargs = dict(
stack_id=self.parameters.get("StackId"),
name=self.parameters.get("Name"),
app_type=self.parameters.get("Type"),
shortname=self.parameters.get("Shortname"),
description=self.parameters.get("Description"),
datasources=self.parameters.get("DataSources"),
app_source=self.parameters.get("AppSource"),
domains=self.parameters.get("Domains"),
enable_ssl=self.parameters.get("EnableSsl"),
ssl_configuration=self.parameters.get("SslConfiguration"),
attributes=self.parameters.get("Attributes"),
environment=self.parameters.get("Environment"),
stack_id=self._get_param("StackId"),
name=self._get_param("Name"),
app_type=self._get_param("Type"),
shortname=self._get_param("Shortname"),
description=self._get_param("Description"),
datasources=self._get_param("DataSources"),
app_source=self._get_param("AppSource"),
domains=self._get_param("Domains"),
enable_ssl=self._get_param("EnableSsl"),
ssl_configuration=self._get_param("SslConfiguration"),
attributes=self._get_param("Attributes"),
environment=self._get_param("Environment"),
)
app = self.opsworks_backend.create_app(**kwargs)
return json.dumps({"AppId": app.id}, indent=1)
def create_instance(self):
def create_instance(self) -> str:
kwargs = dict(
stack_id=self.parameters.get("StackId"),
layer_ids=self.parameters.get("LayerIds"),
instance_type=self.parameters.get("InstanceType"),
auto_scale_type=self.parameters.get("AutoScalingType"),
hostname=self.parameters.get("Hostname"),
os=self.parameters.get("Os"),
ami_id=self.parameters.get("AmiId"),
ssh_keyname=self.parameters.get("SshKeyName"),
availability_zone=self.parameters.get("AvailabilityZone"),
virtualization_type=self.parameters.get("VirtualizationType"),
subnet_id=self.parameters.get("SubnetId"),
architecture=self.parameters.get("Architecture"),
root_device_type=self.parameters.get("RootDeviceType"),
block_device_mappings=self.parameters.get("BlockDeviceMappings"),
install_updates_on_boot=self.parameters.get("InstallUpdatesOnBoot"),
ebs_optimized=self.parameters.get("EbsOptimized"),
agent_version=self.parameters.get("AgentVersion"),
stack_id=self._get_param("StackId"),
layer_ids=self._get_param("LayerIds"),
instance_type=self._get_param("InstanceType"),
auto_scale_type=self._get_param("AutoScalingType"),
hostname=self._get_param("Hostname"),
os=self._get_param("Os"),
ami_id=self._get_param("AmiId"),
ssh_keyname=self._get_param("SshKeyName"),
availability_zone=self._get_param("AvailabilityZone"),
virtualization_type=self._get_param("VirtualizationType"),
subnet_id=self._get_param("SubnetId"),
architecture=self._get_param("Architecture"),
root_device_type=self._get_param("RootDeviceType"),
block_device_mappings=self._get_param("BlockDeviceMappings"),
install_updates_on_boot=self._get_param("InstallUpdatesOnBoot"),
ebs_optimized=self._get_param("EbsOptimized"),
agent_version=self._get_param("AgentVersion"),
)
opsworks_instance = self.opsworks_backend.create_instance(**kwargs)
return json.dumps({"InstanceId": opsworks_instance.id}, indent=1)
def describe_stacks(self):
stack_ids = self.parameters.get("StackIds")
def describe_stacks(self) -> str:
stack_ids = self._get_param("StackIds")
stacks = self.opsworks_backend.describe_stacks(stack_ids)
return json.dumps({"Stacks": stacks}, indent=1)
def describe_layers(self):
stack_id = self.parameters.get("StackId")
layer_ids = self.parameters.get("LayerIds")
def describe_layers(self) -> str:
stack_id = self._get_param("StackId")
layer_ids = self._get_param("LayerIds")
layers = self.opsworks_backend.describe_layers(stack_id, layer_ids)
return json.dumps({"Layers": layers}, indent=1)
def describe_apps(self):
stack_id = self.parameters.get("StackId")
app_ids = self.parameters.get("AppIds")
def describe_apps(self) -> str:
stack_id = self._get_param("StackId")
app_ids = self._get_param("AppIds")
apps = self.opsworks_backend.describe_apps(stack_id, app_ids)
return json.dumps({"Apps": apps}, indent=1)
def describe_instances(self):
instance_ids = self.parameters.get("InstanceIds")
layer_id = self.parameters.get("LayerId")
stack_id = self.parameters.get("StackId")
def describe_instances(self) -> str:
instance_ids = self._get_param("InstanceIds")
layer_id = self._get_param("LayerId")
stack_id = self._get_param("StackId")
instances = self.opsworks_backend.describe_instances(
instance_ids, layer_id, stack_id
)
return json.dumps({"Instances": instances}, indent=1)
def start_instance(self):
instance_id = self.parameters.get("InstanceId")
def start_instance(self) -> str:
instance_id = self._get_param("InstanceId")
self.opsworks_backend.start_instance(instance_id)
return ""

View File

@ -4,7 +4,7 @@ from moto.core.exceptions import JsonRESTError
class AccountAlreadyRegisteredException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"AccountAlreadyRegisteredException",
"The provided account is already a delegated administrator for your organization.",
@ -14,7 +14,7 @@ class AccountAlreadyRegisteredException(JsonRESTError):
class AccountNotRegisteredException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"AccountNotRegisteredException",
"The provided account is not a registered delegated administrator for your organization.",
@ -24,7 +24,7 @@ class AccountNotRegisteredException(JsonRESTError):
class AccountNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"AccountNotFoundException", "You specified an account that doesn't exist."
)
@ -33,7 +33,7 @@ class AccountNotFoundException(JsonRESTError):
class AWSOrganizationsNotInUseException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"AWSOrganizationsNotInUseException",
"Your account is not a member of an organization.",
@ -43,21 +43,21 @@ class AWSOrganizationsNotInUseException(JsonRESTError):
class ConstraintViolationException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ConstraintViolationException", message)
class InvalidInputException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("InvalidInputException", message)
class DuplicateOrganizationalUnitException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"DuplicateOrganizationalUnitException",
"An OU with the same name already exists.",
@ -67,7 +67,7 @@ class DuplicateOrganizationalUnitException(JsonRESTError):
class DuplicatePolicyException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"DuplicatePolicyException", "A policy with the same name already exists."
)
@ -76,7 +76,7 @@ class DuplicatePolicyException(JsonRESTError):
class PolicyTypeAlreadyEnabledException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"PolicyTypeAlreadyEnabledException",
"The specified policy type is already enabled.",
@ -86,7 +86,7 @@ class PolicyTypeAlreadyEnabledException(JsonRESTError):
class PolicyTypeNotEnabledException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"PolicyTypeNotEnabledException",
"This operation can be performed only for enabled policy types.",
@ -96,7 +96,7 @@ class PolicyTypeNotEnabledException(JsonRESTError):
class RootNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"RootNotFoundException", "You specified a root that doesn't exist."
)
@ -105,7 +105,7 @@ class RootNotFoundException(JsonRESTError):
class TargetNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"TargetNotFoundException", "You specified a target that doesn't exist."
)

View File

@ -1,6 +1,7 @@
import datetime
import re
import json
from typing import Any, Dict, List, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.exceptions import RESTError
@ -25,7 +26,7 @@ from .utils import PAGINATION_MODEL
class FakeOrganization(BaseModel):
def __init__(self, account_id, feature_set):
def __init__(self, account_id: str, feature_set: str):
self.id = utils.make_random_org_id()
self.root_id = utils.make_random_root_id()
self.feature_set = feature_set
@ -39,14 +40,14 @@ class FakeOrganization(BaseModel):
]
@property
def arn(self):
def arn(self) -> str:
return utils.ORGANIZATION_ARN_FORMAT.format(self.master_account_id, self.id)
@property
def master_account_arn(self):
def master_account_arn(self) -> str:
return utils.MASTER_ACCOUNT_ARN_FORMAT.format(self.master_account_id, self.id)
def describe(self):
def describe(self) -> Dict[str, Any]:
return {
"Organization": {
"Id": self.id,
@ -61,7 +62,7 @@ class FakeOrganization(BaseModel):
class FakeAccount(BaseModel):
def __init__(self, organization, **kwargs):
def __init__(self, organization: FakeOrganization, **kwargs: Any):
self.type = "ACCOUNT"
self.organization_id = organization.id
self.master_account_id = organization.master_account_id
@ -73,17 +74,17 @@ class FakeAccount(BaseModel):
self.status = "ACTIVE"
self.joined_method = "CREATED"
self.parent_id = organization.root_id
self.attached_policies = []
self.attached_policies: List[FakePolicy] = []
self.tags = {tag["Key"]: tag["Value"] for tag in kwargs.get("Tags", [])}
@property
def arn(self):
def arn(self) -> str:
return utils.ACCOUNT_ARN_FORMAT.format(
self.master_account_id, self.organization_id, self.id
)
@property
def create_account_status(self):
def create_account_status(self) -> Dict[str, Any]: # type: ignore[misc]
return {
"CreateAccountStatus": {
"Id": self.create_account_status_id,
@ -95,7 +96,7 @@ class FakeAccount(BaseModel):
}
}
def describe(self):
def describe(self) -> Dict[str, Any]:
return {
"Id": self.id,
"Arn": self.arn,
@ -106,14 +107,14 @@ class FakeAccount(BaseModel):
"JoinedTimestamp": unix_time(self.create_time),
}
def close(self):
def close(self) -> None:
# TODO: The CloseAccount spec allows the account to pass through a
# "PENDING_CLOSURE" state before reaching the SUSPNEDED state.
# "PENDING_CLOSURE" state before reaching the SUSPENDED state.
self.status = "SUSPENDED"
class FakeOrganizationalUnit(BaseModel):
def __init__(self, organization, **kwargs):
def __init__(self, organization: FakeOrganization, **kwargs: Any):
self.type = "ORGANIZATIONAL_UNIT"
self.organization_id = organization.id
self.master_account_id = organization.master_account_id
@ -121,16 +122,16 @@ class FakeOrganizationalUnit(BaseModel):
self.name = kwargs.get("Name")
self.parent_id = kwargs.get("ParentId")
self._arn_format = utils.OU_ARN_FORMAT
self.attached_policies = []
self.attached_policies: List[FakePolicy] = []
self.tags = {tag["Key"]: tag["Value"] for tag in kwargs.get("Tags", [])}
@property
def arn(self):
def arn(self) -> str:
return self._arn_format.format(
self.master_account_id, self.organization_id, self.id
)
def describe(self):
def describe(self) -> Dict[str, Dict[str, Any]]:
return {
"OrganizationalUnit": {"Id": self.id, "Arn": self.arn, "Name": self.name}
}
@ -144,17 +145,17 @@ class FakeRoot(FakeOrganizationalUnit):
"TAG_POLICY",
]
def __init__(self, organization, **kwargs):
def __init__(self, organization: FakeOrganization, **kwargs: Any):
super().__init__(organization, **kwargs)
self.type = "ROOT"
self.id = organization.root_id
self.name = "Root"
self.policy_types = []
self.policy_types: List[Dict[str, str]] = []
self._arn_format = utils.ROOT_ARN_FORMAT
self.attached_policies = []
self.tags = {tag["Key"]: tag["Value"] for tag in kwargs.get("Tags", [])}
def describe(self):
def describe(self) -> Dict[str, Any]:
return {
"Id": self.id,
"Arn": self.arn,
@ -162,7 +163,7 @@ class FakeRoot(FakeOrganizationalUnit):
"PolicyTypes": self.policy_types,
}
def add_policy_type(self, policy_type):
def add_policy_type(self, policy_type: str) -> None:
if policy_type not in self.SUPPORTED_POLICY_TYPES:
raise InvalidInputException("You specified an invalid value.")
@ -171,7 +172,7 @@ class FakeRoot(FakeOrganizationalUnit):
self.policy_types.append({"Type": policy_type, "Status": "ENABLED"})
def remove_policy_type(self, policy_type):
def remove_policy_type(self, policy_type: str) -> None:
if not FakePolicy.supported_policy_type(policy_type):
raise InvalidInputException("You specified an invalid value.")
@ -189,16 +190,16 @@ class FakePolicy(BaseModel):
"TAG_POLICY",
]
def __init__(self, organization, **kwargs):
def __init__(self, organization: FakeOrganization, **kwargs: Any):
self.content = kwargs.get("Content")
self.description = kwargs.get("Description")
self.name = kwargs.get("Name")
self.type = kwargs.get("Type")
self.type = kwargs.get("Type", "")
self.id = utils.make_random_policy_id()
self.aws_managed = False
self.organization_id = organization.id
self.master_account_id = organization.master_account_id
self.attachments = []
self.attachments: List[Any] = []
if not FakePolicy.supported_policy_type(self.type):
raise InvalidInputException("You specified an invalid value.")
@ -212,12 +213,12 @@ class FakePolicy(BaseModel):
)
@property
def arn(self):
def arn(self) -> str:
return self._arn_format.format(
self.master_account_id, self.organization_id, self.id
)
def describe(self):
def describe(self) -> Dict[str, Any]:
return {
"Policy": {
"PolicySummary": {
@ -233,7 +234,7 @@ class FakePolicy(BaseModel):
}
@staticmethod
def supported_policy_type(policy_type):
def supported_policy_type(policy_type: str) -> bool:
return policy_type in FakePolicy.SUPPORTED_POLICY_TYPES
@ -264,7 +265,7 @@ class FakeServiceAccess(BaseModel):
"tagpolicies.tag.amazonaws.com",
]
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
if not self.trusted_service(kwargs["ServicePrincipal"]):
raise InvalidInputException(
"You specified an unrecognized service principal."
@ -273,14 +274,14 @@ class FakeServiceAccess(BaseModel):
self.service_principal = kwargs["ServicePrincipal"]
self.date_enabled = datetime.datetime.utcnow()
def describe(self):
def describe(self) -> Dict[str, Any]:
return {
"ServicePrincipal": self.service_principal,
"DateEnabled": unix_time(self.date_enabled),
}
@staticmethod
def trusted_service(service_principal):
def trusted_service(service_principal: str) -> bool:
return service_principal in FakeServiceAccess.TRUSTED_SERVICES
@ -296,12 +297,12 @@ class FakeDelegatedAdministrator(BaseModel):
"ssm.amazonaws.com",
]
def __init__(self, account):
def __init__(self, account: FakeAccount):
self.account = account
self.enabled_date = datetime.datetime.utcnow()
self.services = {}
self.services: Dict[str, Any] = {}
def add_service_principal(self, service_principal):
def add_service_principal(self, service_principal: str) -> None:
if service_principal in self.services:
raise AccountAlreadyRegisteredException
@ -315,7 +316,7 @@ class FakeDelegatedAdministrator(BaseModel):
"DelegationEnabledDate": unix_time(datetime.datetime.utcnow()),
}
def remove_service_principal(self, service_principal):
def remove_service_principal(self, service_principal: str) -> None:
if service_principal not in self.services:
raise InvalidInputException(
"You specified an unrecognized service principal."
@ -323,38 +324,38 @@ class FakeDelegatedAdministrator(BaseModel):
self.services.pop(service_principal)
def describe(self):
def describe(self) -> Dict[str, Any]:
admin = self.account.describe()
admin["DelegationEnabledDate"] = unix_time(self.enabled_date)
return admin
@staticmethod
def supported_service(service_principal):
def supported_service(service_principal: str) -> bool:
return service_principal in FakeDelegatedAdministrator.SUPPORTED_SERVICES
class OrganizationsBackend(BaseBackend):
def __init__(self, region_name, account_id):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self._reset()
def _reset(self):
self.org = None
self.accounts = []
self.ou = []
self.policies = []
self.services = []
self.admins = []
def _reset(self) -> None:
self.org: Optional[FakeOrganization] = None
self.accounts: List[FakeAccount] = []
self.ou: List[FakeOrganizationalUnit] = []
self.policies: List[FakePolicy] = []
self.services: List[Dict[str, Any]] = []
self.admins: List[FakeDelegatedAdministrator] = []
def _get_root_by_id(self, root_id):
def _get_root_by_id(self, root_id: str) -> FakeRoot:
root = next((ou for ou in self.ou if ou.id == root_id), None)
if not root:
raise RootNotFoundException
return root
return root # type: ignore[return-value]
def create_organization(self, **kwargs):
def create_organization(self, **kwargs: Any) -> Dict[str, Any]:
self.org = FakeOrganization(self.account_id, kwargs.get("FeatureSet") or "ALL")
root_ou = FakeRoot(self.org)
self.ou.append(root_ou)
@ -382,37 +383,35 @@ class OrganizationsBackend(BaseBackend):
self.attach_policy(PolicyId=default_policy.id, TargetId=master_account.id)
return self.org.describe()
def describe_organization(self):
def describe_organization(self) -> Dict[str, Any]:
if not self.org:
raise AWSOrganizationsNotInUseException
return self.org.describe()
def delete_organization(self):
def delete_organization(self) -> None:
if [account for account in self.accounts if account.name != "master"]:
raise RESTError(
"OrganizationNotEmptyException",
"To delete an organization you must first remove all member accounts (except the master).",
)
self._reset()
return {}
def list_roots(self):
def list_roots(self) -> Dict[str, Any]:
return dict(Roots=[ou.describe() for ou in self.ou if isinstance(ou, FakeRoot)])
def create_organizational_unit(self, **kwargs):
new_ou = FakeOrganizationalUnit(self.org, **kwargs)
def create_organizational_unit(self, **kwargs: Any) -> Dict[str, Any]:
new_ou = FakeOrganizationalUnit(self.org, **kwargs) # type: ignore
self.ou.append(new_ou)
self.attach_policy(PolicyId=utils.DEFAULT_POLICY_ID, TargetId=new_ou.id)
return new_ou.describe()
def delete_organizational_unit(self, **kwargs):
def delete_organizational_unit(self, **kwargs: Any) -> None:
ou_to_delete = self.get_organizational_unit_by_id(
kwargs["OrganizationalUnitId"]
)
self.ou.remove(ou_to_delete)
return {}
def update_organizational_unit(self, **kwargs):
def update_organizational_unit(self, **kwargs: Any) -> Dict[str, Any]:
for ou in self.ou:
if ou.name == kwargs["Name"]:
raise DuplicateOrganizationalUnitException
@ -420,7 +419,7 @@ class OrganizationsBackend(BaseBackend):
ou.name = kwargs["Name"]
return ou.describe()
def get_organizational_unit_by_id(self, ou_id):
def get_organizational_unit_by_id(self, ou_id: str) -> FakeOrganizationalUnit:
ou = next((ou for ou in self.ou if ou.id == ou_id), None)
if ou is None:
raise RESTError(
@ -429,7 +428,7 @@ class OrganizationsBackend(BaseBackend):
)
return ou
def validate_parent_id(self, parent_id):
def validate_parent_id(self, parent_id: str) -> str:
try:
self.get_organizational_unit_by_id(parent_id)
except RESTError:
@ -438,12 +437,12 @@ class OrganizationsBackend(BaseBackend):
)
return parent_id
def describe_organizational_unit(self, **kwargs):
def describe_organizational_unit(self, **kwargs: Any) -> Dict[str, Any]:
ou = self.get_organizational_unit_by_id(kwargs["OrganizationalUnitId"])
return ou.describe()
@paginate(pagination_model=PAGINATION_MODEL)
def list_organizational_units_for_parent(self, **kwargs):
def list_organizational_units_for_parent(self, **kwargs: Any) -> List[Dict[str, Any]]: # type: ignore
parent_id = self.validate_parent_id(kwargs["parent_id"])
return [
{"Id": ou.id, "Arn": ou.arn, "Name": ou.name}
@ -451,20 +450,20 @@ class OrganizationsBackend(BaseBackend):
if ou.parent_id == parent_id
]
def create_account(self, **kwargs):
new_account = FakeAccount(self.org, **kwargs)
def create_account(self, **kwargs: Any) -> Dict[str, Any]:
new_account = FakeAccount(self.org, **kwargs) # type: ignore
self.accounts.append(new_account)
self.attach_policy(PolicyId=utils.DEFAULT_POLICY_ID, TargetId=new_account.id)
return new_account.create_account_status
def close_account(self, **kwargs):
def close_account(self, **kwargs: Any) -> None:
for account in self.accounts:
if account.id == kwargs["AccountId"]:
account.close()
return
raise AccountNotFoundException
def get_account_by_id(self, account_id):
def get_account_by_id(self, account_id: str) -> FakeAccount:
account = next(
(account for account in self.accounts if account.id == account_id), None
)
@ -472,7 +471,7 @@ class OrganizationsBackend(BaseBackend):
raise AccountNotFoundException
return account
def get_account_by_attr(self, attr, value):
def get_account_by_attr(self, attr: str, value: Any) -> FakeAccount:
account = next(
(
account
@ -485,17 +484,17 @@ class OrganizationsBackend(BaseBackend):
raise AccountNotFoundException
return account
def describe_account(self, **kwargs):
def describe_account(self, **kwargs: Any) -> Dict[str, Any]:
account = self.get_account_by_id(kwargs["AccountId"])
return dict(Account=account.describe())
def describe_create_account_status(self, **kwargs):
def describe_create_account_status(self, **kwargs: Any) -> Dict[str, Any]:
account = self.get_account_by_attr(
"create_account_status_id", kwargs["CreateAccountRequestId"]
)
return account.create_account_status
def list_create_account_status(self, **kwargs):
def list_create_account_status(self, **kwargs: Any) -> Dict[str, Any]:
requested_states = kwargs.get("States")
if not requested_states:
requested_states = ["IN_PROGRESS", "SUCCEEDED", "FAILED"]
@ -517,32 +516,30 @@ class OrganizationsBackend(BaseBackend):
return dict(CreateAccountStatuses=accounts_resp, NextToken=next_token)
@paginate(pagination_model=PAGINATION_MODEL)
def list_accounts(self):
def list_accounts(self) -> List[FakeAccount]: # type: ignore
accounts = [account.describe() for account in self.accounts]
accounts = sorted(accounts, key=lambda x: x["JoinedTimestamp"])
return accounts
return sorted(accounts, key=lambda x: x["JoinedTimestamp"]) # type: ignore
@paginate(pagination_model=PAGINATION_MODEL)
def list_accounts_for_parent(self, **kwargs):
def list_accounts_for_parent(self, **kwargs: Any) -> Any: # type: ignore
parent_id = self.validate_parent_id(kwargs["parent_id"])
accounts = [
account.describe()
for account in self.accounts
if account.parent_id == parent_id
]
accounts = sorted(accounts, key=lambda x: x["JoinedTimestamp"])
return accounts
return sorted(accounts, key=lambda x: x["JoinedTimestamp"])
def move_account(self, **kwargs):
def move_account(self, **kwargs: Any) -> None:
new_parent_id = self.validate_parent_id(kwargs["DestinationParentId"])
self.validate_parent_id(kwargs["SourceParentId"])
account = self.get_account_by_id(kwargs["AccountId"])
index = self.accounts.index(account)
self.accounts[index].parent_id = new_parent_id
def list_parents(self, **kwargs):
def list_parents(self, **kwargs: Any) -> Dict[str, Any]:
if re.compile(r"[0-9]{12}").match(kwargs["ChildId"]):
child_object = self.get_account_by_id(kwargs["ChildId"])
child_object: Any = self.get_account_by_id(kwargs["ChildId"])
else:
child_object = self.get_organizational_unit_by_id(kwargs["ChildId"])
return dict(
@ -553,10 +550,10 @@ class OrganizationsBackend(BaseBackend):
]
)
def list_children(self, **kwargs):
def list_children(self, **kwargs: Any) -> Dict[str, Any]:
parent_id = self.validate_parent_id(kwargs["ParentId"])
if kwargs["ChildType"] == "ACCOUNT":
obj_list = self.accounts
obj_list: List[Any] = self.accounts
elif kwargs["ChildType"] == "ORGANIZATIONAL_UNIT":
obj_list = self.ou
else:
@ -569,15 +566,15 @@ class OrganizationsBackend(BaseBackend):
]
)
def create_policy(self, **kwargs):
new_policy = FakePolicy(self.org, **kwargs)
def create_policy(self, **kwargs: Any) -> Dict[str, Any]:
new_policy = FakePolicy(self.org, **kwargs) # type: ignore
for policy in self.policies:
if kwargs["Name"] == policy.name:
raise DuplicatePolicyException
self.policies.append(new_policy)
return new_policy.describe()
def describe_policy(self, **kwargs):
def describe_policy(self, **kwargs: Any) -> Dict[str, Any]:
if re.compile(utils.POLICY_ID_REGEX).match(kwargs["PolicyId"]):
policy = next(
(p for p in self.policies if p.id == kwargs["PolicyId"]), None
@ -591,7 +588,7 @@ class OrganizationsBackend(BaseBackend):
raise InvalidInputException("You specified an invalid value.")
return policy.describe()
def get_policy_by_id(self, policy_id):
def get_policy_by_id(self, policy_id: str) -> FakePolicy:
policy = next(
(policy for policy in self.policies if policy.id == policy_id), None
)
@ -602,14 +599,14 @@ class OrganizationsBackend(BaseBackend):
)
return policy
def update_policy(self, **kwargs):
def update_policy(self, **kwargs: Any) -> Dict[str, Any]:
policy = self.get_policy_by_id(kwargs["PolicyId"])
policy.name = kwargs.get("Name", policy.name)
policy.description = kwargs.get("Description", policy.description)
policy.content = kwargs.get("Content", policy.content)
return policy.describe()
def attach_policy(self, **kwargs):
def attach_policy(self, **kwargs: Any) -> None:
policy = self.get_policy_by_id(kwargs["PolicyId"])
if re.compile(utils.ROOT_ID_REGEX).match(kwargs["TargetId"]) or re.compile(
utils.OU_ID_REGEX
@ -637,12 +634,12 @@ class OrganizationsBackend(BaseBackend):
else:
raise InvalidInputException("You specified an invalid value.")
def list_policies(self):
def list_policies(self) -> Dict[str, Any]:
return dict(
Policies=[p.describe()["Policy"]["PolicySummary"] for p in self.policies]
)
def delete_policy(self, **kwargs):
def delete_policy(self, **kwargs: Any) -> None:
for idx, policy in enumerate(self.policies):
if policy.id == kwargs["PolicyId"]:
if self.list_targets_for_policy(PolicyId=policy.id)["Targets"]:
@ -657,11 +654,11 @@ class OrganizationsBackend(BaseBackend):
"We can't find a policy with the PolicyId that you specified.",
)
def list_policies_for_target(self, **kwargs):
def list_policies_for_target(self, **kwargs: Any) -> Dict[str, Any]:
_filter = kwargs["Filter"]
if re.match(utils.ROOT_ID_REGEX, kwargs["TargetId"]):
obj = next((ou for ou in self.ou if ou.id == kwargs["TargetId"]), None)
obj: Any = next((ou for ou in self.ou if ou.id == kwargs["TargetId"]), None)
if obj is None:
raise TargetNotFoundException
elif re.compile(utils.OU_ID_REGEX).match(kwargs["TargetId"]):
@ -694,11 +691,11 @@ class OrganizationsBackend(BaseBackend):
]
)
def _get_resource_for_tagging(self, resource_id):
def _get_resource_for_tagging(self, resource_id: str) -> Any:
if utils.fullmatch(
re.compile(utils.OU_ID_REGEX), resource_id
) or utils.fullmatch(utils.ROOT_ID_REGEX, resource_id):
resource = next((a for a in self.ou if a.id == resource_id), None)
resource: Any = next((a for a in self.ou if a.id == resource_id), None)
elif utils.fullmatch(re.compile(utils.ACCOUNT_ID_REGEX), resource_id):
resource = next((a for a in self.accounts if a.id == resource_id), None)
elif utils.fullmatch(re.compile(utils.POLICY_ID_REGEX), resource_id):
@ -713,7 +710,7 @@ class OrganizationsBackend(BaseBackend):
return resource
def list_targets_for_policy(self, **kwargs):
def list_targets_for_policy(self, **kwargs: Any) -> Dict[str, Any]:
if re.compile(utils.POLICY_ID_REGEX).match(kwargs["PolicyId"]):
policy = next(
(p for p in self.policies if p.id == kwargs["PolicyId"]), None
@ -731,22 +728,22 @@ class OrganizationsBackend(BaseBackend):
]
return dict(Targets=objects)
def tag_resource(self, **kwargs):
def tag_resource(self, **kwargs: Any) -> None:
resource = self._get_resource_for_tagging(kwargs["ResourceId"])
new_tags = {tag["Key"]: tag["Value"] for tag in kwargs["Tags"]}
resource.tags.update(new_tags)
def list_tags_for_resource(self, **kwargs):
def list_tags_for_resource(self, **kwargs: str) -> Dict[str, Any]:
resource = self._get_resource_for_tagging(kwargs["ResourceId"])
tags = [{"Key": key, "Value": value} for key, value in resource.tags.items()]
return dict(Tags=tags)
def untag_resource(self, **kwargs):
def untag_resource(self, **kwargs: Any) -> None:
resource = self._get_resource_for_tagging(kwargs["ResourceId"])
for key in kwargs["TagKeys"]:
resource.tags.pop(key, None)
def enable_aws_service_access(self, **kwargs):
def enable_aws_service_access(self, **kwargs: str) -> None:
service = FakeServiceAccess(**kwargs)
# enabling an existing service results in no changes
@ -758,10 +755,10 @@ class OrganizationsBackend(BaseBackend):
self.services.append(service.describe())
def list_aws_service_access_for_organization(self):
def list_aws_service_access_for_organization(self) -> Dict[str, Any]:
return dict(EnabledServicePrincipals=self.services)
def disable_aws_service_access(self, **kwargs):
def disable_aws_service_access(self, **kwargs: str) -> None:
if not FakeServiceAccess.trusted_service(kwargs["ServicePrincipal"]):
raise InvalidInputException(
"You specified an unrecognized service principal."
@ -779,7 +776,7 @@ class OrganizationsBackend(BaseBackend):
if service_principal:
self.services.remove(service_principal)
def register_delegated_administrator(self, **kwargs):
def register_delegated_administrator(self, **kwargs: str) -> None:
account_id = kwargs["AccountId"]
if account_id == self.account_id:
@ -798,7 +795,7 @@ class OrganizationsBackend(BaseBackend):
admin.add_service_principal(kwargs["ServicePrincipal"])
def list_delegated_administrators(self, **kwargs):
def list_delegated_administrators(self, **kwargs: str) -> Dict[str, Any]:
admins = self.admins
service = kwargs.get("ServicePrincipal")
@ -814,7 +811,7 @@ class OrganizationsBackend(BaseBackend):
return dict(DelegatedAdministrators=delegated_admins)
def list_delegated_services_for_account(self, **kwargs):
def list_delegated_services_for_account(self, **kwargs: str) -> Dict[str, Any]:
admin = next(
(admin for admin in self.admins if admin.account.id == kwargs["AccountId"]),
None,
@ -837,7 +834,7 @@ class OrganizationsBackend(BaseBackend):
return dict(DelegatedServices=services)
def deregister_delegated_administrator(self, **kwargs):
def deregister_delegated_administrator(self, **kwargs: str) -> None:
account_id = kwargs["AccountId"]
service = kwargs["ServicePrincipal"]
@ -869,21 +866,21 @@ class OrganizationsBackend(BaseBackend):
if not admin.services:
self.admins.remove(admin)
def enable_policy_type(self, **kwargs):
def enable_policy_type(self, **kwargs: str) -> Dict[str, Any]:
root = self._get_root_by_id(kwargs["RootId"])
root.add_policy_type(kwargs["PolicyType"])
return dict(Root=root.describe())
def disable_policy_type(self, **kwargs):
def disable_policy_type(self, **kwargs: str) -> Dict[str, Any]:
root = self._get_root_by_id(kwargs["RootId"])
root.remove_policy_type(kwargs["PolicyType"])
return dict(Root=root.describe())
def detach_policy(self, **kwargs):
def detach_policy(self, **kwargs: str) -> None:
policy = self.get_policy_by_id(kwargs["PolicyId"])
root_id_regex = utils.ROOT_ID_REGEX
ou_id_regex = utils.OU_ID_REGEX
@ -914,7 +911,7 @@ class OrganizationsBackend(BaseBackend):
else:
raise InvalidInputException("You specified an invalid value.")
def remove_account_from_organization(self, **kwargs):
def remove_account_from_organization(self, **kwargs: str) -> None:
account = self.get_account_by_id(kwargs["AccountId"])
for policy in account.attached_policies:
policy.attachments.remove(account)

View File

@ -1,64 +1,65 @@
import json
from typing import Any, Dict
from moto.core.responses import BaseResponse
from .models import organizations_backends
from .models import organizations_backends, OrganizationsBackend
class OrganizationsResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="organizations")
@property
def organizations_backend(self):
def organizations_backend(self) -> OrganizationsBackend:
return organizations_backends[self.current_account]["global"]
@property
def request_params(self):
def request_params(self) -> Dict[str, Any]: # type: ignore[misc]
try:
return json.loads(self.body)
except ValueError:
return {}
def _get_param(self, param_name, if_none=None):
def _get_param(self, param_name: str, if_none: Any = None) -> Any:
return self.request_params.get(param_name, if_none)
def create_organization(self):
def create_organization(self) -> str:
return json.dumps(
self.organizations_backend.create_organization(**self.request_params)
)
def describe_organization(self):
def describe_organization(self) -> str:
return json.dumps(self.organizations_backend.describe_organization())
def delete_organization(self):
return json.dumps(self.organizations_backend.delete_organization())
def delete_organization(self) -> str:
self.organizations_backend.delete_organization()
return "{}"
def list_roots(self):
def list_roots(self) -> str:
return json.dumps(self.organizations_backend.list_roots())
def create_organizational_unit(self):
def create_organizational_unit(self) -> str:
return json.dumps(
self.organizations_backend.create_organizational_unit(**self.request_params)
)
def delete_organizational_unit(self):
return json.dumps(
self.organizations_backend.delete_organizational_unit(**self.request_params)
)
def delete_organizational_unit(self) -> str:
self.organizations_backend.delete_organizational_unit(**self.request_params)
return "{}"
def update_organizational_unit(self):
def update_organizational_unit(self) -> str:
return json.dumps(
self.organizations_backend.update_organizational_unit(**self.request_params)
)
def describe_organizational_unit(self):
def describe_organizational_unit(self) -> str:
return json.dumps(
self.organizations_backend.describe_organizational_unit(
**self.request_params
)
)
def list_organizational_units_for_parent(self):
def list_organizational_units_for_parent(self) -> str:
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
parent_id = self._get_param("ParentId")
@ -73,39 +74,38 @@ class OrganizationsResponse(BaseResponse):
response["NextToken"] = next_token
return json.dumps(response)
def list_parents(self):
def list_parents(self) -> str:
return json.dumps(
self.organizations_backend.list_parents(**self.request_params)
)
def create_account(self):
def create_account(self) -> str:
return json.dumps(
self.organizations_backend.create_account(**self.request_params)
)
def close_account(self):
return json.dumps(
self.organizations_backend.close_account(**self.request_params)
)
def close_account(self) -> str:
self.organizations_backend.close_account(**self.request_params)
return "{}"
def describe_account(self):
def describe_account(self) -> str:
return json.dumps(
self.organizations_backend.describe_account(**self.request_params)
)
def describe_create_account_status(self):
def describe_create_account_status(self) -> str:
return json.dumps(
self.organizations_backend.describe_create_account_status(
**self.request_params
)
)
def list_create_account_status(self):
def list_create_account_status(self) -> str:
return json.dumps(
self.organizations_backend.list_create_account_status(**self.request_params)
)
def list_accounts(self):
def list_accounts(self) -> str:
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
accounts, next_token = self.organizations_backend.list_accounts(
@ -116,7 +116,7 @@ class OrganizationsResponse(BaseResponse):
response["NextToken"] = next_token
return json.dumps(response)
def list_accounts_for_parent(self):
def list_accounts_for_parent(self) -> str:
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
parent_id = self._get_param("ParentId")
@ -128,129 +128,119 @@ class OrganizationsResponse(BaseResponse):
response["NextToken"] = next_token
return json.dumps(response)
def move_account(self):
return json.dumps(
self.organizations_backend.move_account(**self.request_params)
)
def move_account(self) -> str:
self.organizations_backend.move_account(**self.request_params)
return "{}"
def list_children(self):
def list_children(self) -> str:
return json.dumps(
self.organizations_backend.list_children(**self.request_params)
)
def create_policy(self):
def create_policy(self) -> str:
return json.dumps(
self.organizations_backend.create_policy(**self.request_params)
)
def describe_policy(self):
def describe_policy(self) -> str:
return json.dumps(
self.organizations_backend.describe_policy(**self.request_params)
)
def update_policy(self):
def update_policy(self) -> str:
return json.dumps(
self.organizations_backend.update_policy(**self.request_params)
)
def attach_policy(self):
return json.dumps(
self.organizations_backend.attach_policy(**self.request_params)
)
def attach_policy(self) -> str:
self.organizations_backend.attach_policy(**self.request_params)
return "{}"
def list_policies(self):
def list_policies(self) -> str:
return json.dumps(self.organizations_backend.list_policies())
def delete_policy(self):
def delete_policy(self) -> str:
self.organizations_backend.delete_policy(**self.request_params)
return json.dumps({})
def list_policies_for_target(self):
def list_policies_for_target(self) -> str:
return json.dumps(
self.organizations_backend.list_policies_for_target(**self.request_params)
)
def list_targets_for_policy(self):
def list_targets_for_policy(self) -> str:
return json.dumps(
self.organizations_backend.list_targets_for_policy(**self.request_params)
)
def tag_resource(self):
return json.dumps(
self.organizations_backend.tag_resource(**self.request_params)
)
def tag_resource(self) -> str:
self.organizations_backend.tag_resource(**self.request_params)
return "{}"
def list_tags_for_resource(self):
def list_tags_for_resource(self) -> str:
return json.dumps(
self.organizations_backend.list_tags_for_resource(**self.request_params)
)
def untag_resource(self):
return json.dumps(
self.organizations_backend.untag_resource(**self.request_params)
)
def untag_resource(self) -> str:
self.organizations_backend.untag_resource(**self.request_params)
return "{}"
def enable_aws_service_access(self):
return json.dumps(
self.organizations_backend.enable_aws_service_access(**self.request_params)
)
def enable_aws_service_access(self) -> str:
self.organizations_backend.enable_aws_service_access(**self.request_params)
return "{}"
def list_aws_service_access_for_organization(self):
def list_aws_service_access_for_organization(self) -> str:
return json.dumps(
self.organizations_backend.list_aws_service_access_for_organization()
)
def disable_aws_service_access(self):
return json.dumps(
self.organizations_backend.disable_aws_service_access(**self.request_params)
)
def disable_aws_service_access(self) -> str:
self.organizations_backend.disable_aws_service_access(**self.request_params)
return "{}"
def register_delegated_administrator(self):
return json.dumps(
self.organizations_backend.register_delegated_administrator(
**self.request_params
)
def register_delegated_administrator(self) -> str:
self.organizations_backend.register_delegated_administrator(
**self.request_params
)
return "{}"
def list_delegated_administrators(self):
def list_delegated_administrators(self) -> str:
return json.dumps(
self.organizations_backend.list_delegated_administrators(
**self.request_params
)
)
def list_delegated_services_for_account(self):
def list_delegated_services_for_account(self) -> str:
return json.dumps(
self.organizations_backend.list_delegated_services_for_account(
**self.request_params
)
)
def deregister_delegated_administrator(self):
return json.dumps(
self.organizations_backend.deregister_delegated_administrator(
**self.request_params
)
def deregister_delegated_administrator(self) -> str:
self.organizations_backend.deregister_delegated_administrator(
**self.request_params
)
return "{}"
def enable_policy_type(self):
def enable_policy_type(self) -> str:
return json.dumps(
self.organizations_backend.enable_policy_type(**self.request_params)
)
def disable_policy_type(self):
def disable_policy_type(self) -> str:
return json.dumps(
self.organizations_backend.disable_policy_type(**self.request_params)
)
def detach_policy(self):
return json.dumps(
self.organizations_backend.detach_policy(**self.request_params)
)
def detach_policy(self) -> str:
self.organizations_backend.detach_policy(**self.request_params)
return "{}"
def remove_account_from_organization(self):
return json.dumps(
self.organizations_backend.remove_account_from_organization(
**self.request_params
)
def remove_account_from_organization(self) -> str:
self.organizations_backend.remove_account_from_organization(
**self.request_params
)
return "{}"

View File

@ -1,6 +1,7 @@
import re
import string
from moto.moto_api._internal import mock_random as random
from typing import Pattern, Union
MASTER_ACCOUNT_EMAIL = "master@example.com"
@ -56,21 +57,21 @@ PAGINATION_MODEL = {
}
def make_random_org_id():
def make_random_org_id() -> str:
# The regex pattern for an organization ID string requires "o-"
# followed by from 10 to 32 lower-case letters or digits.
# e.g. 'o-vipjnq5z86'
return "o-" + "".join(random.choice(CHARSET) for x in range(ORG_ID_SIZE))
def make_random_root_id():
def make_random_root_id() -> str:
# The regex pattern for a root ID string requires "r-" followed by
# from 4 to 32 lower-case letters or digits.
# e.g. 'r-3zwx'
return "r-" + "".join(random.choice(CHARSET) for x in range(ROOT_ID_SIZE))
def make_random_ou_id(root_id):
def make_random_ou_id(root_id: str) -> str:
# The regex pattern for an organizational unit ID string requires "ou-"
# followed by from 4 to 32 lower-case letters or digits (the ID of the root
# that contains the OU) followed by a second "-" dash and from 8 to 32
@ -85,13 +86,13 @@ def make_random_ou_id(root_id):
)
def make_random_account_id():
def make_random_account_id() -> str:
# The regex pattern for an account ID string requires exactly 12 digits.
# e.g. '488633172133'
return "".join([random.choice(string.digits) for n in range(ACCOUNT_ID_SIZE)])
def make_random_create_account_status_id():
def make_random_create_account_status_id() -> str:
# The regex pattern for an create account request ID string requires
# "car-" followed by from 8 to 32 lower-case letters or digits.
# e.g. 'car-35gxzwrp'
@ -100,15 +101,16 @@ def make_random_create_account_status_id():
)
def make_random_policy_id():
def make_random_policy_id() -> str:
# The regex pattern for a policy ID string requires "p-" followed by
# from 8 to 128 lower-case letters or digits.
# e.g. 'p-k2av4a8a'
return "p-" + "".join(random.choice(CHARSET) for x in range(POLICY_ID_SIZE))
def fullmatch(regex, s, flags=0):
def fullmatch(regex: Union[Pattern[str], str], s: str, flags: int = 0) -> bool:
"""Emulate python-3.4 re.fullmatch()."""
m = re.match(regex, s, flags=flags)
if m and m.span()[1] == len(s):
return m
return True
return False

View File

@ -235,7 +235,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/neptune,moto/opensearch,moto/rdsdata
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/rdsdata
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract