Adding additional tests to increase coverage
This commit is contained in:
parent
5372e6840f
commit
8ad28f8400
9
file.tmp
Normal file
9
file.tmp
Normal file
@ -0,0 +1,9 @@
|
||||
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: Simple CloudFormation Test Template
|
||||
Resources:
|
||||
S3Bucket:
|
||||
Type: AWS::S3::Bucket
|
||||
Properties:
|
||||
AccessControl: PublicRead
|
||||
BucketName: cf-test-bucket-1
|
@ -72,6 +72,19 @@ from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from errno import EAGAIN
|
||||
|
||||
import logging
|
||||
from inspect import currentframe
|
||||
import inspect
|
||||
|
||||
logging.basicConfig(filename='/tmp/models.log',level=logging.DEBUG)
|
||||
|
||||
DEBUG=0
|
||||
|
||||
def get_linenumber():
|
||||
cf = currentframe()
|
||||
return " - "+str(cf.f_back.f_lineno)
|
||||
|
||||
|
||||
# Some versions of python internally shadowed the
|
||||
# SocketType variable incorrectly https://bugs.python.org/issue20386
|
||||
BAD_SOCKET_SHADOW = socket.socket != socket.SocketType
|
||||
@ -155,15 +168,34 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
|
||||
"""
|
||||
|
||||
def __init__(self, headers, body=''):
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('__init__ - '
|
||||
' -caller: ' + str(
|
||||
inspect.stack()[1][3]) + "-" + get_linenumber())
|
||||
logging.debug('headers: '+str(headers))
|
||||
|
||||
# first of all, lets make sure that if headers or body are
|
||||
# unicode strings, it must be converted into a utf-8 encoded
|
||||
# byte string
|
||||
self.raw_headers = utf8(headers.strip())
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('raw_headers: '+str(self.raw_headers))
|
||||
|
||||
self.body = utf8(body)
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('body: '+str(self.body))
|
||||
|
||||
# Now let's concatenate the headers with the body, and create
|
||||
# `rfile` based on it
|
||||
self.rfile = StringIO(b'\r\n\r\n'.join([self.raw_headers, self.body]))
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('rfile: '+str(self.rfile))
|
||||
|
||||
|
||||
self.wfile = StringIO() # Creating `wfile` as an empty
|
||||
# StringIO, just to avoid any real
|
||||
# I/O calls
|
||||
@ -171,6 +203,10 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
|
||||
# parsing the request line preemptively
|
||||
self.raw_requestline = self.rfile.readline()
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('raw_requestline: '+str(self.raw_requestline))
|
||||
|
||||
|
||||
# initiating the error attributes with None
|
||||
self.error_code = None
|
||||
self.error_message = None
|
||||
@ -182,6 +218,9 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
|
||||
# making the HTTP method string available as the command
|
||||
self.method = self.command
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('method: '+str(self.method))
|
||||
|
||||
# Now 2 convenient attributes for the HTTPretty API:
|
||||
|
||||
# `querystring` holds a dictionary with the parsed query string
|
||||
@ -207,8 +246,23 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
|
||||
)
|
||||
|
||||
def parse_querystring(self, qs):
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('parse_querystring - '
|
||||
' -caller: ' + str(
|
||||
inspect.stack()[1][3]) + "-" + get_linenumber())
|
||||
logging.debug('qs: '+str(qs))
|
||||
|
||||
expanded = unquote_utf8(qs)
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('expanded: '+str(expanded))
|
||||
|
||||
parsed = parse_qs(expanded)
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('parsed: '+str(parsed))
|
||||
|
||||
result = {}
|
||||
for k in parsed:
|
||||
result[k] = list(map(decode_utf8, parsed[k]))
|
||||
@ -218,6 +272,12 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
|
||||
def parse_request_body(self, body):
|
||||
""" Attempt to parse the post based on the content-type passed. Return the regular body if not """
|
||||
|
||||
if DEBUG:
|
||||
logging.debug('parse_request_body - '
|
||||
' -caller: ' + str(
|
||||
inspect.stack()[1][3]) + "-" + get_linenumber())
|
||||
logging.debug('body: '+str(body))
|
||||
|
||||
PARSING_FUNCTIONS = {
|
||||
'application/json': json.loads,
|
||||
'text/json': json.loads,
|
||||
|
@ -29,7 +29,6 @@ import re
|
||||
from .compat import BaseClass
|
||||
from .utils import decode_utf8
|
||||
|
||||
|
||||
STATUSES = {
|
||||
100: "Continue",
|
||||
101: "Switching Protocols",
|
||||
@ -134,6 +133,7 @@ def parse_requestline(s):
|
||||
...
|
||||
ValueError: Not a Request-Line
|
||||
"""
|
||||
|
||||
methods = '|'.join(HttpBaseClass.METHODS)
|
||||
m = re.match(r'(' + methods + ')\s+(.*)\s+HTTP/(1.[0|1])', s, re.I)
|
||||
if m:
|
||||
@ -146,6 +146,7 @@ def last_requestline(sent_data):
|
||||
"""
|
||||
Find the last line in sent_data that can be parsed with parse_requestline
|
||||
"""
|
||||
|
||||
for line in reversed(sent_data):
|
||||
try:
|
||||
parse_requestline(decode_utf8(line))
|
||||
|
8
tests/test_packages/__init__.py
Normal file
8
tests/test_packages/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
# Disable extra logging for tests
|
||||
logging.getLogger('boto').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('boto3').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('botocore').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('nose').setLevel(logging.CRITICAL)
|
37
tests/test_packages/test_httpretty.py
Normal file
37
tests/test_packages/test_httpretty.py
Normal file
@ -0,0 +1,37 @@
|
||||
# #!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
import mock
|
||||
|
||||
from moto.packages.httpretty.core import HTTPrettyRequest, fake_gethostname, fake_gethostbyname
|
||||
|
||||
|
||||
def test_parse_querystring():
|
||||
|
||||
core = HTTPrettyRequest(headers='test test HTTP/1.1')
|
||||
|
||||
qs = 'test test'
|
||||
response = core.parse_querystring(qs)
|
||||
|
||||
assert response == {}
|
||||
|
||||
def test_parse_request_body():
|
||||
core = HTTPrettyRequest(headers='test test HTTP/1.1')
|
||||
|
||||
qs = 'test'
|
||||
response = core.parse_request_body(qs)
|
||||
|
||||
assert response == 'test'
|
||||
|
||||
def test_fake_gethostname():
|
||||
|
||||
response = fake_gethostname()
|
||||
|
||||
assert response == 'localhost'
|
||||
|
||||
def test_fake_gethostbyname():
|
||||
|
||||
host = 'test'
|
||||
response = fake_gethostbyname(host=host)
|
||||
|
||||
assert response == '127.0.0.1'
|
Loading…
Reference in New Issue
Block a user