moto/moto/kinesisvideo/responses.py
2023-03-23 09:17:40 -01:00

56 lines
2.2 KiB
Python

from moto.core.responses import BaseResponse
from .models import kinesisvideo_backends, KinesisVideoBackend
import json
class KinesisVideoResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="kinesisvideo")
@property
def kinesisvideo_backend(self) -> KinesisVideoBackend:
return kinesisvideo_backends[self.current_account][self.region]
def create_stream(self) -> str:
device_name = self._get_param("DeviceName")
stream_name = self._get_param("StreamName")
media_type = self._get_param("MediaType")
kms_key_id = self._get_param("KmsKeyId")
data_retention_in_hours = self._get_int_param("DataRetentionInHours")
tags = self._get_param("Tags")
stream_arn = self.kinesisvideo_backend.create_stream(
device_name=device_name,
stream_name=stream_name,
media_type=media_type,
kms_key_id=kms_key_id,
data_retention_in_hours=data_retention_in_hours,
tags=tags,
)
return json.dumps(dict(StreamARN=stream_arn))
def describe_stream(self) -> str:
stream_name = self._get_param("StreamName")
stream_arn = self._get_param("StreamARN")
stream_info = self.kinesisvideo_backend.describe_stream(
stream_name=stream_name, stream_arn=stream_arn
)
return json.dumps(dict(StreamInfo=stream_info))
def list_streams(self) -> str:
stream_info_list = self.kinesisvideo_backend.list_streams()
return json.dumps(dict(StreamInfoList=stream_info_list, NextToken=None))
def delete_stream(self) -> str:
stream_arn = self._get_param("StreamARN")
self.kinesisvideo_backend.delete_stream(stream_arn=stream_arn)
return json.dumps(dict())
def get_data_endpoint(self) -> str:
stream_name = self._get_param("StreamName")
stream_arn = self._get_param("StreamARN")
api_name = self._get_param("APIName")
data_endpoint = self.kinesisvideo_backend.get_data_endpoint(
stream_name=stream_name, stream_arn=stream_arn, api_name=api_name
)
return json.dumps(dict(DataEndpoint=data_endpoint))