support bucket names in url paths in s3bucket_path
This commit is contained in:
		
							parent
							
								
									59858dd685
								
							
						
					
					
						commit
						5a475881d2
					
				| @ -7,6 +7,7 @@ from .ec2 import mock_ec2 | ||||
| from .elb import mock_elb | ||||
| from .emr import mock_emr | ||||
| from .s3 import mock_s3 | ||||
| from .s3bucket_path import mock_s3bucket_path | ||||
| from .ses import mock_ses | ||||
| from .sqs import mock_sqs | ||||
| from .sts import mock_sts | ||||
|  | ||||
| @ -4,6 +4,7 @@ from moto.ec2 import ec2_backend | ||||
| from moto.elb import elb_backend | ||||
| from moto.emr import emr_backend | ||||
| from moto.s3 import s3_backend | ||||
| from moto.s3bucket_path import s3bucket_path_backend | ||||
| from moto.ses import ses_backend | ||||
| from moto.sqs import sqs_backend | ||||
| from moto.sts import sts_backend | ||||
| @ -15,6 +16,7 @@ BACKENDS = { | ||||
|     'elb': elb_backend, | ||||
|     'emr': emr_backend, | ||||
|     's3': s3_backend, | ||||
|     's3bucket_path': s3bucket_path_backend, | ||||
|     'ses': ses_backend, | ||||
|     'sqs': sqs_backend, | ||||
|     'sts': sts_backend, | ||||
|  | ||||
| @ -9,6 +9,7 @@ from .utils import convert_regex_to_flask_path | ||||
| class MockAWS(object): | ||||
|     def __init__(self, backend): | ||||
|         self.backend = backend | ||||
|         HTTPretty.reset() | ||||
| 
 | ||||
|     def __call__(self, func): | ||||
|         return self.decorate_callable(func) | ||||
|  | ||||
| @ -7,15 +7,24 @@ from .models import s3_backend | ||||
| from .utils import bucket_name_from_url | ||||
| 
 | ||||
| 
 | ||||
| def all_buckets(): | ||||
| def parse_key_name(pth): | ||||
|     return pth.lstrip("/") | ||||
| 
 | ||||
| 
 | ||||
| class ResponseObject(object): | ||||
|     def __init__(self, backend, bucket_name_from_url, parse_key_name): | ||||
|         self.backend = backend | ||||
|         self.bucket_name_from_url = bucket_name_from_url | ||||
|         self.parse_key_name = parse_key_name | ||||
| 
 | ||||
|     def all_buckets(self): | ||||
|         # No bucket specified. Listing all buckets | ||||
|     all_buckets = s3_backend.get_all_buckets() | ||||
|         all_buckets = self.backend.get_all_buckets() | ||||
|         template = Template(S3_ALL_BUCKETS) | ||||
|         return template.render(buckets=all_buckets) | ||||
| 
 | ||||
| 
 | ||||
| def bucket_response(request, full_url, headers): | ||||
|     response = _bucket_response(request, full_url, headers) | ||||
|     def bucket_response(self, request, full_url, headers): | ||||
|         response = self._bucket_response(request, full_url, headers) | ||||
|         if isinstance(response, basestring): | ||||
|             return 200, headers, response | ||||
| 
 | ||||
| @ -23,23 +32,22 @@ def bucket_response(request, full_url, headers): | ||||
|             status_code, headers, response_content = response | ||||
|             return status_code, headers, response_content | ||||
| 
 | ||||
| 
 | ||||
| def _bucket_response(request, full_url, headers): | ||||
|     def _bucket_response(self, request, full_url, headers): | ||||
|         parsed_url = urlparse(full_url) | ||||
|         querystring = parse_qs(parsed_url.query) | ||||
|         method = request.method | ||||
| 
 | ||||
|     bucket_name = bucket_name_from_url(full_url) | ||||
|         bucket_name = self.bucket_name_from_url(full_url) | ||||
|         if not bucket_name: | ||||
|             # If no bucket specified, list all buckets | ||||
|         return all_buckets() | ||||
|             return self.all_buckets() | ||||
| 
 | ||||
|         if method == 'GET': | ||||
|         bucket = s3_backend.get_bucket(bucket_name) | ||||
|             bucket = self.backend.get_bucket(bucket_name) | ||||
|             if bucket: | ||||
|                 prefix = querystring.get('prefix', [None])[0] | ||||
|                 delimiter = querystring.get('delimiter', [None])[0] | ||||
|             result_keys, result_folders = s3_backend.prefix_query(bucket, prefix, delimiter) | ||||
|                 result_keys, result_folders = self.backend.prefix_query(bucket, prefix, delimiter) | ||||
|                 template = Template(S3_BUCKET_GET_RESPONSE) | ||||
|                 return template.render( | ||||
|                     bucket=bucket, | ||||
| @ -51,11 +59,11 @@ def _bucket_response(request, full_url, headers): | ||||
|             else: | ||||
|                 return 404, headers, "" | ||||
|         elif method == 'PUT': | ||||
|         new_bucket = s3_backend.create_bucket(bucket_name) | ||||
|             new_bucket = self.backend.create_bucket(bucket_name) | ||||
|             template = Template(S3_BUCKET_CREATE_RESPONSE) | ||||
|             return template.render(bucket=new_bucket) | ||||
|         elif method == 'DELETE': | ||||
|         removed_bucket = s3_backend.delete_bucket(bucket_name) | ||||
|             removed_bucket = self.backend.delete_bucket(bucket_name) | ||||
|             if removed_bucket is None: | ||||
|                 # Non-existant bucket | ||||
|                 template = Template(S3_DELETE_NON_EXISTING_BUCKET) | ||||
| @ -83,7 +91,7 @@ def _bucket_response(request, full_url, headers): | ||||
|             key = form['key'] | ||||
|             f = form['file'] | ||||
| 
 | ||||
|         new_key = s3_backend.set_key(bucket_name, key, f) | ||||
|             new_key = self.backend.set_key(bucket_name, key, f) | ||||
| 
 | ||||
|             #Metadata | ||||
|             meta_regex = re.compile('^x-amz-meta-([a-zA-Z0-9\-_]+)$', flags=re.IGNORECASE) | ||||
| @ -97,22 +105,22 @@ def _bucket_response(request, full_url, headers): | ||||
|         else: | ||||
|             raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method)) | ||||
| 
 | ||||
| 
 | ||||
| def key_response(request, full_url, headers): | ||||
|     response = _key_response(request, full_url, headers) | ||||
|     def key_response(self, request, full_url, headers): | ||||
|         response = self._key_response(request, full_url, headers) | ||||
|         if isinstance(response, basestring): | ||||
|             return 200, headers, response | ||||
|         else: | ||||
|             status_code, headers, response_content = response | ||||
|             return status_code, headers, response_content | ||||
| 
 | ||||
| 
 | ||||
| def _key_response(request, full_url, headers): | ||||
|     def _key_response(self, request, full_url, headers): | ||||
|         parsed_url = urlparse(full_url) | ||||
|         method = request.method | ||||
| 
 | ||||
|     key_name = parsed_url.path.lstrip('/') | ||||
|     bucket_name = bucket_name_from_url(full_url) | ||||
|         key_name = self.parse_key_name(parsed_url.path) | ||||
| 
 | ||||
|         bucket_name = self.bucket_name_from_url(full_url) | ||||
| 
 | ||||
|         if hasattr(request, 'body'): | ||||
|             # Boto | ||||
|             body = request.body | ||||
| @ -121,7 +129,7 @@ def _key_response(request, full_url, headers): | ||||
|             body = request.data | ||||
| 
 | ||||
|         if method == 'GET': | ||||
|         key = s3_backend.get_key(bucket_name, key_name) | ||||
|             key = self.backend.get_key(bucket_name, key_name) | ||||
|             if key: | ||||
|                 headers.update(key.metadata) | ||||
|                 return 200, headers, key.value | ||||
| @ -130,21 +138,21 @@ def _key_response(request, full_url, headers): | ||||
|         if method == 'PUT': | ||||
|             if 'x-amz-copy-source' in request.headers: | ||||
|                 # Copy key | ||||
|             src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/",1) | ||||
|             s3_backend.copy_key(src_bucket, src_key, bucket_name, key_name) | ||||
|                 src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1) | ||||
|                 self.backend.copy_key(src_bucket, src_key, bucket_name, key_name) | ||||
|                 template = Template(S3_OBJECT_COPY_RESPONSE) | ||||
|                 return template.render(key=src_key) | ||||
|             streaming_request = hasattr(request, 'streaming') and request.streaming | ||||
|             closing_connection = headers.get('connection') == 'close' | ||||
|             if closing_connection and streaming_request: | ||||
|                 # Closing the connection of a streaming request. No more data | ||||
|             new_key = s3_backend.get_key(bucket_name, key_name) | ||||
|                 new_key = self.backend.get_key(bucket_name, key_name) | ||||
|             elif streaming_request: | ||||
|                 # Streaming request, more data | ||||
|             new_key = s3_backend.append_to_key(bucket_name, key_name, body) | ||||
|                 new_key = self.backend.append_to_key(bucket_name, key_name, body) | ||||
|             else: | ||||
|                 # Initial data | ||||
|             new_key = s3_backend.set_key(bucket_name, key_name, body) | ||||
|                 new_key = self.backend.set_key(bucket_name, key_name, body) | ||||
|                 request.streaming = True | ||||
| 
 | ||||
|                 #Metadata | ||||
| @ -160,20 +168,21 @@ def _key_response(request, full_url, headers): | ||||
|             headers.update(new_key.response_dict) | ||||
|             return 200, headers, template.render(key=new_key) | ||||
|         elif method == 'HEAD': | ||||
|         key = s3_backend.get_key(bucket_name, key_name) | ||||
|             key = self.backend.get_key(bucket_name, key_name) | ||||
|             if key: | ||||
|                 headers.update(key.metadata) | ||||
|                 headers.update(key.response_dict) | ||||
|             return 200, headers, key.value | ||||
|                 return 200, headers, "" | ||||
|             else: | ||||
|                 return 404, headers, "" | ||||
|         elif method == 'DELETE': | ||||
|         removed_key = s3_backend.delete_key(bucket_name, key_name) | ||||
|             removed_key = self.backend.delete_key(bucket_name, key_name) | ||||
|             template = Template(S3_DELETE_OBJECT_SUCCESS) | ||||
|             return 204, headers, template.render(bucket=removed_key) | ||||
|         else: | ||||
|             raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method)) | ||||
| 
 | ||||
| S3ResponseInstance = ResponseObject(s3_backend, bucket_name_from_url, parse_key_name) | ||||
| 
 | ||||
| S3_ALL_BUCKETS = """<ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01"> | ||||
|   <Owner> | ||||
|  | ||||
| @ -1,10 +1,10 @@ | ||||
| from .responses import bucket_response, key_response | ||||
| from .responses import S3ResponseInstance | ||||
| 
 | ||||
| url_bases = [ | ||||
|     "https?://(?P<bucket_name>[a-zA-Z0-9\-_.]*)\.?s3.amazonaws.com" | ||||
| ] | ||||
| 
 | ||||
| url_paths = { | ||||
|     '{0}/$': bucket_response, | ||||
|     '{0}/(?P<key_name>[a-zA-Z0-9\-_.]+)': key_response, | ||||
|     '{0}/$': S3ResponseInstance.bucket_response, | ||||
|     '{0}/(?P<key_name>[a-zA-Z0-9\-_.]+)': S3ResponseInstance.key_response, | ||||
| } | ||||
|  | ||||
							
								
								
									
										2
									
								
								moto/s3bucket_path/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								moto/s3bucket_path/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,2 @@ | ||||
| from .models import s3bucket_path_backend | ||||
| mock_s3bucket_path = s3bucket_path_backend.decorator | ||||
							
								
								
									
										7
									
								
								moto/s3bucket_path/models.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								moto/s3bucket_path/models.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,7 @@ | ||||
| from moto.s3.models import S3Backend | ||||
| 
 | ||||
| 
 | ||||
| class S3BucketPathBackend(S3Backend): | ||||
|     True | ||||
| 
 | ||||
| s3bucket_path_backend = S3BucketPathBackend() | ||||
							
								
								
									
										15
									
								
								moto/s3bucket_path/responses.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								moto/s3bucket_path/responses.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,15 @@ | ||||
| from .models import s3bucket_path_backend | ||||
| 
 | ||||
| from .utils import bucket_name_from_url | ||||
| 
 | ||||
| from moto.s3.responses import ResponseObject | ||||
| 
 | ||||
| 
 | ||||
| def parse_key_name(pth): | ||||
|     return "/".join(pth.rstrip("/").split("/")[2:]) | ||||
| 
 | ||||
| S3BucketPathResponseInstance = ResponseObject( | ||||
|     s3bucket_path_backend, | ||||
|     bucket_name_from_url, | ||||
|     parse_key_name, | ||||
| ) | ||||
							
								
								
									
										20
									
								
								moto/s3bucket_path/urls.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								moto/s3bucket_path/urls.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,20 @@ | ||||
| from .responses import S3BucketPathResponseInstance as ro | ||||
| 
 | ||||
| url_bases = [ | ||||
|     "https?://s3.amazonaws.com" | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| def bucket_response2(*args): | ||||
|     return ro.bucket_response(*args) | ||||
| 
 | ||||
| 
 | ||||
| def bucket_response3(*args): | ||||
|     return ro.bucket_response(*args) | ||||
| 
 | ||||
| url_paths = { | ||||
|     '{0}/$': bucket_response3, | ||||
|     '{0}/(?P<bucket_name>[a-zA-Z0-9\-_.]+)$': ro.bucket_response, | ||||
|     '{0}/(?P<bucket_name>[a-zA-Z0-9\-_.]+)/$': bucket_response2, | ||||
|     '{0}/(?P<bucket_name>[a-zA-Z0-9\-_./]+)/(?P<key_name>[a-zA-Z0-9\-_.?]+)': ro.key_response | ||||
| } | ||||
							
								
								
									
										10
									
								
								moto/s3bucket_path/utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								moto/s3bucket_path/utils.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,10 @@ | ||||
| import urlparse | ||||
| 
 | ||||
| 
 | ||||
| def bucket_name_from_url(url): | ||||
|     pth = urlparse.urlparse(url).path.lstrip("/") | ||||
| 
 | ||||
|     l = pth.lstrip("/").split("/") | ||||
|     if len(l) == 0 or l[0] == "": | ||||
|         return None | ||||
|     return l[0] | ||||
							
								
								
									
										50
									
								
								tests/test_s3bucket_path/test_bucket_path_server.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								tests/test_s3bucket_path/test_bucket_path_server.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,50 @@ | ||||
| import sure  # noqa | ||||
| 
 | ||||
| import moto.server as server | ||||
| 
 | ||||
| ''' | ||||
| Test the different server responses | ||||
| ''' | ||||
| server.configure_urls("s3bucket_path") | ||||
| 
 | ||||
| 
 | ||||
| def test_s3_server_get(): | ||||
|     test_client = server.app.test_client() | ||||
|     res = test_client.get('/') | ||||
| 
 | ||||
|     res.data.should.contain('ListAllMyBucketsResult') | ||||
| 
 | ||||
| 
 | ||||
| def test_s3_server_bucket_create(): | ||||
|     test_client = server.app.test_client() | ||||
|     res = test_client.put('/foobar', 'http://localhost:5000') | ||||
|     res.status_code.should.equal(200) | ||||
| 
 | ||||
|     res = test_client.get('/') | ||||
|     res.data.should.contain('<Name>foobar</Name>') | ||||
| 
 | ||||
|     res = test_client.get('/foobar', 'http://localhost:5000') | ||||
|     res.status_code.should.equal(200) | ||||
|     res.data.should.contain("ListBucketResult") | ||||
| 
 | ||||
|     res = test_client.put('/foobar/bar', 'http://localhost:5000', data='test value') | ||||
|     res.status_code.should.equal(200) | ||||
| 
 | ||||
|     res = test_client.get('/foobar/bar', 'http://localhost:5000') | ||||
|     res.status_code.should.equal(200) | ||||
|     res.data.should.equal("test value") | ||||
| 
 | ||||
| 
 | ||||
| def test_s3_server_post_to_bucket(): | ||||
|     test_client = server.app.test_client() | ||||
|     res = test_client.put('/foobar', 'http://localhost:5000/') | ||||
|     res.status_code.should.equal(200) | ||||
| 
 | ||||
|     test_client.post('/foobar', "https://localhost:5000/", data={ | ||||
|         'key': 'the-key', | ||||
|         'file': 'nothing' | ||||
|     }) | ||||
| 
 | ||||
|     res = test_client.get('/foobar/the-key', 'http://localhost:5000/') | ||||
|     res.status_code.should.equal(200) | ||||
|     res.data.should.equal("nothing") | ||||
							
								
								
									
										281
									
								
								tests/test_s3bucket_path/test_s3bucket_path.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										281
									
								
								tests/test_s3bucket_path/test_s3bucket_path.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,281 @@ | ||||
| import urllib2 | ||||
| 
 | ||||
| import boto | ||||
| from boto.exception import S3ResponseError | ||||
| from boto.s3.key import Key | ||||
| from boto.s3.connection import OrdinaryCallingFormat | ||||
| 
 | ||||
| from freezegun import freeze_time | ||||
| import requests | ||||
| 
 | ||||
| import sure  # noqa | ||||
| 
 | ||||
| from moto import mock_s3bucket_path | ||||
| 
 | ||||
| 
 | ||||
| def create_connection(key=None, secret=None): | ||||
|     return boto.connect_s3(key, secret, calling_format=OrdinaryCallingFormat()) | ||||
| 
 | ||||
| 
 | ||||
| class MyModel(object): | ||||
|     def __init__(self, name, value): | ||||
|         self.name = name | ||||
|         self.value = value | ||||
| 
 | ||||
|     def save(self): | ||||
|         conn = create_connection('the_key', 'the_secret') | ||||
|         bucket = conn.get_bucket('mybucket') | ||||
|         k = Key(bucket) | ||||
|         k.key = self.name | ||||
|         k.set_contents_from_string(self.value) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_my_model_save(): | ||||
|     # Create Bucket so that test can run | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     conn.create_bucket('mybucket') | ||||
|     #################################### | ||||
| 
 | ||||
|     model_instance = MyModel('steve', 'is awesome') | ||||
|     model_instance.save() | ||||
| 
 | ||||
|     conn.get_bucket('mybucket').get_key('steve').get_contents_as_string().should.equal('is awesome') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_missing_key(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     bucket.get_key("the-key").should.equal(None) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_missing_key_urllib2(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     conn.create_bucket("foobar") | ||||
| 
 | ||||
|     urllib2.urlopen.when.called_with("http://s3.amazonaws.com/foobar/the-key").should.throw(urllib2.HTTPError) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_empty_key(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     key = Key(bucket) | ||||
|     key.key = "the-key" | ||||
|     key.set_contents_from_string("") | ||||
| 
 | ||||
|     bucket.get_key("the-key").get_contents_as_string().should.equal('') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_empty_key_set_on_existing_key(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     key = Key(bucket) | ||||
|     key.key = "the-key" | ||||
|     key.set_contents_from_string("foobar") | ||||
| 
 | ||||
|     bucket.get_key("the-key").get_contents_as_string().should.equal('foobar') | ||||
| 
 | ||||
|     key.set_contents_from_string("") | ||||
|     bucket.get_key("the-key").get_contents_as_string().should.equal('') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_large_key_save(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     key = Key(bucket) | ||||
|     key.key = "the-key" | ||||
|     key.set_contents_from_string("foobar" * 100000) | ||||
| 
 | ||||
|     bucket.get_key("the-key").get_contents_as_string().should.equal('foobar' * 100000) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_copy_key(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     key = Key(bucket) | ||||
|     key.key = "the-key" | ||||
|     key.set_contents_from_string("some value") | ||||
| 
 | ||||
|     bucket.copy_key('new-key', 'foobar', 'the-key') | ||||
| 
 | ||||
|     bucket.get_key("the-key").get_contents_as_string().should.equal("some value") | ||||
|     bucket.get_key("new-key").get_contents_as_string().should.equal("some value") | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_set_metadata(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     key = Key(bucket) | ||||
|     key.key = 'the-key' | ||||
|     key.set_metadata('md', 'Metadatastring') | ||||
|     key.set_contents_from_string("Testval") | ||||
| 
 | ||||
|     bucket.get_key('the-key').get_metadata('md').should.equal('Metadatastring') | ||||
| 
 | ||||
| 
 | ||||
| @freeze_time("2012-01-01 12:00:00") | ||||
| @mock_s3bucket_path | ||||
| def test_last_modified(): | ||||
|     # See https://github.com/boto/boto/issues/466 | ||||
|     conn = create_connection() | ||||
|     bucket = conn.create_bucket("foobar") | ||||
|     key = Key(bucket) | ||||
|     key.key = "the-key" | ||||
|     key.set_contents_from_string("some value") | ||||
| 
 | ||||
|     rs = bucket.get_all_keys() | ||||
|     rs[0].last_modified.should.equal('2012-01-01T12:00:00Z') | ||||
| 
 | ||||
|     bucket.get_key("the-key").last_modified.should.equal('Sun, 01 Jan 2012 12:00:00 GMT') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_missing_bucket(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     conn.get_bucket.when.called_with('mybucket').should.throw(S3ResponseError) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_bucket_with_dash(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     conn.get_bucket.when.called_with('mybucket-test').should.throw(S3ResponseError) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_bucket_deletion(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
| 
 | ||||
|     key = Key(bucket) | ||||
|     key.key = "the-key" | ||||
|     key.set_contents_from_string("some value") | ||||
| 
 | ||||
|     # Try to delete a bucket that still has keys | ||||
|     conn.delete_bucket.when.called_with("foobar").should.throw(S3ResponseError) | ||||
| 
 | ||||
|     bucket.delete_key("the-key") | ||||
|     conn.delete_bucket("foobar") | ||||
| 
 | ||||
|     # Get non-existing bucket | ||||
|     conn.get_bucket.when.called_with("foobar").should.throw(S3ResponseError) | ||||
| 
 | ||||
|     # Delete non-existant bucket | ||||
|     conn.delete_bucket.when.called_with("foobar").should.throw(S3ResponseError) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_get_all_buckets(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     conn.create_bucket("foobar") | ||||
|     conn.create_bucket("foobar2") | ||||
|     buckets = conn.get_all_buckets() | ||||
| 
 | ||||
|     buckets.should.have.length_of(2) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_post_to_bucket(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
| 
 | ||||
|     requests.post("https://s3.amazonaws.com/foobar", { | ||||
|         'key': 'the-key', | ||||
|         'file': 'nothing' | ||||
|     }) | ||||
| 
 | ||||
|     bucket.get_key('the-key').get_contents_as_string().should.equal('nothing') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_post_with_metadata_to_bucket(): | ||||
|     conn = create_connection('the_key', 'the_secret') | ||||
|     bucket = conn.create_bucket("foobar") | ||||
| 
 | ||||
|     requests.post("https://s3.amazonaws.com/foobar", { | ||||
|         'key': 'the-key', | ||||
|         'file': 'nothing', | ||||
|         'x-amz-meta-test': 'metadata' | ||||
|     }) | ||||
| 
 | ||||
|     bucket.get_key('the-key').get_metadata('test').should.equal('metadata') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_bucket_method_not_implemented(): | ||||
|     requests.patch.when.called_with("https://s3.amazonaws.com/foobar").should.throw(NotImplementedError) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_key_method_not_implemented(): | ||||
|     requests.post.when.called_with("https://s3.amazonaws.com/foobar/foo").should.throw(NotImplementedError) | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_bucket_name_with_dot(): | ||||
|     conn = create_connection() | ||||
|     bucket = conn.create_bucket('firstname.lastname') | ||||
| 
 | ||||
|     k = Key(bucket, 'somekey') | ||||
|     k.set_contents_from_string('somedata') | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_key_with_special_characters(): | ||||
|     conn = create_connection() | ||||
|     bucket = conn.create_bucket('test_bucket_name') | ||||
| 
 | ||||
|     key = Key(bucket, 'test_list_keys_2/x?y') | ||||
|     key.set_contents_from_string('value1') | ||||
| 
 | ||||
|     key_list = bucket.list('test_list_keys_2/', '/') | ||||
|     keys = [x for x in key_list] | ||||
|     keys[0].name.should.equal("test_list_keys_2/x?y") | ||||
| 
 | ||||
| 
 | ||||
| @mock_s3bucket_path | ||||
| def test_bucket_key_listing_order(): | ||||
|     conn = create_connection() | ||||
|     bucket = conn.create_bucket('test_bucket') | ||||
|     prefix = 'toplevel/' | ||||
| 
 | ||||
|     def store(name): | ||||
|         k = Key(bucket, prefix + name) | ||||
|         k.set_contents_from_string('somedata') | ||||
| 
 | ||||
|     names = ['x/key', 'y.key1', 'y.key2', 'y.key3', 'x/y/key', 'x/y/z/key'] | ||||
| 
 | ||||
|     for name in names: | ||||
|         store(name) | ||||
| 
 | ||||
|     delimiter = None | ||||
|     keys = [x.name for x in bucket.list(prefix, delimiter)] | ||||
|     keys.should.equal([ | ||||
|         'toplevel/x/key', 'toplevel/x/y/key', 'toplevel/x/y/z/key', | ||||
|         'toplevel/y.key1', 'toplevel/y.key2', 'toplevel/y.key3' | ||||
|     ]) | ||||
| 
 | ||||
|     delimiter = '/' | ||||
|     keys = [x.name for x in bucket.list(prefix, delimiter)] | ||||
|     keys.should.equal([ | ||||
|         'toplevel/y.key1', 'toplevel/y.key2', 'toplevel/y.key3', 'toplevel/x/' | ||||
|     ]) | ||||
| 
 | ||||
|     # Test delimiter with no prefix | ||||
|     delimiter = '/' | ||||
|     keys = [x.name for x in bucket.list(prefix=None, delimiter=delimiter)] | ||||
|     keys.should.equal(['toplevel']) | ||||
| 
 | ||||
|     delimiter = None | ||||
|     keys = [x.name for x in bucket.list(prefix + 'x', delimiter)] | ||||
|     keys.should.equal([u'toplevel/x/key', u'toplevel/x/y/key', u'toplevel/x/y/z/key']) | ||||
| 
 | ||||
|     delimiter = '/' | ||||
|     keys = [x.name for x in bucket.list(prefix + 'x', delimiter)] | ||||
|     keys.should.equal([u'toplevel/x/']) | ||||
							
								
								
									
										14
									
								
								tests/test_s3bucket_path/test_s3bucket_path_utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										14
									
								
								tests/test_s3bucket_path/test_s3bucket_path_utils.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,14 @@ | ||||
| from sure import expect | ||||
| from moto.s3bucket_path.utils import bucket_name_from_url | ||||
| 
 | ||||
| 
 | ||||
| def test_base_url(): | ||||
|     expect(bucket_name_from_url('https://s3.amazonaws.com/')).should.equal(None) | ||||
| 
 | ||||
| 
 | ||||
| def test_localhost_bucket(): | ||||
|     expect(bucket_name_from_url('https://localhost:5000/wfoobar/abc')).should.equal("wfoobar") | ||||
| 
 | ||||
| 
 | ||||
| def test_localhost_without_bucket(): | ||||
|     expect(bucket_name_from_url('https://www.localhost:5000')).should.equal(None) | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user