moto/tests/test_kinesisvideoarchivedmedia/test_kinesisvideoarchivedmedia.py
Toshiya Kawasaki c66812edba
Add kinesisvideo archived media (#3280)
* add get_hls_streaming_session_url

* add get_dash_streaming_session_url

* add get_clip

* add test_server for kinesisvideo archived media

* fix for lint

* fix for lint

* avoid testing kinesisvideoarchivedmedia with TEST_SERVER_MODE=true
2020-09-04 12:14:48 +01:00

87 lines
2.9 KiB
Python

from __future__ import unicode_literals
import boto3
import sure # noqa
from moto import mock_kinesisvideoarchivedmedia
from moto import mock_kinesisvideo
from datetime import datetime, timedelta
@mock_kinesisvideo
@mock_kinesisvideoarchivedmedia
def test_get_hls_streaming_session_url():
region_name = "ap-northeast-1"
kvs_client = boto3.client("kinesisvideo", region_name=region_name)
stream_name = "my-stream"
kvs_client.create_stream(StreamName=stream_name)
api_name = "GET_HLS_STREAMING_SESSION_URL"
res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
data_endpoint = res["DataEndpoint"]
client = boto3.client(
"kinesis-video-archived-media",
region_name=region_name,
endpoint_url=data_endpoint,
)
res = client.get_hls_streaming_session_url(StreamName=stream_name,)
reg_exp = "^{}/hls/v1/getHLSMasterPlaylist.m3u8\?SessionToken\=.+$".format(
data_endpoint
)
res.should.have.key("HLSStreamingSessionURL").which.should.match(reg_exp)
@mock_kinesisvideo
@mock_kinesisvideoarchivedmedia
def test_get_dash_streaming_session_url():
region_name = "ap-northeast-1"
kvs_client = boto3.client("kinesisvideo", region_name=region_name)
stream_name = "my-stream"
kvs_client.create_stream(StreamName=stream_name)
api_name = "GET_DASH_STREAMING_SESSION_URL"
res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
data_endpoint = res["DataEndpoint"]
client = boto3.client(
"kinesis-video-archived-media",
region_name=region_name,
endpoint_url=data_endpoint,
)
res = client.get_dash_streaming_session_url(StreamName=stream_name,)
reg_exp = "^{}/dash/v1/getDASHManifest.mpd\?SessionToken\=.+$".format(data_endpoint)
res.should.have.key("DASHStreamingSessionURL").which.should.match(reg_exp)
@mock_kinesisvideo
@mock_kinesisvideoarchivedmedia
def test_get_clip():
region_name = "ap-northeast-1"
kvs_client = boto3.client("kinesisvideo", region_name=region_name)
stream_name = "my-stream"
kvs_client.create_stream(StreamName=stream_name)
api_name = "GET_DASH_STREAMING_SESSION_URL"
res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
data_endpoint = res["DataEndpoint"]
client = boto3.client(
"kinesis-video-archived-media",
region_name=region_name,
endpoint_url=data_endpoint,
)
end_timestamp = datetime.utcnow() - timedelta(hours=1)
start_timestamp = end_timestamp - timedelta(minutes=5)
res = client.get_clip(
StreamName=stream_name,
ClipFragmentSelector={
"FragmentSelectorType": "PRODUCER_TIMESTAMP",
"TimestampRange": {
"StartTimestamp": start_timestamp,
"EndTimestamp": end_timestamp,
},
},
)
res.should.have.key("ContentType").which.should.match("video/mp4")
res.should.have.key("Payload")