Techdebt - Refactor Autoscaling tests (#5430)

This commit is contained in:
Bert Blommers 2022-08-29 20:30:48 +00:00 committed by GitHub
parent 6035a44d79
commit bd051f4f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 326 additions and 518 deletions

View File

@ -9,157 +9,6 @@ from tests import EXAMPLE_AMI_ID
from .utils import setup_networking, setup_instance_with_networking
@mock_autoscaling
def test_create_autoscaling_groups_defaults():
"""Test with the minimum inputs and check that all of the proper defaults
are assigned for the other attributes"""
mocked_networking = setup_networking()
as_client = boto3.client("autoscaling", region_name="us-east-1")
as_client.create_launch_configuration(
LaunchConfigurationName="tester_config",
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
as_client.create_auto_scaling_group(
AutoScalingGroupName="tester_group",
LaunchConfigurationName="tester_config",
MinSize=2,
MaxSize=2,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
group = as_client.describe_auto_scaling_groups()["AutoScalingGroups"][0]
group["AutoScalingGroupName"].should.equal("tester_group")
group["MaxSize"].should.equal(2)
group["MinSize"].should.equal(2)
group["LaunchConfigurationName"].should.equal("tester_config")
# Defaults
group["AvailabilityZones"].should.equal(["us-east-1a"]) # subnet1
group["DesiredCapacity"].should.equal(2)
group["VPCZoneIdentifier"].should.equal(mocked_networking["subnet1"])
group["DefaultCooldown"].should.equal(300)
group["HealthCheckGracePeriod"].should.equal(300)
group["HealthCheckType"].should.equal("EC2")
group["LoadBalancerNames"].should.equal([])
group.shouldnt.have.key("PlacementGroup")
group["TerminationPolicies"].should.equal(["Default"])
group["Tags"].should.equal([])
@mock_autoscaling
def test_list_many_autoscaling_groups():
mocked_networking = setup_networking()
conn = boto3.client("autoscaling", region_name="us-east-1")
conn.create_launch_configuration(
LaunchConfigurationName="TestLC",
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
for i in range(51):
conn.create_auto_scaling_group(
AutoScalingGroupName="TestGroup%d" % i,
MinSize=1,
MaxSize=2,
LaunchConfigurationName="TestLC",
VPCZoneIdentifier=mocked_networking["subnet1"],
)
response = conn.describe_auto_scaling_groups()
groups = response["AutoScalingGroups"]
marker = response["NextToken"]
groups.should.have.length_of(50)
marker.should.equal(groups[-1]["AutoScalingGroupName"])
response2 = conn.describe_auto_scaling_groups(NextToken=marker)
groups.extend(response2["AutoScalingGroups"])
groups.should.have.length_of(51)
assert "NextToken" not in response2.keys()
@mock_autoscaling
def test_list_many_scheduled_scaling_actions():
conn = boto3.client("autoscaling", region_name="us-east-1")
for i in range(30):
conn.put_scheduled_update_group_action(
AutoScalingGroupName="tester_group",
ScheduledActionName=f"my-scheduled-action-{i}",
StartTime=f"2022-07-01T00:00:{i}Z",
EndTime=f"2022-09-01T00:00:{i}Z",
Recurrence="* * * * *",
MinSize=i + 1,
MaxSize=i + 5,
DesiredCapacity=i + 3,
)
response = conn.describe_scheduled_actions(AutoScalingGroupName="tester_group")
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(30)
@mock_autoscaling
def test_non_existing_group_name():
conn = boto3.client("autoscaling", region_name="us-east-1")
conn.put_scheduled_update_group_action(
AutoScalingGroupName="tester_group",
ScheduledActionName="my-scheduled-action",
StartTime="2022-07-01T00:00:1Z",
EndTime="2022-09-01T00:00:2Z",
Recurrence="* * * * *",
MinSize=1,
MaxSize=5,
DesiredCapacity=3,
)
response = conn.describe_scheduled_actions(AutoScalingGroupName="wrong_group")
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(
0
) # since there is no such group name, no actions have been returned
@mock_autoscaling
def test_describe_scheduled_actions_returns_all_actions_when_no_argument_is_passed():
conn = boto3.client("autoscaling", region_name="us-east-1")
for i in range(30):
conn.put_scheduled_update_group_action(
AutoScalingGroupName="tester_group",
ScheduledActionName=f"my-scheduled-action-{i}",
StartTime=f"2022-07-01T00:00:{i}Z",
EndTime=f"2022-09-01T00:00:{i}Z",
Recurrence="* * * * *",
MinSize=i + 1,
MaxSize=i + 5,
DesiredCapacity=i + 3,
)
for i in range(10):
conn.put_scheduled_update_group_action(
AutoScalingGroupName="tester_group-2",
ScheduledActionName=f"my-scheduled-action-4{i}",
StartTime=f"2022-07-01T00:00:{i}Z",
EndTime=f"2022-09-01T00:00:{i}Z",
Recurrence="* * * * *",
MinSize=i + 1,
MaxSize=i + 5,
DesiredCapacity=i + 3,
)
response = conn.describe_scheduled_actions()
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(
40
) # Since no argument is passed describe_scheduled_actions, all scheduled actions are returned
@mock_autoscaling
@mock_ec2
def test_propogate_tags():
@ -196,104 +45,6 @@ def test_propogate_tags():
tags.should.contain({"Value": "TestGroup1", "Key": "aws:autoscaling:groupName"})
@mock_autoscaling
def test_autoscaling_group_delete():
mocked_networking = setup_networking()
as_client = boto3.client("autoscaling", region_name="us-east-1")
as_client.create_launch_configuration(
LaunchConfigurationName="tester_config",
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
as_client.create_auto_scaling_group(
AutoScalingGroupName="tester_group",
LaunchConfigurationName="tester_config",
MinSize=2,
MaxSize=2,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
as_client.describe_auto_scaling_groups()["AutoScalingGroups"].should.have.length_of(
1
)
as_client.delete_auto_scaling_group(AutoScalingGroupName="tester_group")
as_client.describe_auto_scaling_groups()["AutoScalingGroups"].should.have.length_of(
0
)
@mock_autoscaling
def test_scheduled_action_delete():
as_client = boto3.client("autoscaling", region_name="us-east-1")
for i in range(3):
as_client.put_scheduled_update_group_action(
AutoScalingGroupName="tester_group",
ScheduledActionName=f"my-scheduled-action-{i}",
StartTime=f"2022-07-01T00:00:{i}Z",
EndTime=f"2022-09-01T00:00:{i}Z",
Recurrence="* * * * *",
MinSize=i + 1,
MaxSize=i + 5,
DesiredCapacity=i + 3,
)
response = as_client.describe_scheduled_actions(AutoScalingGroupName="tester_group")
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(3)
as_client.delete_scheduled_action(
AutoScalingGroupName="tester_group",
ScheduledActionName="my-scheduled-action-2",
)
as_client.delete_scheduled_action(
AutoScalingGroupName="tester_group",
ScheduledActionName="my-scheduled-action-1",
)
response = as_client.describe_scheduled_actions(AutoScalingGroupName="tester_group")
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(1)
@mock_autoscaling
def test_create_autoscaling_group():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
client.create_launch_configuration(
LaunchConfigurationName="test_launch_configuration",
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
response = client.create_auto_scaling_group(
AutoScalingGroupName="test_asg",
LaunchConfigurationName="test_launch_configuration",
MinSize=0,
MaxSize=20,
DesiredCapacity=5,
Tags=[
{
"ResourceId": "test_asg",
"ResourceType": "auto-scaling-group",
"Key": "propogated-tag-key",
"Value": "propagate-tag-value",
"PropagateAtLaunch": True,
},
{
"ResourceId": "test_asg",
"ResourceType": "auto-scaling-group",
"Key": "not-propogated-tag-key",
"Value": "not-propagate-tag-value",
"PropagateAtLaunch": False,
},
],
VPCZoneIdentifier=mocked_networking["subnet1"],
NewInstancesProtectedFromScaleIn=False,
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
@mock_autoscaling
def test_create_autoscaling_group_from_instance():
autoscaling_group_name = "test_asg"
@ -612,45 +363,6 @@ def test_create_autoscaling_group_multiple_launch_configurations():
)
@mock_autoscaling
def test_describe_autoscaling_groups_launch_config():
mocked_networking = setup_networking(region_name="eu-north-1")
client = boto3.client("autoscaling", region_name="eu-north-1")
client.create_launch_configuration(
LaunchConfigurationName="test_launch_configuration",
InstanceType="t2.micro",
ImageId=EXAMPLE_AMI_ID,
)
client.create_auto_scaling_group(
AutoScalingGroupName="test_asg",
LaunchConfigurationName="test_launch_configuration",
MinSize=0,
MaxSize=20,
DesiredCapacity=5,
VPCZoneIdentifier=mocked_networking["subnet1"],
NewInstancesProtectedFromScaleIn=True,
)
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
group = response["AutoScalingGroups"][0]
group["AutoScalingGroupARN"].should.match(
f"arn:aws:autoscaling:eu-north-1:{ACCOUNT_ID}:autoScalingGroup:"
)
group["AutoScalingGroupName"].should.equal("test_asg")
group["LaunchConfigurationName"].should.equal("test_launch_configuration")
group.should_not.have.key("LaunchTemplate")
group["AvailabilityZones"].should.equal(["eu-north-1a"])
group["VPCZoneIdentifier"].should.equal(mocked_networking["subnet1"])
group["NewInstancesProtectedFromScaleIn"].should.equal(True)
for instance in group["Instances"]:
instance["LaunchConfigurationName"].should.equal("test_launch_configuration")
instance.should_not.have.key("LaunchTemplate")
instance["AvailabilityZone"].should.equal("eu-north-1a")
instance["ProtectedFromScaleIn"].should.equal(True)
instance["InstanceType"].should.equal("t2.micro")
@mock_autoscaling
@mock_ec2
def test_describe_autoscaling_groups_launch_template():
@ -1162,236 +874,6 @@ def test_create_autoscaling_policy_with_predictive_scaling_config():
)
@mock_autoscaling
@mock_ec2
def test_describe_instance_health():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
_ = client.create_launch_configuration(
LaunchConfigurationName="test_launch_configuration",
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
client.create_auto_scaling_group(
AutoScalingGroupName="test_asg",
LaunchConfigurationName="test_launch_configuration",
MinSize=2,
MaxSize=4,
DesiredCapacity=2,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"])
instance1 = response["AutoScalingGroups"][0]["Instances"][0]
instance1["HealthStatus"].should.equal("Healthy")
@mock_autoscaling
@mock_ec2
def test_set_instance_health():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
_ = client.create_launch_configuration(
LaunchConfigurationName="test_launch_configuration",
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
client.create_auto_scaling_group(
AutoScalingGroupName="test_asg",
LaunchConfigurationName="test_launch_configuration",
MinSize=2,
MaxSize=4,
DesiredCapacity=2,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"])
instance1 = response["AutoScalingGroups"][0]["Instances"][0]
instance1["HealthStatus"].should.equal("Healthy")
client.set_instance_health(
InstanceId=instance1["InstanceId"], HealthStatus="Unhealthy"
)
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"])
instance1 = response["AutoScalingGroups"][0]["Instances"][0]
instance1["HealthStatus"].should.equal("Unhealthy")
@mock_autoscaling
def test_suspend_processes():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
client.create_launch_configuration(
LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium"
)
client.create_auto_scaling_group(
LaunchConfigurationName="lc",
AutoScalingGroupName="test-asg",
MinSize=1,
MaxSize=1,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
# When we suspend the 'Launch' process on the ASG client
client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"]
)
res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"])
# The 'Launch' process should, in fact, be suspended
launch_suspended = False
for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]:
if proc.get("ProcessName") == "Launch":
launch_suspended = True
assert launch_suspended is True
@mock_autoscaling
def test_suspend_processes_all_by_default():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
client.create_launch_configuration(
LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium"
)
client.create_auto_scaling_group(
LaunchConfigurationName="lc",
AutoScalingGroupName="test-asg",
MinSize=1,
MaxSize=1,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
# When we suspend with no processes specified
client.suspend_processes(AutoScalingGroupName="test-asg")
res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"])
# All processes should be suspended
all_proc_names = [
"Launch",
"Terminate",
"AddToLoadBalancer",
"AlarmNotification",
"AZRebalance",
"HealthCheck",
"InstanceRefresh",
"ReplaceUnhealthy",
"ScheduledActions",
]
suspended_proc_names = [
proc["ProcessName"]
for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]
]
set(suspended_proc_names).should.equal(set(all_proc_names))
@mock_autoscaling
def test_suspend_additional_processes():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
client.create_launch_configuration(
LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium"
)
client.create_auto_scaling_group(
LaunchConfigurationName="lc",
AutoScalingGroupName="test-asg",
MinSize=1,
MaxSize=1,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
# When we suspend the 'Launch' and 'Terminate' processes in separate calls
client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"]
)
client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Terminate"]
)
res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"])
# Both 'Launch' and 'Terminate' should be suspended
launch_suspended = False
terminate_suspended = False
for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]:
if proc.get("ProcessName") == "Launch":
launch_suspended = True
if proc.get("ProcessName") == "Terminate":
terminate_suspended = True
assert launch_suspended is True
assert terminate_suspended is True
@mock_autoscaling
def test_resume_processes():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
client.create_launch_configuration(
LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium"
)
client.create_auto_scaling_group(
LaunchConfigurationName="lc",
AutoScalingGroupName="test-asg",
MinSize=1,
MaxSize=1,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
# When we suspect 'Launch' and 'Termiate' process then resume 'Launch'
client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"]
)
client.resume_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"]
)
res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"])
# Only 'Terminate' should be suspended
expected_suspended_processes = [
{"ProcessName": "Terminate", "SuspensionReason": ""}
]
res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal(
expected_suspended_processes
)
@mock_autoscaling
def test_resume_processes_all_by_default():
mocked_networking = setup_networking()
client = boto3.client("autoscaling", region_name="us-east-1")
client.create_launch_configuration(
LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium"
)
client.create_auto_scaling_group(
LaunchConfigurationName="lc",
AutoScalingGroupName="test-asg",
MinSize=1,
MaxSize=1,
VPCZoneIdentifier=mocked_networking["subnet1"],
)
# When we suspend two processes then resume with no process argument
client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"]
)
client.resume_processes(AutoScalingGroupName="test-asg")
res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"])
# No processes should be suspended
res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal([])
@mock_autoscaling
def test_set_instance_protection():
mocked_networking = setup_networking()

View File

@ -0,0 +1,241 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_autoscaling
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests import EXAMPLE_AMI_ID
from unittest import TestCase
from .utils import setup_networking
@mock_autoscaling
class TestAutoScalingGroup(TestCase):
def setUp(self) -> None:
self.mocked_networking = setup_networking()
self.as_client = boto3.client("autoscaling", region_name="us-east-1")
self.lc_name = "tester_config"
self.as_client.create_launch_configuration(
LaunchConfigurationName=self.lc_name,
ImageId=EXAMPLE_AMI_ID,
InstanceType="t2.medium",
)
def test_create_autoscaling_groups_defaults(self):
"""Test with the minimum inputs and check that all of the proper defaults
are assigned for the other attributes"""
self._create_group(name="tester_group")
group = self.as_client.describe_auto_scaling_groups()["AutoScalingGroups"][0]
group["AutoScalingGroupName"].should.equal("tester_group")
group["MaxSize"].should.equal(2)
group["MinSize"].should.equal(1)
group["LaunchConfigurationName"].should.equal(self.lc_name)
# Defaults
group["AvailabilityZones"].should.equal(["us-east-1a"]) # subnet1
group["DesiredCapacity"].should.equal(1)
group["VPCZoneIdentifier"].should.equal(self.mocked_networking["subnet1"])
group["DefaultCooldown"].should.equal(300)
group["HealthCheckGracePeriod"].should.equal(300)
group["HealthCheckType"].should.equal("EC2")
group["LoadBalancerNames"].should.equal([])
group.shouldnt.have.key("PlacementGroup")
group["TerminationPolicies"].should.equal(["Default"])
group["Tags"].should.equal([])
def test_list_many_autoscaling_groups(self):
for i in range(51):
self._create_group("TestGroup%d" % i)
response = self.as_client.describe_auto_scaling_groups()
groups = response["AutoScalingGroups"]
marker = response["NextToken"]
groups.should.have.length_of(50)
marker.should.equal(groups[-1]["AutoScalingGroupName"])
response2 = self.as_client.describe_auto_scaling_groups(NextToken=marker)
groups.extend(response2["AutoScalingGroups"])
groups.should.have.length_of(51)
assert "NextToken" not in response2.keys()
def test_autoscaling_group_delete(self):
self._create_group(name="tester_group")
groups = self.as_client.describe_auto_scaling_groups()["AutoScalingGroups"]
groups.should.have.length_of(1)
self.as_client.delete_auto_scaling_group(AutoScalingGroupName="tester_group")
groups = self.as_client.describe_auto_scaling_groups()["AutoScalingGroups"]
groups.should.have.length_of(0)
def test_describe_autoscaling_groups__instances(self):
self._create_group(name="test_asg")
response = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test_asg"]
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
group = response["AutoScalingGroups"][0]
group["AutoScalingGroupARN"].should.match(
f"arn:aws:autoscaling:us-east-1:{ACCOUNT_ID}:autoScalingGroup:"
)
group["AutoScalingGroupName"].should.equal("test_asg")
group["LaunchConfigurationName"].should.equal(self.lc_name)
group.should_not.have.key("LaunchTemplate")
group["AvailabilityZones"].should.equal(["us-east-1a"])
group["VPCZoneIdentifier"].should.equal(self.mocked_networking["subnet1"])
for instance in group["Instances"]:
instance["LaunchConfigurationName"].should.equal(self.lc_name)
instance.should_not.have.key("LaunchTemplate")
instance["AvailabilityZone"].should.equal("us-east-1a")
instance["InstanceType"].should.equal("t2.medium")
def test_set_instance_health(self):
self._create_group(name="test_asg")
response = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test_asg"]
)
instance1 = response["AutoScalingGroups"][0]["Instances"][0]
instance1["HealthStatus"].should.equal("Healthy")
self.as_client.set_instance_health(
InstanceId=instance1["InstanceId"], HealthStatus="Unhealthy"
)
response = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test_asg"]
)
instance1 = response["AutoScalingGroups"][0]["Instances"][0]
instance1["HealthStatus"].should.equal("Unhealthy")
def test_suspend_processes(self):
self._create_group(name="test-asg")
# When we suspend the 'Launch' process on the ASG client
self.as_client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"]
)
res = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test-asg"]
)
# The 'Launch' process should, in fact, be suspended
launch_suspended = False
for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]:
if proc.get("ProcessName") == "Launch":
launch_suspended = True
assert launch_suspended is True
def test_suspend_processes_all_by_default(self):
self._create_group(name="test-asg")
# When we suspend with no processes specified
self.as_client.suspend_processes(AutoScalingGroupName="test-asg")
res = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test-asg"]
)
# All processes should be suspended
all_proc_names = [
"Launch",
"Terminate",
"AddToLoadBalancer",
"AlarmNotification",
"AZRebalance",
"HealthCheck",
"InstanceRefresh",
"ReplaceUnhealthy",
"ScheduledActions",
]
suspended_proc_names = [
proc["ProcessName"]
for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]
]
set(suspended_proc_names).should.equal(set(all_proc_names))
def test_suspend_additional_processes(self):
self._create_group(name="test-asg")
# When we suspend the 'Launch' and 'Terminate' processes in separate calls
self.as_client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"]
)
self.as_client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Terminate"]
)
res = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test-asg"]
)
# Both 'Launch' and 'Terminate' should be suspended
launch_suspended = False
terminate_suspended = False
for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]:
if proc.get("ProcessName") == "Launch":
launch_suspended = True
if proc.get("ProcessName") == "Terminate":
terminate_suspended = True
assert launch_suspended is True
assert terminate_suspended is True
def test_resume_processes(self):
self._create_group(name="test-asg")
# When we suspect 'Launch' and 'Termiate' process then resume 'Launch'
self.as_client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"]
)
self.as_client.resume_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"]
)
res = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test-asg"]
)
# Only 'Terminate' should be suspended
expected_suspended_processes = [
{"ProcessName": "Terminate", "SuspensionReason": ""}
]
res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal(
expected_suspended_processes
)
def test_resume_processes_all_by_default(self):
self._create_group(name="test-asg")
# When we suspend two processes then resume with no process argument
self.as_client.suspend_processes(
AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"]
)
self.as_client.resume_processes(AutoScalingGroupName="test-asg")
res = self.as_client.describe_auto_scaling_groups(
AutoScalingGroupNames=["test-asg"]
)
# No processes should be suspended
res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal([])
def _create_group(self, name):
self.as_client.create_auto_scaling_group(
AutoScalingGroupName=name,
MinSize=1,
MaxSize=2,
LaunchConfigurationName=self.lc_name,
VPCZoneIdentifier=self.mocked_networking["subnet1"],
)

View File

@ -0,0 +1,85 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_autoscaling
from unittest import TestCase
@mock_autoscaling
class TestAutoScalingScheduledActions(TestCase):
def setUp(self) -> None:
self.client = boto3.client("autoscaling", region_name="us-east-1")
self.asg_name = "tester_group"
def test_list_many_scheduled_scaling_actions(self):
for i in range(30):
self._create_scheduled_action(name=f"my-scheduled-action-{i}", idx=i)
response = self.client.describe_scheduled_actions(
AutoScalingGroupName=self.asg_name
)
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(30)
def test_non_existing_group_name(self):
self._create_scheduled_action(name="my-scheduled-action", idx=1)
response = self.client.describe_scheduled_actions(
AutoScalingGroupName="wrong_group"
)
actions = response["ScheduledUpdateGroupActions"]
# since there is no such group name, no actions have been returned
actions.should.have.length_of(0)
def test_describe_scheduled_actions_returns_all_actions_when_no_argument_is_passed(
self,
):
for i in range(30):
self._create_scheduled_action(name=f"my-scheduled-action-{i}", idx=i)
for i in range(10):
self._create_scheduled_action(
name=f"my-scheduled-action-4{i}", idx=i, asg_name="test_group-2"
)
response = self.client.describe_scheduled_actions()
actions = response["ScheduledUpdateGroupActions"]
# Since no argument is passed describe_scheduled_actions, all scheduled actions are returned
actions.should.have.length_of(40)
def test_scheduled_action_delete(self):
for i in range(3):
self._create_scheduled_action(name=f"my-scheduled-action-{i}", idx=i)
response = self.client.describe_scheduled_actions(
AutoScalingGroupName=self.asg_name
)
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(3)
self.client.delete_scheduled_action(
AutoScalingGroupName=self.asg_name,
ScheduledActionName="my-scheduled-action-2",
)
self.client.delete_scheduled_action(
AutoScalingGroupName=self.asg_name,
ScheduledActionName="my-scheduled-action-1",
)
response = self.client.describe_scheduled_actions(
AutoScalingGroupName=self.asg_name
)
actions = response["ScheduledUpdateGroupActions"]
actions.should.have.length_of(1)
def _create_scheduled_action(self, name, idx, asg_name=None):
self.client.put_scheduled_update_group_action(
AutoScalingGroupName=asg_name or self.asg_name,
ScheduledActionName=name,
StartTime=f"2022-07-01T00:00:{idx}Z",
EndTime=f"2022-09-01T00:00:{idx}Z",
Recurrence="* * * * *",
MinSize=idx + 1,
MaxSize=idx + 5,
DesiredCapacity=idx + 3,
)