moto/tests/test_kinesisvideoarchivedmedia/test_kinesisvideoarchivedmedia.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

86 lines
2.7 KiB
Python
Raw Normal View History

from datetime import timedelta
import boto3
from moto import mock_kinesisvideo, mock_kinesisvideoarchivedmedia
from moto.core.utils import utcnow
@mock_kinesisvideo
@mock_kinesisvideoarchivedmedia
def test_get_hls_streaming_session_url():
region_name = "ap-northeast-1"
kvs_client = boto3.client("kinesisvideo", region_name=region_name)
stream_name = "my-stream"
kvs_client.create_stream(StreamName=stream_name)
api_name = "GET_HLS_STREAMING_SESSION_URL"
res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
data_endpoint = res["DataEndpoint"]
client = boto3.client(
"kinesis-video-archived-media",
region_name=region_name,
endpoint_url=data_endpoint,
)
res = client.get_hls_streaming_session_url(StreamName=stream_name)
assert res["HLSStreamingSessionURL"].startswith(
f"{data_endpoint}/hls/v1/getHLSMasterPlaylist.m3u8?SessionToken="
)
@mock_kinesisvideo
@mock_kinesisvideoarchivedmedia
def test_get_dash_streaming_session_url():
region_name = "ap-northeast-1"
kvs_client = boto3.client("kinesisvideo", region_name=region_name)
stream_name = "my-stream"
kvs_client.create_stream(StreamName=stream_name)
api_name = "GET_DASH_STREAMING_SESSION_URL"
res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
data_endpoint = res["DataEndpoint"]
client = boto3.client(
"kinesis-video-archived-media",
region_name=region_name,
endpoint_url=data_endpoint,
)
res = client.get_dash_streaming_session_url(StreamName=stream_name)
assert res["DASHStreamingSessionURL"].startswith(
f"{data_endpoint}/dash/v1/getDASHManifest.mpd?SessionToken="
)
@mock_kinesisvideo
@mock_kinesisvideoarchivedmedia
def test_get_clip():
region_name = "ap-northeast-1"
kvs_client = boto3.client("kinesisvideo", region_name=region_name)
stream_name = "my-stream"
kvs_client.create_stream(StreamName=stream_name)
api_name = "GET_DASH_STREAMING_SESSION_URL"
res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
data_endpoint = res["DataEndpoint"]
client = boto3.client(
"kinesis-video-archived-media",
region_name=region_name,
endpoint_url=data_endpoint,
)
end_timestamp = utcnow() - timedelta(hours=1)
start_timestamp = end_timestamp - timedelta(minutes=5)
res = client.get_clip(
StreamName=stream_name,
ClipFragmentSelector={
"FragmentSelectorType": "PRODUCER_TIMESTAMP",
"TimestampRange": {
"StartTimestamp": start_timestamp,
"EndTimestamp": end_timestamp,
},
},
)
assert res["ContentType"] == "video/mp4"
assert "Payload" in res