83 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			83 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import boto3
 | |
| import sure  # noqa # pylint: disable=unused-import
 | |
| from moto import mock_kinesisvideoarchivedmedia
 | |
| from moto import mock_kinesisvideo
 | |
| from datetime import datetime, timedelta
 | |
| 
 | |
| 
 | |
| @mock_kinesisvideo
 | |
| @mock_kinesisvideoarchivedmedia
 | |
| def test_get_hls_streaming_session_url():
 | |
|     region_name = "ap-northeast-1"
 | |
|     kvs_client = boto3.client("kinesisvideo", region_name=region_name)
 | |
|     stream_name = "my-stream"
 | |
|     kvs_client.create_stream(StreamName=stream_name)
 | |
| 
 | |
|     api_name = "GET_HLS_STREAMING_SESSION_URL"
 | |
|     res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
 | |
|     data_endpoint = res["DataEndpoint"]
 | |
| 
 | |
|     client = boto3.client(
 | |
|         "kinesis-video-archived-media",
 | |
|         region_name=region_name,
 | |
|         endpoint_url=data_endpoint,
 | |
|     )
 | |
|     res = client.get_hls_streaming_session_url(StreamName=stream_name)
 | |
|     reg_exp = rf"^{data_endpoint}/hls/v1/getHLSMasterPlaylist.m3u8\?SessionToken\=.+$"
 | |
|     res.should.have.key("HLSStreamingSessionURL").which.should.match(reg_exp)
 | |
| 
 | |
| 
 | |
| @mock_kinesisvideo
 | |
| @mock_kinesisvideoarchivedmedia
 | |
| def test_get_dash_streaming_session_url():
 | |
|     region_name = "ap-northeast-1"
 | |
|     kvs_client = boto3.client("kinesisvideo", region_name=region_name)
 | |
|     stream_name = "my-stream"
 | |
|     kvs_client.create_stream(StreamName=stream_name)
 | |
| 
 | |
|     api_name = "GET_DASH_STREAMING_SESSION_URL"
 | |
|     res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
 | |
|     data_endpoint = res["DataEndpoint"]
 | |
| 
 | |
|     client = boto3.client(
 | |
|         "kinesis-video-archived-media",
 | |
|         region_name=region_name,
 | |
|         endpoint_url=data_endpoint,
 | |
|     )
 | |
|     res = client.get_dash_streaming_session_url(StreamName=stream_name)
 | |
|     reg_exp = rf"^{data_endpoint}/dash/v1/getDASHManifest.mpd\?SessionToken\=.+$"
 | |
|     res.should.have.key("DASHStreamingSessionURL").which.should.match(reg_exp)
 | |
| 
 | |
| 
 | |
| @mock_kinesisvideo
 | |
| @mock_kinesisvideoarchivedmedia
 | |
| def test_get_clip():
 | |
|     region_name = "ap-northeast-1"
 | |
|     kvs_client = boto3.client("kinesisvideo", region_name=region_name)
 | |
|     stream_name = "my-stream"
 | |
|     kvs_client.create_stream(StreamName=stream_name)
 | |
| 
 | |
|     api_name = "GET_DASH_STREAMING_SESSION_URL"
 | |
|     res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
 | |
|     data_endpoint = res["DataEndpoint"]
 | |
| 
 | |
|     client = boto3.client(
 | |
|         "kinesis-video-archived-media",
 | |
|         region_name=region_name,
 | |
|         endpoint_url=data_endpoint,
 | |
|     )
 | |
|     end_timestamp = datetime.utcnow() - timedelta(hours=1)
 | |
|     start_timestamp = end_timestamp - timedelta(minutes=5)
 | |
|     res = client.get_clip(
 | |
|         StreamName=stream_name,
 | |
|         ClipFragmentSelector={
 | |
|             "FragmentSelectorType": "PRODUCER_TIMESTAMP",
 | |
|             "TimestampRange": {
 | |
|                 "StartTimestamp": start_timestamp,
 | |
|                 "EndTimestamp": end_timestamp,
 | |
|             },
 | |
|         },
 | |
|     )
 | |
|     res.should.have.key("ContentType").which.should.match("video/mp4")
 | |
|     res.should.have.key("Payload")
 |