Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

49 lines
1.5 KiB
Python
Raw Normal View History

"""ApiGatewayManagementApiBackend class with methods for supported APIs."""
from collections import defaultdict
from typing import Any, Dict
from moto.core import BaseBackend, BackendDict
from moto.core.utils import unix_time
class Connection:
def __init__(self) -> None:
self.connected_at = unix_time()
self.source_ip = "192.168.0.1"
self.user_agent = "Moto Mocks"
self.data = b""
def to_dict(self) -> Dict[str, Any]:
return {
"connectedAt": self.connected_at,
"lastActiveAt": unix_time(),
"identity": {
"sourceIp": self.source_ip,
"userAgent": self.user_agent,
},
}
class ApiGatewayManagementApiBackend(BaseBackend):
"""
Connecting to this API in ServerMode/Docker requires Python >= 3.8 and an up-to-date `werkzeug` version (>=2.3.x)
"""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.connections: Dict[str, Connection] = defaultdict(Connection)
def delete_connection(self, connection_id: str) -> None:
self.connections.pop(connection_id, None)
def get_connection(self, connection_id: str) -> Connection:
return self.connections[connection_id]
def post_to_connection(self, data: bytes, connection_id: str) -> None:
cnctn = self.get_connection(connection_id)
cnctn.data += data
apigatewaymanagementapi_backends = BackendDict(
ApiGatewayManagementApiBackend, "apigateway"
)