moto/tests/test_kinesisvideo/test_kinesisvideo.py
Toshiya Kawasaki 25161c0c18
Add kinesisvideo (#3271)
* kinesisvideo create_stream

* add kinesis video stream description

* add kinesisvideo describe_stream

* add kinesisvideo list_streams

* add kinesisvideo delete_stream

* remove unused comment

* remove duplicated definition

* add kinesis video exceptions

* pass region_name to kinesisvideo client in test

* fix kinesisvideo url path

* resolve conflict of kinesisvideo url and kinesis url

* specify region name to kinesisvideobackend

* Add get-dataendpoint to kinesisvideo

* include stream name in ResourceInUseException of kinesisvideo

* use ACCOUNT_ID from moto.core in kinesisvideo

* add server test for kinesisvideo

* split up kinesisvideo test
2020-09-02 08:51:51 +01:00

141 lines
4.8 KiB
Python

from __future__ import unicode_literals
import boto3
import sure # noqa
from nose.tools import assert_raises
from moto import mock_kinesisvideo
from botocore.exceptions import ClientError
import json
@mock_kinesisvideo
def test_create_stream():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
device_name = "random-device"
# stream can be created
res = client.create_stream(StreamName=stream_name, DeviceName=device_name)
res.should.have.key("StreamARN").which.should.contain(stream_name)
@mock_kinesisvideo
def test_create_stream_with_same_name():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
device_name = "random-device"
client.create_stream(StreamName=stream_name, DeviceName=device_name)
# cannot create with same stream name
with assert_raises(ClientError):
client.create_stream(StreamName=stream_name, DeviceName=device_name)
@mock_kinesisvideo
def test_describe_stream():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
device_name = "random-device"
res = client.create_stream(StreamName=stream_name, DeviceName=device_name)
res.should.have.key("StreamARN").which.should.contain(stream_name)
stream_arn = res["StreamARN"]
# cannot create with existing stream name
with assert_raises(ClientError):
client.create_stream(StreamName=stream_name, DeviceName=device_name)
# stream can be described with name
res = client.describe_stream(StreamName=stream_name)
res.should.have.key("StreamInfo")
stream_info = res["StreamInfo"]
stream_info.should.have.key("StreamARN").which.should.contain(stream_name)
stream_info.should.have.key("StreamName").which.should.equal(stream_name)
stream_info.should.have.key("DeviceName").which.should.equal(device_name)
# stream can be described with arn
res = client.describe_stream(StreamARN=stream_arn)
res.should.have.key("StreamInfo")
stream_info = res["StreamInfo"]
stream_info.should.have.key("StreamARN").which.should.contain(stream_name)
stream_info.should.have.key("StreamName").which.should.equal(stream_name)
stream_info.should.have.key("DeviceName").which.should.equal(device_name)
@mock_kinesisvideo
def test_describe_stream_with_name_not_exist():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name_not_exist = "not-exist-stream"
# cannot describe with not exist stream name
with assert_raises(ClientError):
client.describe_stream(StreamName=stream_name_not_exist)
@mock_kinesisvideo
def test_list_streams():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
stream_name_2 = "my-stream-2"
device_name = "random-device"
client.create_stream(StreamName=stream_name, DeviceName=device_name)
client.create_stream(StreamName=stream_name_2, DeviceName=device_name)
# streams can be listed
res = client.list_streams()
res.should.have.key("StreamInfoList")
streams = res["StreamInfoList"]
streams.should.have.length_of(2)
@mock_kinesisvideo
def test_delete_stream():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
stream_name_2 = "my-stream-2"
device_name = "random-device"
client.create_stream(StreamName=stream_name, DeviceName=device_name)
res = client.create_stream(StreamName=stream_name_2, DeviceName=device_name)
stream_2_arn = res["StreamARN"]
# stream can be deleted
client.delete_stream(StreamARN=stream_2_arn)
res = client.list_streams()
streams = res["StreamInfoList"]
streams.should.have.length_of(1)
@mock_kinesisvideo
def test_delete_stream_with_arn_not_exist():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
stream_name_2 = "my-stream-2"
device_name = "random-device"
client.create_stream(StreamName=stream_name, DeviceName=device_name)
res = client.create_stream(StreamName=stream_name_2, DeviceName=device_name)
stream_2_arn = res["StreamARN"]
client.delete_stream(StreamARN=stream_2_arn)
# cannot delete with not exist stream
stream_arn_not_exist = stream_2_arn
with assert_raises(ClientError):
client.delete_stream(StreamARN=stream_arn_not_exist)
@mock_kinesisvideo
def test_data_endpoint():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
stream_name = "my-stream"
device_name = "random-device"
# data-endpoint can be created
api_name = "GET_MEDIA"
client.create_stream(StreamName=stream_name, DeviceName=device_name)
res = client.get_data_endpoint(StreamName=stream_name, APIName=api_name)
res.should.have.key("DataEndpoint")