moto/tests/test_mediastore/test_mediastore.py
Franck Ndame d9177f382e
Implementation of core AWS Mediastore endpoints (#3825)
* write boilerplate code

* generate boilerplate code with scaffold script

* create mediapackage channel

* remove duplicate mediapackage reference

* remove Channel key from mediapackage response

* describe_channel endpoint added

* create_origin_endpoint-added

* keys changed to camel case to fix issue

* minor changes to clean up

* minor clean up again

* implement & test delete_channel

* delete origin endpoint created; WIP-tests failing

* fix delete_origin_endpoint issue

* refactor function call

* delete origin endpoint completed; test_server tests added

* implement and test describe_origin_endpoint

* update origin endpoint added

* remove print statements

* implement test_list_origin_endpoint_succeeds

* create test name changed

* create test name changed

* changes after flake8 and black run

* url assertion added to decribe origin endpoint test

* region dependent url enabled

* initial commit; WIP

* create container added, still WIP

* create_container working

* test_create_channel_succeeds

* test_describe_container_succeeds

* get_lifecycle_policy added; error tests added

* changes to pass linting

* added exception for container but no policy

* linting

* put_container_policy added

* put_metric_policy added

* list_containers added

* resolved linting

* test_describe_container_raises_error_if_container_does_not_exist

* added __init__ file

* __init__ added to mediapackage as well

* Mediastore (#20)

* initial commit; WIP

* create container added, still WIP

* create_container working

* test_create_channel_succeeds

* test_describe_container_succeeds

* get_lifecycle_policy added; error tests added

* changes to pass linting

* added exception for container but no policy

* linting

* put_container_policy added

* put_metric_policy added

* list_containers added

* resolved linting

* test_describe_container_raises_error_if_container_does_not_exist

* added __init__ file

* __init__ added to mediapackage as well

Co-authored-by: FranckNdame <franck.mpouli@yahoo.com>

* test_server fixed; resolved rebasing mix ups on tests

* [FIX] Ensures MediaConnect create_flow sets a valid sourceArn

* code clean up

Co-authored-by: Anya <anya.champaneria@capablue.com>
Co-authored-by: AnyaChamp <71766808+AnyaChamp@users.noreply.github.com>
2021-04-08 16:51:50 +01:00

195 lines
7.7 KiB
Python

from __future__ import unicode_literals
import boto3
import sure # noqa
import pytest
from moto import mock_mediastore
from botocore.exceptions import ClientError
region = "eu-west-1"
@mock_mediastore
def test_create_container_succeeds():
client = boto3.client("mediastore", region_name=region)
response = client.create_container(
ContainerName="Awesome container!", Tags=[{"Key": "customer"}]
)
container = response["Container"]
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
container["ARN"].should.equal(
"arn:aws:mediastore:container:{}".format(container["Name"])
)
container["Name"].should.equal("Awesome container!")
container["Status"].should.equal("CREATING")
@mock_mediastore
def test_describe_container_succeeds():
client = boto3.client("mediastore", region_name=region)
create_response = client.create_container(
ContainerName="Awesome container!", Tags=[{"Key": "customer"}]
)
container_name = create_response["Container"]["Name"]
response = client.describe_container(ContainerName=container_name)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
container = response["Container"]
container["ARN"].should.equal(
"arn:aws:mediastore:container:{}".format(container["Name"])
)
container["Name"].should.equal("Awesome container!")
container["Status"].should.equal("ACTIVE")
@mock_mediastore
def test_list_containers_succeeds():
client = boto3.client("mediastore", region_name=region)
client.create_container(
ContainerName="Awesome container!", Tags=[{"Key": "customer"}]
)
list_response = client.list_containers(NextToken="next-token", MaxResults=123)
containers_list = list_response["Containers"]
len(containers_list).should.equal(1)
client.create_container(
ContainerName="Awesome container2!", Tags=[{"Key": "customer"}]
)
list_response = client.list_containers(NextToken="next-token", MaxResults=123)
containers_list = list_response["Containers"]
len(containers_list).should.equal(2)
@mock_mediastore
def test_describe_container_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.describe_container(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_put_lifecycle_policy_succeeds():
client = boto3.client("mediastore", region_name=region)
container_response = client.create_container(
ContainerName="container-name", Tags=[{"Key": "customer"}]
)
container = container_response["Container"]
client.put_lifecycle_policy(
ContainerName=container["Name"], LifecyclePolicy="lifecycle-policy"
)
response = client.get_lifecycle_policy(ContainerName=container["Name"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["LifecyclePolicy"].should.equal("lifecycle-policy")
@mock_mediastore
def test_put_lifecycle_policy_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.put_lifecycle_policy(
ContainerName="container-name", LifecyclePolicy="lifecycle-policy"
)
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_get_lifecycle_policy_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.get_lifecycle_policy(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_get_lifecycle_policy_raises_error_if_container_does_not_have_lifecycle_policy():
client = boto3.client("mediastore", region_name=region)
client.create_container(ContainerName="container-name", Tags=[{"Key": "customer"}])
with pytest.raises(ClientError) as ex:
client.get_lifecycle_policy(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("PolicyNotFoundException")
@mock_mediastore
def test_put_container_policy_succeeds():
client = boto3.client("mediastore", region_name=region)
container_response = client.create_container(
ContainerName="container-name", Tags=[{"Key": "customer"}]
)
container = container_response["Container"]
response = client.put_container_policy(
ContainerName=container["Name"], Policy="container-policy"
)
response = client.get_container_policy(ContainerName=container["Name"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["Policy"].should.equal("container-policy")
@mock_mediastore
def test_put_container_policy_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.put_container_policy(
ContainerName="container-name", Policy="container-policy"
)
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_get_container_policy_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.get_container_policy(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_get_container_policy_raises_error_if_container_does_not_have_container_policy():
client = boto3.client("mediastore", region_name=region)
client.create_container(ContainerName="container-name", Tags=[{"Key": "customer"}])
with pytest.raises(ClientError) as ex:
client.get_container_policy(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("PolicyNotFoundException")
@mock_mediastore
def test_put_metric_policy_succeeds():
client = boto3.client("mediastore", region_name=region)
container_response = client.create_container(
ContainerName="container-name", Tags=[{"Key": "customer"}]
)
container = container_response["Container"]
response = client.put_metric_policy(
ContainerName=container["Name"],
MetricPolicy={"ContainerLevelMetrics": "ENABLED"},
)
response = client.get_metric_policy(ContainerName=container["Name"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["MetricPolicy"].should.equal({"ContainerLevelMetrics": "ENABLED"})
@mock_mediastore
def test_put_metric_policy_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.put_metric_policy(
ContainerName="container-name",
MetricPolicy={"ContainerLevelMetrics": "ENABLED"},
)
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_get_metric_policy_raises_error_if_container_does_not_exist():
client = boto3.client("mediastore", region_name=region)
with pytest.raises(ClientError) as ex:
client.get_metric_policy(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_mediastore
def test_get_metric_policy_raises_error_if_container_does_not_have_metric_policy():
client = boto3.client("mediastore", region_name=region)
client.create_container(ContainerName="container-name", Tags=[{"Key": "customer"}])
with pytest.raises(ClientError) as ex:
client.get_metric_policy(ContainerName="container-name")
ex.value.response["Error"]["Code"].should.equal("PolicyNotFoundException")