import json import re from six import string_types from moto.iam.exceptions import MalformedPolicyDocument VALID_TOP_ELEMENTS = [ "Version", "Id", "Statement", "Conditions" ] VALID_VERSIONS = [ "2008-10-17", "2012-10-17" ] VALID_STATEMENT_ELEMENTS = [ "Sid", "Action", "NotAction", "Resource", "NotResource", "Effect", "Condition" ] VALID_EFFECTS = [ "Allow", "Deny" ] VALID_CONDITIONS = [ "StringEquals", "StringNotEquals", "StringEqualsIgnoreCase", "StringNotEqualsIgnoreCase", "StringLike", "StringNotLike", "NumericEquals", "NumericNotEquals", "NumericLessThan", "NumericLessThanEquals", "NumericGreaterThan", "NumericGreaterThanEquals", "DateEquals", "DateNotEquals", "DateLessThan", "DateLessThanEquals", "DateGreaterThan", "DateGreaterThanEquals", "Bool", "BinaryEquals", "IpAddress", "NotIpAddress", "ArnEquals", "ArnLike", "ArnNotEquals", "ArnNotLike", "Null" ] VALID_CONDITION_PREFIXES = [ "ForAnyValue:", "ForAllValues:" ] VALID_CONDITION_POSTFIXES = [ "IfExists" ] SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS = { "iam": 'IAM resource {resource} cannot contain region information.', "s3": 'Resource {resource} can not contain region information.' } VALID_RESOURCE_PATH_STARTING_VALUES = { "iam": { "values": ["user/", "federated-user/", "role/", "group/", "instance-profile/", "mfa/", "server-certificate/", "policy/", "sms-mfa/", "saml-provider/", "oidc-provider/", "report/", "access-report/"], "error_message": 'IAM resource path must either be "*" or start with {values}.' } } class IAMPolicyDocumentValidator: def __init__(self, policy_document): self._policy_document = policy_document self._policy_json = {} self._statements = [] self._resource_error = "" # the first resource error found that does not generate a legacy parsing error def validate(self): try: self._validate_syntax() except Exception: raise MalformedPolicyDocument("Syntax errors in policy.") try: self._validate_version() except Exception: raise MalformedPolicyDocument("Policy document must be version 2012-10-17 or greater.") try: self._perform_first_legacy_parsing() self._validate_resources_for_formats() self._validate_not_resources_for_formats() except Exception: raise MalformedPolicyDocument("The policy failed legacy parsing") try: self._validate_sid_uniqueness() except Exception: raise MalformedPolicyDocument("Statement IDs (SID) in a single policy must be unique.") try: self._validate_action_like_exist() except Exception: raise MalformedPolicyDocument("Policy statement must contain actions.") try: self._validate_resource_exist() except Exception: raise MalformedPolicyDocument("Policy statement must contain resources.") if self._resource_error != "": raise MalformedPolicyDocument(self._resource_error) self._validate_actions_for_prefixes() self._validate_not_actions_for_prefixes() def _validate_syntax(self): self._policy_json = json.loads(self._policy_document) assert isinstance(self._policy_json, dict) self._validate_top_elements() self._validate_version_syntax() self._validate_id_syntax() self._validate_statements_syntax() def _validate_top_elements(self): top_elements = self._policy_json.keys() for element in top_elements: assert element in VALID_TOP_ELEMENTS def _validate_version_syntax(self): if "Version" in self._policy_json: assert self._policy_json["Version"] in VALID_VERSIONS def _validate_version(self): assert self._policy_json["Version"] == "2012-10-17" def _validate_sid_uniqueness(self): sids = [] for statement in self._statements: if "Sid" in statement: assert statement["Sid"] not in sids sids.append(statement["Sid"]) def _validate_statements_syntax(self): assert "Statement" in self._policy_json assert isinstance(self._policy_json["Statement"], (dict, list)) if isinstance(self._policy_json["Statement"], dict): self._statements.append(self._policy_json["Statement"]) else: self._statements += self._policy_json["Statement"] assert self._statements for statement in self._statements: self._validate_statement_syntax(statement) @staticmethod def _validate_statement_syntax(statement): assert isinstance(statement, dict) for statement_element in statement.keys(): assert statement_element in VALID_STATEMENT_ELEMENTS assert ("Resource" not in statement or "NotResource" not in statement) assert ("Action" not in statement or "NotAction" not in statement) IAMPolicyDocumentValidator._validate_effect_syntax(statement) IAMPolicyDocumentValidator._validate_action_syntax(statement) IAMPolicyDocumentValidator._validate_not_action_syntax(statement) IAMPolicyDocumentValidator._validate_resource_syntax(statement) IAMPolicyDocumentValidator._validate_not_resource_syntax(statement) IAMPolicyDocumentValidator._validate_condition_syntax(statement) IAMPolicyDocumentValidator._validate_sid_syntax(statement) @staticmethod def _validate_effect_syntax(statement): assert "Effect" in statement assert isinstance(statement["Effect"], string_types) assert statement["Effect"].lower() in [allowed_effect.lower() for allowed_effect in VALID_EFFECTS] @staticmethod def _validate_action_syntax(statement): IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "Action") @staticmethod def _validate_not_action_syntax(statement): IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "NotAction") @staticmethod def _validate_resource_syntax(statement): IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "Resource") @staticmethod def _validate_not_resource_syntax(statement): IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "NotResource") @staticmethod def _validate_string_or_list_of_strings_syntax(statement, key): if key in statement: assert isinstance(statement[key], (string_types, list)) if isinstance(statement[key], list): for resource in statement[key]: assert isinstance(resource, string_types) @staticmethod def _validate_condition_syntax(statement): if "Condition" in statement: assert isinstance(statement["Condition"], dict) for condition_key, condition_value in statement["Condition"].items(): assert isinstance(condition_value, dict) for condition_element_key, condition_element_value in condition_value.items(): assert isinstance(condition_element_value, (list, string_types)) if IAMPolicyDocumentValidator._strip_condition_key(condition_key) not in VALID_CONDITIONS: assert not condition_value # empty dict @staticmethod def _strip_condition_key(condition_key): for valid_prefix in VALID_CONDITION_PREFIXES: if condition_key.startswith(valid_prefix): condition_key = condition_key[len(valid_prefix):] break # strip only the first match for valid_postfix in VALID_CONDITION_POSTFIXES: if condition_key.endswith(valid_postfix): condition_key = condition_key[:-len(valid_postfix)] break # strip only the first match return condition_key @staticmethod def _validate_sid_syntax(statement): if "Sid" in statement: assert isinstance(statement["Sid"], string_types) def _validate_id_syntax(self): if "Id" in self._policy_json: assert isinstance(self._policy_json["Id"], string_types) def _validate_resource_exist(self): for statement in self._statements: assert ("Resource" in statement or "NotResource" in statement) if "Resource" in statement and isinstance(statement["Resource"], list): assert statement["Resource"] elif "NotResource" in statement and isinstance(statement["NotResource"], list): assert statement["NotResource"] def _validate_action_like_exist(self): for statement in self._statements: assert ("Action" in statement or "NotAction" in statement) if "Action" in statement and isinstance(statement["Action"], list): assert statement["Action"] elif "NotAction" in statement and isinstance(statement["NotAction"], list): assert statement["NotAction"] def _validate_actions_for_prefixes(self): self._validate_action_like_for_prefixes("Action") def _validate_not_actions_for_prefixes(self): self._validate_action_like_for_prefixes("NotAction") def _validate_action_like_for_prefixes(self, key): for statement in self._statements: if key in statement: if isinstance(statement[key], string_types): self._validate_action_prefix(statement[key]) else: for action in statement[key]: self._validate_action_prefix(action) @staticmethod def _validate_action_prefix(action): action_parts = action.split(":") if len(action_parts) == 1 and action_parts[0] != "*": raise MalformedPolicyDocument("Actions/Conditions must be prefaced by a vendor, e.g., iam, sdb, ec2, etc.") elif len(action_parts) > 2: raise MalformedPolicyDocument("Actions/Condition can contain only one colon.") vendor_pattern = re.compile(r'[^a-zA-Z0-9\-.]') if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]): raise MalformedPolicyDocument("Vendor {vendor} is not valid".format(vendor=action_parts[0])) def _validate_resources_for_formats(self): self._validate_resource_like_for_formats("Resource") def _validate_not_resources_for_formats(self): self._validate_resource_like_for_formats("NotResource") def _validate_resource_like_for_formats(self, key): for statement in self._statements: if key in statement: if isinstance(statement[key], string_types): self._validate_resource_format(statement[key]) else: for resource in sorted(statement[key], reverse=True): self._validate_resource_format(resource) if self._resource_error == "": IAMPolicyDocumentValidator._legacy_parse_resource_like(statement, key) def _validate_resource_format(self, resource): if resource != "*": resource_partitions = resource.partition(":") if resource_partitions[1] == "": self._resource_error = 'Resource {resource} must be in ARN format or "*".'.format(resource=resource) return resource_partitions = resource_partitions[2].partition(":") if resource_partitions[0] != "aws": remaining_resource_parts = resource_partitions[2].split(":") arn1 = remaining_resource_parts[0] if remaining_resource_parts[0] != "" or len(remaining_resource_parts) > 1 else "*" arn2 = remaining_resource_parts[1] if len(remaining_resource_parts) > 1 else "*" arn3 = remaining_resource_parts[2] if len(remaining_resource_parts) > 2 else "*" arn4 = ":".join(remaining_resource_parts[3:]) if len(remaining_resource_parts) > 3 else "*" self._resource_error = 'Partition "{partition}" is not valid for resource "arn:{partition}:{arn1}:{arn2}:{arn3}:{arn4}".'.format( partition=resource_partitions[0], arn1=arn1, arn2=arn2, arn3=arn3, arn4=arn4 ) return if resource_partitions[1] != ":": self._resource_error = "Resource vendor must be fully qualified and cannot contain regexes." return resource_partitions = resource_partitions[2].partition(":") service = resource_partitions[0] if service in SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS.keys() and not resource_partitions[2].startswith(":"): self._resource_error = SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS[service].format(resource=resource) return resource_partitions = resource_partitions[2].partition(":") resource_partitions = resource_partitions[2].partition(":") if service in VALID_RESOURCE_PATH_STARTING_VALUES.keys(): valid_start = False for valid_starting_value in VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"]: if resource_partitions[2].startswith(valid_starting_value): valid_start = True break if not valid_start: self._resource_error = VALID_RESOURCE_PATH_STARTING_VALUES[service]["error_message"].format( values=", ".join(VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"]) ) def _perform_first_legacy_parsing(self): """This method excludes legacy parsing resources, since that have to be done later.""" for statement in self._statements: self._legacy_parse_statement(statement) @staticmethod def _legacy_parse_statement(statement): assert statement["Effect"] in VALID_EFFECTS # case-sensitive matching if "Condition" in statement: for condition_key, condition_value in statement["Condition"].items(): IAMPolicyDocumentValidator._legacy_parse_condition(condition_key, condition_value) @staticmethod def _legacy_parse_resource_like(statement, key): if isinstance(statement[key], string_types): if statement[key] != "*": assert statement[key].count(":") >= 5 or "::" not in statement[key] assert statement[key].split(":")[2] != "" else: # list for resource in statement[key]: if resource != "*": assert resource.count(":") >= 5 or "::" not in resource assert resource[2] != "" @staticmethod def _legacy_parse_condition(condition_key, condition_value): stripped_condition_key = IAMPolicyDocumentValidator._strip_condition_key(condition_key) if stripped_condition_key.startswith("Date"): for condition_element_key, condition_element_value in condition_value.items(): if isinstance(condition_element_value, string_types): IAMPolicyDocumentValidator._legacy_parse_date_condition_value(condition_element_value) else: # it has to be a list for date_condition_value in condition_element_value: IAMPolicyDocumentValidator._legacy_parse_date_condition_value(date_condition_value) @staticmethod def _legacy_parse_date_condition_value(date_condition_value): if "t" in date_condition_value.lower() or "-" in date_condition_value: IAMPolicyDocumentValidator._validate_iso_8601_datetime(date_condition_value.lower()) else: # timestamp assert 0 <= int(date_condition_value) <= 9223372036854775807 @staticmethod def _validate_iso_8601_datetime(datetime): datetime_parts = datetime.partition("t") negative_year = datetime_parts[0].startswith("-") date_parts = datetime_parts[0][1:].split("-") if negative_year else datetime_parts[0].split("-") year = "-" + date_parts[0] if negative_year else date_parts[0] assert -292275054 <= int(year) <= 292278993 if len(date_parts) > 1: month = date_parts[1] assert 1 <= int(month) <= 12 if len(date_parts) > 2: day = date_parts[2] assert 1 <= int(day) <= 31 assert len(date_parts) < 4 time_parts = datetime_parts[2].split(":") if time_parts[0] != "": hours = time_parts[0] assert 0 <= int(hours) <= 23 if len(time_parts) > 1: minutes = time_parts[1] assert 0 <= int(minutes) <= 59 if len(time_parts) > 2: if "z" in time_parts[2]: seconds_with_decimal_fraction = time_parts[2].partition("z")[0] assert time_parts[2].partition("z")[2] == "" elif "+" in time_parts[2]: seconds_with_decimal_fraction = time_parts[2].partition("+")[0] time_zone_data = time_parts[2].partition("+")[2].partition(":") time_zone_hours = time_zone_data[0] assert len(time_zone_hours) == 2 assert 0 <= int(time_zone_hours) <= 23 if time_zone_data[1] == ":": time_zone_minutes = time_zone_data[2] assert len(time_zone_minutes) == 2 assert 0 <= int(time_zone_minutes) <= 59 else: seconds_with_decimal_fraction = time_parts[2] seconds_with_decimal_fraction_partition = seconds_with_decimal_fraction.partition(".") seconds = seconds_with_decimal_fraction_partition[0] assert 0 <= int(seconds) <= 59 if seconds_with_decimal_fraction_partition[1] == ".": decimal_seconds = seconds_with_decimal_fraction_partition[2] assert 0 <= int(decimal_seconds) <= 999999999