AWSLambda: Stream function-contents into container (#6769)
This commit is contained in:
parent
d05f7747f8
commit
97c2175f15
@ -2,7 +2,7 @@ import base64
|
|||||||
import time
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import copy
|
import copy
|
||||||
import datetime
|
from datetime import datetime
|
||||||
from gzip import GzipFile
|
from gzip import GzipFile
|
||||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
||||||
from sys import platform
|
from sys import platform
|
||||||
@ -58,33 +58,27 @@ from moto.sqs import sqs_backends
|
|||||||
from moto.dynamodb import dynamodb_backends
|
from moto.dynamodb import dynamodb_backends
|
||||||
from moto.dynamodbstreams import dynamodbstreams_backends
|
from moto.dynamodbstreams import dynamodbstreams_backends
|
||||||
from moto.utilities.docker_utilities import DockerModel
|
from moto.utilities.docker_utilities import DockerModel
|
||||||
from tempfile import TemporaryDirectory
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def zip2tar(zip_bytes: bytes) -> bytes:
|
def zip2tar(zip_bytes: bytes) -> io.BytesIO:
|
||||||
with TemporaryDirectory() as td:
|
tarstream = io.BytesIO()
|
||||||
tarname = os.path.join(td, "data.tar")
|
timeshift = int((datetime.now() - datetime.utcnow()).total_seconds())
|
||||||
timeshift = int(
|
tarf = tarfile.TarFile(fileobj=tarstream, mode="w")
|
||||||
(datetime.datetime.now() - datetime.datetime.utcnow()).total_seconds()
|
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf:
|
||||||
)
|
for zipinfo in zipf.infolist():
|
||||||
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf, tarfile.TarFile(
|
if zipinfo.is_dir():
|
||||||
tarname, "w"
|
continue
|
||||||
) as tarf:
|
|
||||||
for zipinfo in zipf.infolist():
|
|
||||||
if zipinfo.filename[-1] == "/": # is_dir() is py3.6+
|
|
||||||
continue
|
|
||||||
|
|
||||||
tarinfo = tarfile.TarInfo(name=zipinfo.filename)
|
tarinfo = tarfile.TarInfo(name=zipinfo.filename)
|
||||||
tarinfo.size = zipinfo.file_size
|
tarinfo.size = zipinfo.file_size
|
||||||
tarinfo.mtime = calendar.timegm(zipinfo.date_time) - timeshift
|
tarinfo.mtime = calendar.timegm(zipinfo.date_time) - timeshift
|
||||||
infile = zipf.open(zipinfo.filename)
|
infile = zipf.open(zipinfo.filename)
|
||||||
tarf.addfile(tarinfo, infile)
|
tarf.addfile(tarinfo, infile)
|
||||||
|
|
||||||
with open(tarname, "rb") as f:
|
tarstream.seek(0)
|
||||||
tar_data = f.read()
|
return tarstream
|
||||||
return tar_data
|
|
||||||
|
|
||||||
|
|
||||||
class _VolumeRefCount:
|
class _VolumeRefCount:
|
||||||
@ -135,8 +129,8 @@ class _DockerDataVolumeContext:
|
|||||||
"busybox", "sleep 100", volumes=volumes, detach=True
|
"busybox", "sleep 100", volumes=volumes, detach=True
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
tar_bytes = zip2tar(self._lambda_func.code_bytes)
|
with zip2tar(self._lambda_func.code_bytes) as stream:
|
||||||
container.put_archive(settings.LAMBDA_DATA_DIR, tar_bytes)
|
container.put_archive(settings.LAMBDA_DATA_DIR, stream)
|
||||||
finally:
|
finally:
|
||||||
container.remove(force=True)
|
container.remove(force=True)
|
||||||
|
|
||||||
@ -256,7 +250,7 @@ class LayerVersion(CloudFormationModel):
|
|||||||
self.license_info = spec.get("LicenseInfo", "")
|
self.license_info = spec.get("LicenseInfo", "")
|
||||||
|
|
||||||
# auto-generated
|
# auto-generated
|
||||||
self.created_date = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
self.created_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
self.version: Optional[int] = None
|
self.version: Optional[int] = None
|
||||||
self._attached = False
|
self._attached = False
|
||||||
self._layer: Optional["Layer"] = None
|
self._layer: Optional["Layer"] = None
|
||||||
@ -475,9 +469,7 @@ class LambdaFunction(CloudFormationModel, DockerModel):
|
|||||||
|
|
||||||
# auto-generated
|
# auto-generated
|
||||||
self.version = version
|
self.version = version
|
||||||
self.last_modified = iso_8601_datetime_with_nanoseconds(
|
self.last_modified = iso_8601_datetime_with_nanoseconds(datetime.utcnow())
|
||||||
datetime.datetime.utcnow()
|
|
||||||
)
|
|
||||||
|
|
||||||
self._set_function_code(self.code)
|
self._set_function_code(self.code)
|
||||||
|
|
||||||
@ -499,9 +491,7 @@ class LambdaFunction(CloudFormationModel, DockerModel):
|
|||||||
self.region, self.account_id, self.function_name, version
|
self.region, self.account_id, self.function_name, version
|
||||||
)
|
)
|
||||||
self.version = version
|
self.version = version
|
||||||
self.last_modified = iso_8601_datetime_with_nanoseconds(
|
self.last_modified = iso_8601_datetime_with_nanoseconds(datetime.utcnow())
|
||||||
datetime.datetime.utcnow()
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def architectures(self) -> List[str]:
|
def architectures(self) -> List[str]:
|
||||||
@ -882,7 +872,7 @@ class LambdaFunction(CloudFormationModel, DockerModel):
|
|||||||
def save_logs(self, output: str) -> None:
|
def save_logs(self, output: str) -> None:
|
||||||
# Send output to "logs" backend
|
# Send output to "logs" backend
|
||||||
invoke_id = random.uuid4().hex
|
invoke_id = random.uuid4().hex
|
||||||
date = datetime.datetime.utcnow()
|
date = datetime.utcnow()
|
||||||
log_stream_name = (
|
log_stream_name = (
|
||||||
f"{date.year}/{date.month:02d}/{date.day:02d}/[{self.version}]{invoke_id}"
|
f"{date.year}/{date.month:02d}/{date.day:02d}/[{self.version}]{invoke_id}"
|
||||||
)
|
)
|
||||||
@ -1036,7 +1026,7 @@ class FunctionUrlConfig:
|
|||||||
self.function = function
|
self.function = function
|
||||||
self.config = config
|
self.config = config
|
||||||
self.url = f"https://{random.uuid4().hex}.lambda-url.{function.region}.on.aws"
|
self.url = f"https://{random.uuid4().hex}.lambda-url.{function.region}.on.aws"
|
||||||
self.created = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000+0000")
|
self.created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000+0000")
|
||||||
self.last_modified = self.created
|
self.last_modified = self.created
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
@ -1055,7 +1045,7 @@ class FunctionUrlConfig:
|
|||||||
self.config["Cors"] = new_config["Cors"]
|
self.config["Cors"] = new_config["Cors"]
|
||||||
if new_config.get("AuthType"):
|
if new_config.get("AuthType"):
|
||||||
self.config["AuthType"] = new_config["AuthType"]
|
self.config["AuthType"] = new_config["AuthType"]
|
||||||
self.last_modified = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
|
self.last_modified = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
|
|
||||||
|
|
||||||
class EventSourceMapping(CloudFormationModel):
|
class EventSourceMapping(CloudFormationModel):
|
||||||
@ -1072,7 +1062,7 @@ class EventSourceMapping(CloudFormationModel):
|
|||||||
|
|
||||||
self.function_arn = spec["FunctionArn"]
|
self.function_arn = spec["FunctionArn"]
|
||||||
self.uuid = str(random.uuid4())
|
self.uuid = str(random.uuid4())
|
||||||
self.last_modified = time.mktime(datetime.datetime.utcnow().timetuple())
|
self.last_modified = time.mktime(datetime.utcnow().timetuple())
|
||||||
|
|
||||||
def _get_service_source_from_arn(self, event_source_arn: str) -> str:
|
def _get_service_source_from_arn(self, event_source_arn: str) -> str:
|
||||||
return event_source_arn.split(":")[2].lower()
|
return event_source_arn.split(":")[2].lower()
|
||||||
@ -1805,7 +1795,7 @@ class LambdaBackend(BaseBackend):
|
|||||||
elif key == "Enabled":
|
elif key == "Enabled":
|
||||||
esm.enabled = spec[key]
|
esm.enabled = spec[key]
|
||||||
|
|
||||||
esm.last_modified = time.mktime(datetime.datetime.utcnow().timetuple())
|
esm.last_modified = time.mktime(datetime.utcnow().timetuple())
|
||||||
return esm
|
return esm
|
||||||
|
|
||||||
def list_event_source_mappings(
|
def list_event_source_mappings(
|
||||||
|
Loading…
Reference in New Issue
Block a user