2021-11-03 21:00:42 +00:00
|
|
|
import json
|
2017-02-24 00:43:48 +00:00
|
|
|
import os
|
|
|
|
|
2022-02-04 21:10:46 +00:00
|
|
|
from functools import lru_cache
|
|
|
|
|
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true"
|
|
|
|
INITIAL_NO_AUTH_ACTION_COUNT = float(
|
|
|
|
os.environ.get("INITIAL_NO_AUTH_ACTION_COUNT", float("inf"))
|
|
|
|
)
|
2021-02-18 08:58:20 +00:00
|
|
|
DEFAULT_CONTAINER_REGISTRY = os.environ.get("DEFAULT_CONTAINER_REGISTRY", "docker.io")
|
2020-12-03 18:32:06 +00:00
|
|
|
|
2021-03-26 16:51:19 +00:00
|
|
|
S3_IGNORE_SUBDOMAIN_BUCKETNAME = os.environ.get(
|
|
|
|
"S3_IGNORE_SUBDOMAIN_BUCKETNAME", ""
|
|
|
|
) in ["1", "true"]
|
|
|
|
|
2021-04-10 07:13:20 +00:00
|
|
|
# How many seconds to wait before we "validate" a new certificate in ACM.
|
|
|
|
ACM_VALIDATION_WAIT = int(os.environ.get("MOTO_ACM_VALIDATION_WAIT", "60"))
|
|
|
|
|
2020-12-03 18:32:06 +00:00
|
|
|
|
|
|
|
def get_sf_execution_history_type():
|
|
|
|
"""
|
|
|
|
Determines which execution history events `get_execution_history` returns
|
|
|
|
:returns: str representing the type of Step Function Execution Type events should be
|
|
|
|
returned. Default value is SUCCESS, currently supports (SUCCESS || FAILURE)
|
|
|
|
"""
|
|
|
|
return os.environ.get("SF_EXECUTION_HISTORY_TYPE", "SUCCESS")
|
2021-06-10 09:48:28 +00:00
|
|
|
|
|
|
|
|
2021-11-17 21:02:14 +00:00
|
|
|
def get_s3_custom_endpoints():
|
|
|
|
endpoints = os.environ.get("MOTO_S3_CUSTOM_ENDPOINTS")
|
|
|
|
if endpoints:
|
|
|
|
return endpoints.split(",")
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
2021-07-07 15:38:50 +00:00
|
|
|
S3_UPLOAD_PART_MIN_SIZE = 5242880
|
|
|
|
|
|
|
|
|
2021-06-10 09:48:28 +00:00
|
|
|
def get_s3_default_key_buffer_size():
|
2021-07-07 15:38:50 +00:00
|
|
|
return int(
|
|
|
|
os.environ.get(
|
|
|
|
"MOTO_S3_DEFAULT_KEY_BUFFER_SIZE", S3_UPLOAD_PART_MIN_SIZE - 1024
|
|
|
|
)
|
|
|
|
)
|
2021-10-21 10:05:10 +00:00
|
|
|
|
|
|
|
|
|
|
|
def ecs_new_arn_format():
|
2022-01-18 20:53:31 +00:00
|
|
|
# True by default - only the value 'false' will return false
|
|
|
|
return os.environ.get("MOTO_ECS_NEW_ARN", "true").lower() != "false"
|
2021-11-03 21:00:42 +00:00
|
|
|
|
|
|
|
|
2021-12-24 21:02:45 +00:00
|
|
|
def allow_unknown_region():
|
|
|
|
return os.environ.get("MOTO_ALLOW_NONEXISTENT_REGION", "false").lower() == "true"
|
|
|
|
|
|
|
|
|
2021-11-03 21:00:42 +00:00
|
|
|
def moto_server_port():
|
2021-11-29 20:35:18 +00:00
|
|
|
return os.environ.get("MOTO_PORT") or "5000"
|
2021-11-03 21:00:42 +00:00
|
|
|
|
|
|
|
|
2022-02-04 21:10:46 +00:00
|
|
|
@lru_cache()
|
2021-11-03 21:00:42 +00:00
|
|
|
def moto_server_host():
|
|
|
|
if is_docker():
|
2022-01-27 12:04:03 +00:00
|
|
|
return get_docker_host()
|
2021-11-03 21:00:42 +00:00
|
|
|
else:
|
2022-01-27 12:04:03 +00:00
|
|
|
return "http://host.docker.internal"
|
|
|
|
|
|
|
|
|
2022-02-24 23:53:43 +00:00
|
|
|
def moto_lambda_image():
|
|
|
|
return os.environ.get("MOTO_DOCKER_LAMBDA_IMAGE", "lambci/lambda")
|
|
|
|
|
|
|
|
|
2022-01-27 12:04:03 +00:00
|
|
|
def moto_network_name():
|
|
|
|
return os.environ.get("MOTO_DOCKER_NETWORK_NAME")
|
|
|
|
|
|
|
|
|
|
|
|
def moto_network_mode():
|
|
|
|
return os.environ.get("MOTO_DOCKER_NETWORK_MODE")
|
2021-11-03 21:00:42 +00:00
|
|
|
|
|
|
|
|
2022-01-27 19:34:49 +00:00
|
|
|
def test_server_mode_endpoint():
|
|
|
|
return os.environ.get(
|
|
|
|
"TEST_SERVER_MODE_ENDPOINT", f"http://localhost:{moto_server_port()}"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-11-03 21:00:42 +00:00
|
|
|
def is_docker():
|
|
|
|
path = "/proc/self/cgroup"
|
|
|
|
return (
|
|
|
|
os.path.exists("/.dockerenv")
|
|
|
|
or os.path.isfile(path)
|
|
|
|
and any("docker" in line for line in open(path))
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def get_docker_host():
|
|
|
|
try:
|
|
|
|
cmd = "curl -s --unix-socket /run/docker.sock http://docker/containers/$HOSTNAME/json"
|
|
|
|
container_info = os.popen(cmd).read()
|
2022-01-27 12:04:03 +00:00
|
|
|
network_settings = json.loads(container_info)["NetworkSettings"]
|
|
|
|
network_name = moto_network_name()
|
|
|
|
if network_name and network_name in network_settings["Networks"]:
|
|
|
|
_ip = network_settings["Networks"][network_name]["IPAddress"]
|
|
|
|
else:
|
|
|
|
_ip = network_settings["IPAddress"]
|
|
|
|
if network_name:
|
|
|
|
print(
|
|
|
|
f"WARNING - Moto couldn't find network '{network_name}' - defaulting to {_ip}"
|
|
|
|
)
|
2021-11-03 21:00:42 +00:00
|
|
|
return f"http://{_ip}"
|
2022-01-27 12:04:03 +00:00
|
|
|
except Exception as e: # noqa
|
|
|
|
print(
|
|
|
|
"WARNING - Unable to parse Docker API response. Defaulting to 'host.docker.internal'"
|
|
|
|
)
|
|
|
|
print(f"{type(e)}::{e}")
|
2021-11-03 21:00:42 +00:00
|
|
|
return "http://host.docker.internal"
|