Rudimentary support for IVS (#6894)

This commit is contained in:
Pepijn 2023-10-10 09:39:55 +02:00 committed by GitHub
parent b2455e4211
commit 5cd288b42c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 565 additions and 3 deletions

View File

@ -4105,6 +4105,42 @@
- [X] update_thing_shadow
</details>
## ivs
<details>
<summary>20% implemented</summary>
- [X] batch_get_channel
- [ ] batch_get_stream_key
- [ ] batch_start_viewer_session_revocation
- [X] create_channel
- [ ] create_recording_configuration
- [ ] create_stream_key
- [X] delete_channel
- [ ] delete_playback_key_pair
- [ ] delete_recording_configuration
- [ ] delete_stream_key
- [X] get_channel
- [ ] get_playback_key_pair
- [ ] get_recording_configuration
- [ ] get_stream
- [ ] get_stream_key
- [ ] get_stream_session
- [ ] import_playback_key_pair
- [X] list_channels
- [ ] list_playback_key_pairs
- [ ] list_recording_configurations
- [ ] list_stream_keys
- [ ] list_stream_sessions
- [ ] list_streams
- [ ] list_tags_for_resource
- [ ] put_metadata
- [ ] start_viewer_session_revocation
- [ ] stop_stream
- [ ] tag_resource
- [ ] untag_resource
- [X] update_channel
</details>
## kinesis
<details>
<summary>93% implemented</summary>
@ -7481,7 +7517,6 @@
- iotthingsgraph
- iottwinmaker
- iotwireless
- ivs
- ivs-realtime
- ivschat
- kafka

View File

@ -0,0 +1,60 @@
.. _implementedservice_ivs:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
===
ivs
===
.. autoclass:: moto.ivs.models.IVSBackend
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_ivs
def test_ivs_behaviour:
boto3.client("ivs")
...
|start-h3| Implemented features for this service |end-h3|
- [X] batch_get_channel
- [ ] batch_get_stream_key
- [ ] batch_start_viewer_session_revocation
- [X] create_channel
- [ ] create_recording_configuration
- [ ] create_stream_key
- [X] delete_channel
- [ ] delete_playback_key_pair
- [ ] delete_recording_configuration
- [ ] delete_stream_key
- [X] get_channel
- [ ] get_playback_key_pair
- [ ] get_recording_configuration
- [ ] get_stream
- [ ] get_stream_key
- [ ] get_stream_session
- [ ] import_playback_key_pair
- [X] list_channels
- [ ] list_playback_key_pairs
- [ ] list_recording_configurations
- [ ] list_stream_keys
- [ ] list_stream_sessions
- [ ] list_streams
- [ ] list_tags_for_resource
- [ ] put_metadata
- [ ] start_viewer_session_revocation
- [ ] stop_stream
- [ ] tag_resource
- [ ] untag_resource
- [X] update_channel

View File

@ -106,6 +106,7 @@ mock_iam = lazy_load(".iam", "mock_iam")
mock_identitystore = lazy_load(".identitystore", "mock_identitystore")
mock_iot = lazy_load(".iot", "mock_iot")
mock_iotdata = lazy_load(".iotdata", "mock_iotdata", boto3_name="iot-data")
mock_ivs = lazy_load(".ivs", "mock_ivs", boto3_name="ivs")
mock_kinesis = lazy_load(".kinesis", "mock_kinesis")
mock_kinesisvideo = lazy_load(".kinesisvideo", "mock_kinesisvideo")
mock_kinesisvideoarchivedmedia = lazy_load(

View File

@ -88,6 +88,7 @@ backend_url_patterns = [
("iot", re.compile("https?://iot\\.(.+)\\.amazonaws\\.com")),
("iot-data", re.compile("https?://data\\.iot\\.(.+)\\.amazonaws.com")),
("iot-data", re.compile("https?://data-ats\\.iot\\.(.+)\\.amazonaws.com")),
("ivs", re.compile("https?://ivs\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://kinesis\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://(.+)\\.control-kinesis\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://(.+)\\.data-kinesis\\.(.+)\\.amazonaws\\.com")),

5
moto/ivs/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""ivs module initialization; sets value for base decorator."""
from .models import ivs_backends
from ..core.models import base_decorator
mock_ivs = base_decorator(ivs_backends)

9
moto/ivs/exceptions.py Normal file
View File

@ -0,0 +1,9 @@
"""Exceptions raised by the ivs service."""
from moto.core.exceptions import JsonRESTError
class ResourceNotFoundException(JsonRESTError):
code = 404
def __init__(self, message: str):
super().__init__("ResourceNotFoundException", message)

134
moto/ivs/models.py Normal file
View File

@ -0,0 +1,134 @@
"""IVSBackend class with methods for supported APIs."""
from typing import Optional, Any, List, Dict, Tuple
from moto.core import BaseBackend, BackendDict
from moto.ivs.exceptions import ResourceNotFoundException
from moto.utilities.paginator import paginate
from moto.moto_api._internal import mock_random
class IVSBackend(BaseBackend):
"""Implementation of IVS APIs."""
PAGINATION_MODEL = {
"list_channels": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "arn",
},
}
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.channels: List[Dict[str, Any]] = []
def create_channel(
self,
authorized: bool,
insecure_ingest: bool,
latency_mode: str,
name: str,
preset: str,
recording_configuration_arn: str,
tags: Dict[str, str],
channel_type: str,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
channel_id = mock_random.get_random_string(12)
channel_arn = (
f"arn:aws:ivs:{self.region_name}:{self.account_id}:channel/{channel_id}"
)
channel = {
"arn": channel_arn,
"authorized": authorized,
"ingestEndpoint": "ingest.example.com",
"insecureIngest": insecure_ingest,
"latencyMode": latency_mode,
"name": name,
"playbackUrl": f"https://playback.example.com/{self.region_name}.{self.account_id}.{channel_id}.m3u8",
"preset": preset,
"recordingConfigurationArn": recording_configuration_arn,
"tags": tags,
"type": channel_type,
}
self.channels.append(channel)
stream_key_id = mock_random.get_random_string(12)
stream_key_arn = f"arn:aws:ivs:{self.region_name}:{self.account_id}:stream-key/{stream_key_id}"
stream_key = {
"arn": stream_key_arn,
"channelArn": channel_arn,
"tags": tags,
"value": f"sk_{self.region_name}_{mock_random.token_urlsafe(32)}",
}
return channel, stream_key
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc]
def list_channels(
self,
filter_by_name: Optional[str],
filter_by_recording_configuration_arn: Optional[str],
) -> List[Dict[str, Any]]:
if filter_by_name is not None:
channels = [
channel
for channel in self.channels
if channel["name"] == filter_by_name
]
elif filter_by_recording_configuration_arn is not None:
channels = [
channel
for channel in self.channels
if channel["recordingConfigurationArn"]
== filter_by_recording_configuration_arn
]
else:
channels = self.channels
return channels
def _find_channel(self, arn: str) -> Dict[str, Any]:
try:
return next(channel for channel in self.channels if channel["arn"] == arn)
except StopIteration:
raise ResourceNotFoundException(f"Resource: {arn} not found")
def get_channel(self, arn: str) -> Dict[str, Any]:
return self._find_channel(arn)
def batch_get_channel(
self, arns: List[str]
) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]:
return [channel for channel in self.channels if channel["arn"] in arns], []
def update_channel(
self,
arn: str,
authorized: Optional[bool],
insecure_ingest: Optional[bool],
latency_mode: Optional[str],
name: Optional[str],
preset: Optional[str],
recording_configuration_arn: Optional[str],
channel_type: Optional[str],
) -> Dict[str, Any]:
channel = self._find_channel(arn)
if authorized is not None:
channel["authorized"] = authorized
if insecure_ingest is not None:
channel["insecureIngest"] = insecure_ingest
if latency_mode is not None:
channel["latencyMode"] = latency_mode
if name is not None:
channel["name"] = name
if preset is not None:
channel["preset"] = preset
if recording_configuration_arn is not None:
channel["recordingConfigurationArn"] = recording_configuration_arn
if channel_type is not None:
channel["type"] = channel_type
return channel
def delete_channel(self, arn: str) -> None:
self._find_channel(arn)
self.channels = [channel for channel in self.channels if channel["arn"] != arn]
ivs_backends = BackendDict(IVSBackend, "ivs")

93
moto/ivs/responses.py Normal file
View File

@ -0,0 +1,93 @@
"""Handles incoming ivs requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import IVSBackend, ivs_backends
class IVSResponse(BaseResponse):
"""Handler for IVS requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="ivs")
@property
def ivs_backend(self) -> IVSBackend:
"""Return backend instance specific for this region."""
return ivs_backends[self.current_account][self.region]
def create_channel(self) -> str:
authorized = self._get_param("authorized", False)
insecure_ingest = self._get_param("insecureIngest", False)
latency_mode = self._get_param("latencyMode", "LOW")
name = self._get_param("name")
preset = self._get_param("preset", "")
recording_configuration_arn = self._get_param("recordingConfigurationArn", "")
tags = self._get_param("tags", {})
channel_type = self._get_param("type", "STANDARD")
channel, stream_key = self.ivs_backend.create_channel(
authorized=authorized,
insecure_ingest=insecure_ingest,
latency_mode=latency_mode,
name=name,
preset=preset,
recording_configuration_arn=recording_configuration_arn,
tags=tags,
channel_type=channel_type,
)
return json.dumps(dict(channel=channel, streamKey=stream_key))
def list_channels(self) -> str:
filter_by_name = self._get_param("filterByName")
filter_by_recording_configuration_arn = self._get_param(
"filterByRecordingConfigurationArn"
)
max_results = self._get_param("maxResults")
next_token = self._get_param("nextToken")
channels, next_token = self.ivs_backend.list_channels(
filter_by_name=filter_by_name,
filter_by_recording_configuration_arn=filter_by_recording_configuration_arn,
max_results=max_results,
next_token=next_token,
)
return json.dumps(dict(channels=channels, nextToken=next_token))
def get_channel(self) -> str:
arn = self._get_param("arn")
channel = self.ivs_backend.get_channel(
arn=arn,
)
return json.dumps(dict(channel=channel))
def batch_get_channel(self) -> str:
arns = self._get_param("arns")
channels, errors = self.ivs_backend.batch_get_channel(
arns=arns,
)
return json.dumps(dict(channels=channels, errors=errors))
def update_channel(self) -> str:
arn = self._get_param("arn")
authorized = self._get_param("authorized")
insecure_ingest = self._get_param("insecureIngest")
latency_mode = self._get_param("latencyMode")
name = self._get_param("name")
preset = self._get_param("preset")
recording_configuration_arn = self._get_param("recordingConfigurationArn")
channel_type = self._get_param("type")
channel = self.ivs_backend.update_channel(
arn=arn,
authorized=authorized,
insecure_ingest=insecure_ingest,
latency_mode=latency_mode,
name=name,
preset=preset,
recording_configuration_arn=recording_configuration_arn,
channel_type=channel_type,
)
return json.dumps(dict(channel=channel))
def delete_channel(self) -> None:
arn = self._get_param("arn")
self.ivs_backend.delete_channel(
arn=arn,
)

20
moto/ivs/urls.py Normal file
View File

@ -0,0 +1,20 @@
"""ivs base URL and path."""
from .responses import IVSResponse
url_bases = [
r"https?://ivs\.(.+)\.amazonaws\.com",
]
response = IVSResponse()
url_paths = {
"{0}/CreateChannel": response.dispatch,
"{0}/ListChannels": response.dispatch,
"{0}/GetChannel": response.dispatch,
"{0}/BatchGetChannel": response.dispatch,
"{0}/UpdateChannel": response.dispatch,
"{0}/DeleteChannel": response.dispatch,
}

View File

@ -1,6 +1,7 @@
from random import Random
import string
from uuid import UUID
from base64 import urlsafe_b64encode
HEX_CHARS = list(range(10)) + ["a", "b", "c", "d", "e", "f"]
@ -30,3 +31,8 @@ class MotoRandom(Random):
pool += string.digits
random_str = "".join([self.choice(pool) for i in range(length)])
return random_str.lower() if lower_case else random_str
# Based on https://github.com/python/cpython/blob/main/Lib/secrets.py
def token_urlsafe(self, nbytes: int) -> str:
randbytes = self.getrandbits(nbytes * 8).to_bytes(nbytes, "little")
return urlsafe_b64encode(randbytes).rstrip(b"=").decode("ascii")

View File

192
tests/test_ivs/test_ivs.py Normal file
View File

@ -0,0 +1,192 @@
"""Unit tests for ivs-supported APIs."""
import boto3
from botocore.exceptions import ClientError
from pytest import raises
from moto import mock_ivs
from re import fullmatch
@mock_ivs
def test_create_channel_with_name():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(name="foo")
assert create_response["channel"]["name"] == "foo"
@mock_ivs
def test_create_channel_defaults():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(name="foo")
assert create_response["channel"]["authorized"] is False
assert create_response["channel"]["insecureIngest"] is False
assert create_response["channel"]["latencyMode"] == "LOW"
assert create_response["channel"]["preset"] == ""
assert create_response["channel"]["recordingConfigurationArn"] == ""
assert create_response["channel"]["tags"] == {}
assert create_response["channel"]["type"] == "STANDARD"
assert create_response["streamKey"]["tags"] == {}
@mock_ivs
def test_create_channel_generated_values():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(name="foo")
assert fullmatch(r"arn:aws:ivs:.*:channel/.*", create_response["channel"]["arn"])
assert create_response["channel"]["ingestEndpoint"]
assert create_response["channel"]["playbackUrl"]
assert fullmatch(
r"arn:aws:ivs:.*:stream-key/.*", create_response["streamKey"]["arn"]
)
assert (
create_response["streamKey"]["channelArn"] == create_response["channel"]["arn"]
)
assert fullmatch(r"sk_.*", create_response["streamKey"]["value"])
@mock_ivs
def test_create_channel_with_name_and_recording_configuration():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(
name="foo",
recordingConfigurationArn="blah",
)
assert create_response["channel"]["name"] == "foo"
assert create_response["channel"]["recordingConfigurationArn"] == "blah"
@mock_ivs
def test_list_channels_empty():
client = boto3.client("ivs", region_name="eu-west-1")
list_response = client.list_channels()
assert list_response["channels"] == []
@mock_ivs
def test_list_channels_one():
client = boto3.client("ivs", region_name="eu-west-1")
client.create_channel(name="foo")
list_response = client.list_channels()
assert len(list_response["channels"]) == 1
assert list_response["channels"][0]["name"] == "foo"
@mock_ivs
def test_list_channels_two():
client = boto3.client("ivs", region_name="eu-west-1")
client.create_channel(name="foo")
client.create_channel(
name="bar",
recordingConfigurationArn="blah",
)
list_response = client.list_channels()
assert len(list_response["channels"]) == 2
assert list_response["channels"][0]["name"] == "foo"
assert list_response["channels"][1]["name"] == "bar"
@mock_ivs
def test_list_channels_by_name():
client = boto3.client("ivs", region_name="eu-west-1")
client.create_channel(name="foo")
client.create_channel(
name="bar",
recordingConfigurationArn="blah",
)
list_response = client.list_channels(filterByName="foo")
assert len(list_response["channels"]) == 1
assert list_response["channels"][0]["name"] == "foo"
@mock_ivs
def test_list_channels_by_recording_configuration():
client = boto3.client("ivs", region_name="eu-west-1")
client.create_channel(name="foo")
client.create_channel(
name="bar",
recordingConfigurationArn="blah",
)
list_response = client.list_channels(filterByRecordingConfigurationArn="blah")
assert len(list_response["channels"]) == 1
assert list_response["channels"][0]["name"] == "bar"
@mock_ivs
def test_list_channels_pagination():
client = boto3.client("ivs", region_name="eu-west-1")
client.create_channel(name="foo")
client.create_channel(name="bar")
first_list_response = client.list_channels(maxResults=1)
assert len(first_list_response["channels"]) == 1
assert "nextToken" in first_list_response
second_list_response = client.list_channels(
maxResults=1, nextToken=first_list_response["nextToken"]
)
assert len(second_list_response["channels"]) == 1
assert "nextToken" not in second_list_response
@mock_ivs
def test_get_channel_exists():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(name="foo")
get_response = client.get_channel(arn=create_response["channel"]["arn"])
assert get_response["channel"]["name"] == "foo"
@mock_ivs
def test_get_channel_not_exists():
client = boto3.client("ivs", region_name="eu-west-1")
with raises(ClientError) as exc:
client.get_channel(arn="nope")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_ivs
def test_batch_get_channel():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(name="foo")
batch_get_response = client.batch_get_channel(
arns=[create_response["channel"]["arn"]]
)
assert len(batch_get_response["channels"]) == 1
assert batch_get_response["channels"][0]["name"] == "foo"
@mock_ivs
def test_update_channel_exists():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(
name="foo",
recordingConfigurationArn="blah",
)
update_response = client.update_channel(
arn=create_response["channel"]["arn"],
name="bar",
)
assert update_response["channel"]["name"] == "bar"
assert update_response["channel"]["recordingConfigurationArn"] == "blah"
@mock_ivs
def test_update_channel_not_exists():
client = boto3.client("ivs", region_name="eu-west-1")
with raises(ClientError) as exc:
client.update_channel(arn="nope", name="bar")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_ivs
def test_delete_channel_exists():
client = boto3.client("ivs", region_name="eu-west-1")
create_response = client.create_channel(name="foo")
client.delete_channel(arn=create_response["channel"]["arn"])
list_response = client.list_channels()
assert list_response["channels"] == []
@mock_ivs
def test_delete_channel_not_exists():
client = boto3.client("ivs", region_name="eu-west-1")
with raises(ClientError) as exc:
client.delete_channel(arn="nope")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"

View File

@ -31,7 +31,13 @@ def test_semi_random_hex_strings():
# Ensure they are different
assert fixed_hex != random_hex
# Retrieving another 'fixed' UUID should not return a known UUID
second_hex = mock_random.uuid4()
# Retrieving another 'fixed' HEX should not return a known HEX
second_hex = mock_random.get_random_hex()
assert second_hex != random_hex
assert second_hex != fixed_hex
def test_semi_random_token_urlsafe():
mock_random.seed(42)
token = mock_random.token_urlsafe(32)
assert token == "nXmxo38xgBzRGmcG-0DWvVdSaEaQO7E-3lYkOenBuCM"