Transcribe Medical Support (#3299)

* Transcribe first cut: Medical: start, get and delete jobs.

* Added list_medical_transcription_job() support to Transcribe.

* Support for medical vocabularies.

* Added transcribe to list of backends to fix server mode error.

* PR3299 requested changes: don't offer deprecated decorator, regionalize download_uri, create/use service-specific exceptions.

Co-authored-by: Joseph Weitekamp <jweite@amazon.com>
This commit is contained in:
jweite 2020-09-30 08:18:26 -04:00 committed by GitHub
parent ebb1c6bd68
commit 1dd5cf08a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 917 additions and 0 deletions

View File

@ -110,6 +110,7 @@ mock_sts = lazy_load(".sts", "mock_sts")
mock_sts_deprecated = lazy_load(".sts", "mock_sts_deprecated")
mock_swf = lazy_load(".swf", "mock_swf")
mock_swf_deprecated = lazy_load(".swf", "mock_swf_deprecated")
mock_transcribe = lazy_load(".transcribe", "mock_transcribe")
XRaySegment = lazy_load(".xray", "XRaySegment")
mock_xray = lazy_load(".xray", "mock_xray")
mock_xray_client = lazy_load(".xray", "mock_xray_client")

View File

@ -68,6 +68,7 @@ BACKENDS = {
"stepfunctions": ("stepfunctions", "stepfunction_backends"),
"sts": ("sts", "sts_backends"),
"swf": ("swf", "swf_backends"),
"transcribe": ("transcribe", "transcribe_backends"),
"xray": ("xray", "xray_backends"),
"kinesisvideo": ("kinesisvideo", "kinesisvideo_backends"),
"kinesis-video-archived-media": (

View File

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import transcribe_backends
transcribe_backend = transcribe_backends["us-east-1"]
mock_transcribe = transcribe_backend.decorator

View File

@ -0,0 +1,13 @@
from moto.core.exceptions import JsonRESTError
class ConflictException(JsonRESTError):
def __init__(self, message, **kwargs):
super(ConflictException, self).__init__("ConflictException", message, **kwargs)
class BadRequestException(JsonRESTError):
def __init__(self, message, **kwargs):
super(BadRequestException, self).__init__(
"BadRequestException", message, **kwargs
)

387
moto/transcribe/models.py Normal file
View File

@ -0,0 +1,387 @@
import uuid
from datetime import datetime, timedelta
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
from moto.sts.models import ACCOUNT_ID
from .exceptions import ConflictException, BadRequestException
class BaseObject(BaseModel):
def camelCase(self, key):
words = []
for i, word in enumerate(key.split("_")):
words.append(word.title())
return "".join(words)
def gen_response_object(self):
response_object = dict()
for key, value in self.__dict__.items():
if "_" in key:
response_object[self.camelCase(key)] = value
else:
response_object[key[0].upper() + key[1:]] = value
return response_object
@property
def response_object(self):
return self.gen_response_object()
class FakeMedicalTranscriptionJob(BaseObject):
def __init__(
self,
region_name,
medical_transcription_job_name,
language_code,
media_sample_rate_hertz,
media_format,
media,
output_bucket_name,
output_encryption_kms_key_id,
settings,
specialty,
type,
):
self._region_name = region_name
self.medical_transcription_job_name = medical_transcription_job_name
self.transcription_job_status = None
self.language_code = language_code
self.media_sample_rate_hertz = media_sample_rate_hertz
self.media_format = media_format
self.media = media
self.transcript = None
self.start_time = self.completion_time = None
self.creation_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.failure_reason = None
self.settings = settings or {
"ChannelIdentification": False,
"ShowAlternatives": False,
}
self.specialty = specialty
self.type = type
self._output_bucket_name = output_bucket_name
self._output_encryption_kms_key_id = output_encryption_kms_key_id
self.output_location_type = "CUSTOMER_BUCKET"
def response_object(self, response_type):
response_field_dict = {
"CREATE": [
"MedicalTranscriptionJobName",
"TranscriptionJobStatus",
"LanguageCode",
"MediaFormat",
"Media",
"StartTime",
"CreationTime",
"Specialty",
"Type",
],
"GET": [
"MedicalTranscriptionJobName",
"TranscriptionJobStatus",
"LanguageCode",
"MediaSampleRateHertz",
"MediaFormat",
"Media",
"Transcript",
"StartTime",
"CreationTime",
"CompletionTime",
"Settings",
"Specialty",
"Type",
],
"LIST": [
"MedicalTranscriptionJobName",
"CreationTime",
"StartTime",
"CompletionTime",
"LanguageCode",
"TranscriptionJobStatus",
"FailureReason",
"OutputLocationType",
"Specialty",
"Type",
],
}
response_fields = response_field_dict[response_type]
response_object = self.gen_response_object()
if response_type != "LIST":
return {
"MedicalTranscriptionJob": {
k: v
for k, v in response_object.items()
if k in response_fields and v is not None and v != [None]
}
}
else:
return {
k: v
for k, v in response_object.items()
if k in response_fields and v is not None and v != [None]
}
def advance_job_status(self):
# On each call advances the fake job status
if not self.transcription_job_status:
self.transcription_job_status = "QUEUED"
elif self.transcription_job_status == "QUEUED":
self.transcription_job_status = "IN_PROGRESS"
self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not self.media_sample_rate_hertz:
self.media_sample_rate_hertz = 44100
if not self.media_format:
file_ext = self.media["MediaFileUri"].split(".")[-1].lower()
self.media_format = (
file_ext if file_ext in ["mp3", "mp4", "wav", "flac"] else "mp3"
)
elif self.transcription_job_status == "IN_PROGRESS":
self.transcription_job_status = "COMPLETED"
self.completion_time = (datetime.now() + timedelta(seconds=10)).strftime(
"%Y-%m-%d %H:%M:%S"
)
self.transcript = {
"TranscriptFileUri": "https://s3.{}.amazonaws.com/{}/medical/{}.json".format(
self._region_name,
self._output_bucket_name,
self.medical_transcription_job_name,
)
}
class FakeMedicalVocabulary(BaseObject):
def __init__(
self, region_name, vocabulary_name, language_code, vocabulary_file_uri,
):
self._region_name = region_name
self.vocabulary_name = vocabulary_name
self.language_code = language_code
self.vocabulary_file_uri = vocabulary_file_uri
self.vocabulary_state = None
self.last_modified_time = None
self.failure_reason = None
self.download_uri = "https://s3.us-east-1.amazonaws.com/aws-transcribe-dictionary-model-{}-prod/{}/medical/{}/{}/input.txt".format(
region_name, ACCOUNT_ID, self.vocabulary_name, uuid.uuid4()
)
def response_object(self, response_type):
response_field_dict = {
"CREATE": [
"VocabularyName",
"LanguageCode",
"VocabularyState",
"LastModifiedTime",
"FailureReason",
],
"GET": [
"VocabularyName",
"LanguageCode",
"VocabularyState",
"LastModifiedTime",
"FailureReason",
"DownloadUri",
],
"LIST": [
"VocabularyName",
"LanguageCode",
"LastModifiedTime",
"VocabularyState",
],
}
response_fields = response_field_dict[response_type]
response_object = self.gen_response_object()
return {
k: v
for k, v in response_object.items()
if k in response_fields and v is not None and v != [None]
}
def advance_job_status(self):
# On each call advances the fake job status
if not self.vocabulary_state:
self.vocabulary_state = "PENDING"
self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elif self.vocabulary_state == "PENDING":
self.vocabulary_state = "READY"
self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
class TranscribeBackend(BaseBackend):
def __init__(self, region_name=None):
self.medical_transcriptions = {}
self.medical_vocabularies = {}
self.region_name = region_name
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def start_medical_transcription_job(self, **kwargs):
name = kwargs.get("medical_transcription_job_name")
if name in self.medical_transcriptions:
raise ConflictException(
message="The requested job name already exists. Use a different job name."
)
settings = kwargs.get("settings")
vocabulary_name = settings.get("VocabularyName") if settings else None
if vocabulary_name and vocabulary_name not in self.medical_vocabularies:
raise BadRequestException(
message="The requested vocabulary couldn't be found. Check the vocabulary name and try your request again."
)
transcription_job_object = FakeMedicalTranscriptionJob(
region_name=self.region_name,
medical_transcription_job_name=name,
language_code=kwargs.get("language_code"),
media_sample_rate_hertz=kwargs.get("media_sample_rate_hertz"),
media_format=kwargs.get("media_format"),
media=kwargs.get("media"),
output_bucket_name=kwargs.get("output_bucket_name"),
output_encryption_kms_key_id=kwargs.get("output_encryption_kms_key_id"),
settings=settings,
specialty=kwargs.get("specialty"),
type=kwargs.get("type"),
)
self.medical_transcriptions[name] = transcription_job_object
return transcription_job_object.response_object("CREATE")
def get_medical_transcription_job(self, medical_transcription_job_name):
try:
job = self.medical_transcriptions[medical_transcription_job_name]
job.advance_job_status() # Fakes advancement through statuses.
return job.response_object("GET")
except KeyError:
raise BadRequestException(
message="The requested job couldn't be found. Check the job name and try your request again."
)
def delete_medical_transcription_job(self, medical_transcription_job_name):
try:
del self.medical_transcriptions[medical_transcription_job_name]
except KeyError:
raise BadRequestException(
message="The requested job couldn't be found. Check the job name and try your request again.",
)
def list_medical_transcription_jobs(
self, status, job_name_contains, next_token, max_results
):
jobs = list(self.medical_transcriptions.values())
if status:
jobs = [job for job in jobs if job.transcription_job_status == status]
if job_name_contains:
jobs = [
job
for job in jobs
if job_name_contains in job.medical_transcription_job_name
]
start_offset = int(next_token) if next_token else 0
end_offset = start_offset + (
max_results if max_results else 100
) # Arbitrarily selected...
jobs_paginated = jobs[start_offset:end_offset]
response = {
"MedicalTranscriptionJobSummaries": [
job.response_object("LIST") for job in jobs_paginated
]
}
if end_offset < len(jobs):
response["NextToken"] = str(end_offset)
if status:
response["Status"] = status
return response
def create_medical_vocabulary(self, **kwargs):
vocabulary_name = kwargs.get("vocabulary_name")
language_code = kwargs.get("language_code")
vocabulary_file_uri = kwargs.get("vocabulary_file_uri")
if vocabulary_name in self.medical_vocabularies:
raise ConflictException(
message="The requested vocabulary name already exists. Use a different vocabulary name."
)
medical_vocabulary_object = FakeMedicalVocabulary(
region_name=self.region_name,
vocabulary_name=vocabulary_name,
language_code=language_code,
vocabulary_file_uri=vocabulary_file_uri,
)
self.medical_vocabularies[vocabulary_name] = medical_vocabulary_object
return medical_vocabulary_object.response_object("CREATE")
def get_medical_vocabulary(self, vocabulary_name):
try:
job = self.medical_vocabularies[vocabulary_name]
job.advance_job_status() # Fakes advancement through statuses.
return job.response_object("GET")
except KeyError:
raise BadRequestException(
message="The requested vocabulary couldn't be found. Check the vocabulary name and try your request again."
)
def delete_medical_vocabulary(self, vocabulary_name):
try:
del self.medical_vocabularies[vocabulary_name]
except KeyError:
raise BadRequestException(
message="The requested vocabulary couldn't be found. Check the vocabulary name and try your request again."
)
def list_medical_vocabularies(
self, state_equals, name_contains, next_token, max_results
):
vocabularies = list(self.medical_vocabularies.values())
if state_equals:
vocabularies = [
vocabulary
for vocabulary in vocabularies
if vocabulary.vocabulary_state == state_equals
]
if name_contains:
vocabularies = [
vocabulary
for vocabulary in vocabularies
if name_contains in vocabulary.vocabulary_name
]
start_offset = int(next_token) if next_token else 0
end_offset = start_offset + (
max_results if max_results else 100
) # Arbitrarily selected...
vocabularies_paginated = vocabularies[start_offset:end_offset]
response = {
"Vocabularies": [
vocabulary.response_object("LIST")
for vocabulary in vocabularies_paginated
]
}
if end_offset < len(vocabularies):
response["NextToken"] = str(end_offset)
if state_equals:
response["Status"] = state_equals
return response
transcribe_backends = {}
for region, ec2_backend in ec2_backends.items():
transcribe_backends[region] = TranscribeBackend(region_name=region)

View File

@ -0,0 +1,111 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from moto.core.utils import amzn_request_id
from .models import transcribe_backends
class TranscribeResponse(BaseResponse):
@property
def transcribe_backend(self):
return transcribe_backends[self.region]
@property
def request_params(self):
try:
return json.loads(self.body)
except ValueError:
return {}
@amzn_request_id
def start_medical_transcription_job(self):
name = self._get_param("MedicalTranscriptionJobName")
response = self.transcribe_backend.start_medical_transcription_job(
medical_transcription_job_name=name,
language_code=self._get_param("LanguageCode"),
media_sample_rate_hertz=self._get_param("MediaSampleRateHertz"),
media_format=self._get_param("MediaFormat"),
media=self._get_param("Media"),
output_bucket_name=self._get_param("OutputBucketName"),
output_encryption_kms_key_id=self._get_param("OutputEncryptionKMSKeyId"),
settings=self._get_param("Settings"),
specialty=self._get_param("Specialty"),
type=self._get_param("Type"),
)
return json.dumps(response)
@amzn_request_id
def list_medical_transcription_jobs(self):
status = self._get_param("Status")
job_name_contains = self._get_param("JobNameContains")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
response = self.transcribe_backend.list_medical_transcription_jobs(
status=status,
job_name_contains=job_name_contains,
next_token=next_token,
max_results=max_results,
)
return json.dumps(response)
@amzn_request_id
def get_medical_transcription_job(self):
medical_transcription_job_name = self._get_param("MedicalTranscriptionJobName")
response = self.transcribe_backend.get_medical_transcription_job(
medical_transcription_job_name=medical_transcription_job_name
)
return json.dumps(response)
@amzn_request_id
def delete_medical_transcription_job(self):
medical_transcription_job_name = self._get_param("MedicalTranscriptionJobName")
response = self.transcribe_backend.delete_medical_transcription_job(
medical_transcription_job_name=medical_transcription_job_name
)
return json.dumps(response)
@amzn_request_id
def create_medical_vocabulary(self):
vocabulary_name = self._get_param("VocabularyName")
language_code = self._get_param("LanguageCode")
vocabulary_file_uri = self._get_param("VocabularyFileUri")
response = self.transcribe_backend.create_medical_vocabulary(
vocabulary_name=vocabulary_name,
language_code=language_code,
vocabulary_file_uri=vocabulary_file_uri,
)
return json.dumps(response)
@amzn_request_id
def get_medical_vocabulary(self):
vocabulary_name = self._get_param("VocabularyName")
response = self.transcribe_backend.get_medical_vocabulary(
vocabulary_name=vocabulary_name
)
return json.dumps(response)
@amzn_request_id
def list_medical_vocabularies(self):
state_equals = self._get_param("StateEquals")
name_contains = self._get_param("NameContains")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
response = self.transcribe_backend.list_medical_vocabularies(
state_equals=state_equals,
name_contains=name_contains,
next_token=next_token,
max_results=max_results,
)
return json.dumps(response)
@amzn_request_id
def delete_medical_vocabulary(self):
vocabulary_name = self._get_param("VocabularyName")
response = self.transcribe_backend.delete_medical_vocabulary(
vocabulary_name=vocabulary_name
)
return json.dumps(response)

7
moto/transcribe/urls.py Normal file
View File

@ -0,0 +1,7 @@
from __future__ import unicode_literals
from .responses import TranscribeResponse
url_bases = ["https?://transcribe.(.+).amazonaws.com"]
url_paths = {"{0}/$": TranscribeResponse.dispatch}

View File

View File

@ -0,0 +1,391 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import boto3
import sure # noqa
from moto import mock_transcribe
@mock_transcribe
def test_run_medical_transcription_job_minimal_params():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
job_name = "MyJob"
args = {
"MedicalTranscriptionJobName": job_name,
"LanguageCode": "en-US",
"Media": {"MediaFileUri": "s3://my-bucket/my-media-file.wav",},
"OutputBucketName": "my-output-bucket",
"Specialty": "PRIMARYCARE",
"Type": "CONVERSATION",
}
resp = client.start_medical_transcription_job(**args)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
# CREATED
resp = client.get_medical_transcription_job(MedicalTranscriptionJobName=job_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
transcription_job = resp["MedicalTranscriptionJob"]
transcription_job["MedicalTranscriptionJobName"].should.equal(
args["MedicalTranscriptionJobName"]
)
transcription_job["TranscriptionJobStatus"].should.equal("QUEUED")
transcription_job["LanguageCode"].should.equal(args["LanguageCode"])
transcription_job["Media"].should.equal(args["Media"])
transcription_job.should.contain("CreationTime")
transcription_job.doesnt.contain("StartTime")
transcription_job.doesnt.contain("CompletionTime")
transcription_job.doesnt.contain("Transcript")
transcription_job["Settings"]["ChannelIdentification"].should.equal(False)
transcription_job["Settings"]["ShowAlternatives"].should.equal(False)
transcription_job["Specialty"].should.equal(args["Specialty"])
transcription_job["Type"].should.equal(args["Type"])
# IN_PROGRESS
resp = client.get_medical_transcription_job(MedicalTranscriptionJobName=job_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
transcription_job = resp["MedicalTranscriptionJob"]
transcription_job["TranscriptionJobStatus"].should.equal("IN_PROGRESS")
transcription_job["MediaFormat"].should.equal("wav")
transcription_job.should.contain("StartTime")
transcription_job.doesnt.contain("CompletionTime")
transcription_job.doesnt.contain("Transcript")
transcription_job["MediaSampleRateHertz"].should.equal(44100)
# COMPLETED
resp = client.get_medical_transcription_job(MedicalTranscriptionJobName=job_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
transcription_job = resp["MedicalTranscriptionJob"]
transcription_job["TranscriptionJobStatus"].should.equal("COMPLETED")
transcription_job.should.contain("CompletionTime")
transcription_job["Transcript"].should.equal(
{
"TranscriptFileUri": "https://s3.{}.amazonaws.com/{}/medical/{}.json".format(
region_name,
args["OutputBucketName"],
args["MedicalTranscriptionJobName"],
)
}
)
# Delete
client.delete_medical_transcription_job(MedicalTranscriptionJobName=job_name)
client.get_medical_transcription_job.when.called_with(
MedicalTranscriptionJobName=job_name
).should.throw(client.exceptions.BadRequestException)
@mock_transcribe
def test_run_medical_transcription_job_all_params():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
vocabulary_name = "MyMedicalVocabulary"
resp = client.create_medical_vocabulary(
VocabularyName=vocabulary_name,
LanguageCode="en-US",
VocabularyFileUri="https://s3.us-east-1.amazonaws.com/AWSDOC-EXAMPLE-BUCKET/vocab.txt",
)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
job_name = "MyJob2"
args = {
"MedicalTranscriptionJobName": job_name,
"LanguageCode": "en-US",
"MediaSampleRateHertz": 48000,
"MediaFormat": "flac",
"Media": {"MediaFileUri": "s3://my-bucket/my-media-file.dat",},
"OutputBucketName": "my-output-bucket",
"OutputEncryptionKMSKeyId": "arn:aws:kms:us-east-1:012345678901:key/37111b5e-8eff-4706-ae3a-d4f9d1d559fc",
"Settings": {
"ShowSpeakerLabels": True,
"MaxSpeakerLabels": 5,
"ChannelIdentification": True,
"ShowAlternatives": True,
"MaxAlternatives": 6,
"VocabularyName": vocabulary_name,
},
"Specialty": "PRIMARYCARE",
"Type": "CONVERSATION",
}
resp = client.start_medical_transcription_job(**args)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
# CREATED
resp = client.get_medical_transcription_job(MedicalTranscriptionJobName=job_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
transcription_job = resp["MedicalTranscriptionJob"]
transcription_job["MedicalTranscriptionJobName"].should.equal(
args["MedicalTranscriptionJobName"]
)
transcription_job["TranscriptionJobStatus"].should.equal("QUEUED")
transcription_job["LanguageCode"].should.equal(args["LanguageCode"])
transcription_job["Media"].should.equal(args["Media"])
transcription_job.should.contain("CreationTime")
transcription_job.doesnt.contain("StartTime")
transcription_job.doesnt.contain("CompletionTime")
transcription_job.doesnt.contain("Transcript")
transcription_job["Settings"]["ShowSpeakerLabels"].should.equal(
args["Settings"]["ShowSpeakerLabels"]
)
transcription_job["Settings"]["MaxSpeakerLabels"].should.equal(
args["Settings"]["MaxSpeakerLabels"]
)
transcription_job["Settings"]["ChannelIdentification"].should.equal(
args["Settings"]["ChannelIdentification"]
)
transcription_job["Settings"]["ShowAlternatives"].should.equal(
args["Settings"]["ShowAlternatives"]
)
transcription_job["Settings"]["MaxAlternatives"].should.equal(
args["Settings"]["MaxAlternatives"]
)
transcription_job["Settings"]["VocabularyName"].should.equal(
args["Settings"]["VocabularyName"]
)
transcription_job["Specialty"].should.equal(args["Specialty"])
transcription_job["Type"].should.equal(args["Type"])
# IN_PROGRESS
resp = client.get_medical_transcription_job(MedicalTranscriptionJobName=job_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
transcription_job = resp["MedicalTranscriptionJob"]
transcription_job["TranscriptionJobStatus"].should.equal("IN_PROGRESS")
transcription_job["MediaFormat"].should.equal("flac")
transcription_job.should.contain("StartTime")
transcription_job.doesnt.contain("CompletionTime")
transcription_job.doesnt.contain("Transcript")
transcription_job["MediaSampleRateHertz"].should.equal(48000)
# COMPLETED
resp = client.get_medical_transcription_job(MedicalTranscriptionJobName=job_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
transcription_job = resp["MedicalTranscriptionJob"]
transcription_job["TranscriptionJobStatus"].should.equal("COMPLETED")
transcription_job.should.contain("CompletionTime")
transcription_job["Transcript"].should.equal(
{
"TranscriptFileUri": "https://s3.{}.amazonaws.com/{}/medical/{}.json".format(
region_name,
args["OutputBucketName"],
args["MedicalTranscriptionJobName"],
)
}
)
@mock_transcribe
def test_get_nonexistent_medical_transcription_job():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
client.get_medical_transcription_job.when.called_with(
MedicalTranscriptionJobName="NonexistentJobName"
).should.throw(client.exceptions.BadRequestException)
@mock_transcribe
def test_run_medical_transcription_job_with_existing_job_name():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
job_name = "MyJob"
args = {
"MedicalTranscriptionJobName": job_name,
"LanguageCode": "en-US",
"Media": {"MediaFileUri": "s3://my-bucket/my-media-file.wav",},
"OutputBucketName": "my-output-bucket",
"Specialty": "PRIMARYCARE",
"Type": "CONVERSATION",
}
resp = client.start_medical_transcription_job(**args)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
client.start_medical_transcription_job.when.called_with(**args).should.throw(
client.exceptions.ConflictException
)
@mock_transcribe
def test_run_medical_transcription_job_nonexistent_vocabulary():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
job_name = "MyJob3"
args = {
"MedicalTranscriptionJobName": job_name,
"LanguageCode": "en-US",
"Media": {"MediaFileUri": "s3://my-bucket/my-media-file.dat",},
"OutputBucketName": "my-output-bucket",
"Settings": {"VocabularyName": "NonexistentVocabulary"},
"Specialty": "PRIMARYCARE",
"Type": "CONVERSATION",
}
client.start_medical_transcription_job.when.called_with(**args).should.throw(
client.exceptions.BadRequestException
)
@mock_transcribe
def test_list_medical_transcription_jobs():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
def run_job(index, target_status):
job_name = "Job_{}".format(index)
args = {
"MedicalTranscriptionJobName": job_name,
"LanguageCode": "en-US",
"Media": {"MediaFileUri": "s3://my-bucket/my-media-file.wav",},
"OutputBucketName": "my-output-bucket",
"Specialty": "PRIMARYCARE",
"Type": "CONVERSATION",
}
resp = client.start_medical_transcription_job(**args)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
# IMPLICITLY PROMOTE JOB STATUS TO QUEUED
resp = client.get_medical_transcription_job(
MedicalTranscriptionJobName=job_name
)
# IN_PROGRESS
if target_status in ["IN_PROGRESS", "COMPLETED"]:
resp = client.get_medical_transcription_job(
MedicalTranscriptionJobName=job_name
)
# COMPLETED
if target_status == "COMPLETED":
resp = client.get_medical_transcription_job(
MedicalTranscriptionJobName=job_name
)
# Run 5 pending jobs
for i in range(5):
run_job(i, "PENDING")
# Run 10 job to IN_PROGRESS
for i in range(5, 15):
run_job(i, "IN_PROGRESS")
# Run 15 job to COMPLETED
for i in range(15, 30):
run_job(i, "COMPLETED")
# List all
response = client.list_medical_transcription_jobs()
response.should.contain("MedicalTranscriptionJobSummaries")
len(response["MedicalTranscriptionJobSummaries"]).should.equal(30)
response.shouldnt.contain("NextToken")
response.shouldnt.contain("Status")
# List IN_PROGRESS
response = client.list_medical_transcription_jobs(Status="IN_PROGRESS")
response.should.contain("MedicalTranscriptionJobSummaries")
len(response["MedicalTranscriptionJobSummaries"]).should.equal(10)
response.shouldnt.contain("NextToken")
response.should.contain("Status")
response["Status"].should.equal("IN_PROGRESS")
# List JobName contains "8"
response = client.list_medical_transcription_jobs(JobNameContains="8")
response.should.contain("MedicalTranscriptionJobSummaries")
len(response["MedicalTranscriptionJobSummaries"]).should.equal(3)
response.shouldnt.contain("NextToken")
response.shouldnt.contain("Status")
# Pagination by 11
response = client.list_medical_transcription_jobs(MaxResults=11)
response.should.contain("MedicalTranscriptionJobSummaries")
len(response["MedicalTranscriptionJobSummaries"]).should.equal(11)
response.should.contain("NextToken")
response.shouldnt.contain("Status")
response = client.list_medical_transcription_jobs(
NextToken=response["NextToken"], MaxResults=11
)
response.should.contain("MedicalTranscriptionJobSummaries")
len(response["MedicalTranscriptionJobSummaries"]).should.equal(11)
response.should.contain("NextToken")
response = client.list_medical_transcription_jobs(
NextToken=response["NextToken"], MaxResults=11
)
response.should.contain("MedicalTranscriptionJobSummaries")
len(response["MedicalTranscriptionJobSummaries"]).should.equal(8)
response.shouldnt.contain("NextToken")
@mock_transcribe
def test_create_medical_vocabulary():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
vocabulary_name = "MyVocabulary"
resp = client.create_medical_vocabulary(
VocabularyName=vocabulary_name,
LanguageCode="en-US",
VocabularyFileUri="https://s3.us-east-1.amazonaws.com/AWSDOC-EXAMPLE-BUCKET/vocab.txt",
)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
# PENDING
resp = client.get_medical_vocabulary(VocabularyName=vocabulary_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["VocabularyName"].should.equal(vocabulary_name)
resp["LanguageCode"].should.equal("en-US")
resp["VocabularyState"].should.equal("PENDING")
resp.should.contain("LastModifiedTime")
resp.shouldnt.contain("FailureReason")
resp["DownloadUri"].should.contain(vocabulary_name)
# IN_PROGRESS
resp = client.get_medical_vocabulary(VocabularyName=vocabulary_name)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["VocabularyState"].should.equal("READY")
# Delete
client.delete_medical_vocabulary(VocabularyName=vocabulary_name)
client.get_medical_vocabulary.when.called_with(
VocabularyName=vocabulary_name
).should.throw(client.exceptions.BadRequestException)
@mock_transcribe
def test_get_nonexistent_medical_vocabulary():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
client.get_medical_vocabulary.when.called_with(
VocabularyName="NonexistentVocabularyName"
).should.throw(client.exceptions.BadRequestException)
@mock_transcribe
def test_create_medical_vocabulary_with_existing_vocabulary_name():
region_name = "us-east-1"
client = boto3.client("transcribe", region_name=region_name)
vocabulary_name = "MyVocabulary"
args = {
"VocabularyName": vocabulary_name,
"LanguageCode": "en-US",
"VocabularyFileUri": "https://s3.us-east-1.amazonaws.com/AWSDOC-EXAMPLE-BUCKET/vocab.txt",
}
resp = client.create_medical_vocabulary(**args)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
client.create_medical_vocabulary.when.called_with(**args).should.throw(
client.exceptions.ConflictException
)