diff --git a/tests/test_autoscaling/test_autoscaling.py b/tests/test_autoscaling/test_autoscaling.py index 9438f9575..7e45b42c7 100644 --- a/tests/test_autoscaling/test_autoscaling.py +++ b/tests/test_autoscaling/test_autoscaling.py @@ -9,157 +9,6 @@ from tests import EXAMPLE_AMI_ID from .utils import setup_networking, setup_instance_with_networking -@mock_autoscaling -def test_create_autoscaling_groups_defaults(): - """Test with the minimum inputs and check that all of the proper defaults - are assigned for the other attributes""" - - mocked_networking = setup_networking() - as_client = boto3.client("autoscaling", region_name="us-east-1") - as_client.create_launch_configuration( - LaunchConfigurationName="tester_config", - ImageId=EXAMPLE_AMI_ID, - InstanceType="t2.medium", - ) - - as_client.create_auto_scaling_group( - AutoScalingGroupName="tester_group", - LaunchConfigurationName="tester_config", - MinSize=2, - MaxSize=2, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - group = as_client.describe_auto_scaling_groups()["AutoScalingGroups"][0] - group["AutoScalingGroupName"].should.equal("tester_group") - group["MaxSize"].should.equal(2) - group["MinSize"].should.equal(2) - group["LaunchConfigurationName"].should.equal("tester_config") - - # Defaults - group["AvailabilityZones"].should.equal(["us-east-1a"]) # subnet1 - group["DesiredCapacity"].should.equal(2) - group["VPCZoneIdentifier"].should.equal(mocked_networking["subnet1"]) - group["DefaultCooldown"].should.equal(300) - group["HealthCheckGracePeriod"].should.equal(300) - group["HealthCheckType"].should.equal("EC2") - group["LoadBalancerNames"].should.equal([]) - group.shouldnt.have.key("PlacementGroup") - group["TerminationPolicies"].should.equal(["Default"]) - group["Tags"].should.equal([]) - - -@mock_autoscaling -def test_list_many_autoscaling_groups(): - mocked_networking = setup_networking() - conn = boto3.client("autoscaling", region_name="us-east-1") - conn.create_launch_configuration( - LaunchConfigurationName="TestLC", - ImageId=EXAMPLE_AMI_ID, - InstanceType="t2.medium", - ) - - for i in range(51): - conn.create_auto_scaling_group( - AutoScalingGroupName="TestGroup%d" % i, - MinSize=1, - MaxSize=2, - LaunchConfigurationName="TestLC", - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - response = conn.describe_auto_scaling_groups() - groups = response["AutoScalingGroups"] - marker = response["NextToken"] - groups.should.have.length_of(50) - marker.should.equal(groups[-1]["AutoScalingGroupName"]) - - response2 = conn.describe_auto_scaling_groups(NextToken=marker) - - groups.extend(response2["AutoScalingGroups"]) - groups.should.have.length_of(51) - assert "NextToken" not in response2.keys() - - -@mock_autoscaling -def test_list_many_scheduled_scaling_actions(): - conn = boto3.client("autoscaling", region_name="us-east-1") - - for i in range(30): - conn.put_scheduled_update_group_action( - AutoScalingGroupName="tester_group", - ScheduledActionName=f"my-scheduled-action-{i}", - StartTime=f"2022-07-01T00:00:{i}Z", - EndTime=f"2022-09-01T00:00:{i}Z", - Recurrence="* * * * *", - MinSize=i + 1, - MaxSize=i + 5, - DesiredCapacity=i + 3, - ) - - response = conn.describe_scheduled_actions(AutoScalingGroupName="tester_group") - actions = response["ScheduledUpdateGroupActions"] - actions.should.have.length_of(30) - - -@mock_autoscaling -def test_non_existing_group_name(): - conn = boto3.client("autoscaling", region_name="us-east-1") - - conn.put_scheduled_update_group_action( - AutoScalingGroupName="tester_group", - ScheduledActionName="my-scheduled-action", - StartTime="2022-07-01T00:00:1Z", - EndTime="2022-09-01T00:00:2Z", - Recurrence="* * * * *", - MinSize=1, - MaxSize=5, - DesiredCapacity=3, - ) - - response = conn.describe_scheduled_actions(AutoScalingGroupName="wrong_group") - actions = response["ScheduledUpdateGroupActions"] - actions.should.have.length_of( - 0 - ) # since there is no such group name, no actions have been returned - - -@mock_autoscaling -def test_describe_scheduled_actions_returns_all_actions_when_no_argument_is_passed(): - conn = boto3.client("autoscaling", region_name="us-east-1") - - for i in range(30): - conn.put_scheduled_update_group_action( - AutoScalingGroupName="tester_group", - ScheduledActionName=f"my-scheduled-action-{i}", - StartTime=f"2022-07-01T00:00:{i}Z", - EndTime=f"2022-09-01T00:00:{i}Z", - Recurrence="* * * * *", - MinSize=i + 1, - MaxSize=i + 5, - DesiredCapacity=i + 3, - ) - - for i in range(10): - conn.put_scheduled_update_group_action( - AutoScalingGroupName="tester_group-2", - ScheduledActionName=f"my-scheduled-action-4{i}", - StartTime=f"2022-07-01T00:00:{i}Z", - EndTime=f"2022-09-01T00:00:{i}Z", - Recurrence="* * * * *", - MinSize=i + 1, - MaxSize=i + 5, - DesiredCapacity=i + 3, - ) - - response = conn.describe_scheduled_actions() - actions = response["ScheduledUpdateGroupActions"] - - actions.should.have.length_of( - 40 - ) # Since no argument is passed describe_scheduled_actions, all scheduled actions are returned - - @mock_autoscaling @mock_ec2 def test_propogate_tags(): @@ -196,104 +45,6 @@ def test_propogate_tags(): tags.should.contain({"Value": "TestGroup1", "Key": "aws:autoscaling:groupName"}) -@mock_autoscaling -def test_autoscaling_group_delete(): - mocked_networking = setup_networking() - as_client = boto3.client("autoscaling", region_name="us-east-1") - as_client.create_launch_configuration( - LaunchConfigurationName="tester_config", - ImageId=EXAMPLE_AMI_ID, - InstanceType="t2.medium", - ) - - as_client.create_auto_scaling_group( - AutoScalingGroupName="tester_group", - LaunchConfigurationName="tester_config", - MinSize=2, - MaxSize=2, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - as_client.describe_auto_scaling_groups()["AutoScalingGroups"].should.have.length_of( - 1 - ) - - as_client.delete_auto_scaling_group(AutoScalingGroupName="tester_group") - as_client.describe_auto_scaling_groups()["AutoScalingGroups"].should.have.length_of( - 0 - ) - - -@mock_autoscaling -def test_scheduled_action_delete(): - as_client = boto3.client("autoscaling", region_name="us-east-1") - - for i in range(3): - as_client.put_scheduled_update_group_action( - AutoScalingGroupName="tester_group", - ScheduledActionName=f"my-scheduled-action-{i}", - StartTime=f"2022-07-01T00:00:{i}Z", - EndTime=f"2022-09-01T00:00:{i}Z", - Recurrence="* * * * *", - MinSize=i + 1, - MaxSize=i + 5, - DesiredCapacity=i + 3, - ) - - response = as_client.describe_scheduled_actions(AutoScalingGroupName="tester_group") - actions = response["ScheduledUpdateGroupActions"] - actions.should.have.length_of(3) - - as_client.delete_scheduled_action( - AutoScalingGroupName="tester_group", - ScheduledActionName="my-scheduled-action-2", - ) - as_client.delete_scheduled_action( - AutoScalingGroupName="tester_group", - ScheduledActionName="my-scheduled-action-1", - ) - response = as_client.describe_scheduled_actions(AutoScalingGroupName="tester_group") - actions = response["ScheduledUpdateGroupActions"] - actions.should.have.length_of(1) - - -@mock_autoscaling -def test_create_autoscaling_group(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - client.create_launch_configuration( - LaunchConfigurationName="test_launch_configuration", - ImageId=EXAMPLE_AMI_ID, - InstanceType="t2.medium", - ) - response = client.create_auto_scaling_group( - AutoScalingGroupName="test_asg", - LaunchConfigurationName="test_launch_configuration", - MinSize=0, - MaxSize=20, - DesiredCapacity=5, - Tags=[ - { - "ResourceId": "test_asg", - "ResourceType": "auto-scaling-group", - "Key": "propogated-tag-key", - "Value": "propagate-tag-value", - "PropagateAtLaunch": True, - }, - { - "ResourceId": "test_asg", - "ResourceType": "auto-scaling-group", - "Key": "not-propogated-tag-key", - "Value": "not-propagate-tag-value", - "PropagateAtLaunch": False, - }, - ], - VPCZoneIdentifier=mocked_networking["subnet1"], - NewInstancesProtectedFromScaleIn=False, - ) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - - @mock_autoscaling def test_create_autoscaling_group_from_instance(): autoscaling_group_name = "test_asg" @@ -612,45 +363,6 @@ def test_create_autoscaling_group_multiple_launch_configurations(): ) -@mock_autoscaling -def test_describe_autoscaling_groups_launch_config(): - mocked_networking = setup_networking(region_name="eu-north-1") - client = boto3.client("autoscaling", region_name="eu-north-1") - client.create_launch_configuration( - LaunchConfigurationName="test_launch_configuration", - InstanceType="t2.micro", - ImageId=EXAMPLE_AMI_ID, - ) - client.create_auto_scaling_group( - AutoScalingGroupName="test_asg", - LaunchConfigurationName="test_launch_configuration", - MinSize=0, - MaxSize=20, - DesiredCapacity=5, - VPCZoneIdentifier=mocked_networking["subnet1"], - NewInstancesProtectedFromScaleIn=True, - ) - - response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"]) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - group = response["AutoScalingGroups"][0] - group["AutoScalingGroupARN"].should.match( - f"arn:aws:autoscaling:eu-north-1:{ACCOUNT_ID}:autoScalingGroup:" - ) - group["AutoScalingGroupName"].should.equal("test_asg") - group["LaunchConfigurationName"].should.equal("test_launch_configuration") - group.should_not.have.key("LaunchTemplate") - group["AvailabilityZones"].should.equal(["eu-north-1a"]) - group["VPCZoneIdentifier"].should.equal(mocked_networking["subnet1"]) - group["NewInstancesProtectedFromScaleIn"].should.equal(True) - for instance in group["Instances"]: - instance["LaunchConfigurationName"].should.equal("test_launch_configuration") - instance.should_not.have.key("LaunchTemplate") - instance["AvailabilityZone"].should.equal("eu-north-1a") - instance["ProtectedFromScaleIn"].should.equal(True) - instance["InstanceType"].should.equal("t2.micro") - - @mock_autoscaling @mock_ec2 def test_describe_autoscaling_groups_launch_template(): @@ -1162,236 +874,6 @@ def test_create_autoscaling_policy_with_predictive_scaling_config(): ) -@mock_autoscaling -@mock_ec2 -def test_describe_instance_health(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - _ = client.create_launch_configuration( - LaunchConfigurationName="test_launch_configuration", - ImageId=EXAMPLE_AMI_ID, - InstanceType="t2.medium", - ) - client.create_auto_scaling_group( - AutoScalingGroupName="test_asg", - LaunchConfigurationName="test_launch_configuration", - MinSize=2, - MaxSize=4, - DesiredCapacity=2, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"]) - - instance1 = response["AutoScalingGroups"][0]["Instances"][0] - instance1["HealthStatus"].should.equal("Healthy") - - -@mock_autoscaling -@mock_ec2 -def test_set_instance_health(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - _ = client.create_launch_configuration( - LaunchConfigurationName="test_launch_configuration", - ImageId=EXAMPLE_AMI_ID, - InstanceType="t2.medium", - ) - client.create_auto_scaling_group( - AutoScalingGroupName="test_asg", - LaunchConfigurationName="test_launch_configuration", - MinSize=2, - MaxSize=4, - DesiredCapacity=2, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"]) - - instance1 = response["AutoScalingGroups"][0]["Instances"][0] - instance1["HealthStatus"].should.equal("Healthy") - - client.set_instance_health( - InstanceId=instance1["InstanceId"], HealthStatus="Unhealthy" - ) - - response = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test_asg"]) - - instance1 = response["AutoScalingGroups"][0]["Instances"][0] - instance1["HealthStatus"].should.equal("Unhealthy") - - -@mock_autoscaling -def test_suspend_processes(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - client.create_launch_configuration( - LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium" - ) - client.create_auto_scaling_group( - LaunchConfigurationName="lc", - AutoScalingGroupName="test-asg", - MinSize=1, - MaxSize=1, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - # When we suspend the 'Launch' process on the ASG client - client.suspend_processes( - AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"] - ) - - res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"]) - - # The 'Launch' process should, in fact, be suspended - launch_suspended = False - for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]: - if proc.get("ProcessName") == "Launch": - launch_suspended = True - - assert launch_suspended is True - - -@mock_autoscaling -def test_suspend_processes_all_by_default(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - client.create_launch_configuration( - LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium" - ) - client.create_auto_scaling_group( - LaunchConfigurationName="lc", - AutoScalingGroupName="test-asg", - MinSize=1, - MaxSize=1, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - # When we suspend with no processes specified - client.suspend_processes(AutoScalingGroupName="test-asg") - - res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"]) - - # All processes should be suspended - all_proc_names = [ - "Launch", - "Terminate", - "AddToLoadBalancer", - "AlarmNotification", - "AZRebalance", - "HealthCheck", - "InstanceRefresh", - "ReplaceUnhealthy", - "ScheduledActions", - ] - suspended_proc_names = [ - proc["ProcessName"] - for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"] - ] - set(suspended_proc_names).should.equal(set(all_proc_names)) - - -@mock_autoscaling -def test_suspend_additional_processes(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - client.create_launch_configuration( - LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium" - ) - client.create_auto_scaling_group( - LaunchConfigurationName="lc", - AutoScalingGroupName="test-asg", - MinSize=1, - MaxSize=1, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - # When we suspend the 'Launch' and 'Terminate' processes in separate calls - client.suspend_processes( - AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"] - ) - client.suspend_processes( - AutoScalingGroupName="test-asg", ScalingProcesses=["Terminate"] - ) - - res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"]) - - # Both 'Launch' and 'Terminate' should be suspended - launch_suspended = False - terminate_suspended = False - for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]: - if proc.get("ProcessName") == "Launch": - launch_suspended = True - if proc.get("ProcessName") == "Terminate": - terminate_suspended = True - - assert launch_suspended is True - assert terminate_suspended is True - - -@mock_autoscaling -def test_resume_processes(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - client.create_launch_configuration( - LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium" - ) - client.create_auto_scaling_group( - LaunchConfigurationName="lc", - AutoScalingGroupName="test-asg", - MinSize=1, - MaxSize=1, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - # When we suspect 'Launch' and 'Termiate' process then resume 'Launch' - client.suspend_processes( - AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"] - ) - - client.resume_processes( - AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"] - ) - - res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"]) - - # Only 'Terminate' should be suspended - expected_suspended_processes = [ - {"ProcessName": "Terminate", "SuspensionReason": ""} - ] - res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal( - expected_suspended_processes - ) - - -@mock_autoscaling -def test_resume_processes_all_by_default(): - mocked_networking = setup_networking() - client = boto3.client("autoscaling", region_name="us-east-1") - client.create_launch_configuration( - LaunchConfigurationName="lc", ImageId=EXAMPLE_AMI_ID, InstanceType="t2.medium" - ) - client.create_auto_scaling_group( - LaunchConfigurationName="lc", - AutoScalingGroupName="test-asg", - MinSize=1, - MaxSize=1, - VPCZoneIdentifier=mocked_networking["subnet1"], - ) - - # When we suspend two processes then resume with no process argument - client.suspend_processes( - AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"] - ) - - client.resume_processes(AutoScalingGroupName="test-asg") - - res = client.describe_auto_scaling_groups(AutoScalingGroupNames=["test-asg"]) - - # No processes should be suspended - res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal([]) - - @mock_autoscaling def test_set_instance_protection(): mocked_networking = setup_networking() diff --git a/tests/test_autoscaling/test_autoscaling_groups.py b/tests/test_autoscaling/test_autoscaling_groups.py new file mode 100644 index 000000000..c101b9493 --- /dev/null +++ b/tests/test_autoscaling/test_autoscaling_groups.py @@ -0,0 +1,241 @@ +import boto3 +import sure # noqa # pylint: disable=unused-import + +from moto import mock_autoscaling +from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID +from tests import EXAMPLE_AMI_ID +from unittest import TestCase +from .utils import setup_networking + + +@mock_autoscaling +class TestAutoScalingGroup(TestCase): + def setUp(self) -> None: + self.mocked_networking = setup_networking() + self.as_client = boto3.client("autoscaling", region_name="us-east-1") + self.lc_name = "tester_config" + self.as_client.create_launch_configuration( + LaunchConfigurationName=self.lc_name, + ImageId=EXAMPLE_AMI_ID, + InstanceType="t2.medium", + ) + + def test_create_autoscaling_groups_defaults(self): + """Test with the minimum inputs and check that all of the proper defaults + are assigned for the other attributes""" + + self._create_group(name="tester_group") + + group = self.as_client.describe_auto_scaling_groups()["AutoScalingGroups"][0] + group["AutoScalingGroupName"].should.equal("tester_group") + group["MaxSize"].should.equal(2) + group["MinSize"].should.equal(1) + group["LaunchConfigurationName"].should.equal(self.lc_name) + + # Defaults + group["AvailabilityZones"].should.equal(["us-east-1a"]) # subnet1 + group["DesiredCapacity"].should.equal(1) + group["VPCZoneIdentifier"].should.equal(self.mocked_networking["subnet1"]) + group["DefaultCooldown"].should.equal(300) + group["HealthCheckGracePeriod"].should.equal(300) + group["HealthCheckType"].should.equal("EC2") + group["LoadBalancerNames"].should.equal([]) + group.shouldnt.have.key("PlacementGroup") + group["TerminationPolicies"].should.equal(["Default"]) + group["Tags"].should.equal([]) + + def test_list_many_autoscaling_groups(self): + + for i in range(51): + self._create_group("TestGroup%d" % i) + + response = self.as_client.describe_auto_scaling_groups() + groups = response["AutoScalingGroups"] + marker = response["NextToken"] + groups.should.have.length_of(50) + marker.should.equal(groups[-1]["AutoScalingGroupName"]) + + response2 = self.as_client.describe_auto_scaling_groups(NextToken=marker) + + groups.extend(response2["AutoScalingGroups"]) + groups.should.have.length_of(51) + assert "NextToken" not in response2.keys() + + def test_autoscaling_group_delete(self): + self._create_group(name="tester_group") + + groups = self.as_client.describe_auto_scaling_groups()["AutoScalingGroups"] + groups.should.have.length_of(1) + + self.as_client.delete_auto_scaling_group(AutoScalingGroupName="tester_group") + + groups = self.as_client.describe_auto_scaling_groups()["AutoScalingGroups"] + groups.should.have.length_of(0) + + def test_describe_autoscaling_groups__instances(self): + self._create_group(name="test_asg") + + response = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test_asg"] + ) + response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + group = response["AutoScalingGroups"][0] + group["AutoScalingGroupARN"].should.match( + f"arn:aws:autoscaling:us-east-1:{ACCOUNT_ID}:autoScalingGroup:" + ) + group["AutoScalingGroupName"].should.equal("test_asg") + group["LaunchConfigurationName"].should.equal(self.lc_name) + group.should_not.have.key("LaunchTemplate") + group["AvailabilityZones"].should.equal(["us-east-1a"]) + group["VPCZoneIdentifier"].should.equal(self.mocked_networking["subnet1"]) + for instance in group["Instances"]: + instance["LaunchConfigurationName"].should.equal(self.lc_name) + instance.should_not.have.key("LaunchTemplate") + instance["AvailabilityZone"].should.equal("us-east-1a") + instance["InstanceType"].should.equal("t2.medium") + + def test_set_instance_health(self): + self._create_group(name="test_asg") + + response = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test_asg"] + ) + + instance1 = response["AutoScalingGroups"][0]["Instances"][0] + instance1["HealthStatus"].should.equal("Healthy") + + self.as_client.set_instance_health( + InstanceId=instance1["InstanceId"], HealthStatus="Unhealthy" + ) + + response = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test_asg"] + ) + + instance1 = response["AutoScalingGroups"][0]["Instances"][0] + instance1["HealthStatus"].should.equal("Unhealthy") + + def test_suspend_processes(self): + self._create_group(name="test-asg") + + # When we suspend the 'Launch' process on the ASG client + self.as_client.suspend_processes( + AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"] + ) + + res = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test-asg"] + ) + + # The 'Launch' process should, in fact, be suspended + launch_suspended = False + for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]: + if proc.get("ProcessName") == "Launch": + launch_suspended = True + + assert launch_suspended is True + + def test_suspend_processes_all_by_default(self): + self._create_group(name="test-asg") + + # When we suspend with no processes specified + self.as_client.suspend_processes(AutoScalingGroupName="test-asg") + + res = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test-asg"] + ) + + # All processes should be suspended + all_proc_names = [ + "Launch", + "Terminate", + "AddToLoadBalancer", + "AlarmNotification", + "AZRebalance", + "HealthCheck", + "InstanceRefresh", + "ReplaceUnhealthy", + "ScheduledActions", + ] + suspended_proc_names = [ + proc["ProcessName"] + for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"] + ] + set(suspended_proc_names).should.equal(set(all_proc_names)) + + def test_suspend_additional_processes(self): + self._create_group(name="test-asg") + + # When we suspend the 'Launch' and 'Terminate' processes in separate calls + self.as_client.suspend_processes( + AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"] + ) + self.as_client.suspend_processes( + AutoScalingGroupName="test-asg", ScalingProcesses=["Terminate"] + ) + + res = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test-asg"] + ) + + # Both 'Launch' and 'Terminate' should be suspended + launch_suspended = False + terminate_suspended = False + for proc in res["AutoScalingGroups"][0]["SuspendedProcesses"]: + if proc.get("ProcessName") == "Launch": + launch_suspended = True + if proc.get("ProcessName") == "Terminate": + terminate_suspended = True + + assert launch_suspended is True + assert terminate_suspended is True + + def test_resume_processes(self): + self._create_group(name="test-asg") + + # When we suspect 'Launch' and 'Termiate' process then resume 'Launch' + self.as_client.suspend_processes( + AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"] + ) + + self.as_client.resume_processes( + AutoScalingGroupName="test-asg", ScalingProcesses=["Launch"] + ) + + res = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test-asg"] + ) + + # Only 'Terminate' should be suspended + expected_suspended_processes = [ + {"ProcessName": "Terminate", "SuspensionReason": ""} + ] + res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal( + expected_suspended_processes + ) + + def test_resume_processes_all_by_default(self): + self._create_group(name="test-asg") + + # When we suspend two processes then resume with no process argument + self.as_client.suspend_processes( + AutoScalingGroupName="test-asg", ScalingProcesses=["Launch", "Terminate"] + ) + + self.as_client.resume_processes(AutoScalingGroupName="test-asg") + + res = self.as_client.describe_auto_scaling_groups( + AutoScalingGroupNames=["test-asg"] + ) + + # No processes should be suspended + res["AutoScalingGroups"][0]["SuspendedProcesses"].should.equal([]) + + def _create_group(self, name): + self.as_client.create_auto_scaling_group( + AutoScalingGroupName=name, + MinSize=1, + MaxSize=2, + LaunchConfigurationName=self.lc_name, + VPCZoneIdentifier=self.mocked_networking["subnet1"], + ) diff --git a/tests/test_autoscaling/test_autoscaling_scheduledactions.py b/tests/test_autoscaling/test_autoscaling_scheduledactions.py new file mode 100644 index 000000000..372faf47c --- /dev/null +++ b/tests/test_autoscaling/test_autoscaling_scheduledactions.py @@ -0,0 +1,85 @@ +import boto3 +import sure # noqa # pylint: disable=unused-import + +from moto import mock_autoscaling +from unittest import TestCase + + +@mock_autoscaling +class TestAutoScalingScheduledActions(TestCase): + def setUp(self) -> None: + self.client = boto3.client("autoscaling", region_name="us-east-1") + self.asg_name = "tester_group" + + def test_list_many_scheduled_scaling_actions(self): + for i in range(30): + self._create_scheduled_action(name=f"my-scheduled-action-{i}", idx=i) + + response = self.client.describe_scheduled_actions( + AutoScalingGroupName=self.asg_name + ) + actions = response["ScheduledUpdateGroupActions"] + actions.should.have.length_of(30) + + def test_non_existing_group_name(self): + self._create_scheduled_action(name="my-scheduled-action", idx=1) + + response = self.client.describe_scheduled_actions( + AutoScalingGroupName="wrong_group" + ) + actions = response["ScheduledUpdateGroupActions"] + # since there is no such group name, no actions have been returned + actions.should.have.length_of(0) + + def test_describe_scheduled_actions_returns_all_actions_when_no_argument_is_passed( + self, + ): + for i in range(30): + self._create_scheduled_action(name=f"my-scheduled-action-{i}", idx=i) + + for i in range(10): + self._create_scheduled_action( + name=f"my-scheduled-action-4{i}", idx=i, asg_name="test_group-2" + ) + + response = self.client.describe_scheduled_actions() + actions = response["ScheduledUpdateGroupActions"] + + # Since no argument is passed describe_scheduled_actions, all scheduled actions are returned + actions.should.have.length_of(40) + + def test_scheduled_action_delete(self): + for i in range(3): + self._create_scheduled_action(name=f"my-scheduled-action-{i}", idx=i) + + response = self.client.describe_scheduled_actions( + AutoScalingGroupName=self.asg_name + ) + actions = response["ScheduledUpdateGroupActions"] + actions.should.have.length_of(3) + + self.client.delete_scheduled_action( + AutoScalingGroupName=self.asg_name, + ScheduledActionName="my-scheduled-action-2", + ) + self.client.delete_scheduled_action( + AutoScalingGroupName=self.asg_name, + ScheduledActionName="my-scheduled-action-1", + ) + response = self.client.describe_scheduled_actions( + AutoScalingGroupName=self.asg_name + ) + actions = response["ScheduledUpdateGroupActions"] + actions.should.have.length_of(1) + + def _create_scheduled_action(self, name, idx, asg_name=None): + self.client.put_scheduled_update_group_action( + AutoScalingGroupName=asg_name or self.asg_name, + ScheduledActionName=name, + StartTime=f"2022-07-01T00:00:{idx}Z", + EndTime=f"2022-09-01T00:00:{idx}Z", + Recurrence="* * * * *", + MinSize=idx + 1, + MaxSize=idx + 5, + DesiredCapacity=idx + 3, + )