Merge pull request #3102 from Dreadwall/ssm_docs

SSM documents
This commit is contained in:
Mike Grima 2020-07-03 11:29:16 -07:00 committed by GitHub
commit 1998b8b5d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1524 additions and 1 deletions

View File

@ -53,3 +53,58 @@ class ValidationException(JsonRESTError):
def __init__(self, message):
super(ValidationException, self).__init__("ValidationException", message)
class DocumentAlreadyExists(JsonRESTError):
code = 400
def __init__(self, message):
super(DocumentAlreadyExists, self).__init__("DocumentAlreadyExists", message)
class InvalidDocument(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocument, self).__init__("InvalidDocument", message)
class InvalidDocumentOperation(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocumentOperation, self).__init__(
"InvalidDocumentOperation", message
)
class InvalidDocumentContent(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocumentContent, self).__init__("InvalidDocumentContent", message)
class InvalidDocumentVersion(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidDocumentVersion, self).__init__("InvalidDocumentVersion", message)
class DuplicateDocumentVersionName(JsonRESTError):
code = 400
def __init__(self, message):
super(DuplicateDocumentVersionName, self).__init__(
"DuplicateDocumentVersionName", message
)
class DuplicateDocumentContent(JsonRESTError):
code = 400
def __init__(self, message):
super(DuplicateDocumentContent, self).__init__(
"DuplicateDocumentContent", message
)

View File

@ -3,7 +3,7 @@ from __future__ import unicode_literals
import re
from collections import defaultdict
from moto.core import BaseBackend, BaseModel
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.exceptions import RESTError
from moto.ec2 import ec2_backends
from moto.cloudformation import cloudformation_backends
@ -12,6 +12,9 @@ import datetime
import time
import uuid
import itertools
import json
import yaml
import hashlib
from .utils import parameter_arn
from .exceptions import (
@ -22,6 +25,13 @@ from .exceptions import (
ParameterVersionLabelLimitExceeded,
ParameterVersionNotFound,
ParameterNotFound,
DocumentAlreadyExists,
InvalidDocumentOperation,
InvalidDocument,
InvalidDocumentContent,
InvalidDocumentVersion,
DuplicateDocumentVersionName,
DuplicateDocumentContent,
)
@ -102,6 +112,108 @@ class Parameter(BaseModel):
MAX_TIMEOUT_SECONDS = 3600
def generate_ssm_doc_param_list(parameters):
if not parameters:
return None
param_list = []
for param_name, param_info in parameters.items():
final_dict = {}
final_dict["Name"] = param_name
final_dict["Type"] = param_info["type"]
final_dict["Description"] = param_info["description"]
if (
param_info["type"] == "StringList"
or param_info["type"] == "StringMap"
or param_info["type"] == "MapList"
):
final_dict["DefaultValue"] = json.dumps(param_info["default"])
else:
final_dict["DefaultValue"] = str(param_info["default"])
param_list.append(final_dict)
return param_list
class Document(BaseModel):
def __init__(
self,
name,
version_name,
content,
document_type,
document_format,
requires,
attachments,
target_type,
tags,
document_version="1",
):
self.name = name
self.version_name = version_name
self.content = content
self.document_type = document_type
self.document_format = document_format
self.requires = requires
self.attachments = attachments
self.target_type = target_type
self.tags = tags
self.status = "Active"
self.document_version = document_version
self.owner = ACCOUNT_ID
self.created_date = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
if document_format == "JSON":
try:
content_json = json.loads(content)
except ValueError:
# Python2
raise InvalidDocumentContent(
"The content for the document is not valid."
)
except json.decoder.JSONDecodeError:
raise InvalidDocumentContent(
"The content for the document is not valid."
)
elif document_format == "YAML":
try:
content_json = yaml.safe_load(content)
except yaml.YAMLError:
raise InvalidDocumentContent(
"The content for the document is not valid."
)
else:
raise ValidationException("Invalid document format " + str(document_format))
self.content_json = content_json
try:
self.schema_version = str(content_json["schemaVersion"])
self.description = content_json.get("description")
self.outputs = content_json.get("outputs")
self.files = content_json.get("files")
# TODO add platformType (requires mapping the ssm actions to OS's this isn't well documented)
self.platform_types = ["Not Implemented (moto)"]
self.parameter_list = generate_ssm_doc_param_list(
content_json.get("parameters")
)
if (
self.schema_version == "0.3"
or self.schema_version == "2.0"
or self.schema_version == "2.2"
):
self.mainSteps = content_json["mainSteps"]
elif self.schema_version == "1.2":
self.runtimeConfig = content_json.get("runtimeConfig")
except KeyError:
raise InvalidDocumentContent("The content for the document is not valid.")
class Command(BaseModel):
def __init__(
self,
@ -269,6 +381,93 @@ class Command(BaseModel):
return invocation
def _validate_document_format(document_format):
aws_doc_formats = ["JSON", "YAML"]
if document_format not in aws_doc_formats:
raise ValidationException("Invalid document format " + str(document_format))
def _validate_document_info(content, name, document_type, document_format, strict=True):
aws_ssm_name_regex = r"^[a-zA-Z0-9_\-.]{3,128}$"
aws_name_reject_list = ["aws-", "amazon", "amzn"]
aws_doc_types = [
"Command",
"Policy",
"Automation",
"Session",
"Package",
"ApplicationConfiguration",
"ApplicationConfigurationSchema",
"DeploymentStrategy",
"ChangeCalendar",
]
_validate_document_format(document_format)
if not content:
raise ValidationException("Content is required")
if list(filter(name.startswith, aws_name_reject_list)):
raise ValidationException("Invalid document name " + str(name))
ssm_name_pattern = re.compile(aws_ssm_name_regex)
if not ssm_name_pattern.match(name):
raise ValidationException("Invalid document name " + str(name))
if strict and document_type not in aws_doc_types:
# Update document doesn't use document type
raise ValidationException("Invalid document type " + str(document_type))
def _document_filter_equal_comparator(keyed_value, filter):
for v in filter["Values"]:
if keyed_value == v:
return True
return False
def _document_filter_list_includes_comparator(keyed_value_list, filter):
for v in filter["Values"]:
if v in keyed_value_list:
return True
return False
def _document_filter_match(filters, ssm_doc):
for filter in filters:
if filter["Key"] == "Name" and not _document_filter_equal_comparator(
ssm_doc.name, filter
):
return False
elif filter["Key"] == "Owner":
if len(filter["Values"]) != 1:
raise ValidationException("Owner filter can only have one value.")
if filter["Values"][0] == "Self":
# Update to running account ID
filter["Values"][0] = ACCOUNT_ID
if not _document_filter_equal_comparator(ssm_doc.owner, filter):
return False
elif filter[
"Key"
] == "PlatformTypes" and not _document_filter_list_includes_comparator(
ssm_doc.platform_types, filter
):
return False
elif filter["Key"] == "DocumentType" and not _document_filter_equal_comparator(
ssm_doc.document_type, filter
):
return False
elif filter["Key"] == "TargetType" and not _document_filter_equal_comparator(
ssm_doc.target_type, filter
):
return False
return True
class SimpleSystemManagerBackend(BaseBackend):
def __init__(self):
# each value is a list of all of the versions for a parameter
@ -278,12 +477,355 @@ class SimpleSystemManagerBackend(BaseBackend):
self._resource_tags = defaultdict(lambda: defaultdict(dict))
self._commands = []
self._errors = []
self._documents = defaultdict(dict)
# figure out what region we're in
for region, backend in ssm_backends.items():
if backend == self:
self._region = region
def _generate_document_description(self, document):
latest = self._documents[document.name]["latest_version"]
default_version = self._documents[document.name]["default_version"]
base = {
"Hash": hashlib.sha256(document.content.encode("utf-8")).hexdigest(),
"HashType": "Sha256",
"Name": document.name,
"Owner": document.owner,
"CreatedDate": document.created_date,
"Status": document.status,
"DocumentVersion": document.document_version,
"Description": document.description,
"Parameters": document.parameter_list,
"PlatformTypes": document.platform_types,
"DocumentType": document.document_type,
"SchemaVersion": document.schema_version,
"LatestVersion": latest,
"DefaultVersion": default_version,
"DocumentFormat": document.document_format,
}
if document.version_name:
base["VersionName"] = document.version_name
if document.target_type:
base["TargetType"] = document.target_type
if document.tags:
base["Tags"] = document.tags
return base
def _generate_document_information(self, ssm_document, document_format):
base = {
"Name": ssm_document.name,
"DocumentVersion": ssm_document.document_version,
"Status": ssm_document.status,
"Content": ssm_document.content,
"DocumentType": ssm_document.document_type,
"DocumentFormat": document_format,
}
if document_format == "JSON":
base["Content"] = json.dumps(ssm_document.content_json)
elif document_format == "YAML":
base["Content"] = yaml.dump(ssm_document.content_json)
else:
raise ValidationException("Invalid document format " + str(document_format))
if ssm_document.version_name:
base["VersionName"] = ssm_document.version_name
if ssm_document.requires:
base["Requires"] = ssm_document.requires
if ssm_document.attachments:
base["AttachmentsContent"] = ssm_document.attachments
return base
def _generate_document_list_information(self, ssm_document):
base = {
"Name": ssm_document.name,
"Owner": ssm_document.owner,
"DocumentVersion": ssm_document.document_version,
"DocumentType": ssm_document.document_type,
"SchemaVersion": ssm_document.schema_version,
"DocumentFormat": ssm_document.document_format,
}
if ssm_document.version_name:
base["VersionName"] = ssm_document.version_name
if ssm_document.platform_types:
base["PlatformTypes"] = ssm_document.platform_types
if ssm_document.target_type:
base["TargetType"] = ssm_document.target_type
if ssm_document.tags:
base["Tags"] = ssm_document.tags
if ssm_document.requires:
base["Requires"] = ssm_document.requires
return base
def create_document(
self,
content,
requires,
attachments,
name,
version_name,
document_type,
document_format,
target_type,
tags,
):
ssm_document = Document(
name=name,
version_name=version_name,
content=content,
document_type=document_type,
document_format=document_format,
requires=requires,
attachments=attachments,
target_type=target_type,
tags=tags,
)
_validate_document_info(
content=content,
name=name,
document_type=document_type,
document_format=document_format,
)
if self._documents.get(ssm_document.name):
raise DocumentAlreadyExists("The specified document already exists.")
self._documents[ssm_document.name] = {
"documents": {ssm_document.document_version: ssm_document},
"default_version": ssm_document.document_version,
"latest_version": ssm_document.document_version,
}
return self._generate_document_description(ssm_document)
def delete_document(self, name, document_version, version_name, force):
documents = self._documents.get(name, {}).get("documents", {})
keys_to_delete = set()
if documents:
default_version = self._documents[name]["default_version"]
if (
documents[default_version].document_type
== "ApplicationConfigurationSchema"
and not force
):
raise InvalidDocumentOperation(
"You attempted to delete a document while it is still shared. "
"You must stop sharing the document before you can delete it."
)
if document_version and document_version == default_version:
raise InvalidDocumentOperation(
"Default version of the document can't be deleted."
)
if document_version or version_name:
# We delete only a specific version
delete_doc = self._find_document(name, document_version, version_name)
# we can't delete only the default version
if (
delete_doc
and delete_doc.document_version == default_version
and len(documents) != 1
):
raise InvalidDocumentOperation(
"Default version of the document can't be deleted."
)
if delete_doc:
keys_to_delete.add(delete_doc.document_version)
else:
raise InvalidDocument("The specified document does not exist.")
else:
# We are deleting all versions
keys_to_delete = set(documents.keys())
for key in keys_to_delete:
del self._documents[name]["documents"][key]
if len(self._documents[name]["documents"].keys()) == 0:
del self._documents[name]
else:
old_latest = self._documents[name]["latest_version"]
if old_latest not in self._documents[name]["documents"].keys():
leftover_keys = self._documents[name]["documents"].keys()
int_keys = []
for key in leftover_keys:
int_keys.append(int(key))
self._documents[name]["latest_version"] = str(sorted(int_keys)[-1])
else:
raise InvalidDocument("The specified document does not exist.")
def _find_document(
self, name, document_version=None, version_name=None, strict=True
):
if not self._documents.get(name):
raise InvalidDocument("The specified document does not exist.")
documents = self._documents[name]["documents"]
ssm_document = None
if not version_name and not document_version:
# Retrieve default version
default_version = self._documents[name]["default_version"]
ssm_document = documents.get(default_version)
elif version_name and document_version:
for doc_version, document in documents.items():
if (
doc_version == document_version
and document.version_name == version_name
):
ssm_document = document
break
else:
for doc_version, document in documents.items():
if document_version and doc_version == document_version:
ssm_document = document
break
if version_name and document.version_name == version_name:
ssm_document = document
break
if strict and not ssm_document:
raise InvalidDocument("The specified document does not exist.")
return ssm_document
def get_document(self, name, document_version, version_name, document_format):
ssm_document = self._find_document(name, document_version, version_name)
if not document_format:
document_format = ssm_document.document_format
else:
_validate_document_format(document_format=document_format)
return self._generate_document_information(ssm_document, document_format)
def update_document_default_version(self, name, document_version):
ssm_document = self._find_document(name, document_version=document_version)
self._documents[name]["default_version"] = document_version
base = {
"Name": ssm_document.name,
"DefaultVersion": document_version,
}
if ssm_document.version_name:
base["DefaultVersionName"] = ssm_document.version_name
return base
def update_document(
self,
content,
attachments,
name,
version_name,
document_version,
document_format,
target_type,
):
_validate_document_info(
content=content,
name=name,
document_type=None,
document_format=document_format,
strict=False,
)
if not self._documents.get(name):
raise InvalidDocument("The specified document does not exist.")
if (
self._documents[name]["latest_version"] != document_version
and document_version != "$LATEST"
):
raise InvalidDocumentVersion(
"The document version is not valid or does not exist."
)
if version_name and self._find_document(
name, version_name=version_name, strict=False
):
raise DuplicateDocumentVersionName(
"The specified version name is a duplicate."
)
old_ssm_document = self._find_document(name)
new_ssm_document = Document(
name=name,
version_name=version_name,
content=content,
document_type=old_ssm_document.document_type,
document_format=document_format,
requires=old_ssm_document.requires,
attachments=attachments,
target_type=target_type,
tags=old_ssm_document.tags,
document_version=str(int(self._documents[name]["latest_version"]) + 1),
)
for doc_version, document in self._documents[name]["documents"].items():
if document.content == new_ssm_document.content:
raise DuplicateDocumentContent(
"The content of the association document matches another document. "
"Change the content of the document and try again."
)
self._documents[name]["latest_version"] = str(
int(self._documents[name]["latest_version"]) + 1
)
self._documents[name]["documents"][
new_ssm_document.document_version
] = new_ssm_document
return self._generate_document_description(new_ssm_document)
def describe_document(self, name, document_version, version_name):
ssm_document = self._find_document(name, document_version, version_name)
return self._generate_document_description(ssm_document)
def list_documents(
self, document_filter_list, filters, max_results=10, next_token="0"
):
if document_filter_list:
raise ValidationException(
"DocumentFilterList is deprecated. Instead use Filters."
)
next_token = int(next_token)
results = []
dummy_token_tracker = 0
# Sort to maintain next token adjacency
for document_name, document_bundle in sorted(self._documents.items()):
if len(results) == max_results:
# There's still more to go so we need a next token
return results, str(next_token + len(results))
if dummy_token_tracker < next_token:
dummy_token_tracker = dummy_token_tracker + 1
continue
default_version = document_bundle["default_version"]
ssm_doc = self._documents[document_name]["documents"][default_version]
if filters and not _document_filter_match(filters, ssm_doc):
# If we have filters enabled, and we don't match them,
continue
else:
results.append(self._generate_document_list_information(ssm_doc))
# If we've fallen out of the loop, theres no more documents. No next token.
return results, ""
def delete_parameter(self, name):
return self._parameters.pop(name, None)

View File

@ -17,6 +17,116 @@ class SimpleSystemManagerResponse(BaseResponse):
except ValueError:
return {}
def create_document(self):
content = self._get_param("Content")
requires = self._get_param("Requires")
attachments = self._get_param("Attachments")
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_type = self._get_param("DocumentType")
document_format = self._get_param("DocumentFormat", "JSON")
target_type = self._get_param("TargetType")
tags = self._get_param("Tags")
result = self.ssm_backend.create_document(
content=content,
requires=requires,
attachments=attachments,
name=name,
version_name=version_name,
document_type=document_type,
document_format=document_format,
target_type=target_type,
tags=tags,
)
return json.dumps({"DocumentDescription": result})
def delete_document(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
version_name = self._get_param("VersionName")
force = self._get_param("Force", False)
self.ssm_backend.delete_document(
name=name,
document_version=document_version,
version_name=version_name,
force=force,
)
return json.dumps({})
def get_document(self):
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_version = self._get_param("DocumentVersion")
document_format = self._get_param("DocumentFormat", "JSON")
document = self.ssm_backend.get_document(
name=name,
document_version=document_version,
document_format=document_format,
version_name=version_name,
)
return json.dumps(document)
def describe_document(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
version_name = self._get_param("VersionName")
result = self.ssm_backend.describe_document(
name=name, document_version=document_version, version_name=version_name
)
return json.dumps({"Document": result})
def update_document(self):
content = self._get_param("Content")
attachments = self._get_param("Attachments")
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_version = self._get_param("DocumentVersion")
document_format = self._get_param("DocumentFormat", "JSON")
target_type = self._get_param("TargetType")
result = self.ssm_backend.update_document(
content=content,
attachments=attachments,
name=name,
version_name=version_name,
document_version=document_version,
document_format=document_format,
target_type=target_type,
)
return json.dumps({"DocumentDescription": result})
def update_document_default_version(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
result = self.ssm_backend.update_document_default_version(
name=name, document_version=document_version
)
return json.dumps({"Description": result})
def list_documents(self):
document_filter_list = self._get_param("DocumentFilterList")
filters = self._get_param("Filters")
max_results = self._get_param("MaxResults", 10)
next_token = self._get_param("NextToken", "0")
documents, token = self.ssm_backend.list_documents(
document_filter_list=document_filter_list,
filters=filters,
max_results=max_results,
next_token=next_token,
)
return json.dumps({"DocumentIdentifiers": documents, "NextToken": token})
def _get_param(self, param, default=None):
return self.request_params.get(param, default)

View File

View File

@ -0,0 +1,769 @@
from __future__ import unicode_literals
import boto3
import botocore.exceptions
import sure # noqa
import datetime
import json
import pkg_resources
import yaml
import hashlib
import copy
from moto.core import ACCOUNT_ID
from moto import mock_ssm
def _get_yaml_template():
template_path = "/".join(["test_ssm", "test_templates", "good.yaml"])
resource_path = pkg_resources.resource_string("tests", template_path)
return resource_path
def _validate_document_description(
doc_name,
doc_description,
json_doc,
expected_document_version,
expected_latest_version,
expected_default_version,
expected_format,
):
if expected_format == "JSON":
doc_description["Hash"].should.equal(
hashlib.sha256(json.dumps(json_doc).encode("utf-8")).hexdigest()
)
else:
doc_description["Hash"].should.equal(
hashlib.sha256(yaml.dump(json_doc).encode("utf-8")).hexdigest()
)
doc_description["HashType"].should.equal("Sha256")
doc_description["Name"].should.equal(doc_name)
doc_description["Owner"].should.equal(ACCOUNT_ID)
difference = datetime.datetime.utcnow() - doc_description["CreatedDate"]
if difference.min > datetime.timedelta(minutes=1):
assert False
doc_description["Status"].should.equal("Active")
doc_description["DocumentVersion"].should.equal(expected_document_version)
doc_description["Description"].should.equal(json_doc["description"])
doc_description["Parameters"] = sorted(
doc_description["Parameters"], key=lambda doc: doc["Name"]
)
doc_description["Parameters"][0]["Name"].should.equal("Parameter1")
doc_description["Parameters"][0]["Type"].should.equal("Integer")
doc_description["Parameters"][0]["Description"].should.equal("Command Duration.")
doc_description["Parameters"][0]["DefaultValue"].should.equal("3")
doc_description["Parameters"][1]["Name"].should.equal("Parameter2")
doc_description["Parameters"][1]["Type"].should.equal("String")
doc_description["Parameters"][1]["DefaultValue"].should.equal("def")
doc_description["Parameters"][2]["Name"].should.equal("Parameter3")
doc_description["Parameters"][2]["Type"].should.equal("Boolean")
doc_description["Parameters"][2]["Description"].should.equal("A boolean")
doc_description["Parameters"][2]["DefaultValue"].should.equal("False")
doc_description["Parameters"][3]["Name"].should.equal("Parameter4")
doc_description["Parameters"][3]["Type"].should.equal("StringList")
doc_description["Parameters"][3]["Description"].should.equal("A string list")
doc_description["Parameters"][3]["DefaultValue"].should.equal('["abc", "def"]')
doc_description["Parameters"][4]["Name"].should.equal("Parameter5")
doc_description["Parameters"][4]["Type"].should.equal("StringMap")
doc_description["Parameters"][5]["Name"].should.equal("Parameter6")
doc_description["Parameters"][5]["Type"].should.equal("MapList")
if expected_format == "JSON":
# We have to replace single quotes from the response to package it back up
json.loads(doc_description["Parameters"][4]["DefaultValue"]).should.equal(
{
"NotificationArn": "$dependency.topicArn",
"NotificationEvents": ["Failed"],
"NotificationType": "Command",
}
)
json.loads(doc_description["Parameters"][5]["DefaultValue"]).should.equal(
[
{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": "50"}},
{"DeviceName": "/dev/sdm", "Ebs": {"VolumeSize": "100"}},
]
)
else:
yaml.safe_load(doc_description["Parameters"][4]["DefaultValue"]).should.equal(
{
"NotificationArn": "$dependency.topicArn",
"NotificationEvents": ["Failed"],
"NotificationType": "Command",
}
)
yaml.safe_load(doc_description["Parameters"][5]["DefaultValue"]).should.equal(
[
{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": "50"}},
{"DeviceName": "/dev/sdm", "Ebs": {"VolumeSize": "100"}},
]
)
doc_description["DocumentType"].should.equal("Command")
doc_description["SchemaVersion"].should.equal("2.2")
doc_description["LatestVersion"].should.equal(expected_latest_version)
doc_description["DefaultVersion"].should.equal(expected_default_version)
doc_description["DocumentFormat"].should.equal(expected_format)
def _get_doc_validator(
response, version_name, doc_version, json_doc_content, document_format
):
response["Name"].should.equal("TestDocument3")
if version_name:
response["VersionName"].should.equal(version_name)
response["DocumentVersion"].should.equal(doc_version)
response["Status"].should.equal("Active")
if document_format == "JSON":
json.loads(response["Content"]).should.equal(json_doc_content)
else:
yaml.safe_load(response["Content"]).should.equal(json_doc_content)
response["DocumentType"].should.equal("Command")
response["DocumentFormat"].should.equal(document_format)
@mock_ssm
def test_create_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
response = client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="YAML",
)
doc_description = response["DocumentDescription"]
_validate_document_description(
"TestDocument", doc_description, json_doc, "1", "1", "1", "YAML"
)
response = client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument2",
DocumentType="Command",
DocumentFormat="JSON",
)
doc_description = response["DocumentDescription"]
_validate_document_description(
"TestDocument2", doc_description, json_doc, "1", "1", "1", "JSON"
)
response = client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="JSON",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
Tags=[{"Key": "testing", "Value": "testingValue"}],
)
doc_description = response["DocumentDescription"]
doc_description["VersionName"].should.equal("Base")
doc_description["TargetType"].should.equal("/AWS::EC2::Instance")
doc_description["Tags"].should.equal([{"Key": "testing", "Value": "testingValue"}])
_validate_document_description(
"TestDocument3", doc_description, json_doc, "1", "1", "1", "JSON"
)
try:
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("CreateDocument")
err.response["Error"]["Message"].should.equal(
"The specified document already exists."
)
try:
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument4",
DocumentType="Command",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("CreateDocument")
err.response["Error"]["Message"].should.equal(
"The content for the document is not valid."
)
del json_doc["parameters"]
response = client.create_document(
Content=yaml.dump(json_doc),
Name="EmptyParamDoc",
DocumentType="Command",
DocumentFormat="YAML",
)
doc_description = response["DocumentDescription"]
doc_description["Hash"].should.equal(
hashlib.sha256(yaml.dump(json_doc).encode("utf-8")).hexdigest()
)
doc_description["HashType"].should.equal("Sha256")
doc_description["Name"].should.equal("EmptyParamDoc")
doc_description["Owner"].should.equal(ACCOUNT_ID)
difference = datetime.datetime.utcnow() - doc_description["CreatedDate"]
if difference.min > datetime.timedelta(minutes=1):
assert False
doc_description["Status"].should.equal("Active")
doc_description["DocumentVersion"].should.equal("1")
doc_description["Description"].should.equal(json_doc["description"])
doc_description["DocumentType"].should.equal("Command")
doc_description["SchemaVersion"].should.equal("2.2")
doc_description["LatestVersion"].should.equal("1")
doc_description["DefaultVersion"].should.equal("1")
doc_description["DocumentFormat"].should.equal("YAML")
@mock_ssm
def test_get_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.get_document(Name="DNE")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
)
new_json_doc = copy.copy(json_doc)
new_json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
VersionName="NewBase",
)
response = client.get_document(Name="TestDocument3")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", DocumentFormat="YAML")
_get_doc_validator(response, "Base", "1", json_doc, "YAML")
response = client.get_document(Name="TestDocument3", DocumentFormat="JSON")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", VersionName="Base")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", DocumentVersion="1")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
response = client.get_document(Name="TestDocument3", DocumentVersion="2")
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
response = client.get_document(Name="TestDocument3", VersionName="NewBase")
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
response = client.get_document(
Name="TestDocument3", VersionName="NewBase", DocumentVersion="2"
)
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
try:
response = client.get_document(
Name="TestDocument3", VersionName="BadName", DocumentVersion="2"
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
try:
response = client.get_document(Name="TestDocument3", DocumentVersion="3")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
# Updating default should update normal get
client.update_document_default_version(Name="TestDocument3", DocumentVersion="2")
response = client.get_document(Name="TestDocument3", DocumentFormat="JSON")
_get_doc_validator(response, "NewBase", "2", new_json_doc, "JSON")
@mock_ssm
def test_delete_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.delete_document(Name="DNE")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DeleteDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
# Test simple
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
)
client.delete_document(Name="TestDocument3")
try:
client.get_document(Name="TestDocument3")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
# Delete default version with other version is bad
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
)
new_json_doc = copy.copy(json_doc)
new_json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
VersionName="NewBase",
)
new_json_doc["description"] = "a new description2"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
)
new_json_doc["description"] = "a new description3"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
)
new_json_doc["description"] = "a new description4"
client.update_document(
Content=json.dumps(new_json_doc),
Name="TestDocument3",
DocumentVersion="$LATEST",
)
try:
client.delete_document(Name="TestDocument3", DocumentVersion="1")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DeleteDocument")
err.response["Error"]["Message"].should.equal(
"Default version of the document can't be deleted."
)
try:
client.delete_document(Name="TestDocument3", VersionName="Base")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DeleteDocument")
err.response["Error"]["Message"].should.equal(
"Default version of the document can't be deleted."
)
# Make sure no ill side effects
response = client.get_document(Name="TestDocument3")
_get_doc_validator(response, "Base", "1", json_doc, "JSON")
client.delete_document(Name="TestDocument3", DocumentVersion="5")
# Check that latest version is changed
response = client.describe_document(Name="TestDocument3")
response["Document"]["LatestVersion"].should.equal("4")
client.delete_document(Name="TestDocument3", VersionName="NewBase")
# Make sure other versions okay
client.get_document(Name="TestDocument3", DocumentVersion="1")
client.get_document(Name="TestDocument3", DocumentVersion="3")
client.get_document(Name="TestDocument3", DocumentVersion="4")
client.delete_document(Name="TestDocument3")
try:
client.get_document(Name="TestDocument3", DocumentVersion="1")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
try:
client.get_document(Name="TestDocument3", DocumentVersion="3")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
try:
client.get_document(Name="TestDocument3", DocumentVersion="4")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
response = client.list_documents()
len(response["DocumentIdentifiers"]).should.equal(0)
@mock_ssm
def test_update_document_default_version():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.update_document_default_version(Name="DNE", DocumentVersion="1")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocumentDefaultVersion")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentType="Command",
VersionName="Base",
)
json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
)
json_doc["description"] = "a new description2"
client.update_document(
Content=json.dumps(json_doc), Name="TestDocument", DocumentVersion="$LATEST"
)
response = client.update_document_default_version(
Name="TestDocument", DocumentVersion="2"
)
response["Description"]["Name"].should.equal("TestDocument")
response["Description"]["DefaultVersion"].should.equal("2")
json_doc["description"] = "a new description3"
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
VersionName="NewBase",
)
response = client.update_document_default_version(
Name="TestDocument", DocumentVersion="4"
)
response["Description"]["Name"].should.equal("TestDocument")
response["Description"]["DefaultVersion"].should.equal("4")
response["Description"]["DefaultVersionName"].should.equal("NewBase")
@mock_ssm
def test_update_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.update_document(
Name="DNE",
Content=json.dumps(json_doc),
DocumentVersion="1",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="JSON",
VersionName="Base",
)
try:
client.update_document(
Name="TestDocument",
Content=json.dumps(json_doc),
DocumentVersion="2",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The document version is not valid or does not exist."
)
# Duplicate content throws an error
try:
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="1",
DocumentFormat="JSON",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The content of the association document matches another "
"document. Change the content of the document and try again."
)
json_doc["description"] = "a new description"
# Duplicate version name
try:
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="1",
DocumentFormat="JSON",
VersionName="Base",
)
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("UpdateDocument")
err.response["Error"]["Message"].should.equal(
"The specified version name is a duplicate."
)
response = client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
VersionName="Base2",
DocumentVersion="1",
DocumentFormat="JSON",
)
response["DocumentDescription"]["Description"].should.equal("a new description")
response["DocumentDescription"]["DocumentVersion"].should.equal("2")
response["DocumentDescription"]["LatestVersion"].should.equal("2")
response["DocumentDescription"]["DefaultVersion"].should.equal("1")
json_doc["description"] = "a new description2"
response = client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
VersionName="NewBase",
)
response["DocumentDescription"]["Description"].should.equal("a new description2")
response["DocumentDescription"]["DocumentVersion"].should.equal("3")
response["DocumentDescription"]["LatestVersion"].should.equal("3")
response["DocumentDescription"]["DefaultVersion"].should.equal("1")
response["DocumentDescription"]["VersionName"].should.equal("NewBase")
@mock_ssm
def test_describe_document():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
try:
client.describe_document(Name="DNE")
raise RuntimeError("Should fail")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("DescribeDocument")
err.response["Error"]["Message"].should.equal(
"The specified document does not exist."
)
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
TargetType="/AWS::EC2::Instance",
Tags=[{"Key": "testing", "Value": "testingValue"}],
)
response = client.describe_document(Name="TestDocument")
doc_description = response["Document"]
_validate_document_description(
"TestDocument", doc_description, json_doc, "1", "1", "1", "YAML"
)
# Adding update to check for issues
new_json_doc = copy.copy(json_doc)
new_json_doc["description"] = "a new description2"
client.update_document(
Content=json.dumps(new_json_doc), Name="TestDocument", DocumentVersion="$LATEST"
)
response = client.describe_document(Name="TestDocument")
doc_description = response["Document"]
_validate_document_description(
"TestDocument", doc_description, json_doc, "1", "2", "1", "YAML"
)
@mock_ssm
def test_list_documents():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="JSON",
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument2",
DocumentType="Command",
DocumentFormat="JSON",
)
client.create_document(
Content=json.dumps(json_doc),
Name="TestDocument3",
DocumentType="Command",
DocumentFormat="JSON",
TargetType="/AWS::EC2::Instance",
)
response = client.list_documents()
len(response["DocumentIdentifiers"]).should.equal(3)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument")
response["DocumentIdentifiers"][1]["Name"].should.equal("TestDocument2")
response["DocumentIdentifiers"][2]["Name"].should.equal("TestDocument3")
response["NextToken"].should.equal("")
response = client.list_documents(MaxResults=1)
len(response["DocumentIdentifiers"]).should.equal(1)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("1")
response = client.list_documents(MaxResults=1, NextToken=response["NextToken"])
len(response["DocumentIdentifiers"]).should.equal(1)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument2")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("2")
response = client.list_documents(MaxResults=1, NextToken=response["NextToken"])
len(response["DocumentIdentifiers"]).should.equal(1)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument3")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("")
# making sure no bad interactions with update
json_doc["description"] = "a new description"
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
)
client.update_document(
Content=json.dumps(json_doc),
Name="TestDocument2",
DocumentVersion="$LATEST",
DocumentFormat="JSON",
)
client.update_document_default_version(Name="TestDocument", DocumentVersion="2")
response = client.list_documents()
len(response["DocumentIdentifiers"]).should.equal(3)
response["DocumentIdentifiers"][0]["Name"].should.equal("TestDocument")
response["DocumentIdentifiers"][0]["DocumentVersion"].should.equal("2")
response["DocumentIdentifiers"][1]["Name"].should.equal("TestDocument2")
response["DocumentIdentifiers"][1]["DocumentVersion"].should.equal("1")
response["DocumentIdentifiers"][2]["Name"].should.equal("TestDocument3")
response["DocumentIdentifiers"][2]["DocumentVersion"].should.equal("1")
response["NextToken"].should.equal("")
response = client.list_documents(Filters=[{"Key": "Owner", "Values": ["Self"]}])
len(response["DocumentIdentifiers"]).should.equal(3)
response = client.list_documents(
Filters=[{"Key": "TargetType", "Values": ["/AWS::EC2::Instance"]}]
)
len(response["DocumentIdentifiers"]).should.equal(1)

View File

@ -0,0 +1,47 @@
schemaVersion: "2.2"
description: "Sample Yaml"
parameters:
Parameter1:
type: "Integer"
default: 3
description: "Command Duration."
allowedValues: [1,2,3,4]
Parameter2:
type: "String"
default: "def"
description:
allowedValues: ["abc", "def", "ghi"]
allowedPattern: r"^[a-zA-Z0-9_\-.]{3,128}$"
Parameter3:
type: "Boolean"
default: false
description: "A boolean"
allowedValues: [True, False]
Parameter4:
type: "StringList"
default: ["abc", "def"]
description: "A string list"
Parameter5:
type: "StringMap"
default:
NotificationType: Command
NotificationEvents:
- Failed
NotificationArn: "$dependency.topicArn"
description:
Parameter6:
type: "MapList"
default:
- DeviceName: "/dev/sda1"
Ebs:
VolumeSize: '50'
- DeviceName: "/dev/sdm"
Ebs:
VolumeSize: '100'
description:
mainSteps:
- action: "aws:runShellScript"
name: "sampleCommand"
inputs:
runCommand:
- "echo hi"