Feature: Elastic Transcoder (#3810)

This commit is contained in:
Bert Blommers 2021-08-28 07:30:40 +01:00 committed by GitHub
parent a6606cbf42
commit 6d0bcba791
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 731 additions and 6 deletions

View File

@ -4220,23 +4220,23 @@
## elastictranscoder
<details>
<summary>0% implemented</summary>
<summary>29% implemented</summary>
- [ ] cancel_job
- [ ] create_job
- [ ] create_pipeline
- [X] create_pipeline
- [ ] create_preset
- [ ] delete_pipeline
- [X] delete_pipeline
- [ ] delete_preset
- [ ] list_jobs_by_pipeline
- [ ] list_jobs_by_status
- [ ] list_pipelines
- [X] list_pipelines
- [ ] list_presets
- [ ] read_job
- [ ] read_pipeline
- [X] read_pipeline
- [ ] read_preset
- [ ] test_role
- [ ] update_pipeline
- [X] update_pipeline
- [ ] update_pipeline_notifications
- [ ] update_pipeline_status
</details>

View File

@ -58,6 +58,7 @@ mock_ecr = lazy_load(".ecr", "mock_ecr")
mock_ecr_deprecated = lazy_load(".ecr", "mock_ecr_deprecated")
mock_ecs = lazy_load(".ecs", "mock_ecs")
mock_ecs_deprecated = lazy_load(".ecs", "mock_ecs_deprecated")
mock_elastictranscoder = lazy_load(".elastictranscoder", "mock_elastictranscoder")
mock_elb = lazy_load(".elb", "mock_elb")
mock_elb_deprecated = lazy_load(".elb", "mock_elb_deprecated")
mock_elbv2 = lazy_load(".elbv2", "mock_elbv2")

View File

@ -30,6 +30,7 @@ BACKENDS = {
"ecr": ("ecr", "ecr_backends"),
"ecs": ("ecs", "ecs_backends"),
"elasticbeanstalk": ("elasticbeanstalk", "eb_backends"),
"elastictranscoder": ("elastictranscoder", "elastictranscoder_backends"),
"elb": ("elb", "elb_backends"),
"elbv2": ("elbv2", "elbv2_backends"),
"emr": ("emr", "emr_backends"),

View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import elastictranscoder_backends
from ..core.models import base_decorator
elastictranscoder_backend = elastictranscoder_backends["us-east-1"]
mock_elastictranscoder = base_decorator(elastictranscoder_backends)

View File

@ -0,0 +1,136 @@
from __future__ import unicode_literals
from boto3 import Session
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
import random
import string
class Pipeline(BaseModel):
def __init__(
self,
region,
name,
input_bucket,
output_bucket,
role,
content_config,
thumbnail_config,
):
a = "".join(random.choice(string.digits) for _ in range(13))
b = "".join(random.choice(string.ascii_lowercase) for _ in range(6))
self.id = "{}-{}".format(a, b)
self.name = name
self.arn = "arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
region, ACCOUNT_ID, self.id
)
self.status = "Active"
self.input_bucket = input_bucket
self.output_bucket = output_bucket or content_config["Bucket"]
self.role = role
self.content_config = content_config or {"Bucket": self.output_bucket}
if "Permissions" not in self.content_config:
self.content_config["Permissions"] = []
self.thumbnail_config = thumbnail_config or {"Bucket": self.output_bucket}
if "Permissions" not in self.thumbnail_config:
self.thumbnail_config["Permissions"] = []
def update(self, name, input_bucket, role):
if name:
self.name = name
if input_bucket:
self.input_bucket = input_bucket
if role:
self.role = role
def to_dict(self):
return {
"Id": self.id,
"Name": self.name,
"Arn": self.arn,
"Status": self.status,
"InputBucket": self.input_bucket,
"OutputBucket": self.output_bucket,
"Role": self.role,
"Notifications": {
"Progressing": "",
"Completed": "",
"Warning": "",
"Error": "",
},
"ContentConfig": self.content_config,
"ThumbnailConfig": self.thumbnail_config,
}
class ElasticTranscoderBackend(BaseBackend):
def __init__(self, region_name=None):
super(ElasticTranscoderBackend, self).__init__()
self.region_name = region_name
self.pipelines = {}
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_pipeline(
self,
name,
input_bucket,
output_bucket,
role,
aws_kms_key_arn,
notifications,
content_config,
thumbnail_config,
):
pipeline = Pipeline(
self.region_name,
name,
input_bucket,
output_bucket,
role,
content_config,
thumbnail_config,
)
self.pipelines[pipeline.id] = pipeline
warnings = []
return pipeline, warnings
def list_pipelines(self):
return [p.to_dict() for _, p in self.pipelines.items()]
def read_pipeline(self, id):
return self.pipelines[id]
def update_pipeline(
self,
id,
name,
input_bucket,
role,
aws_kms_key_arn,
notifications,
content_config,
thumbnail_config,
):
pipeline = self.read_pipeline(id)
pipeline.update(name, input_bucket, role)
warnings = []
return pipeline, warnings
def delete_pipeline(self, pipeline_id):
self.pipelines.pop(pipeline_id)
elastictranscoder_backends = {}
for region in Session().get_available_regions("elastictranscoder"):
elastictranscoder_backends[region] = ElasticTranscoderBackend(region)
for region in Session().get_available_regions(
"elastictranscoder", partition_name="aws-us-gov"
):
elastictranscoder_backends[region] = ElasticTranscoderBackend(region)
for region in Session().get_available_regions(
"elastictranscoder", partition_name="aws-cn"
):
elastictranscoder_backends[region] = ElasticTranscoderBackend(region)

View File

@ -0,0 +1,147 @@
from __future__ import unicode_literals
from moto.core import ACCOUNT_ID
from moto.core.responses import BaseResponse
from .models import elastictranscoder_backends
import json
import re
class ElasticTranscoderResponse(BaseResponse):
SERVICE_NAME = "elastictranscoder"
@property
def elastictranscoder_backend(self):
return elastictranscoder_backends[self.region]
def pipelines(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_pipeline()
elif request.method == "GET":
return self.list_pipelines()
else:
return self.update_pipeline()
def individual_pipeline(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.read_pipeline()
elif request.method == "DELETE":
return self.delete_pipeline()
else:
return self.update_pipeline()
def create_pipeline(self):
name = self._get_param("Name")
input_bucket = self._get_param("InputBucket")
output_bucket = self._get_param("OutputBucket")
role = self._get_param("Role")
aws_kms_key_arn = self._get_param("AwsKmsKeyArn")
notifications = self._get_param("Notifications")
content_config = self._get_param("ContentConfig")
thumbnail_config = self._get_param("ThumbnailConfig")
if not role:
return self.err("Role cannot be blank")
if not re.match("^arn:aws:iam::[0-9]+:role/.+", role):
return self.err("Role ARN is invalid: {}".format(role))
if not output_bucket and not content_config:
return self.err(
"[OutputBucket and ContentConfig:Bucket are not allowed to both be null.]"
)
if output_bucket and content_config:
return self.err("[OutputBucket and ContentConfig are mutually exclusive.]")
if content_config and not thumbnail_config:
return self.err(
"[ThumbnailConfig:Bucket is not allowed to be null if ContentConfig is specified.]"
)
pipeline, warnings = self.elastictranscoder_backend.create_pipeline(
name=name,
input_bucket=input_bucket,
output_bucket=output_bucket,
role=role,
aws_kms_key_arn=aws_kms_key_arn,
notifications=notifications,
content_config=content_config,
thumbnail_config=thumbnail_config,
)
return (
201,
{"status": 201},
json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}),
)
def list_pipelines(self):
return (
200,
{},
json.dumps({"Pipelines": self.elastictranscoder_backend.list_pipelines()}),
)
def validate_pipeline_id(self, pipeline_id):
r = "^\\d{13}-\\w{6}$"
if not re.match(r, pipeline_id):
msg = "1 validation error detected: Value '{}' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: {}".format(
pipeline_id, r
)
return self.err(msg)
try:
self.elastictranscoder_backend.read_pipeline(pipeline_id)
return None
except KeyError:
msg = "The specified pipeline was not found: account={}, pipelineId={}.".format(
ACCOUNT_ID, pipeline_id
)
return (
404,
{"status": 404, "x-amzn-ErrorType": "ResourceNotFoundException"},
json.dumps({"message": msg}),
)
def read_pipeline(self):
_id = self.path.rsplit("/", 1)[-1]
err = self.validate_pipeline_id(_id)
if err:
return err
pipeline = self.elastictranscoder_backend.read_pipeline(_id)
return 200, {}, json.dumps({"Pipeline": pipeline.to_dict()})
def update_pipeline(self):
_id = self.path.rsplit("/", 1)[-1]
name = self._get_param("Name")
input_bucket = self._get_param("InputBucket")
role = self._get_param("Role")
aws_kms_key_arn = self._get_param("AwsKmsKeyArn")
notifications = self._get_param("Notifications")
content_config = self._get_param("ContentConfig")
thumbnail_config = self._get_param("ThumbnailConfig")
err = self.validate_pipeline_id(_id)
if err:
return err
pipeline, warnings = self.elastictranscoder_backend.update_pipeline(
id=_id,
name=name,
input_bucket=input_bucket,
role=role,
aws_kms_key_arn=aws_kms_key_arn,
notifications=notifications,
content_config=content_config,
thumbnail_config=thumbnail_config,
)
return (
200,
{},
json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}),
)
def delete_pipeline(self):
_id = self.path.rsplit("/", 1)[-1]
self.elastictranscoder_backend.delete_pipeline(pipeline_id=_id)
return 200, {}, "{}"
def err(self, msg):
return (
400,
{"status": 400, "x-amzn-ErrorType": "ValidationException"},
json.dumps({"message": msg}),
)

View File

@ -0,0 +1,15 @@
from __future__ import unicode_literals
from .responses import ElasticTranscoderResponse
url_bases = [
"https?://elastictranscoder.(.+).amazonaws.com",
]
response = ElasticTranscoderResponse()
url_paths = {
r"{0}/(?P<api_version>[^/]+)/pipelines/?$": response.pipelines,
r"{0}/(?P<api_version>[^/]+)/pipelines/(?P<pipeline_id>[^/]+)/?$": response.individual_pipeline,
}

View File

View File

@ -0,0 +1,400 @@
from __future__ import unicode_literals
from botocore.exceptions import ClientError
import boto3
import sure # noqa
import pytest
from moto import mock_elastictranscoder
from moto.core import ACCOUNT_ID
@mock_elastictranscoder
def test_create_simple_pipeline():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
response = client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
Role=role,
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(201)
pipeline = response["Pipeline"]
pipeline.should.have.key("Id")
pipeline.should.have.key("Name").being.equal("testpipeline")
pipeline.should.have.key("Arn").being.equal(
"arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
region, ACCOUNT_ID, pipeline["Id"]
)
)
pipeline.should.have.key("Status").being.equal("Active")
pipeline.should.have.key("InputBucket").being.equal("inputtest")
pipeline.should.have.key("OutputBucket").being.equal("outputtest")
pipeline.should.have.key("Role").being.equal(role)
pipeline.should.have.key("Notifications").being.equal(
{"Progressing": "", "Completed": "", "Warning": "", "Error": ""}
)
pipeline.should.have.key("ContentConfig")
pipeline["ContentConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ContentConfig"].should.have.key("Permissions").being.equal([])
pipeline.should.have.key("ThumbnailConfig")
pipeline["ThumbnailConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ThumbnailConfig"].should.have.key("Permissions").being.equal([])
response.should.have.key("Warnings").being.equal([])
@mock_elastictranscoder
def test_create_pipeline_with_content_config():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
response = client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
ContentConfig={"Bucket": "outputtest"},
ThumbnailConfig={"Bucket": "outputtest"},
Role=role,
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(201)
pipeline = response["Pipeline"]
pipeline.should.have.key("Id")
pipeline.should.have.key("Name").being.equal("testpipeline")
pipeline.should.have.key("Arn").being.equal(
"arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
region, ACCOUNT_ID, pipeline["Id"]
)
)
pipeline.should.have.key("Status").being.equal("Active")
pipeline.should.have.key("InputBucket").being.equal("inputtest")
pipeline.should.have.key("OutputBucket").being.equal("outputtest")
pipeline.should.have.key("Role").being.equal(role)
pipeline.should.have.key("Notifications").being.equal(
{"Progressing": "", "Completed": "", "Warning": "", "Error": ""}
)
pipeline.should.have.key("ContentConfig")
pipeline["ContentConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ContentConfig"].should.have.key("Permissions").being.equal([])
pipeline.should.have.key("ThumbnailConfig")
pipeline["ThumbnailConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ThumbnailConfig"].should.have.key("Permissions").being.equal([])
@mock_elastictranscoder
def test_create_pipeline_with_outputbucket_and_content_config():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
with pytest.raises(ClientError) as ex:
client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
ContentConfig={"Bucket": "configoutputtest"},
Role=role,
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"[OutputBucket and ContentConfig are mutually exclusive.]"
)
@mock_elastictranscoder
def test_create_pipeline_without_thumbnail_config():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
with pytest.raises(ClientError) as ex:
client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
ContentConfig={"Bucket": "outputtest"},
Role=role,
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"[ThumbnailConfig:Bucket is not allowed to be null if ContentConfig is specified.]"
)
@mock_elastictranscoder
def test_create_pipeline_without_role():
client = boto3.client("elastictranscoder", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.create_pipeline(Name="testpipeline", InputBucket="inputtest", Role="")
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal("Role cannot be blank")
@mock_elastictranscoder
def test_create_pipeline_with_invalid_role():
client = boto3.client("elastictranscoder", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.create_pipeline(
Name="testpipeline", InputBucket="inputtest", Role="asdf"
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal("Role ARN is invalid: asdf")
@mock_elastictranscoder
def test_create_pipeline_without_output():
client = boto3.client("elastictranscoder", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
Role=create_role_name("nonexistingrole"),
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"[OutputBucket and ContentConfig:Bucket are not allowed to both be null.]"
)
@mock_elastictranscoder
def test_list_pipelines():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
Role=role,
)
response = client.list_pipelines()
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response.should.have.key("Pipelines").being.length_of(1)
pipeline = response["Pipelines"][0]
pipeline.should.have.key("Id")
pipeline.should.have.key("Name").being.equal("testpipeline")
pipeline.should.have.key("Arn").being.equal(
"arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
region, ACCOUNT_ID, pipeline["Id"]
)
)
pipeline.should.have.key("Status").being.equal("Active")
pipeline.should.have.key("InputBucket").being.equal("inputtest")
pipeline.should.have.key("OutputBucket").being.equal("outputtest")
pipeline.should.have.key("Role").being.equal(role)
pipeline.should.have.key("Notifications").being.equal(
{"Progressing": "", "Completed": "", "Warning": "", "Error": ""}
)
pipeline.should.have.key("ContentConfig")
pipeline["ContentConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ContentConfig"].should.have.key("Permissions").being.equal([])
pipeline.should.have.key("ThumbnailConfig")
pipeline["ThumbnailConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ThumbnailConfig"].should.have.key("Permissions").being.equal([])
@mock_elastictranscoder
def test_read_pipeline():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
pipeline = client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
Role=role,
)["Pipeline"]
response = client.read_pipeline(Id=pipeline["Id"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response.should.have.key("Pipeline")
pipeline = response["Pipeline"]
pipeline.should.have.key("Id")
pipeline.should.have.key("Name").being.equal("testpipeline")
pipeline.should.have.key("Arn").being.equal(
"arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
region, ACCOUNT_ID, pipeline["Id"]
)
)
pipeline.should.have.key("Status").being.equal("Active")
pipeline.should.have.key("InputBucket").being.equal("inputtest")
pipeline.should.have.key("OutputBucket").being.equal("outputtest")
pipeline.should.have.key("Role").being.equal(role)
pipeline.should.have.key("Notifications").being.equal(
{"Progressing": "", "Completed": "", "Warning": "", "Error": ""}
)
pipeline.should.have.key("ContentConfig")
pipeline["ContentConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ContentConfig"].should.have.key("Permissions").being.equal([])
pipeline.should.have.key("ThumbnailConfig")
pipeline["ThumbnailConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ThumbnailConfig"].should.have.key("Permissions").being.equal([])
@mock_elastictranscoder
def test_read_unknown_pipeline_format():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
with pytest.raises(ClientError) as ex:
client.read_pipeline(Id="unknown-pipeline")
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value 'unknown-pipeline' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: ^\\d{13}-\\w{6}$"
)
@mock_elastictranscoder
def test_read_nonexisting_pipeline_format():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
pipeline_id = "0000000000000-abcdef"
with pytest.raises(ClientError) as ex:
client.read_pipeline(Id=pipeline_id)
err = ex.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"The specified pipeline was not found: account={}, pipelineId={}.".format(
ACCOUNT_ID, pipeline_id
)
)
@mock_elastictranscoder
def test_update_pipeline_name():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
pipeline = client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
Role=role,
)["Pipeline"]
response = client.update_pipeline(Id=pipeline["Id"], Name="newtestpipeline")
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response.should.have.key("Pipeline")
pipeline = response["Pipeline"]
pipeline.should.have.key("Id")
pipeline.should.have.key("Name").being.equal("newtestpipeline")
pipeline.should.have.key("Arn").being.equal(
"arn:aws:elastictranscoder:{}:{}:pipeline/{}".format(
region, ACCOUNT_ID, pipeline["Id"]
)
)
pipeline.should.have.key("Status").being.equal("Active")
pipeline.should.have.key("InputBucket").being.equal("inputtest")
pipeline.should.have.key("OutputBucket").being.equal("outputtest")
pipeline.should.have.key("Role").being.equal(role)
pipeline.should.have.key("Notifications").being.equal(
{"Progressing": "", "Completed": "", "Warning": "", "Error": ""}
)
pipeline.should.have.key("ContentConfig")
pipeline["ContentConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ContentConfig"].should.have.key("Permissions").being.equal([])
pipeline.should.have.key("ThumbnailConfig")
pipeline["ThumbnailConfig"].should.have.key("Bucket").being.equal("outputtest")
pipeline["ThumbnailConfig"].should.have.key("Permissions").being.equal([])
@mock_elastictranscoder
def test_update_pipeline_input_and_role():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
newrole = create_role_name("newrole")
pipeline = client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
Role=role,
)["Pipeline"]
response = client.update_pipeline(
Id=pipeline["Id"], InputBucket="inputtest2", Role=newrole
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response.should.have.key("Pipeline")
pipeline = response["Pipeline"]
pipeline.should.have.key("Id")
pipeline.should.have.key("Name").being.equal("testpipeline")
pipeline.should.have.key("InputBucket").being.equal("inputtest2")
pipeline.should.have.key("Role").being.equal(newrole)
@mock_elastictranscoder
def test_update_pipeline_with_invalid_id():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
with pytest.raises(ClientError) as ex:
client.update_pipeline(Id="unknown-pipeline")
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value 'unknown-pipeline' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: ^\\d{13}-\\w{6}$"
)
@mock_elastictranscoder
def test_update_nonexisting_pipeline():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
pipeline_id = "0000000000000-abcdef"
with pytest.raises(ClientError) as ex:
client.read_pipeline(Id=pipeline_id)
err = ex.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"The specified pipeline was not found: account={}, pipelineId={}.".format(
ACCOUNT_ID, pipeline_id
)
)
@mock_elastictranscoder
def test_delete_pipeline():
region = "us-east-1"
client = boto3.client("elastictranscoder", region_name=region)
role = create_role_name("nonexistingrole")
pipeline = client.create_pipeline(
Name="testpipeline",
InputBucket="inputtest",
OutputBucket="outputtest",
Role=role,
)["Pipeline"]
client.delete_pipeline(Id=pipeline["Id"])
response = client.list_pipelines()
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response.should.have.key("Pipelines").being.length_of(0)
def create_role_name(name):
return "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, name)

View File

@ -0,0 +1,19 @@
from __future__ import unicode_literals
import sure # noqa
import moto.server as server
from moto import mock_elastictranscoder
"""
Test the different server responses
"""
@mock_elastictranscoder
def test_elastictranscoder_list():
backend = server.create_backend_app("elastictranscoder")
test_client = backend.test_client()
res = test_client.get("/2012-09-25/pipelines")
res.data.should.contain(b"Pipelines")