MediaConnect: add_flow_sources and update_flow_source (#5322)

* Implement mediaconnect

Implement add_flow_sources and update_flow_source.

* MediaConnect - add missing URLs

Co-authored-by: Bert Blommers <info@bertblommers.nl>
This commit is contained in:
donfiguerres 2022-08-05 02:20:22 +08:00 committed by GitHub
parent 12d58bbf29
commit 9629f09af6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 209 additions and 3 deletions

View File

@ -3843,11 +3843,11 @@
## mediaconnect
<details>
<summary>40% implemented</summary>
<summary>46% implemented</summary>
- [ ] add_flow_media_streams
- [X] add_flow_outputs
- [ ] add_flow_sources
- [x] add_flow_sources
- [X] add_flow_vpc_interfaces
- [X] create_flow
- [X] delete_flow
@ -3874,7 +3874,7 @@
- [ ] update_flow_entitlement
- [ ] update_flow_media_stream
- [ ] update_flow_output
- [ ] update_flow_source
- [x] update_flow_source
</details>
## medialive

View File

@ -241,6 +241,71 @@ class MediaConnectBackend(BaseBackend):
)
return flow_arn, output_name
def add_flow_sources(self, flow_arn, sources):
if flow_arn not in self._flows:
raise NotFoundException(
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn]
for source in sources:
source_id = uuid4().hex
name = source["name"]
arn = f"arn:aws:mediaconnect:{self.region_name}:{get_account_id()}:source:{source_id}:{name}"
source["sourceArn"] = arn
flow.sources = sources
return flow_arn, sources
def update_flow_source(
self,
flow_arn,
source_arn,
decryption,
description,
entitlement_arn,
ingest_port,
max_bitrate,
max_latency,
max_sync_buffer,
media_stream_source_configurations,
min_latency,
protocol,
sender_control_port,
sender_ip_address,
stream_id,
vpc_interface_name,
whitelist_cidr,
):
if flow_arn not in self._flows:
raise NotFoundException(
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn]
source = next(
iter(
[source for source in flow.sources if source["sourceArn"] == source_arn]
),
{},
)
if source:
source["decryption"] = decryption
source["description"] = description
source["entitlementArn"] = entitlement_arn
source["ingestPort"] = ingest_port
source["maxBitrate"] = max_bitrate
source["maxLatency"] = max_latency
source["maxSyncBuffer"] = max_sync_buffer
source[
"mediaStreamSourceConfigurations"
] = media_stream_source_configurations
source["minLatency"] = min_latency
source["protocol"] = protocol
source["senderControlPort"] = sender_control_port
source["senderIpAddress"] = sender_ip_address
source["streamId"] = stream_id
source["vpcInterfaceName"] = vpc_interface_name
source["whitelistCidr"] = whitelist_cidr
return flow_arn, source
# add methods from here

View File

@ -111,3 +111,52 @@ class MediaConnectResponse(BaseResponse):
flow_arn=flow_arn, output_name=output_name
)
return json.dumps(dict(flow_arn=flow_arn, output_name=output_name))
def add_flow_sources(self):
flow_arn = unquote(self._get_param("flowArn"))
sources = self._get_param("sources")
flow_arn, sources = self.mediaconnect_backend.add_flow_sources(
flow_arn=flow_arn, sources=sources
)
return json.dumps(dict(flow_arn=flow_arn, sources=sources))
def update_flow_source(self):
flow_arn = unquote(self._get_param("flowArn"))
source_arn = unquote(self._get_param("sourceArn"))
description = self._get_param("description")
decryption = self._get_param("decryption")
entitlement_arn = self._get_param("entitlementArn")
ingest_port = self._get_param("ingestPort")
max_bitrate = self._get_param("maxBitrate")
max_latency = self._get_param("maxLatency")
max_sync_buffer = self._get_param("maxSyncbuffer")
media_stream_source_configurations = self._get_param(
"mediaStreamSourceConfigurations"
)
min_latency = self._get_param("minLatency")
protocol = self._get_param("protocol")
sender_control_port = self._get_param("senderControlPort")
sender_ip_address = self._get_param("senderIpAddress")
stream_id = self._get_param("streamId")
vpc_interface_name = self._get_param("vpcInterfaceName")
whitelist_cidr = self._get_param("whitelistCidr")
flow_arn, source = self.mediaconnect_backend.update_flow_source(
flow_arn=flow_arn,
source_arn=source_arn,
decryption=decryption,
description=description,
entitlement_arn=entitlement_arn,
ingest_port=ingest_port,
max_bitrate=max_bitrate,
max_latency=max_latency,
max_sync_buffer=max_sync_buffer,
media_stream_source_configurations=media_stream_source_configurations,
min_latency=min_latency,
protocol=protocol,
sender_control_port=sender_control_port,
sender_ip_address=sender_ip_address,
stream_id=stream_id,
vpc_interface_name=vpc_interface_name,
whitelist_cidr=whitelist_cidr,
)
return json.dumps(dict(flow_arn=flow_arn, source=source))

View File

@ -13,6 +13,8 @@ url_paths = {
"{0}/v1/flows/(?P<flowarn>[^/.]+)": response.dispatch,
"{0}/v1/flows/(?P<flowarn>[^/.]+)/vpcInterfaces": response.dispatch,
"{0}/v1/flows/(?P<flowarn>[^/.]+)/vpcInterfaces/(?P<vpcinterfacename>[^/.]+)": response.dispatch,
"{0}/v1/flows/(?P<flowarn>[^/.]+)/source": response.dispatch,
"{0}/v1/flows/(?P<flowarn>[^/.]+)/source/(?P<sourcearn>[^/.]+)": response.dispatch,
"{0}/v1/flows/(?P<flowarn>[^/.]+)/outputs": response.dispatch,
"{0}/v1/flows/(?P<flowarn>[^/.]+)/outputs/(?P<outputarn>[^/.]+)": response.dispatch,
"{0}/v1/flows/start/(?P<flowarn>[^/.]+)": response.dispatch,

View File

@ -381,3 +381,93 @@ def test_remove_flow_output_succeeds():
describe_response = client.describe_flow(FlowArn=flow_arn)
len(describe_response["Flow"]["Outputs"]).should.equal(0)
@mock_mediaconnect
def test_add_flow_sources_fails():
client = boto3.client("mediaconnect", region_name=region)
flow_arn = "unknown-flow"
with pytest.raises(ClientError) as err:
client.add_flow_sources(FlowArn=flow_arn, Sources=[])
err = err.value.response["Error"]
err["Code"].should.equal("NotFoundException")
err["Message"].should.equal("flow with arn=unknown-flow not found")
@mock_mediaconnect
def test_add_flow_sources_succeeds():
client = boto3.client("mediaconnect", region_name=region)
channel_config = _create_flow_config("test-Flow-1")
create_response = client.create_flow(**channel_config)
create_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
create_response["Flow"]["Status"].should.equal("STANDBY")
flow_arn = create_response["Flow"]["FlowArn"]
client.add_flow_sources(
FlowArn=flow_arn,
Sources=[
{
"Description": "string",
"Name": "string",
"Protocol": "rist",
"SenderControlPort": 123,
}
],
)
describe_response = client.describe_flow(FlowArn=flow_arn)
describe_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(describe_response["Flow"]["Sources"]).should.equal(1)
@mock_mediaconnect
def test_update_flow_source_fails():
client = boto3.client("mediaconnect", region_name=region)
flow_arn = "unknown-flow"
source_arn = "unknown-source"
channel_config = _create_flow_config("test-Flow-1")
client.create_flow(**channel_config)
with pytest.raises(ClientError) as err:
client.update_flow_source(
FlowArn=flow_arn, SourceArn=source_arn, Description="new description"
)
err = err.value.response["Error"]
err["Code"].should.equal("NotFoundException")
err["Message"].should.equal("flow with arn=unknown-flow not found")
@mock_mediaconnect
def test_update_flow_source_succeeds():
client = boto3.client("mediaconnect", region_name=region)
channel_config = _create_flow_config("test-Flow-1")
create_response = client.create_flow(**channel_config)
create_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
create_response["Flow"]["Status"].should.equal("STANDBY")
flow_arn = create_response["Flow"]["FlowArn"]
add_response = client.add_flow_sources(
FlowArn=flow_arn,
Sources=[
{
"Description": "string",
"Name": "string",
"Protocol": "rist",
"SenderControlPort": 123,
}
],
)
describe_response = client.describe_flow(FlowArn=flow_arn)
describe_response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
len(describe_response["Flow"]["Sources"]).should.equal(1)
source_arn = add_response["Sources"][0]["SourceArn"]
update_response = client.update_flow_source(
FlowArn=flow_arn, SourceArn=source_arn, Description="new description"
)
update_response["Source"]["Description"].should.equal("new description")