Added put_bucket_logging support (#1401)
- Also added put acl for XML - Put logging will also verify that the destination bucket exists in the same region with the proper ACLs attached.
This commit is contained in:
		
							parent
							
								
									b855fee2e4
								
							
						
					
					
						commit
						770281aef2
					
				| @ -111,3 +111,30 @@ class MalformedXML(S3ClientError): | ||||
|             "MalformedXML", | ||||
|             "The XML you provided was not well-formed or did not validate against our published schema", | ||||
|             *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| class MalformedACLError(S3ClientError): | ||||
|     code = 400 | ||||
| 
 | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         super(MalformedACLError, self).__init__( | ||||
|             "MalformedACLError", | ||||
|             "The XML you provided was not well-formed or did not validate against our published schema", | ||||
|             *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| class InvalidTargetBucketForLogging(S3ClientError): | ||||
|     code = 400 | ||||
| 
 | ||||
|     def __init__(self, msg): | ||||
|         super(InvalidTargetBucketForLogging, self).__init__("InvalidTargetBucketForLogging", msg) | ||||
| 
 | ||||
| 
 | ||||
| class CrossLocationLoggingProhibitted(S3ClientError): | ||||
|     code = 403 | ||||
| 
 | ||||
|     def __init__(self): | ||||
|         super(CrossLocationLoggingProhibitted, self).__init__( | ||||
|             "CrossLocationLoggingProhibitted", | ||||
|             "Cross S3 location logging not allowed." | ||||
|         ) | ||||
|  | ||||
| @ -347,6 +347,7 @@ class FakeBucket(BaseModel): | ||||
|         self.acl = get_canned_acl('private') | ||||
|         self.tags = FakeTagging() | ||||
|         self.cors = [] | ||||
|         self.logging = {} | ||||
| 
 | ||||
|     @property | ||||
|     def location(self): | ||||
| @ -422,6 +423,40 @@ class FakeBucket(BaseModel): | ||||
|     def tagging(self): | ||||
|         return self.tags | ||||
| 
 | ||||
|     def set_logging(self, logging_config, bucket_backend): | ||||
|         if not logging_config: | ||||
|             self.logging = {} | ||||
|         else: | ||||
|             from moto.s3.exceptions import InvalidTargetBucketForLogging, CrossLocationLoggingProhibitted | ||||
|             # Target bucket must exist in the same account (assuming all moto buckets are in the same account): | ||||
|             if not bucket_backend.buckets.get(logging_config["TargetBucket"]): | ||||
|                 raise InvalidTargetBucketForLogging("The target bucket for logging does not exist.") | ||||
| 
 | ||||
|             # Does the target bucket have the log-delivery WRITE and READ_ACP permissions? | ||||
|             write = read_acp = False | ||||
|             for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants: | ||||
|                 # Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery | ||||
|                 for grantee in grant.grantees: | ||||
|                     if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery": | ||||
|                         if "WRITE" in grant.permissions or "FULL_CONTROL" in grant.permissions: | ||||
|                             write = True | ||||
| 
 | ||||
|                         if "READ_ACP" in grant.permissions or "FULL_CONTROL" in grant.permissions: | ||||
|                             read_acp = True | ||||
| 
 | ||||
|                         break | ||||
| 
 | ||||
|             if not write or not read_acp: | ||||
|                 raise InvalidTargetBucketForLogging("You must give the log-delivery group WRITE and READ_ACP" | ||||
|                                                     " permissions to the target bucket") | ||||
| 
 | ||||
|             # Buckets must also exist within the same region: | ||||
|             if bucket_backend.buckets[logging_config["TargetBucket"]].region_name != self.region_name: | ||||
|                 raise CrossLocationLoggingProhibitted() | ||||
| 
 | ||||
|             # Checks pass -- set the logging config: | ||||
|             self.logging = logging_config | ||||
| 
 | ||||
|     def set_website_configuration(self, website_configuration): | ||||
|         self.website_configuration = website_configuration | ||||
| 
 | ||||
| @ -608,6 +643,10 @@ class S3Backend(BaseBackend): | ||||
|         bucket = self.get_bucket(bucket_name) | ||||
|         bucket.set_cors(cors_rules) | ||||
| 
 | ||||
|     def put_bucket_logging(self, bucket_name, logging_config): | ||||
|         bucket = self.get_bucket(bucket_name) | ||||
|         bucket.set_logging(logging_config, self) | ||||
| 
 | ||||
|     def delete_bucket_cors(self, bucket_name): | ||||
|         bucket = self.get_bucket(bucket_name) | ||||
|         bucket.delete_cors() | ||||
|  | ||||
| @ -11,11 +11,13 @@ import xmltodict | ||||
| from moto.packages.httpretty.core import HTTPrettyRequest | ||||
| from moto.core.responses import _TemplateEnvironmentMixin | ||||
| 
 | ||||
| from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys | ||||
| from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, \ | ||||
|     parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys | ||||
| 
 | ||||
| 
 | ||||
| from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder | ||||
| from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, FakeTag | ||||
| from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder, MalformedXML, \ | ||||
|     MalformedACLError | ||||
| from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \ | ||||
|     FakeTag | ||||
| from .utils import bucket_name_from_url, metadata_from_headers | ||||
| from xml.dom import minidom | ||||
| 
 | ||||
| @ -70,7 +72,8 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
| 
 | ||||
|         match = re.match(r'^\[(.+)\](:\d+)?$', host) | ||||
|         if match: | ||||
|             match = re.match(r'^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z', | ||||
|             match = re.match( | ||||
|                 r'^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z', | ||||
|                 match.groups()[0], re.IGNORECASE) | ||||
|             if match: | ||||
|                 return False | ||||
| @ -229,6 +232,13 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
|                 return 404, {}, template.render(bucket_name=bucket_name) | ||||
|             template = self.response_template(S3_BUCKET_TAGGING_RESPONSE) | ||||
|             return template.render(bucket=bucket) | ||||
|         elif 'logging' in querystring: | ||||
|             bucket = self.backend.get_bucket(bucket_name) | ||||
|             if not bucket.logging: | ||||
|                 template = self.response_template(S3_NO_LOGGING_CONFIG) | ||||
|                 return 200, {}, template.render() | ||||
|             template = self.response_template(S3_LOGGING_CONFIG) | ||||
|             return 200, {}, template.render(logging=bucket.logging) | ||||
|         elif "cors" in querystring: | ||||
|             bucket = self.backend.get_bucket(bucket_name) | ||||
|             if len(bucket.cors) == 0: | ||||
| @ -324,8 +334,7 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
|             limit = continuation_token or start_after | ||||
|             result_keys = self._get_results_from_token(result_keys, limit) | ||||
| 
 | ||||
|         result_keys, is_truncated, \ | ||||
|             next_continuation_token = self._truncate_result(result_keys, max_keys) | ||||
|         result_keys, is_truncated, next_continuation_token = self._truncate_result(result_keys, max_keys) | ||||
| 
 | ||||
|         return template.render( | ||||
|             bucket=bucket, | ||||
| @ -380,8 +389,11 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
|             self.backend.set_bucket_policy(bucket_name, body) | ||||
|             return 'True' | ||||
|         elif 'acl' in querystring: | ||||
|             # TODO: Support the XML-based ACL format | ||||
|             self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers)) | ||||
|             # Headers are first. If not set, then look at the body (consistent with the documentation): | ||||
|             acls = self._acl_from_headers(request.headers) | ||||
|             if not acls: | ||||
|                 acls = self._acl_from_xml(body) | ||||
|             self.backend.set_bucket_acl(bucket_name, acls) | ||||
|             return "" | ||||
|         elif "tagging" in querystring: | ||||
|             tagging = self._bucket_tagging_from_xml(body) | ||||
| @ -391,12 +403,18 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
|             self.backend.set_bucket_website_configuration(bucket_name, body) | ||||
|             return "" | ||||
|         elif "cors" in querystring: | ||||
|             from moto.s3.exceptions import MalformedXML | ||||
|             try: | ||||
|                 self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body)) | ||||
|                 return "" | ||||
|             except KeyError: | ||||
|                 raise MalformedXML() | ||||
|         elif "logging" in querystring: | ||||
|             try: | ||||
|                 self.backend.put_bucket_logging(bucket_name, self._logging_from_xml(body)) | ||||
|                 return "" | ||||
|             except KeyError: | ||||
|                 raise MalformedXML() | ||||
| 
 | ||||
|         else: | ||||
|             if body: | ||||
|                 try: | ||||
| @ -515,6 +533,7 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
| 
 | ||||
|         def toint(i): | ||||
|             return int(i) if i else None | ||||
| 
 | ||||
|         begin, end = map(toint, rspec.split('-')) | ||||
|         if begin is not None:  # byte range | ||||
|             end = last if end is None else min(end, last) | ||||
| @ -731,6 +750,58 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
|         else: | ||||
|             return 404, response_headers, "" | ||||
| 
 | ||||
|     def _acl_from_xml(self, xml): | ||||
|         parsed_xml = xmltodict.parse(xml) | ||||
|         if not parsed_xml.get("AccessControlPolicy"): | ||||
|             raise MalformedACLError() | ||||
| 
 | ||||
|         # The owner is needed for some reason... | ||||
|         if not parsed_xml["AccessControlPolicy"].get("Owner"): | ||||
|             # TODO: Validate that the Owner is actually correct. | ||||
|             raise MalformedACLError() | ||||
| 
 | ||||
|         # If empty, then no ACLs: | ||||
|         if parsed_xml["AccessControlPolicy"].get("AccessControlList") is None: | ||||
|             return [] | ||||
| 
 | ||||
|         if not parsed_xml["AccessControlPolicy"]["AccessControlList"].get("Grant"): | ||||
|             raise MalformedACLError() | ||||
| 
 | ||||
|         permissions = [ | ||||
|             "READ", | ||||
|             "WRITE", | ||||
|             "READ_ACP", | ||||
|             "WRITE_ACP", | ||||
|             "FULL_CONTROL" | ||||
|         ] | ||||
| 
 | ||||
|         if not isinstance(parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"], list): | ||||
|             parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"] = \ | ||||
|                 [parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"]] | ||||
| 
 | ||||
|         grants = self._get_grants_from_xml(parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"], | ||||
|                                            MalformedACLError, permissions) | ||||
|         return FakeAcl(grants) | ||||
| 
 | ||||
|     def _get_grants_from_xml(self, grant_list, exception_type, permissions): | ||||
|         grants = [] | ||||
|         for grant in grant_list: | ||||
|             if grant.get("Permission", "") not in permissions: | ||||
|                 raise exception_type() | ||||
| 
 | ||||
|             if grant["Grantee"].get("@xsi:type", "") not in ["CanonicalUser", "AmazonCustomerByEmail", "Group"]: | ||||
|                 raise exception_type() | ||||
| 
 | ||||
|             # TODO: Verify that the proper grantee data is supplied based on the type. | ||||
| 
 | ||||
|             grants.append(FakeGrant( | ||||
|                 [FakeGrantee(id=grant["Grantee"].get("ID", ""), display_name=grant["Grantee"].get("DisplayName", ""), | ||||
|                              uri=grant["Grantee"].get("URI", ""))], | ||||
|                 [grant["Permission"]]) | ||||
|             ) | ||||
| 
 | ||||
|         return grants | ||||
| 
 | ||||
|     def _acl_from_headers(self, headers): | ||||
|         canned_acl = headers.get('x-amz-acl', '') | ||||
|         if canned_acl: | ||||
| @ -814,6 +885,42 @@ class ResponseObject(_TemplateEnvironmentMixin): | ||||
| 
 | ||||
|         return [parsed_xml["CORSConfiguration"]["CORSRule"]] | ||||
| 
 | ||||
|     def _logging_from_xml(self, xml): | ||||
|         parsed_xml = xmltodict.parse(xml) | ||||
| 
 | ||||
|         if not parsed_xml["BucketLoggingStatus"].get("LoggingEnabled"): | ||||
|             return {} | ||||
| 
 | ||||
|         if not parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetBucket"): | ||||
|             raise MalformedXML() | ||||
| 
 | ||||
|         if not parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetPrefix"): | ||||
|             parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetPrefix"] = "" | ||||
| 
 | ||||
|         # Get the ACLs: | ||||
|         if parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetGrants"): | ||||
|             permissions = [ | ||||
|                 "READ", | ||||
|                 "WRITE", | ||||
|                 "FULL_CONTROL" | ||||
|             ] | ||||
|             if not isinstance(parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"], list): | ||||
|                 target_grants = self._get_grants_from_xml( | ||||
|                     [parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"]], | ||||
|                     MalformedXML, | ||||
|                     permissions | ||||
|                 ) | ||||
|             else: | ||||
|                 target_grants = self._get_grants_from_xml( | ||||
|                     parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"], | ||||
|                     MalformedXML, | ||||
|                     permissions | ||||
|                 ) | ||||
| 
 | ||||
|             parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"] = target_grants | ||||
| 
 | ||||
|         return parsed_xml["BucketLoggingStatus"]["LoggingEnabled"] | ||||
| 
 | ||||
|     def _key_response_delete(self, bucket_name, query, key_name, headers): | ||||
|         if query.get('uploadId'): | ||||
|             upload_id = query['uploadId'][0] | ||||
| @ -1322,3 +1429,37 @@ S3_NO_CORS_CONFIG = """<?xml version="1.0" encoding="UTF-8"?> | ||||
|   <HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId> | ||||
| </Error> | ||||
| """ | ||||
| 
 | ||||
| S3_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?> | ||||
| <BucketLoggingStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01"> | ||||
|   <LoggingEnabled> | ||||
|     <TargetBucket>{{ logging["TargetBucket"] }}</TargetBucket> | ||||
|     <TargetPrefix>{{ logging["TargetPrefix"] }}</TargetPrefix> | ||||
|     {% if logging.get("TargetGrants") %} | ||||
|     <TargetGrants> | ||||
|       {% for grant in logging["TargetGrants"] %} | ||||
|       <Grant> | ||||
|         <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||||
|                  xsi:type="{{ grant.grantees[0].type }}"> | ||||
|           {% if grant.grantees[0].uri %} | ||||
|           <URI>{{ grant.grantees[0].uri }}</URI> | ||||
|           {% endif %} | ||||
|           {% if grant.grantees[0].id %} | ||||
|           <ID>{{ grant.grantees[0].id }}</ID> | ||||
|           {% endif %} | ||||
|           {% if grant.grantees[0].display_name %} | ||||
|           <DisplayName>{{ grant.grantees[0].display_name }}</DisplayName> | ||||
|           {% endif %} | ||||
|         </Grantee> | ||||
|         <Permission>{{ grant.permissions[0] }}</Permission> | ||||
|       </Grant> | ||||
|       {% endfor %} | ||||
|     </TargetGrants> | ||||
|     {% endif %} | ||||
|   </LoggingEnabled> | ||||
| </BucketLoggingStatus> | ||||
| """ | ||||
| 
 | ||||
| S3_NO_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?> | ||||
| <BucketLoggingStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01" /> | ||||
| """ | ||||
|  | ||||
| @ -50,6 +50,7 @@ def reduced_min_part_size(f): | ||||
|             return f(*args, **kwargs) | ||||
|         finally: | ||||
|             s3model.UPLOAD_PART_MIN_SIZE = orig_size | ||||
| 
 | ||||
|     return wrapped | ||||
| 
 | ||||
| 
 | ||||
| @ -888,6 +889,7 @@ def test_s3_object_in_public_bucket(): | ||||
|     response = requests.get(presigned_url) | ||||
|     assert response.status_code == 200 | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3 | ||||
| def test_s3_object_in_private_bucket(): | ||||
|     s3 = boto3.resource('s3') | ||||
| @ -1102,6 +1104,7 @@ def test_boto3_key_etag(): | ||||
|     resp = s3.get_object(Bucket='mybucket', Key='steve') | ||||
|     resp['ETag'].should.equal('"d32bda93738f7e03adb22e66c90fbc04"') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3 | ||||
| def test_website_redirect_location(): | ||||
|     s3 = boto3.client('s3', region_name='us-east-1') | ||||
| @ -1116,6 +1119,7 @@ def test_website_redirect_location(): | ||||
|     resp = s3.get_object(Bucket='mybucket', Key='steve') | ||||
|     resp['WebsiteRedirectLocation'].should.equal(url) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3 | ||||
| def test_boto3_list_keys_xml_escaped(): | ||||
|     s3 = boto3.client('s3', region_name='us-east-1') | ||||
| @ -1732,6 +1736,249 @@ def test_boto3_delete_bucket_cors(): | ||||
|     e.response["Error"]["Message"].should.equal("The CORS configuration does not exist") | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3 | ||||
| def test_put_bucket_acl_body(): | ||||
|     s3 = boto3.client("s3", region_name="us-east-1") | ||||
|     s3.create_bucket(Bucket="bucket") | ||||
|     bucket_owner = s3.get_bucket_acl(Bucket="bucket")["Owner"] | ||||
|     s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={ | ||||
|         "Grants": [ | ||||
|             { | ||||
|                 "Grantee": { | ||||
|                     "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                     "Type": "Group" | ||||
|                 }, | ||||
|                 "Permission": "WRITE" | ||||
|             }, | ||||
|             { | ||||
|                 "Grantee": { | ||||
|                     "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                     "Type": "Group" | ||||
|                 }, | ||||
|                 "Permission": "READ_ACP" | ||||
|             } | ||||
|         ], | ||||
|         "Owner": bucket_owner | ||||
|     }) | ||||
| 
 | ||||
|     result = s3.get_bucket_acl(Bucket="bucket") | ||||
|     assert len(result["Grants"]) == 2 | ||||
|     for g in result["Grants"]: | ||||
|         assert g["Grantee"]["URI"] == "http://acs.amazonaws.com/groups/s3/LogDelivery" | ||||
|         assert g["Grantee"]["Type"] == "Group" | ||||
|         assert g["Permission"] in ["WRITE", "READ_ACP"] | ||||
| 
 | ||||
|     # With one: | ||||
|     s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={ | ||||
|         "Grants": [ | ||||
|             { | ||||
|                 "Grantee": { | ||||
|                     "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                     "Type": "Group" | ||||
|                 }, | ||||
|                 "Permission": "WRITE" | ||||
|             } | ||||
|         ], | ||||
|         "Owner": bucket_owner | ||||
|     }) | ||||
|     result = s3.get_bucket_acl(Bucket="bucket") | ||||
|     assert len(result["Grants"]) == 1 | ||||
| 
 | ||||
|     # With no owner: | ||||
|     with assert_raises(ClientError) as err: | ||||
|         s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={ | ||||
|             "Grants": [ | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                         "Type": "Group" | ||||
|                     }, | ||||
|                     "Permission": "WRITE" | ||||
|                 } | ||||
|             ] | ||||
|         }) | ||||
|     assert err.exception.response["Error"]["Code"] == "MalformedACLError" | ||||
| 
 | ||||
|     # With incorrect permission: | ||||
|     with assert_raises(ClientError) as err: | ||||
|         s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={ | ||||
|             "Grants": [ | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                         "Type": "Group" | ||||
|                     }, | ||||
|                     "Permission": "lskjflkasdjflkdsjfalisdjflkdsjf" | ||||
|                 } | ||||
|             ], | ||||
|             "Owner": bucket_owner | ||||
|         }) | ||||
|     assert err.exception.response["Error"]["Code"] == "MalformedACLError" | ||||
| 
 | ||||
|     # Clear the ACLs: | ||||
|     result = s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={"Grants": [], "Owner": bucket_owner}) | ||||
|     assert not result.get("Grants") | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3 | ||||
| def test_boto3_put_bucket_logging(): | ||||
|     s3 = boto3.client("s3", region_name="us-east-1") | ||||
|     bucket_name = "mybucket" | ||||
|     log_bucket = "logbucket" | ||||
|     wrong_region_bucket = "wrongregionlogbucket" | ||||
|     s3.create_bucket(Bucket=bucket_name) | ||||
|     s3.create_bucket(Bucket=log_bucket)  # Adding the ACL for log-delivery later... | ||||
|     s3.create_bucket(Bucket=wrong_region_bucket, CreateBucketConfiguration={"LocationConstraint": "us-west-2"}) | ||||
| 
 | ||||
|     # No logging config: | ||||
|     result = s3.get_bucket_logging(Bucket=bucket_name) | ||||
|     assert not result.get("LoggingEnabled") | ||||
| 
 | ||||
|     # A log-bucket that doesn't exist: | ||||
|     with assert_raises(ClientError) as err: | ||||
|         s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|             "LoggingEnabled": { | ||||
|                 "TargetBucket": "IAMNOTREAL", | ||||
|                 "TargetPrefix": "" | ||||
|             } | ||||
|         }) | ||||
|     assert err.exception.response["Error"]["Code"] == "InvalidTargetBucketForLogging" | ||||
| 
 | ||||
|     # A log-bucket that's missing the proper ACLs for LogDelivery: | ||||
|     with assert_raises(ClientError) as err: | ||||
|         s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|             "LoggingEnabled": { | ||||
|                 "TargetBucket": log_bucket, | ||||
|                 "TargetPrefix": "" | ||||
|             } | ||||
|         }) | ||||
|     assert err.exception.response["Error"]["Code"] == "InvalidTargetBucketForLogging" | ||||
|     assert "log-delivery" in err.exception.response["Error"]["Message"] | ||||
| 
 | ||||
|     # Add the proper "log-delivery" ACL to the log buckets: | ||||
|     bucket_owner = s3.get_bucket_acl(Bucket=log_bucket)["Owner"] | ||||
|     for bucket in [log_bucket, wrong_region_bucket]: | ||||
|         s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy={ | ||||
|             "Grants": [ | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                         "Type": "Group" | ||||
|                     }, | ||||
|                     "Permission": "WRITE" | ||||
|                 }, | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", | ||||
|                         "Type": "Group" | ||||
|                     }, | ||||
|                     "Permission": "READ_ACP" | ||||
|                 }, | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "Type": "CanonicalUser", | ||||
|                         "ID": bucket_owner["ID"] | ||||
|                     }, | ||||
|                     "Permission": "FULL_CONTROL" | ||||
|                 } | ||||
|             ], | ||||
|             "Owner": bucket_owner | ||||
|         }) | ||||
| 
 | ||||
|     # A log-bucket that's in the wrong region: | ||||
|     with assert_raises(ClientError) as err: | ||||
|         s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|             "LoggingEnabled": { | ||||
|                 "TargetBucket": wrong_region_bucket, | ||||
|                 "TargetPrefix": "" | ||||
|             } | ||||
|         }) | ||||
|     assert err.exception.response["Error"]["Code"] == "CrossLocationLoggingProhibitted" | ||||
| 
 | ||||
|     # Correct logging: | ||||
|     s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|         "LoggingEnabled": { | ||||
|             "TargetBucket": log_bucket, | ||||
|             "TargetPrefix": "{}/".format(bucket_name) | ||||
|         } | ||||
|     }) | ||||
|     result = s3.get_bucket_logging(Bucket=bucket_name) | ||||
|     assert result["LoggingEnabled"]["TargetBucket"] == log_bucket | ||||
|     assert result["LoggingEnabled"]["TargetPrefix"] == "{}/".format(bucket_name) | ||||
|     assert not result["LoggingEnabled"].get("TargetGrants") | ||||
| 
 | ||||
|     # And disabling: | ||||
|     s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={}) | ||||
|     assert not s3.get_bucket_logging(Bucket=bucket_name).get("LoggingEnabled") | ||||
| 
 | ||||
|     # And enabling with multiple target grants: | ||||
|     s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|         "LoggingEnabled": { | ||||
|             "TargetBucket": log_bucket, | ||||
|             "TargetPrefix": "{}/".format(bucket_name), | ||||
|             "TargetGrants": [ | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274", | ||||
|                         "Type": "CanonicalUser" | ||||
|                     }, | ||||
|                     "Permission": "READ" | ||||
|                 }, | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274", | ||||
|                         "Type": "CanonicalUser" | ||||
|                     }, | ||||
|                     "Permission": "WRITE" | ||||
|                 } | ||||
|             ] | ||||
|         } | ||||
|     }) | ||||
| 
 | ||||
|     result = s3.get_bucket_logging(Bucket=bucket_name) | ||||
|     assert len(result["LoggingEnabled"]["TargetGrants"]) == 2 | ||||
|     assert result["LoggingEnabled"]["TargetGrants"][0]["Grantee"]["ID"] == \ | ||||
|         "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274" | ||||
| 
 | ||||
|     # Test with just 1 grant: | ||||
|     s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|         "LoggingEnabled": { | ||||
|             "TargetBucket": log_bucket, | ||||
|             "TargetPrefix": "{}/".format(bucket_name), | ||||
|             "TargetGrants": [ | ||||
|                 { | ||||
|                     "Grantee": { | ||||
|                         "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274", | ||||
|                         "Type": "CanonicalUser" | ||||
|                     }, | ||||
|                     "Permission": "READ" | ||||
|                 } | ||||
|             ] | ||||
|         } | ||||
|     }) | ||||
|     result = s3.get_bucket_logging(Bucket=bucket_name) | ||||
|     assert len(result["LoggingEnabled"]["TargetGrants"]) == 1 | ||||
| 
 | ||||
|     # With an invalid grant: | ||||
|     with assert_raises(ClientError) as err: | ||||
|         s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={ | ||||
|             "LoggingEnabled": { | ||||
|                 "TargetBucket": log_bucket, | ||||
|                 "TargetPrefix": "{}/".format(bucket_name), | ||||
|                 "TargetGrants": [ | ||||
|                     { | ||||
|                         "Grantee": { | ||||
|                             "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274", | ||||
|                             "Type": "CanonicalUser" | ||||
|                         }, | ||||
|                         "Permission": "NOTAREALPERM" | ||||
|                     } | ||||
|                 ] | ||||
|             } | ||||
|         }) | ||||
|     assert err.exception.response["Error"]["Code"] == "MalformedXML" | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3 | ||||
| def test_boto3_put_object_tagging(): | ||||
|     s3 = boto3.client('s3', region_name='us-east-1') | ||||
| @ -1943,7 +2190,6 @@ def test_get_stream_gzipped(): | ||||
|     assert res == payload | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| TEST_XML = """\ | ||||
| <?xml version="1.0" encoding="UTF-8"?> | ||||
| <ns0:WebsiteConfiguration xmlns:ns0="http://s3.amazonaws.com/doc/2006-03-01/"> | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user