#3359 - Reuse Docker-on-request for AWSLambda and Batch
This commit is contained in:
parent
390a4d5510
commit
8d3cc3ef32
@ -17,13 +17,12 @@ import json
|
||||
import re
|
||||
import zipfile
|
||||
import uuid
|
||||
import functools
|
||||
import tarfile
|
||||
import calendar
|
||||
import threading
|
||||
import traceback
|
||||
import weakref
|
||||
import requests.adapters
|
||||
import requests.exceptions
|
||||
|
||||
from boto3 import Session
|
||||
|
||||
@ -47,6 +46,7 @@ from moto.sqs import sqs_backends
|
||||
from moto.dynamodb2 import dynamodb_backends2
|
||||
from moto.dynamodbstreams import dynamodbstreams_backends
|
||||
from moto.core import ACCOUNT_ID
|
||||
from moto.utilities.docker import DockerModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -55,7 +55,6 @@ try:
|
||||
except ImportError:
|
||||
from backports.tempfile import TemporaryDirectory
|
||||
|
||||
_orig_adapter_send = requests.adapters.HTTPAdapter.send
|
||||
docker_3 = docker.__version__[0] >= "3"
|
||||
|
||||
|
||||
@ -151,8 +150,9 @@ class _DockerDataVolumeContext:
|
||||
raise # multiple processes trying to use same volume?
|
||||
|
||||
|
||||
class LambdaFunction(CloudFormationModel):
|
||||
class LambdaFunction(CloudFormationModel, DockerModel):
|
||||
def __init__(self, spec, region, validate_s3=True, version=1):
|
||||
DockerModel.__init__(self)
|
||||
# required
|
||||
self.region = region
|
||||
self.code = spec["Code"]
|
||||
@ -162,7 +162,6 @@ class LambdaFunction(CloudFormationModel):
|
||||
self.run_time = spec["Runtime"]
|
||||
self.logs_backend = logs_backends[self.region]
|
||||
self.environment_vars = spec.get("Environment", {}).get("Variables", {})
|
||||
self.docker_client = None
|
||||
self.policy = None
|
||||
self.state = "Active"
|
||||
self.reserved_concurrency = spec.get("ReservedConcurrentExecutions", None)
|
||||
@ -228,26 +227,6 @@ class LambdaFunction(CloudFormationModel):
|
||||
|
||||
self.tags = dict()
|
||||
|
||||
def initiate_docker_client(self):
|
||||
# We should only initiate the Docker Client at runtime.
|
||||
# The docker.from_env() call will fall if Docker is not running
|
||||
if self.docker_client is None:
|
||||
self.docker_client = docker.from_env()
|
||||
|
||||
# Unfortunately mocking replaces this method w/o fallback enabled, so we
|
||||
# need to replace it if we detect it's been mocked
|
||||
if requests.adapters.HTTPAdapter.send != _orig_adapter_send:
|
||||
_orig_get_adapter = self.docker_client.api.get_adapter
|
||||
|
||||
def replace_adapter_send(*args, **kwargs):
|
||||
adapter = _orig_get_adapter(*args, **kwargs)
|
||||
|
||||
if isinstance(adapter, requests.adapters.HTTPAdapter):
|
||||
adapter.send = functools.partial(_orig_adapter_send, adapter)
|
||||
return adapter
|
||||
|
||||
self.docker_client.api.get_adapter = replace_adapter_send
|
||||
|
||||
def set_version(self, version):
|
||||
self.function_arn = make_function_ver_arn(
|
||||
self.region, ACCOUNT_ID, self.function_name, version
|
||||
@ -418,8 +397,6 @@ class LambdaFunction(CloudFormationModel):
|
||||
|
||||
env_vars.update(self.environment_vars)
|
||||
|
||||
self.initiate_docker_client()
|
||||
|
||||
container = exit_code = None
|
||||
log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON)
|
||||
with _DockerDataVolumeContext(self) as data_vol:
|
||||
|
@ -1,6 +1,5 @@
|
||||
from __future__ import unicode_literals
|
||||
import re
|
||||
import requests.adapters
|
||||
from itertools import cycle
|
||||
import six
|
||||
import datetime
|
||||
@ -8,7 +7,6 @@ import time
|
||||
import uuid
|
||||
import logging
|
||||
import docker
|
||||
import functools
|
||||
import threading
|
||||
import dateutil.parser
|
||||
from boto3 import Session
|
||||
@ -30,8 +28,8 @@ from moto.ec2.exceptions import InvalidSubnetIdError
|
||||
from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES
|
||||
from moto.iam.exceptions import IAMNotFoundException
|
||||
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
|
||||
from moto.utilities.docker import DockerModel
|
||||
|
||||
_orig_adapter_send = requests.adapters.HTTPAdapter.send
|
||||
logger = logging.getLogger(__name__)
|
||||
COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(
|
||||
r"^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$"
|
||||
@ -311,7 +309,7 @@ class JobDefinition(CloudFormationModel):
|
||||
return backend.get_job_definition_by_arn(arn)
|
||||
|
||||
|
||||
class Job(threading.Thread, BaseModel):
|
||||
class Job(threading.Thread, BaseModel, DockerModel):
|
||||
def __init__(self, name, job_def, job_queue, log_backend, container_overrides):
|
||||
"""
|
||||
Docker Job
|
||||
@ -324,6 +322,7 @@ class Job(threading.Thread, BaseModel):
|
||||
:type log_backend: moto.logs.models.LogsBackend
|
||||
"""
|
||||
threading.Thread.__init__(self)
|
||||
DockerModel.__init__(self)
|
||||
|
||||
self.job_name = name
|
||||
self.job_id = str(uuid.uuid4())
|
||||
@ -342,24 +341,9 @@ class Job(threading.Thread, BaseModel):
|
||||
self.daemon = True
|
||||
self.name = "MOTO-BATCH-" + self.job_id
|
||||
|
||||
self.docker_client = docker.from_env()
|
||||
self._log_backend = log_backend
|
||||
self.log_stream_name = None
|
||||
|
||||
# Unfortunately mocking replaces this method w/o fallback enabled, so we
|
||||
# need to replace it if we detect it's been mocked
|
||||
if requests.adapters.HTTPAdapter.send != _orig_adapter_send:
|
||||
_orig_get_adapter = self.docker_client.api.get_adapter
|
||||
|
||||
def replace_adapter_send(*args, **kwargs):
|
||||
adapter = _orig_get_adapter(*args, **kwargs)
|
||||
|
||||
if isinstance(adapter, requests.adapters.HTTPAdapter):
|
||||
adapter.send = functools.partial(_orig_adapter_send, adapter)
|
||||
return adapter
|
||||
|
||||
self.docker_client.api.get_adapter = replace_adapter_send
|
||||
|
||||
def describe(self):
|
||||
result = {
|
||||
"jobDefinition": self.job_definition.arn,
|
||||
|
33
moto/utilities/docker.py
Normal file
33
moto/utilities/docker.py
Normal file
@ -0,0 +1,33 @@
|
||||
import docker
|
||||
import functools
|
||||
import requests.adapters
|
||||
|
||||
|
||||
_orig_adapter_send = requests.adapters.HTTPAdapter.send
|
||||
|
||||
|
||||
class DockerModel:
|
||||
def __init__(self):
|
||||
self.__docker_client = None
|
||||
|
||||
@property
|
||||
def docker_client(self):
|
||||
if self.__docker_client is None:
|
||||
# We should only initiate the Docker Client at runtime.
|
||||
# The docker.from_env() call will fall if Docker is not running
|
||||
self.__docker_client = docker.from_env()
|
||||
|
||||
# Unfortunately mocking replaces this method w/o fallback enabled, so we
|
||||
# need to replace it if we detect it's been mocked
|
||||
if requests.adapters.HTTPAdapter.send != _orig_adapter_send:
|
||||
_orig_get_adapter = self.docker_client.api.get_adapter
|
||||
|
||||
def replace_adapter_send(*args, **kwargs):
|
||||
adapter = _orig_get_adapter(*args, **kwargs)
|
||||
|
||||
if isinstance(adapter, requests.adapters.HTTPAdapter):
|
||||
adapter.send = functools.partial(_orig_adapter_send, adapter)
|
||||
return adapter
|
||||
|
||||
self.docker_client.api.get_adapter = replace_adapter_send
|
||||
return self.__docker_client
|
Loading…
Reference in New Issue
Block a user