SSM: add patch baseline operations (#6426)

This commit is contained in:
Cristopher Pinzón 2023-06-21 06:37:09 -05:00 committed by GitHub
parent b2dc643f56
commit c942883183
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 3 deletions

View File

@ -6729,7 +6729,7 @@
- [X] create_maintenance_window
- [ ] create_ops_item
- [ ] create_ops_metadata
- [ ] create_patch_baseline
- [X] create_patch_baseline
- [ ] create_resource_data_sync
- [ ] delete_activation
- [ ] delete_association
@ -6739,7 +6739,7 @@
- [ ] delete_ops_metadata
- [X] delete_parameter
- [X] delete_parameters
- [ ] delete_patch_baseline
- [X] delete_patch_baseline
- [ ] delete_resource_data_sync
- [ ] delete_resource_policy
- [ ] deregister_managed_instance
@ -6773,7 +6773,7 @@
- [ ] describe_maintenance_windows_for_target
- [ ] describe_ops_items
- [X] describe_parameters
- [ ] describe_patch_baselines
- [X] describe_patch_baselines
- [ ] describe_patch_group_state
- [ ] describe_patch_groups
- [ ] describe_patch_properties

View File

@ -986,6 +986,58 @@ class FakeMaintenanceWindow:
return "mw-" + "".join(str(random.choice(chars)) for _ in range(17))
class FakePatchBaseline:
def __init__(
self,
name: str,
operating_system: str,
global_filters: Optional[Dict[str, Any]],
approval_rules: Optional[Dict[str, Any]],
approved_patches: Optional[List[str]],
approved_patches_compliance_level: Optional[str],
approved_patches_enable_non_security: Optional[bool],
rejected_patches: Optional[List[str]],
rejected_patches_action: Optional[str],
description: Optional[str],
sources: Optional[List[Dict[str, Any]]],
):
self.id = FakePatchBaseline.generate_id()
self.operating_system = operating_system
self.name = name
self.global_filters = global_filters
self.approval_rules = approval_rules
self.approved_patches = approved_patches
self.approved_patches_compliance_level = approved_patches_compliance_level
self.approved_patches_enable_non_security = approved_patches_enable_non_security
self.rejected_patches = rejected_patches
self.rejected_patches_action = rejected_patches_action
self.description = description
self.sources = sources
self.default_baseline = False
def to_json(self) -> Dict[str, Any]:
return {
"BaselineId": self.id,
"OperatingSystem": self.operating_system,
"BaselineName": self.name,
"GlobalFilters": self.global_filters,
"ApprovalRules": self.approval_rules,
"ApprovedPatches": self.approved_patches,
"ApprovedPatchesComplianceLevel": self.approved_patches_compliance_level,
"ApprovedPatchesEnableNonSecurity": self.approved_patches_enable_non_security,
"RejectedPatches": self.rejected_patches,
"RejectedPatchesAction": self.rejected_patches_action,
"BaselineDescription": self.description,
"Sources": self.sources,
"DefaultBaseline": self.default_baseline,
}
@staticmethod
def generate_id() -> str:
chars = list(range(10)) + ["a", "b", "c", "d", "e", "f"]
return "pb-" + "".join(str(random.choice(chars)) for _ in range(17))
class SimpleSystemManagerBackend(BaseBackend):
"""
Moto supports the following default parameters out of the box:
@ -1010,6 +1062,7 @@ class SimpleSystemManagerBackend(BaseBackend):
self._documents: Dict[str, Documents] = {}
self.windows: Dict[str, FakeMaintenanceWindow] = dict()
self.baselines: Dict[str, FakePatchBaseline] = dict()
@staticmethod
def default_vpc_endpoint_service(
@ -2135,5 +2188,67 @@ class SimpleSystemManagerBackend(BaseBackend):
"""
del self.windows[window_id]
def create_patch_baseline(
self,
name: str,
operating_system: str,
global_filters: Optional[Dict[str, Any]],
approval_rules: Optional[Dict[str, Any]],
approved_patches: Optional[List[str]],
approved_patches_compliance_level: Optional[str],
approved_patches_enable_non_security: Optional[bool],
rejected_patches: Optional[List[str]],
rejected_patches_action: Optional[str],
description: Optional[str],
sources: Optional[List[Dict[str, Any]]],
tags: Optional[List[Dict[str, str]]],
) -> str:
"""
Registers a patch baseline. No error handling or input validation has been implemented yet.
"""
baseline = FakePatchBaseline(
name,
operating_system,
global_filters,
approval_rules,
approved_patches,
approved_patches_compliance_level,
approved_patches_enable_non_security,
rejected_patches,
rejected_patches_action,
description,
sources,
)
self.baselines[baseline.id] = baseline
if tags:
baseline_tags = {t["Key"]: t["Value"] for t in tags}
self.add_tags_to_resource("PatchBaseline", baseline.id, baseline_tags)
return baseline.id
def describe_patch_baselines(
self, filters: Optional[List[Dict[str, Any]]]
) -> List[FakePatchBaseline]:
"""
Returns all baselines. No pagination has been implemented yet.
"""
baselines = [baseline for baseline in self.baselines.values()]
if filters:
for f in filters:
if f["Key"] == "NAME_PREFIX":
baselines = [
baseline
for baseline in baselines
if baseline.name in f["Values"]
]
return baselines
def delete_patch_baseline(self, baseline_id: str) -> None:
"""
Assumes the provided BaselineId exists. No error handling has been implemented yet.
"""
del self.baselines[baseline_id]
ssm_backends = BackendDict(SimpleSystemManagerBackend, "ssm")

View File

@ -453,3 +453,37 @@ class SimpleSystemManagerResponse(BaseResponse):
window_id = self._get_param("WindowId")
self.ssm_backend.delete_maintenance_window(window_id)
return "{}"
def create_patch_baseline(self) -> str:
baseline_id = self.ssm_backend.create_patch_baseline(
name=self._get_param("Name"),
operating_system=self._get_param("OperatingSystem"),
global_filters=self._get_param("GlobalFilters", {}),
approval_rules=self._get_param("ApprovalRules", {}),
approved_patches=self._get_param("ApprovedPatches", []),
approved_patches_compliance_level=self._get_param(
"ApprovedPatchesComplianceLevel"
),
approved_patches_enable_non_security=self._get_param(
"ApprovedPatchesEnableNonSecurity"
),
rejected_patches=self._get_param("RejectedPatches", []),
rejected_patches_action=self._get_param("RejectedPatchesAction"),
description=self._get_param("Description"),
sources=self._get_param("Sources", []),
tags=self._get_param("Tags", []),
)
return json.dumps({"BaselineId": baseline_id})
def describe_patch_baselines(self) -> str:
filters = self._get_param("Filters", None)
baselines = [
baseline.to_json()
for baseline in self.ssm_backend.describe_patch_baselines(filters)
]
return json.dumps({"BaselineIdentities": baselines})
def delete_patch_baseline(self) -> str:
baseline_id = self._get_param("BaselineId")
self.ssm_backend.delete_patch_baseline(baseline_id)
return "{}"

View File

@ -0,0 +1,100 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_ssm
@mock_ssm
def test_create_patch_baseLine():
ssm = boto3.client("ssm", region_name="us-east-1")
baseline_name = "ExamplePatchBaseline"
baseline_description = "Example patch baseline created using Boto3"
# Define the approval rules for the patch baseline
approval_rules = {
"PatchRules": [
{
"PatchFilterGroup": {
"PatchFilters": [
{"Key": "PRODUCT", "Values": ["AmazonLinux2012.03"]},
{"Key": "CLASSIFICATION", "Values": ["Security"]},
]
},
"ApproveAfterDays": 7,
"ComplianceLevel": "CRITICAL",
}
]
}
# Create the patch baseline
response = ssm.create_patch_baseline(
Name=baseline_name,
OperatingSystem="AMAZON_LINUX",
Description=baseline_description,
ApprovalRules=approval_rules,
)
_id = response["BaselineId"] # mw-01d6bbfdf6af2c39a
response = ssm.describe_patch_baselines(
Filters=[
{
"Key": "NAME_PREFIX",
"Values": [
baseline_name,
],
},
],
MaxResults=50,
)
response.should.have.key("BaselineIdentities").have.length_of(1)
baseline = response["BaselineIdentities"][0]
baseline.should.have.key("BaselineId").equal(_id)
baseline.should.have.key("BaselineName").equal(baseline_name)
baseline.should.have.key("DefaultBaseline").equal(False)
baseline.should.have.key("OperatingSystem").equal("AMAZON_LINUX")
baseline.should.have.key("BaselineDescription").equal(baseline_description)
@mock_ssm
def test_delete_patch_baseline():
ssm = boto3.client("ssm", region_name="us-east-1")
baseline_name = "ExamplePatchBaseline"
# Create the patch baseline
response = ssm.create_patch_baseline(
Name="ExamplePatchBaseline",
OperatingSystem="AMAZON_LINUX",
Description="Example patch baseline created using Boto3",
ApprovalRules={
"PatchRules": [
{
"PatchFilterGroup": {
"PatchFilters": [
{"Key": "PRODUCT", "Values": ["AmazonLinux2012.03"]},
{"Key": "CLASSIFICATION", "Values": ["Security"]},
]
},
"ApproveAfterDays": 7,
"ComplianceLevel": "CRITICAL",
}
]
},
)
_id = response["BaselineId"] # pw-0a49ee14c7f305f55
ssm.delete_patch_baseline(BaselineId=_id)
response = ssm.describe_patch_baselines(
Filters=[
{
"Key": "NAME_PREFIX",
"Values": [
baseline_name,
],
},
],
MaxResults=50,
)
response.should.have.key("BaselineIdentities").have.length_of(0)