Techdebt: Run ServerMode tests in newer Python versions (#6602)
This commit is contained in:
		
							parent
							
								
									9636e02127
								
							
						
					
					
						commit
						993a904ac4
					
				
							
								
								
									
										2
									
								
								.github/workflows/tests_servermode.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/tests_servermode.yml
									
									
									
									
										vendored
									
									
								
							@ -21,7 +21,7 @@ jobs:
 | 
			
		||||
      run: |
 | 
			
		||||
        pip install build
 | 
			
		||||
        python -m build
 | 
			
		||||
        docker run --rm -t --name motoserver -e TEST_SERVER_MODE=true -e AWS_SECRET_ACCESS_KEY=server_secret -e AWS_ACCESS_KEY_ID=server_key -v `pwd`:/moto -p 5000:5000 -v /var/run/docker.sock:/var/run/docker.sock python:3.7-buster /moto/scripts/ci_moto_server.sh &
 | 
			
		||||
        docker run --rm -t --name motoserver -e TEST_SERVER_MODE=true -e AWS_SECRET_ACCESS_KEY=server_secret -e AWS_ACCESS_KEY_ID=server_key -v `pwd`:/moto -p 5000:5000 -v /var/run/docker.sock:/var/run/docker.sock python:${{ matrix.python-version }}-buster /moto/scripts/ci_moto_server.sh &
 | 
			
		||||
        python scripts/ci_wait_for_server.py
 | 
			
		||||
    - name: Get pip cache dir
 | 
			
		||||
      id: pip-cache
 | 
			
		||||
 | 
			
		||||
@ -236,6 +236,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
 | 
			
		||||
        """
 | 
			
		||||
        use_raw_body: Use incoming bytes if True, encode to string otherwise
 | 
			
		||||
        """
 | 
			
		||||
        self.is_werkzeug_request = "werkzeug" in str(type(request))
 | 
			
		||||
        querystring: Dict[str, Any] = OrderedDict()
 | 
			
		||||
        if hasattr(request, "body"):
 | 
			
		||||
            # Boto
 | 
			
		||||
@ -296,6 +297,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
 | 
			
		||||
            pass  # ignore decoding errors, as the body may not contain a legitimate querystring
 | 
			
		||||
 | 
			
		||||
        self.uri = full_url
 | 
			
		||||
 | 
			
		||||
        self.path = urlparse(full_url).path
 | 
			
		||||
        self.querystring = querystring
 | 
			
		||||
        self.data = querystring
 | 
			
		||||
 | 
			
		||||
@ -1,10 +1,12 @@
 | 
			
		||||
import datetime
 | 
			
		||||
import inspect
 | 
			
		||||
import re
 | 
			
		||||
import unicodedata
 | 
			
		||||
from botocore.exceptions import ClientError
 | 
			
		||||
from typing import Any, Optional, List, Callable, Dict, Tuple
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
from urllib.parse import urlparse, unquote
 | 
			
		||||
from .common_types import TYPE_RESPONSE
 | 
			
		||||
from .versions import is_werkzeug_2_3_x
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def camelcase_to_underscores(argument: str) -> str:
 | 
			
		||||
@ -321,3 +323,96 @@ def params_sort_function(item: Tuple[str, Any]) -> Tuple[str, Any]:
 | 
			
		||||
        member_num = int(key.split(".")[2])
 | 
			
		||||
        return ("Tags.member", member_num)
 | 
			
		||||
    return item
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def normalize_werkzeug_path(path: str) -> str:
 | 
			
		||||
    if is_werkzeug_2_3_x():
 | 
			
		||||
        # New versions of werkzeug expose a quoted path
 | 
			
		||||
        #   %40connections
 | 
			
		||||
        #
 | 
			
		||||
        # Older versions (and botocore requests) expose the original:
 | 
			
		||||
        #   @connections
 | 
			
		||||
        #
 | 
			
		||||
        # We're unquoting the path here manually, so it behaves the same as botocore requests and requests coming in from old werkzeug versions.
 | 
			
		||||
        #
 | 
			
		||||
        return _unquote_hex_characters(path)
 | 
			
		||||
    else:
 | 
			
		||||
        return unquote(path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _unquote_hex_characters(path: str) -> str:
 | 
			
		||||
    allowed_characters = ["%2F"]  # /
 | 
			
		||||
    # Path can contain a single hex character
 | 
			
		||||
    #    my%3Fchar
 | 
			
		||||
    #
 | 
			
		||||
    # Path can also contain multiple hex characters in a row
 | 
			
		||||
    #    %AA%AB%AC
 | 
			
		||||
    #
 | 
			
		||||
    # This is how complex unicode characters, such as smileys, are encoded.
 | 
			
		||||
    # Note that these particular characters do not translate to anything useful
 | 
			
		||||
    # For the sake of simplicy, let's assume that it translates to a smiley: :)
 | 
			
		||||
    #
 | 
			
		||||
    # Just to make things interesting, they could be found right next to eachother:
 | 
			
		||||
    #    my%3F%AA%AB%ACchar
 | 
			
		||||
    #
 | 
			
		||||
    # Which should translate to my?:)char
 | 
			
		||||
 | 
			
		||||
    # char_ranges contains all consecutie hex characters:
 | 
			
		||||
    # [(2, 5, %3F), (0, 9, %AA%AB%AC)]
 | 
			
		||||
    char_ranges = [
 | 
			
		||||
        (m.start(0), m.end(0)) for m in re.finditer("(%[0-9A-F][0-9A-F])+", path)
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    # characters_found will contain the replacement characters
 | 
			
		||||
    # [(2, 5, '?'), (0, 9, ':)')]
 | 
			
		||||
    characters_found: List[Tuple[int, int, str]] = []
 | 
			
		||||
    for char_range in char_ranges:
 | 
			
		||||
        range_start, range_end = char_range
 | 
			
		||||
        possible_combo_start = range_start
 | 
			
		||||
        possible_combo_end = range_end
 | 
			
		||||
        while possible_combo_start < possible_combo_end:
 | 
			
		||||
            # For every range, create combinations of possibilities
 | 
			
		||||
            #    iter 1:   %AA%AB%AC
 | 
			
		||||
            #    iter 2:   %AA%AB
 | 
			
		||||
            #    iter3:    %AA
 | 
			
		||||
            possible_char = path[possible_combo_start:possible_combo_end]
 | 
			
		||||
 | 
			
		||||
            if possible_char in allowed_characters:
 | 
			
		||||
                # Werkzeug has already converted these characters for us
 | 
			
		||||
                possible_combo_end -= 3
 | 
			
		||||
                continue
 | 
			
		||||
            try:
 | 
			
		||||
                start_of_raw_repr = possible_combo_start + len(characters_found)
 | 
			
		||||
                end_of_raw_repr = start_of_raw_repr + len(possible_char)
 | 
			
		||||
                # Verify that the current possibility is a known unicode character
 | 
			
		||||
                unicodedata.category(unquote(possible_char))
 | 
			
		||||
                characters_found.append(
 | 
			
		||||
                    (start_of_raw_repr, end_of_raw_repr, unquote(possible_char))
 | 
			
		||||
                )
 | 
			
		||||
                if range_end == possible_combo_end:
 | 
			
		||||
                    # We've matched on the full phrase:
 | 
			
		||||
                    # %AA%AB%AC
 | 
			
		||||
                    break
 | 
			
		||||
                else:
 | 
			
		||||
                    # we matched on %AA%AB
 | 
			
		||||
                    # reset the indexes, and try to match %AC next
 | 
			
		||||
                    possible_combo_start = possible_combo_end
 | 
			
		||||
                    possible_combo_end = range_end
 | 
			
		||||
            except:  # noqa: E722 Do not use bare except
 | 
			
		||||
                # 'unicodedata.category' would have thrown an error, meaning:
 | 
			
		||||
                # %AA%AB%AC does not exist
 | 
			
		||||
                # Try the next possibility:
 | 
			
		||||
                # %AA%AB
 | 
			
		||||
                possible_combo_end -= 3
 | 
			
		||||
 | 
			
		||||
    # Replace the hex characters with the appropriate unicode representation
 | 
			
		||||
    char_offset = 0
 | 
			
		||||
    for char_pos in characters_found:
 | 
			
		||||
        combo_start, combo_end, character = char_pos
 | 
			
		||||
        path = (
 | 
			
		||||
            path[0 : combo_start - char_offset]
 | 
			
		||||
            + character
 | 
			
		||||
            + path[combo_end - char_offset :]
 | 
			
		||||
        )
 | 
			
		||||
        char_offset += (combo_end - combo_start) + len(character) - 1
 | 
			
		||||
    return path
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										13
									
								
								moto/core/versions.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								moto/core/versions.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,13 @@
 | 
			
		||||
from moto.utilities.distutils_version import LooseVersion
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    from importlib.metadata import version
 | 
			
		||||
except ImportError:
 | 
			
		||||
    from importlib_metadata import version
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
WERKZEUG_VERSION = version("werkzeug")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_werkzeug_2_3_x() -> bool:
 | 
			
		||||
    return LooseVersion(WERKZEUG_VERSION) >= LooseVersion("2.3.0")
 | 
			
		||||
@ -53,12 +53,7 @@ from moto.s3.exceptions import (
 | 
			
		||||
from .cloud_formation import cfn_to_api_encryption, is_replacement_update
 | 
			
		||||
from . import notifications
 | 
			
		||||
from .select_object_content import parse_query
 | 
			
		||||
from .utils import (
 | 
			
		||||
    clean_key_name,
 | 
			
		||||
    _VersionedKeyStore,
 | 
			
		||||
    undo_clean_key_name,
 | 
			
		||||
    CaseInsensitiveDict,
 | 
			
		||||
)
 | 
			
		||||
from .utils import _VersionedKeyStore, CaseInsensitiveDict
 | 
			
		||||
from .utils import ARCHIVE_STORAGE_CLASSES, STORAGE_CLASS
 | 
			
		||||
from ..events.notifications import send_notification as events_send_notification
 | 
			
		||||
from ..settings import get_s3_default_key_buffer_size, S3_UPLOAD_PART_MIN_SIZE
 | 
			
		||||
@ -1904,7 +1899,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
 | 
			
		||||
        lock_until: Optional[str] = None,
 | 
			
		||||
        checksum_value: Optional[str] = None,
 | 
			
		||||
    ) -> FakeKey:
 | 
			
		||||
        key_name = clean_key_name(key_name)
 | 
			
		||||
        if storage is not None and storage not in STORAGE_CLASS:
 | 
			
		||||
            raise InvalidStorageClass(storage=storage)
 | 
			
		||||
 | 
			
		||||
@ -1963,9 +1957,8 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
 | 
			
		||||
        bucket_name: str,
 | 
			
		||||
        key_name: str,
 | 
			
		||||
        acl: Optional[FakeAcl],
 | 
			
		||||
        key_is_clean: bool = False,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        key = self.get_object(bucket_name, key_name, key_is_clean=key_is_clean)
 | 
			
		||||
        key = self.get_object(bucket_name, key_name)
 | 
			
		||||
        # TODO: Support the XML-based ACL format
 | 
			
		||||
        if key is not None:
 | 
			
		||||
            key.set_acl(acl)
 | 
			
		||||
@ -2023,10 +2016,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
 | 
			
		||||
        key_name: str,
 | 
			
		||||
        version_id: Optional[str] = None,
 | 
			
		||||
        part_number: Optional[str] = None,
 | 
			
		||||
        key_is_clean: bool = False,
 | 
			
		||||
    ) -> Optional[FakeKey]:
 | 
			
		||||
        if not key_is_clean:
 | 
			
		||||
            key_name = clean_key_name(key_name)
 | 
			
		||||
        bucket = self.get_bucket(bucket_name)
 | 
			
		||||
 | 
			
		||||
        key = None
 | 
			
		||||
@ -2360,7 +2350,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
 | 
			
		||||
        version_id: Optional[str] = None,
 | 
			
		||||
        bypass: bool = False,
 | 
			
		||||
    ) -> Tuple[bool, Optional[Dict[str, Any]]]:
 | 
			
		||||
        key_name = clean_key_name(key_name)
 | 
			
		||||
        bucket = self.get_bucket(bucket_name)
 | 
			
		||||
 | 
			
		||||
        response_meta = {}
 | 
			
		||||
@ -2422,9 +2411,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
 | 
			
		||||
            key_name = object_["Key"]
 | 
			
		||||
            version_id = object_.get("VersionId", None)
 | 
			
		||||
 | 
			
		||||
            self.delete_object(
 | 
			
		||||
                bucket_name, undo_clean_key_name(key_name), version_id=version_id
 | 
			
		||||
            )
 | 
			
		||||
            self.delete_object(bucket_name, key_name, version_id=version_id)
 | 
			
		||||
            deleted_objects.append((key_name, version_id))
 | 
			
		||||
        return deleted_objects
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -6,11 +6,14 @@ from typing import Any, Dict, List, Iterator, Union, Tuple, Optional, Type
 | 
			
		||||
import urllib.parse
 | 
			
		||||
 | 
			
		||||
from moto import settings
 | 
			
		||||
from moto.core.versions import is_werkzeug_2_3_x
 | 
			
		||||
from moto.core.utils import (
 | 
			
		||||
    extract_region_from_aws_authorization,
 | 
			
		||||
    str_to_rfc_1123_datetime,
 | 
			
		||||
    normalize_werkzeug_path,
 | 
			
		||||
)
 | 
			
		||||
from urllib.parse import parse_qs, urlparse, unquote, urlencode, urlunparse
 | 
			
		||||
from urllib.parse import ParseResult
 | 
			
		||||
 | 
			
		||||
import xmltodict
 | 
			
		||||
 | 
			
		||||
@ -156,6 +159,15 @@ class S3Response(BaseResponse):
 | 
			
		||||
    def __init__(self) -> None:
 | 
			
		||||
        super().__init__(service_name="s3")
 | 
			
		||||
 | 
			
		||||
    def get_safe_path_from_url(self, url: ParseResult) -> str:
 | 
			
		||||
        return self.get_safe_path(url.path)
 | 
			
		||||
 | 
			
		||||
    def get_safe_path(self, part: str) -> str:
 | 
			
		||||
        if self.is_werkzeug_request:
 | 
			
		||||
            return normalize_werkzeug_path(part)
 | 
			
		||||
        else:
 | 
			
		||||
            return unquote(part)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def backend(self) -> S3Backend:
 | 
			
		||||
        return s3_backends[self.current_account]["global"]
 | 
			
		||||
@ -315,8 +327,7 @@ class S3Response(BaseResponse):
 | 
			
		||||
                f"Method {method} has not been implemented in the S3 backend yet"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_querystring(request: Any, full_url: str) -> Dict[str, Any]:  # type: ignore[misc]
 | 
			
		||||
    def _get_querystring(self, request: Any, full_url: str) -> Dict[str, Any]:  # type: ignore[misc]
 | 
			
		||||
        # Flask's Request has the querystring already parsed
 | 
			
		||||
        # In ServerMode, we can use this, instead of manually parsing this
 | 
			
		||||
        if hasattr(request, "args"):
 | 
			
		||||
@ -1129,6 +1140,10 @@ class S3Response(BaseResponse):
 | 
			
		||||
            objects = [objects]
 | 
			
		||||
        if len(objects) == 0:
 | 
			
		||||
            raise MalformedXML()
 | 
			
		||||
        if self.is_werkzeug_request and is_werkzeug_2_3_x():
 | 
			
		||||
            for obj in objects:
 | 
			
		||||
                if "Key" in obj:
 | 
			
		||||
                    obj["Key"] = self.get_safe_path(obj["Key"])
 | 
			
		||||
 | 
			
		||||
        if authenticated:
 | 
			
		||||
            deleted_objects = self.backend.delete_objects(bucket_name, objects)
 | 
			
		||||
@ -1235,10 +1250,11 @@ class S3Response(BaseResponse):
 | 
			
		||||
        self, request: Any, full_url: str, headers: Dict[str, Any]
 | 
			
		||||
    ) -> TYPE_RESPONSE:
 | 
			
		||||
        parsed_url = urlparse(full_url)
 | 
			
		||||
        url_path = self.get_safe_path_from_url(parsed_url)
 | 
			
		||||
        query = parse_qs(parsed_url.query, keep_blank_values=True)
 | 
			
		||||
        method = request.method
 | 
			
		||||
 | 
			
		||||
        key_name = self.parse_key_name(request, parsed_url.path)
 | 
			
		||||
        key_name = self.parse_key_name(request, url_path)
 | 
			
		||||
        bucket_name = self.parse_bucket_name_from_url(request, full_url)
 | 
			
		||||
 | 
			
		||||
        # SDK requests tend to have Authorization set automatically
 | 
			
		||||
@ -1473,7 +1489,8 @@ class S3Response(BaseResponse):
 | 
			
		||||
                if isinstance(copy_source, bytes):
 | 
			
		||||
                    copy_source = copy_source.decode("utf-8")
 | 
			
		||||
                copy_source_parsed = urlparse(copy_source)
 | 
			
		||||
                src_bucket, src_key = copy_source_parsed.path.lstrip("/").split("/", 1)
 | 
			
		||||
                url_path = self.get_safe_path_from_url(copy_source_parsed)
 | 
			
		||||
                src_bucket, src_key = url_path.lstrip("/").split("/", 1)
 | 
			
		||||
                src_version_id = parse_qs(copy_source_parsed.query).get(
 | 
			
		||||
                    "versionId", [None]  # type: ignore
 | 
			
		||||
                )[0]
 | 
			
		||||
@ -1630,7 +1647,7 @@ class S3Response(BaseResponse):
 | 
			
		||||
            )[0]
 | 
			
		||||
 | 
			
		||||
            key_to_copy = self.backend.get_object(
 | 
			
		||||
                src_bucket, src_key, version_id=src_version_id, key_is_clean=True
 | 
			
		||||
                src_bucket, src_key, version_id=src_version_id
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if key_to_copy is not None:
 | 
			
		||||
@ -2233,7 +2250,6 @@ class S3Response(BaseResponse):
 | 
			
		||||
                bucket_name=bucket_name,
 | 
			
		||||
                key_name=key.name,
 | 
			
		||||
                acl=multipart.acl,
 | 
			
		||||
                key_is_clean=True,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            template = self.response_template(S3_MULTIPART_COMPLETE_RESPONSE)
 | 
			
		||||
 | 
			
		||||
@ -3,7 +3,7 @@ import base64
 | 
			
		||||
import binascii
 | 
			
		||||
import re
 | 
			
		||||
import hashlib
 | 
			
		||||
from urllib.parse import urlparse, unquote, quote
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
from requests.structures import CaseInsensitiveDict
 | 
			
		||||
from typing import Any, Dict, List, Iterator, Union, Tuple, Optional
 | 
			
		||||
import sys
 | 
			
		||||
@ -106,14 +106,6 @@ def metadata_from_headers(headers: Dict[str, Any]) -> CaseInsensitiveDict:  # ty
 | 
			
		||||
    return metadata
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def clean_key_name(key_name: str) -> str:
 | 
			
		||||
    return unquote(key_name)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def undo_clean_key_name(key_name: str) -> str:
 | 
			
		||||
    return quote(key_name)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class _VersionedKeyStore(dict):  # type: ignore
 | 
			
		||||
 | 
			
		||||
    """A simplified/modified version of Django's `MultiValueDict` taken from:
 | 
			
		||||
 | 
			
		||||
@ -7,6 +7,7 @@ from moto.core.utils import (
 | 
			
		||||
    unix_time,
 | 
			
		||||
    camelcase_to_pascal,
 | 
			
		||||
    pascal_to_camelcase,
 | 
			
		||||
    _unquote_hex_characters,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -50,3 +51,29 @@ def test_camelcase_to_pascal(_input, expected):
 | 
			
		||||
@freeze_time("2015-01-01 12:00:00")
 | 
			
		||||
def test_unix_time():
 | 
			
		||||
    assert unix_time() == 1420113600.0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "original_url,result",
 | 
			
		||||
    [
 | 
			
		||||
        ("some%3Fkey", "some?key"),
 | 
			
		||||
        ("6T7\x159\x12\r\x08.txt", "6T7\x159\x12\r\x08.txt"),
 | 
			
		||||
        ("foobar/the-unicode-%E2%98%BA-key", "foobar/the-unicode-☺-key"),
 | 
			
		||||
        ("key-with%2Eembedded%2Eurl%2Eencoding", "key-with.embedded.url.encoding"),
 | 
			
		||||
        # Can represent a single character
 | 
			
		||||
        ("%E2%82%AC", "€"),
 | 
			
		||||
        ("%2E", "."),
 | 
			
		||||
        # Multiple chars in a row
 | 
			
		||||
        ("%E2%82%AC%E2%82%AC", "€€"),
 | 
			
		||||
        ("%2E%2E", ".."),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_quote_characters(original_url, result):
 | 
			
		||||
    assert _unquote_hex_characters(original_url) == result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize("original_path", ["%2F%2F", "s%2Fs%2Fs%2F"])
 | 
			
		||||
def test_quote_characters__with_slashes(original_path):
 | 
			
		||||
    # If the string contains slashes, we ignore them
 | 
			
		||||
    # Werkzeug already takes care of those for us
 | 
			
		||||
    assert _unquote_hex_characters(original_path) == original_path
 | 
			
		||||
 | 
			
		||||
@ -1,12 +1,9 @@
 | 
			
		||||
from unittest.mock import patch
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
from moto.s3.utils import (
 | 
			
		||||
    bucket_name_from_url,
 | 
			
		||||
    _VersionedKeyStore,
 | 
			
		||||
    parse_region_from_url,
 | 
			
		||||
    clean_key_name,
 | 
			
		||||
    undo_clean_key_name,
 | 
			
		||||
    compute_checksum,
 | 
			
		||||
    cors_matches_origin,
 | 
			
		||||
)
 | 
			
		||||
@ -91,36 +88,6 @@ def test_parse_region_from_url():
 | 
			
		||||
        assert parse_region_from_url(url) == expected
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "key,expected",
 | 
			
		||||
    [
 | 
			
		||||
        ("foo/bar/baz", "foo/bar/baz"),
 | 
			
		||||
        ("foo", "foo"),
 | 
			
		||||
        (
 | 
			
		||||
            "foo/run_dt%3D2019-01-01%252012%253A30%253A00",
 | 
			
		||||
            "foo/run_dt=2019-01-01%2012%3A30%3A00",
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_clean_key_name(key, expected):
 | 
			
		||||
    assert clean_key_name(key) == expected
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "key,expected",
 | 
			
		||||
    [
 | 
			
		||||
        ("foo/bar/baz", "foo/bar/baz"),
 | 
			
		||||
        ("foo", "foo"),
 | 
			
		||||
        (
 | 
			
		||||
            "foo/run_dt%3D2019-01-01%252012%253A30%253A00",
 | 
			
		||||
            "foo/run_dt%253D2019-01-01%25252012%25253A30%25253A00",
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_undo_clean_key_name(key, expected):
 | 
			
		||||
    assert undo_clean_key_name(key) == expected
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_checksum_sha256():
 | 
			
		||||
    checksum = b"h9FJy0JMA4dlbyEdJYn7Wx4WIpkhMJ6YWIQZzMqKc2I="
 | 
			
		||||
    assert compute_checksum(b"somedata", "SHA256") == checksum
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user