Feature: Resource State Transition (#4734)

This commit is contained in:
Bert Blommers 2022-05-01 11:45:59 +00:00 committed by GitHub
parent 3a6223083d
commit 12421068bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1175 additions and 154 deletions

View File

@ -30,10 +30,7 @@ import shlex
# Add any Sphinx extension module names here, as strings. They can be # Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones. # ones.
extensions = [ extensions = ["sphinx.ext.autodoc", "sphinx.ext.autosectionlabel"]
'sphinx.ext.autodoc',
'sphinx.ext.autosectionlabel'
]
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"] templates_path = ["_templates"]

View File

@ -0,0 +1,31 @@
.. _environment_variables:
.. role:: raw-html(raw)
:format: html
=======================
Environment Variables
=======================
The following is a non-exhaustive list of the environment variables that can be used to configure Moto.
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| Key | Value | Default | Explanation |
+===============================+==========+===========+=================================================================================================+
| TEST_SERVER_MODE | bool | False | Useful when you want to run decorated tests against an existing MotoServer. :raw-html:`<br />` |
| | | | All boto3-clients/resources created within the test will point to `http://localhost:5000`. |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| INITIAL_NO_AUTH_ACTION_COUNT | int | 0 | See :ref:`iam access control`. |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| DEFAULT_CONTAINER_REGISTRY | str | docker.io | Registry that contains the Docker containers. :raw-html:`<br />` |
| | | | Used by AWSLambda and Batch. |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| MOTO_ALLOW_NONEXISTENT_REGION | bool | False | |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| | | | |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+
| MOTO_S3_CUSTOM_ENDPOINTS | str | | See :ref:`s3`. |
+-------------------------------+----------+-----------+-------------------------------------------------------------------------------------------------+

View File

@ -0,0 +1,16 @@
.. _configuration:
======================
Configuration Options
======================
Moto has a variety of ways to configure the mock behaviour.
.. toctree::
:maxdepth: 1
environment_variables
state_transition/index
state_transition/models

View File

@ -0,0 +1,156 @@
.. _state transition:
.. role:: raw-html(raw)
:format: html
=============================
State Transitions
=============================
When developing against AWS, many API calls are asynchronous. Many resources will take some time to complete, and you'll need to write business logic to ensure the application can deal with all possible states. What is the desired behaviour when the status is `initializing`? What should happen when the status is finally `ready`? What should happen when the resource is still not `ready` after an hour?
Let's look at an example. Say you want to create a DAX cluster, and wait until it's available - or throw an error if this takes too long.
.. sourcecode:: python
def create_and_wait_for_cluster(name):
client.create_cluster(ClusterName=name, ...)
cluster_status = get_cluster_status(name)
while cluster_status != "available":
sleep()
if five_minutes_have_passed():
error()
cluster_status = get_cluster_status(name)
Because Moto handles everything in-memory, and no actual servers are created, there is no need to wait until the cluster is ready - it could be ready immediately. :raw-html:`<br />`
Not having to wait for a resource to be ready is of course the major benefit of using Moto, but it also means that the entire example above is impossible to test.
Moto exposes an API that can artificially delay these state transitions, allowing you to let Moto resemble the asynchronous nature of AWS as closely as you need.
Sticking with the example above, you may want to test what happens if the cluster takes 5 seconds to create:
.. sourcecode:: python
from moto.moto_api import state_manager
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 5})
create_and_wait_for_cluster("my_new_cluster")
In order to test what happens in the event of a timeout, we can order the cluster to only be ready after 10 minutes:
.. sourcecode:: python
from moto.moto_api import state_manager
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 600})
try:
create_and_wait_for_cluster("my_new_cluster")
except:
verify_the_correct_error_was_thrown()
In other tests, you may simply want the cluster to be ready as quickly as possible:
.. sourcecode:: python
from moto.moto_api import state_manager
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "immediate"})
So far we've seen two possible transitions:
- The state progresses immediately
- The state progresses after x seconds
There is a third possibility, where the state progresses after calling `describe_object` a specific number of times. :raw-html:`<br />`
This can be useful if you want to verify that the state does change, but you don't want your unit test to take too long.
.. note::
We will use the `boto3.client(..).describe_object` method as an example throughout this page. :raw-html:`<br />`
This should be seen as a agnostic version of service-specific methods to verify the status of a resource, such as `boto.client("dax").describe_clusters()` or `boto.client("support").describe_cases()`.
Changing the state after a certain number of invocations can be done like this:
.. sourcecode:: python
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "manual", "times": 3})
The transition is called `manual` because it requires you to manually invoke the `describe_object`-method before the status is progressed. :raw-html:`<br />`
To show how this would work in practice, let's look at an example test:
.. sourcecode:: python
client.create_cluster(ClusterName=name, ...)
# The first time we retrieve the status
status = client.describe_clusters(ClusterNames=[name])["Clusters"][0]["Status"]
assert status == "creating"
# Second time we retrieve the status
status = client.describe_clusters(ClusterNames=[name])["Clusters"][0]["Status"]
assert status == "creating"
# This is the third time that we're retrieving the status - this time it will advance to the next status
status = client.describe_clusters(ClusterNames=[name])["Clusters"][0]["Status"]
assert status == "available"
This should be done cleanly in a while-loop of-course, similar to the `create_and_wait_for_cluster` defined above - but this is a good way to showcase the behaviour.
Registered models
########################
:doc:`A list of all supported models can be found here. <models>`
Older versions of Moto may not support all models that are listed here. :raw-html:`<br />`
To see a list of supported models for your Moto-version, call the `get_registered_models`-method:
.. sourcecode:: python
with mock_all():
print(state_manager.get_registered_models())
Note the `mock_all`-decorator! Models are registered when the mock for that resource is started. If you call this method outside of a mock, you may see an empty list.
If you'd like to see state transition support for a resource that's not yet supported, feel free to open an issue or PR.
State Transitions in ServerMode
########################################
Configuration state transitions can be done in ServerMode as well, by making a HTTP request to the MotoAPI.
This is an example request for `dax::cluster` to wait 5 seconds before the cluster becomes ready:
.. sourcecode:: python
post_body = dict(model_name="dax::cluster", transition={"progression": "time", "duration": 5})
resp = requests.post("http://localhost:5000/moto-api/state-manager/set-transition", data=json.dumps(post_body))
An example request to see the currently configured transition for a specific model:
.. sourcecode:: python
requests.get("http://localhost:5000/moto-api/state-manager/get-transition?model_name=dax::cluster")
We will not list all configuration options here again, but all models and transitions types (as specified above) follow the same format.
Reset
########
It is possible to reset the state manager, and undo any custom transitions that were set. :raw-html:`<br />`
Using Python:
.. sourcecode:: python
from moto.moto_api import state_manager
state_manager.unset_transition(model_name="dax::cluster")
Or if you're using Moto in ServerMode:
.. sourcecode:: python
post_body = dict(model_name="dax::cluster")
resp = requests.post("http://localhost:5000/moto-api/state-manager/unset-transition", data=json.dumps(post_body))

View File

@ -0,0 +1,113 @@
.. _state transition_models:
.. role:: raw-html(raw)
:format: html
============================================
Supported Models for State Transitions
============================================
Service: Batch
-----------------
**Model**: `batch::job` :raw-html:`<br />`
Available States: :raw-html:`<br />`
"SUBMITTED" --> "PENDING" --> "RUNNABLE" --> "STARTING" --> "RUNNING" :raw-html:`<br />`
"RUNNING" --> SUCCEEDED|FAILED
Transition type: `immediate` :raw-html:`<br />`
Advancement:
When a user calls `submit_job`, Moto will go through a few steps to prepare the job, and when ready, execute that job in a Docker container.
There are some steps to go through while the status is `SUBMITTED`, there are some steps to follow when the status is `PENDING`, etcetera.
Moto will try to advance the status itself - the moment this succeeds, the next step is executed.
As the default transition is `immediate`, the status will advance immediately, and these steps will be executed as quickly as possible. This ensures that the job will be executed as quickly as possible.
Delaying the execution can be done as usual, by forcing Moto to wait x seconds before transitioning to the next stage. This can be useful if you need to 'catch' a job in a specific stage.
Service: Cloudfront
---------------------
**Model**: `cloudfront::distribution` :raw-html:`<br />`
Available States: :raw-html:`<br />`
"InProgress" --> "Deployed"
Transition type: Manual - describe the resource 1 time before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("cloudfront").get_distribution(..)` to advance a single distribution, or `boto3.client("cloudfront").list_distributions(..)` to advance all distributions.
Service: DAX
---------------
**Model**: `dax::cluster` :raw-html:`<br />`
Available States:
"creating" --> "available" :raw-html:`<br />`
"deleting" --> "deleted"
Transition type: Manual - describe the resource 4 times before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("dax").describe_clusters(..)`.
Service: Support
------------------
**Model**: `support::case` :raw-html:`<br />`
Available states:
"opened" --> "pending-customer-action" --> "reopened" --> "resolved" --> "unassigned" --> "work-in-progress" --> "opened"
Transition type: Manual - describe the resource 1 time before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("support").describe_cases(..)`
Service: Transcribe
---------------------
**Model**: `transcribe::vocabulary` :raw-html:`<br />`
Available states:
None --> "PENDING --> "READY"
Transition type: Manual - describe the resource 1 time before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("transcribe").get_vocabulary(..)`
**Model**: `transcribe::medicalvocabulary` :raw-html:`<br />`
Available states:
None --> "PENDING --> "READY"
Transition type: Manual - describe the resource 1 time before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("transcribe").get_medical_vocabulary(..)`
**Model**: `transcribe::transcriptionjob` :raw-html:`<br />`
Available states:
None --> "QUEUED" --> "IN_PROGRESS" --> "COMPLETED"
Transition type: Manual - describe the resource 1 time before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("transcribe").get_transcription_job(..)`
**Model**: `transcribe::medicaltranscriptionjob` :raw-html:`<br />`
Available states:
None --> "QUEUED" --> "IN_PROGRESS" --> "COMPLETED"
Transition type: Manual - describe the resource 1 time before the state advances :raw-html:`<br />`
Advancement:
Call `boto3.client("transcribe").get_medical_transcription_job(..)`

View File

@ -0,0 +1,78 @@
.. _new state transitions:
===============================
State Transition Management
===============================
When developing a model where the resource is not available immediately, such as EC2 instances, a configuration option is available to specify whether you want mocked resources to be available immediately (to speed up unit testing), or whether you want an artificial delay to more closely mimick AWS' behaviour where resources are only available/ready after some time.
See the user-documentation here: :ref:`state transition`
In order for a new model to support this behaviour out of the box, it needs to be configured and registered with the State Manager.
The following steps need to be taken for this to be effective:
- Extend the new model with the ManagedState-class
- Call the ManagedState-constructor with information on which state transitions are supported
- Decide when to advance the status
- Register the model with the StateManager
An example model could look like this:
.. sourcecode:: python
from moto.moto_api._internal.managed_state_model import ManagedState
class NewModel(ManagedState):
def __init__(self):
ManagedState.__init__(self,
# A unique name should be chosen to uniquely identify this model
# Any name is acceptable - a typical format would be 'API:type'
# Examples: 'S3::bucket', 'APIGateway::Method', 'DynamoDB::Table'
model_name="new::model",
# List all the possible status-transitions here
transitions=[("initializing", "starting"),
("starting", "ready")])
def to_json(self):
# ManagedState gives us a 'status'-attribute out of the box
# On the first iteration, this will be set to the first status of the first transition
return {
"name": ...,
"status": self.status,
...
}
from moto.moto_api import state_manager
class Backend():
def __init__():
# This is how we register the model, and specify the default transition-behaviour
# Typically this is done when constructing the Backend-class
state_manager.register_default_transition(
# This name should be the same as the name used in NewModel
model_name="new::model",
# Any transition-config is possible - this is a good default option though
transition={"progression": "immediate"},
)
def list_resources():
for ec2_instance in all_resources:
# For users who configured models of this type to transition manually, this is where we advance the status
# Say the transition is registered like so: {"progression": "manual", "times": 3}
#
# The user calls 'list_resources' 3 times, the advance-method is called 3 times, and the state manager advances the state after the 3rd time.
# This all happens out of the box - just make sure that the `advance()`-method is invoked when appropriate
#
# If the transition is set to progress immediately, this method does exactly nothing.
#
# If the user decides to change the progression to be time-based, where the status changed every y seconds, this method does exactly nothing.
# It will has to be called though, for people who do have the manual progression configured
model.advance()
return all_models
def describe_resource():
resource = ...
# Depending on the API, there may be different ways for the user to retrieve the same information
# Make sure that each way (describe, list, get_, ) calls the advance()-method, and the resource can actually progress to the next state
resource.advance()
return resource

View File

@ -36,6 +36,12 @@ Additional Resources
docs/iam docs/iam
docs/aws_config docs/aws_config
.. toctree::
:hidden:
:caption: Configuration
docs/configuration/index
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:hidden: :hidden:
@ -66,3 +72,4 @@ Additional Resources
docs/contributing/development_tips/urls docs/contributing/development_tips/urls
docs/contributing/development_tips/tests docs/contributing/development_tips/tests
docs/contributing/development_tips/utilities docs/contributing/development_tips/utilities
docs/contributing/development_tips/new_state_transitions

View File

@ -1,5 +1,6 @@
import re import re
from itertools import cycle from itertools import cycle
from time import sleep
import datetime import datetime
import time import time
import uuid import uuid
@ -29,6 +30,8 @@ from moto.ec2._models.instance_types import INSTANCE_FAMILIES as EC2_INSTANCE_FA
from moto.iam.exceptions import IAMNotFoundException from moto.iam.exceptions import IAMNotFoundException
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.core.utils import unix_time_millis, BackendDict from moto.core.utils import unix_time_millis, BackendDict
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.docker_utilities import DockerModel from moto.utilities.docker_utilities import DockerModel
from moto import settings from moto import settings
@ -411,7 +414,7 @@ class JobDefinition(CloudFormationModel):
return backend.get_job_definition_by_arn(arn) return backend.get_job_definition_by_arn(arn)
class Job(threading.Thread, BaseModel, DockerModel): class Job(threading.Thread, BaseModel, DockerModel, ManagedState):
def __init__( def __init__(
self, self,
name, name,
@ -435,13 +438,22 @@ class Job(threading.Thread, BaseModel, DockerModel):
""" """
threading.Thread.__init__(self) threading.Thread.__init__(self)
DockerModel.__init__(self) DockerModel.__init__(self)
ManagedState.__init__(
self,
"batch::job",
[
("SUBMITTED", "PENDING"),
("PENDING", "RUNNABLE"),
("RUNNABLE", "STARTING"),
("STARTING", "RUNNING"),
],
)
self.job_name = name self.job_name = name
self.job_id = str(uuid.uuid4()) self.job_id = str(uuid.uuid4())
self.job_definition = job_def self.job_definition = job_def
self.container_overrides = container_overrides or {} self.container_overrides = container_overrides or {}
self.job_queue = job_queue self.job_queue = job_queue
self.job_state = "SUBMITTED" # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED
self.job_queue.jobs.append(self) self.job_queue.jobs.append(self)
self.job_created_at = datetime.datetime.now() self.job_created_at = datetime.datetime.now()
self.job_started_at = datetime.datetime(1970, 1, 1) self.job_started_at = datetime.datetime(1970, 1, 1)
@ -469,7 +481,7 @@ class Job(threading.Thread, BaseModel, DockerModel):
"jobId": self.job_id, "jobId": self.job_id,
"jobName": self.job_name, "jobName": self.job_name,
"createdAt": datetime2int_milliseconds(self.job_created_at), "createdAt": datetime2int_milliseconds(self.job_created_at),
"status": self.job_state, "status": self.status,
"jobDefinition": self.job_definition.arn, "jobDefinition": self.job_definition.arn,
} }
if self.job_stopped_reason is not None: if self.job_stopped_reason is not None:
@ -556,8 +568,13 @@ class Job(threading.Thread, BaseModel, DockerModel):
:return: :return:
""" """
try: try:
self.job_state = "PENDING" self.advance()
while self.status == "SUBMITTED":
# Wait until we've moved onto state 'PENDING'
sleep(0.5)
# Wait until all dependent jobs have finished
# If any of the dependent jobs have failed, not even start
if self.depends_on and not self._wait_for_dependencies(): if self.depends_on and not self._wait_for_dependencies():
return return
@ -590,7 +607,10 @@ class Job(threading.Thread, BaseModel, DockerModel):
] ]
name = "{0}-{1}".format(self.job_name, self.job_id) name = "{0}-{1}".format(self.job_name, self.job_id)
self.job_state = "RUNNABLE" self.advance()
while self.status == "PENDING":
# Wait until the state is no longer pending, but 'RUNNABLE'
sleep(0.5)
# TODO setup ecs container instance # TODO setup ecs container instance
self.job_started_at = datetime.datetime.now() self.job_started_at = datetime.datetime.now()
@ -619,7 +639,15 @@ class Job(threading.Thread, BaseModel, DockerModel):
run_kwargs["network_mode"] = network_mode run_kwargs["network_mode"] = network_mode
log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON) log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON)
self.job_state = "STARTING" self.advance()
while self.status == "RUNNABLE":
# Wait until the state is no longer runnable, but 'STARTING'
sleep(0.5)
self.advance()
while self.status == "STARTING":
# Wait until the state is no longer runnable, but 'RUNNING'
sleep(0.5)
container = self.docker_client.containers.run( container = self.docker_client.containers.run(
image, image,
cmd, cmd,
@ -632,7 +660,6 @@ class Job(threading.Thread, BaseModel, DockerModel):
extra_hosts=extra_hosts, extra_hosts=extra_hosts,
**run_kwargs, **run_kwargs,
) )
self.job_state = "RUNNING"
try: try:
container.reload() container.reload()
@ -730,10 +757,10 @@ class Job(threading.Thread, BaseModel, DockerModel):
def _mark_stopped(self, success=True): def _mark_stopped(self, success=True):
# Ensure that job_stopped/job_stopped_at-attributes are set first # Ensure that job_stopped/job_stopped_at-attributes are set first
# The describe-method needs them immediately when job_state is set # The describe-method needs them immediately when status is set
self.job_stopped = True self.job_stopped = True
self.job_stopped_at = datetime.datetime.now() self.job_stopped_at = datetime.datetime.now()
self.job_state = "SUCCEEDED" if success else "FAILED" self.status = "SUCCEEDED" if success else "FAILED"
self._stop_attempt() self._stop_attempt()
def _start_attempt(self): def _start_attempt(self):
@ -769,9 +796,9 @@ class Job(threading.Thread, BaseModel, DockerModel):
for dependent_id in dependent_ids: for dependent_id in dependent_ids:
if dependent_id in self.all_jobs: if dependent_id in self.all_jobs:
dependent_job = self.all_jobs[dependent_id] dependent_job = self.all_jobs[dependent_id]
if dependent_job.job_state == "SUCCEEDED": if dependent_job.status == "SUCCEEDED":
successful_dependencies.add(dependent_id) successful_dependencies.add(dependent_id)
if dependent_job.job_state == "FAILED": if dependent_job.status == "FAILED":
logger.error( logger.error(
"Terminating job {0} due to failed dependency {1}".format( "Terminating job {0} due to failed dependency {1}".format(
self.name, dependent_job.name self.name, dependent_job.name
@ -800,6 +827,10 @@ class BatchBackend(BaseBackend):
self._job_definitions = {} self._job_definitions = {}
self._jobs = {} self._jobs = {}
state_manager.register_default_transition(
"batch::job", transition={"progression": "manual", "times": 1}
)
@property @property
def iam_backend(self): def iam_backend(self):
""" """
@ -836,7 +867,7 @@ class BatchBackend(BaseBackend):
region_name = self.region_name region_name = self.region_name
for job in self._jobs.values(): for job in self._jobs.values():
if job.job_state not in ("FAILED", "SUCCEEDED"): if job.status not in ("FAILED", "SUCCEEDED"):
job.stop = True job.stop = True
# Try to join # Try to join
job.join(0.2) job.join(0.2)
@ -1574,7 +1605,7 @@ class BatchBackend(BaseBackend):
) )
for job in job_queue.jobs: for job in job_queue.jobs:
if job_status is not None and job.job_state != job_status: if job_status is not None and job.status != job_status:
continue continue
jobs.append(job) jobs.append(job)
@ -1583,7 +1614,7 @@ class BatchBackend(BaseBackend):
def cancel_job(self, job_id, reason): def cancel_job(self, job_id, reason):
job = self.get_job_by_id(job_id) job = self.get_job_by_id(job_id)
if job.job_state in ["SUBMITTED", "PENDING", "RUNNABLE"]: if job.status in ["SUBMITTED", "PENDING", "RUNNABLE"]:
job.terminate(reason) job.terminate(reason)
# No-Op for jobs that have already started - user has to explicitly terminate those # No-Op for jobs that have already started - user has to explicitly terminate those

View File

@ -2,6 +2,8 @@ import random
import string import string
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from uuid import uuid4 from uuid import uuid4
from .exceptions import ( from .exceptions import (
@ -129,7 +131,7 @@ class DistributionConfig:
self.is_ipv6_enabled = True self.is_ipv6_enabled = True
class Distribution(BaseModel): class Distribution(BaseModel, ManagedState):
@staticmethod @staticmethod
def random_id(uppercase=True): def random_id(uppercase=True):
ascii_set = string.ascii_uppercase if uppercase else string.ascii_lowercase ascii_set = string.ascii_uppercase if uppercase else string.ascii_lowercase
@ -140,6 +142,11 @@ class Distribution(BaseModel):
return resource_id return resource_id
def __init__(self, config): def __init__(self, config):
# Configured ManagedState
super().__init__(
"cloudfront::distribution", transitions=[("InProgress", "Deployed")]
)
# Configure internal properties
self.distribution_id = Distribution.random_id() self.distribution_id = Distribution.random_id()
self.arn = ( self.arn = (
f"arn:aws:cloudfront:{ACCOUNT_ID}:distribution/{self.distribution_id}" f"arn:aws:cloudfront:{ACCOUNT_ID}:distribution/{self.distribution_id}"
@ -152,16 +159,8 @@ class Distribution(BaseModel):
self.last_modified_time = "2021-11-27T10:34:26.802Z" self.last_modified_time = "2021-11-27T10:34:26.802Z"
self.in_progress_invalidation_batches = 0 self.in_progress_invalidation_batches = 0
self.has_active_trusted_key_groups = False self.has_active_trusted_key_groups = False
self.status = "InProgress"
self.domain_name = f"{Distribution.random_id(uppercase=False)}.cloudfront.net" self.domain_name = f"{Distribution.random_id(uppercase=False)}.cloudfront.net"
def advance(self):
"""
Advance the status of this Distribution, to mimick AWS' behaviour
"""
if self.status == "InProgress":
self.status = "Deployed"
@property @property
def location(self): def location(self):
return f"https://cloudfront.amazonaws.com/2020-05-31/distribution/{self.distribution_id}" return f"https://cloudfront.amazonaws.com/2020-05-31/distribution/{self.distribution_id}"
@ -175,6 +174,10 @@ class CloudFrontBackend(BaseBackend):
def __init__(self): def __init__(self):
self.distributions = dict() self.distributions = dict()
state_manager.register_default_transition(
"cloudfront::distribution", transition={"progression": "manual", "times": 1}
)
def create_distribution(self, distribution_config): def create_distribution(self, distribution_config):
""" """
This has been tested against an S3-distribution with the simplest possible configuration. This has been tested against an S3-distribution with the simplest possible configuration.

View File

@ -1,6 +1,8 @@
"""DAXBackend class with methods for supported APIs.""" """DAXBackend class with methods for supported APIs."""
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import BackendDict, get_random_hex, unix_time from moto.core.utils import BackendDict, get_random_hex, unix_time
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.tagging_service import TaggingService from moto.utilities.tagging_service import TaggingService
from moto.utilities.paginator import paginate from moto.utilities.paginator import paginate
@ -63,7 +65,7 @@ class DaxEndpoint:
return dct return dct
class DaxCluster(BaseModel): class DaxCluster(BaseModel, ManagedState):
def __init__( def __init__(
self, self,
region, region,
@ -74,12 +76,17 @@ class DaxCluster(BaseModel):
iam_role_arn, iam_role_arn,
sse_specification, sse_specification,
): ):
# Configure ManagedState
super().__init__(
model_name="dax::cluster",
transitions=[("creating", "available"), ("deleting", "deleted")],
)
# Set internal properties
self.name = name self.name = name
self.description = description self.description = description
self.arn = f"arn:aws:dax:{region}:{ACCOUNT_ID}:cache/{self.name}" self.arn = f"arn:aws:dax:{region}:{ACCOUNT_ID}:cache/{self.name}"
self.node_type = node_type self.node_type = node_type
self.replication_factor = replication_factor self.replication_factor = replication_factor
self.status = "creating"
self.cluster_hex = get_random_hex(6) self.cluster_hex = get_random_hex(6)
self.endpoint = DaxEndpoint( self.endpoint = DaxEndpoint(
name=name, cluster_hex=self.cluster_hex, region=region name=name, cluster_hex=self.cluster_hex, region=region
@ -94,10 +101,6 @@ class DaxCluster(BaseModel):
] ]
self.sse_specification = sse_specification self.sse_specification = sse_specification
# Internal counter to keep track of when this cluster is available/deleted
# Used in conjunction with `advance()`
self._tick = 0
def _create_new_node(self, idx): def _create_new_node(self, idx):
return DaxNode(endpoint=self.endpoint, name=self.name, index=idx) return DaxNode(endpoint=self.endpoint, name=self.name, index=idx)
@ -119,19 +122,6 @@ class DaxCluster(BaseModel):
def is_deleted(self): def is_deleted(self):
return self.status == "deleted" return self.status == "deleted"
def advance(self):
if self.status == "creating":
if self._tick < 3:
self._tick += 1
else:
self.status = "available"
self._tick = 0
if self.status == "deleting":
if self._tick < 3:
self._tick += 1
else:
self.status = "deleted"
def to_json(self): def to_json(self):
use_full_repr = self.status == "available" use_full_repr = self.status == "available"
dct = { dct = {
@ -166,6 +156,10 @@ class DAXBackend(BaseBackend):
self._clusters = dict() self._clusters = dict()
self._tagger = TaggingService() self._tagger = TaggingService()
state_manager.register_default_transition(
model_name="dax::cluster", transition={"progression": "manual", "times": 4}
)
@property @property
def clusters(self): def clusters(self):
self._clusters = { self._clusters = {

View File

@ -0,0 +1,7 @@
from moto.moto_api import _internal
"""
Global StateManager that everyone uses
Use this manager to configure how AWS models transition between states. (initializing -> starting, starting -> ready, etc.)
"""
state_manager = _internal.state_manager.StateManager()

View File

@ -1,4 +1,5 @@
from .models import moto_api_backend from .models import moto_api_backend
from .state_manager import StateManager # noqa
moto_api_backends = {"global": moto_api_backend} moto_api_backends = {"global": moto_api_backend}

View File

@ -0,0 +1,67 @@
from datetime import datetime, timedelta
from moto.moto_api import state_manager
class ManagedState:
"""
Subclass this class to configure state-transitions
"""
def __init__(self, model_name, transitions):
# Indicate the possible transitions for this model
# Example: [(initializing,queued), (queued, starting), (starting, ready)]
self._transitions = transitions
# Current status of this model. Implementations should call `status`
# The initial status is assumed to be the first transition
self._status, _ = transitions[0]
# Internal counter that keeps track of how often this model has been described
# Used for transition-type=manual
self._tick = 0
# Time when the status was last progressed to this model
# Used for transition-type=time
self._time_progressed = datetime.now()
# Name of this model. This will be used in the API
self.model_name = model_name
def advance(self):
self._tick += 1
@property
def status(self):
"""
Transitions the status as appropriate before returning
"""
transition_config = state_manager.get_transition(self.model_name)
if transition_config["progression"] == "immediate":
self._status = self._get_last_status(previous=self._status)
if transition_config["progression"] == "manual":
if self._tick >= transition_config["times"]:
self._status = self._get_next_status(previous=self._status)
self._tick = 0
if transition_config["progression"] == "time":
next_transition_at = self._time_progressed + timedelta(
seconds=transition_config["seconds"]
)
if datetime.now() > next_transition_at:
self._status = self._get_next_status(previous=self._status)
self._time_progressed = datetime.now()
return self._status
@status.setter
def status(self, value):
self._status = value
def _get_next_status(self, previous):
return next(
(nxt for prev, nxt in self._transitions if previous == prev), previous
)
def _get_last_status(self, previous):
next_state = self._get_next_status(previous)
while next_state != previous:
previous = next_state
next_state = self._get_next_status(previous)
return next_state

View File

@ -12,5 +12,20 @@ class MotoAPIBackend(BaseBackend):
backend.reset() backend.reset()
self.__init__() self.__init__()
def get_transition(self, model_name):
from moto.moto_api import state_manager
return state_manager.get_transition(model_name)
def set_transition(self, model_name, transition):
from moto.moto_api import state_manager
state_manager.set_transition(model_name, transition)
def unset_transition(self, model_name):
from moto.moto_api import state_manager
state_manager.unset_transition(model_name)
moto_api_backend = MotoAPIBackend() moto_api_backend = MotoAPIBackend()

View File

@ -65,3 +65,44 @@ class MotoAPIResponse(BaseResponse):
from flask import render_template from flask import render_template
return render_template("dashboard.html") return render_template("dashboard.html")
def get_transition(
self, request, full_url, headers
): # pylint: disable=unused-argument
from .models import moto_api_backend
qs_dict = dict(
x.split("=") for x in request.query_string.decode("utf-8").split("&")
)
model_name = qs_dict["model_name"]
resp = moto_api_backend.get_transition(model_name=model_name)
return 200, {}, json.dumps(resp)
def set_transition(
self, request, full_url, headers
): # pylint: disable=unused-argument
from .models import moto_api_backend
request_body_size = int(headers["Content-Length"])
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
body = json.loads(body)
model_name = body["model_name"]
transition = body["transition"]
moto_api_backend.set_transition(model_name, transition)
return 201, {}, ""
def unset_transition(
self, request, full_url, headers
): # pylint: disable=unused-argument
from .models import moto_api_backend
request_body_size = int(headers["Content-Length"])
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
body = json.loads(body)
model_name = body["model_name"]
moto_api_backend.unset_transition(model_name)
return 201, {}, ""

View File

@ -0,0 +1,42 @@
DEFAULT_TRANSITION = {"progression": "immediate"}
class StateManager:
def __init__(self):
self._default_transitions = dict()
self._transitions = dict()
def register_default_transition(self, model_name, transition):
"""
Register the default transition for a specific model.
This should only be called by Moto backends - use the `set_transition` method to override this default transition in your own tests.
"""
self._default_transitions[model_name] = transition
def set_transition(self, model_name, transition):
"""
Set a transition for a specific model. Any transition added here will take precedence over the default transition that was registered.
See https://docs.getmoto.org/en/latest/docs/configuration/state_transition/index.html for the possible transition-configurations.
"""
self._transitions[model_name] = transition
def unset_transition(self, model_name):
"""
Unset (remove) a custom transition that was set. This is a safe and idempotent operation.
The default transition that was registered will not be altered by this operation.
"""
self._transitions.pop(model_name, None)
def get_transition(self, model_name):
"""
Return the configuration for a specific model. This will return a user-specified configuration, a default configuration of none exists, or the default transition if none exists.
"""
if model_name in self._transitions:
return self._transitions[model_name]
if model_name in self._default_transitions:
return self._default_transitions[model_name]
return DEFAULT_TRANSITION
def get_registered_models(self):
return list(self._default_transitions.keys())

View File

@ -9,4 +9,7 @@ url_paths = {
"{0}/moto-api/data.json": response_instance.model_data, "{0}/moto-api/data.json": response_instance.model_data,
"{0}/moto-api/reset": response_instance.reset_response, "{0}/moto-api/reset": response_instance.reset_response,
"{0}/moto-api/reset-auth": response_instance.reset_auth_response, "{0}/moto-api/reset-auth": response_instance.reset_auth_response,
"{0}/moto-api/state-manager/get-transition": response_instance.get_transition,
"{0}/moto-api/state-manager/set-transition": response_instance.set_transition,
"{0}/moto-api/state-manager/unset-transition": response_instance.unset_transition,
} }

View File

@ -1,4 +1,6 @@
from moto.core import BaseBackend from moto.core import BaseBackend
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.utils import load_resource from moto.utilities.utils import load_resource
import datetime import datetime
import random import random
@ -8,12 +10,23 @@ checks_json = "resources/describe_trusted_advisor_checks.json"
ADVISOR_CHECKS = load_resource(__name__, checks_json) ADVISOR_CHECKS = load_resource(__name__, checks_json)
class SupportCase(object): class SupportCase(ManagedState):
def __init__(self, **kwargs): def __init__(self, **kwargs):
# Configure ManagedState
super().__init__(
"support::case",
transitions=[
("opened", "pending-customer-action"),
("pending-customer-action", "reopened"),
("reopened", "resolved"),
("resolved", "unassigned"),
("unassigned", "work-in-progress"),
("work-in-progress", "opened"),
],
)
self.case_id = kwargs.get("case_id") self.case_id = kwargs.get("case_id")
self.display_id = "foo_display_id" self.display_id = "foo_display_id"
self.subject = kwargs.get("subject") self.subject = kwargs.get("subject")
self.status = "opened"
self.service_code = kwargs.get("service_code") self.service_code = kwargs.get("service_code")
self.category_code = kwargs.get("category_code") self.category_code = kwargs.get("category_code")
self.severity_code = kwargs.get("severity_code") self.severity_code = kwargs.get("severity_code")
@ -54,6 +67,10 @@ class SupportBackend(BaseBackend):
self.check_status = {} self.check_status = {}
self.cases = {} self.cases = {}
state_manager.register_default_transition(
model_name="support::case", transition={"progression": "manual", "times": 1}
)
def reset(self): def reset(self):
region_name = self.region_name region_name = self.region_name
self.__dict__ = {} self.__dict__ = {}
@ -105,23 +122,7 @@ class SupportBackend(BaseBackend):
Fake an advancement through case statuses Fake an advancement through case statuses
""" """
if self.cases[case_id].status == "opened": self.cases[case_id].advance()
self.cases[case_id].status = "pending-customer-action"
elif self.cases[case_id].status == "pending-customer-action":
self.cases[case_id].status = "reopened"
elif self.cases[case_id].status == "reopened":
self.cases[case_id].status = "resolved"
elif self.cases[case_id].status == "resolved":
self.cases[case_id].status = "unassigned"
elif self.cases[case_id].status == "unassigned":
self.cases[case_id].status = "work-in-progress"
elif self.cases[case_id].status == "work-in-progress":
self.cases[case_id].status = "opened"
def advance_case_severity_codes(self, case_id): def advance_case_severity_codes(self, case_id):
""" """

View File

@ -2,6 +2,8 @@ import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict from moto.core.utils import BackendDict
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.sts.models import ACCOUNT_ID from moto.sts.models import ACCOUNT_ID
from .exceptions import ConflictException, BadRequestException from .exceptions import ConflictException, BadRequestException
@ -27,7 +29,7 @@ class BaseObject(BaseModel):
return self.gen_response_object() return self.gen_response_object()
class FakeTranscriptionJob(BaseObject): class FakeTranscriptionJob(BaseObject, ManagedState):
def __init__( def __init__(
self, self,
region_name, region_name,
@ -46,9 +48,17 @@ class FakeTranscriptionJob(BaseObject):
identify_language, identify_language,
language_options, language_options,
): ):
ManagedState.__init__(
self,
"transcribe::transcriptionjob",
transitions=[
(None, "QUEUED"),
("QUEUED", "IN_PROGRESS"),
("IN_PROGRESS", "COMPLETED"),
],
)
self._region_name = region_name self._region_name = region_name
self.transcription_job_name = transcription_job_name self.transcription_job_name = transcription_job_name
self.transcription_job_status = None
self.language_code = language_code self.language_code = language_code
self.media_sample_rate_hertz = media_sample_rate_hertz self.media_sample_rate_hertz = media_sample_rate_hertz
self.media_format = media_format self.media_format = media_format
@ -127,6 +137,7 @@ class FakeTranscriptionJob(BaseObject):
} }
response_fields = response_field_dict[response_type] response_fields = response_field_dict[response_type]
response_object = self.gen_response_object() response_object = self.gen_response_object()
response_object["TranscriptionJobStatus"] = self.status
if response_type != "LIST": if response_type != "LIST":
return { return {
"TranscriptionJob": { "TranscriptionJob": {
@ -142,13 +153,15 @@ class FakeTranscriptionJob(BaseObject):
if k in response_fields and v is not None and v != [None] if k in response_fields and v is not None and v != [None]
} }
def advance_job_status(self): def advance(self):
# On each call advances the fake job status old_status = self.status
super().advance()
new_status = self.status
if not self.transcription_job_status: if old_status == new_status:
self.transcription_job_status = "QUEUED" return
elif self.transcription_job_status == "QUEUED":
self.transcription_job_status = "IN_PROGRESS" if new_status == "IN_PROGRESS":
self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not self.media_sample_rate_hertz: if not self.media_sample_rate_hertz:
self.media_sample_rate_hertz = 44100 self.media_sample_rate_hertz = 44100
@ -165,8 +178,7 @@ class FakeTranscriptionJob(BaseObject):
self.language_code = self.language_options[0] self.language_code = self.language_options[0]
else: else:
self.language_code = "en-US" self.language_code = "en-US"
elif self.transcription_job_status == "IN_PROGRESS": elif new_status == "COMPLETED":
self.transcription_job_status = "COMPLETED"
self.completion_time = (datetime.now() + timedelta(seconds=10)).strftime( self.completion_time = (datetime.now() + timedelta(seconds=10)).strftime(
"%Y-%m-%d %H:%M:%S" "%Y-%m-%d %H:%M:%S"
) )
@ -197,16 +209,21 @@ class FakeTranscriptionJob(BaseObject):
self.transcript = {"TranscriptFileUri": transcript_file_uri} self.transcript = {"TranscriptFileUri": transcript_file_uri}
class FakeVocabulary(BaseObject): class FakeVocabulary(BaseObject, ManagedState):
def __init__( def __init__(
self, region_name, vocabulary_name, language_code, phrases, vocabulary_file_uri self, region_name, vocabulary_name, language_code, phrases, vocabulary_file_uri
): ):
# Configured ManagedState
super().__init__(
"transcribe::vocabulary",
transitions=[(None, "PENDING"), ("PENDING", "READY")],
)
# Configure internal properties
self._region_name = region_name self._region_name = region_name
self.vocabulary_name = vocabulary_name self.vocabulary_name = vocabulary_name
self.language_code = language_code self.language_code = language_code
self.phrases = phrases self.phrases = phrases
self.vocabulary_file_uri = vocabulary_file_uri self.vocabulary_file_uri = vocabulary_file_uri
self.vocabulary_state = None
self.last_modified_time = None self.last_modified_time = None
self.failure_reason = None self.failure_reason = None
self.download_uri = "https://s3.{0}.amazonaws.com/aws-transcribe-dictionary-model-{0}-prod/{1}/{2}/{3}/input.txt".format( # noqa: E501 self.download_uri = "https://s3.{0}.amazonaws.com/aws-transcribe-dictionary-model-{0}-prod/{1}/{2}/{3}/input.txt".format( # noqa: E501
@ -239,24 +256,23 @@ class FakeVocabulary(BaseObject):
} }
response_fields = response_field_dict[response_type] response_fields = response_field_dict[response_type]
response_object = self.gen_response_object() response_object = self.gen_response_object()
response_object["VocabularyState"] = self.status
return { return {
k: v k: v
for k, v in response_object.items() for k, v in response_object.items()
if k in response_fields and v is not None and v != [None] if k in response_fields and v is not None and v != [None]
} }
def advance_job_status(self): def advance(self):
# On each call advances the fake job status old_status = self.status
super().advance()
new_status = self.status
if not self.vocabulary_state: if old_status != new_status:
self.vocabulary_state = "PENDING"
self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elif self.vocabulary_state == "PENDING":
self.vocabulary_state = "READY"
self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
class FakeMedicalTranscriptionJob(BaseObject): class FakeMedicalTranscriptionJob(BaseObject, ManagedState):
def __init__( def __init__(
self, self,
region_name, region_name,
@ -271,9 +287,17 @@ class FakeMedicalTranscriptionJob(BaseObject):
specialty, specialty,
job_type, job_type,
): ):
ManagedState.__init__(
self,
"transcribe::medicaltranscriptionjob",
transitions=[
(None, "QUEUED"),
("QUEUED", "IN_PROGRESS"),
("IN_PROGRESS", "COMPLETED"),
],
)
self._region_name = region_name self._region_name = region_name
self.medical_transcription_job_name = medical_transcription_job_name self.medical_transcription_job_name = medical_transcription_job_name
self.transcription_job_status = None
self.language_code = language_code self.language_code = language_code
self.media_sample_rate_hertz = media_sample_rate_hertz self.media_sample_rate_hertz = media_sample_rate_hertz
self.media_format = media_format self.media_format = media_format
@ -335,6 +359,7 @@ class FakeMedicalTranscriptionJob(BaseObject):
} }
response_fields = response_field_dict[response_type] response_fields = response_field_dict[response_type]
response_object = self.gen_response_object() response_object = self.gen_response_object()
response_object["TranscriptionJobStatus"] = self.status
if response_type != "LIST": if response_type != "LIST":
return { return {
"MedicalTranscriptionJob": { "MedicalTranscriptionJob": {
@ -350,13 +375,15 @@ class FakeMedicalTranscriptionJob(BaseObject):
if k in response_fields and v is not None and v != [None] if k in response_fields and v is not None and v != [None]
} }
def advance_job_status(self): def advance(self):
# On each call advances the fake job status old_status = self.status
super().advance()
new_status = self.status
if not self.transcription_job_status: if old_status == new_status:
self.transcription_job_status = "QUEUED" return
elif self.transcription_job_status == "QUEUED":
self.transcription_job_status = "IN_PROGRESS" if new_status == "IN_PROGRESS":
self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not self.media_sample_rate_hertz: if not self.media_sample_rate_hertz:
self.media_sample_rate_hertz = 44100 self.media_sample_rate_hertz = 44100
@ -365,8 +392,7 @@ class FakeMedicalTranscriptionJob(BaseObject):
self.media_format = ( self.media_format = (
file_ext if file_ext in ["mp3", "mp4", "wav", "flac"] else "mp3" file_ext if file_ext in ["mp3", "mp4", "wav", "flac"] else "mp3"
) )
elif self.transcription_job_status == "IN_PROGRESS": elif new_status == "COMPLETED":
self.transcription_job_status = "COMPLETED"
self.completion_time = (datetime.now() + timedelta(seconds=10)).strftime( self.completion_time = (datetime.now() + timedelta(seconds=10)).strftime(
"%Y-%m-%d %H:%M:%S" "%Y-%m-%d %H:%M:%S"
) )
@ -379,63 +405,28 @@ class FakeMedicalTranscriptionJob(BaseObject):
} }
class FakeMedicalVocabulary(BaseObject): class FakeMedicalVocabulary(FakeVocabulary):
def __init__( def __init__(
self, region_name, vocabulary_name, language_code, vocabulary_file_uri self, region_name, vocabulary_name, language_code, vocabulary_file_uri
): ):
super().__init__(
region_name,
vocabulary_name,
language_code=language_code,
phrases=None,
vocabulary_file_uri=vocabulary_file_uri,
)
self.model_name = "transcribe::medicalvocabulary"
self._region_name = region_name self._region_name = region_name
self.vocabulary_name = vocabulary_name self.vocabulary_name = vocabulary_name
self.language_code = language_code self.language_code = language_code
self.vocabulary_file_uri = vocabulary_file_uri self.vocabulary_file_uri = vocabulary_file_uri
self.vocabulary_state = None
self.last_modified_time = None self.last_modified_time = None
self.failure_reason = None self.failure_reason = None
self.download_uri = "https://s3.us-east-1.amazonaws.com/aws-transcribe-dictionary-model-{}-prod/{}/medical/{}/{}/input.txt".format( # noqa: E501 self.download_uri = "https://s3.us-east-1.amazonaws.com/aws-transcribe-dictionary-model-{}-prod/{}/medical/{}/{}/input.txt".format( # noqa: E501
region_name, ACCOUNT_ID, self.vocabulary_name, uuid.uuid4() region_name, ACCOUNT_ID, self.vocabulary_name, uuid.uuid4()
) )
def response_object(self, response_type):
response_field_dict = {
"CREATE": [
"VocabularyName",
"LanguageCode",
"VocabularyState",
"LastModifiedTime",
"FailureReason",
],
"GET": [
"VocabularyName",
"LanguageCode",
"VocabularyState",
"LastModifiedTime",
"FailureReason",
"DownloadUri",
],
"LIST": [
"VocabularyName",
"LanguageCode",
"LastModifiedTime",
"VocabularyState",
],
}
response_fields = response_field_dict[response_type]
response_object = self.gen_response_object()
return {
k: v
for k, v in response_object.items()
if k in response_fields and v is not None and v != [None]
}
def advance_job_status(self):
# On each call advances the fake job status
if not self.vocabulary_state:
self.vocabulary_state = "PENDING"
self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elif self.vocabulary_state == "PENDING":
self.vocabulary_state = "READY"
self.last_modified_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
class TranscribeBackend(BaseBackend): class TranscribeBackend(BaseBackend):
def __init__(self, region_name=None): def __init__(self, region_name=None):
@ -445,6 +436,22 @@ class TranscribeBackend(BaseBackend):
self.vocabularies = {} self.vocabularies = {}
self.region_name = region_name self.region_name = region_name
state_manager.register_default_transition(
"transcribe::vocabulary", transition={"progression": "manual", "times": 1}
)
state_manager.register_default_transition(
"transcribe::medicalvocabulary",
transition={"progression": "manual", "times": 1},
)
state_manager.register_default_transition(
"transcribe::transcriptionjob",
transition={"progression": "manual", "times": 1},
)
state_manager.register_default_transition(
"transcribe::medicaltranscriptionjob",
transition={"progression": "manual", "times": 1},
)
def reset(self): def reset(self):
region_name = self.region_name region_name = self.region_name
self.__dict__ = {} self.__dict__ = {}
@ -534,7 +541,7 @@ class TranscribeBackend(BaseBackend):
def get_transcription_job(self, transcription_job_name): def get_transcription_job(self, transcription_job_name):
try: try:
job = self.transcriptions[transcription_job_name] job = self.transcriptions[transcription_job_name]
job.advance_job_status() # Fakes advancement through statuses. job.advance() # Fakes advancement through statuses.
return job.response_object("GET") return job.response_object("GET")
except KeyError: except KeyError:
raise BadRequestException( raise BadRequestException(
@ -545,7 +552,7 @@ class TranscribeBackend(BaseBackend):
def get_medical_transcription_job(self, medical_transcription_job_name): def get_medical_transcription_job(self, medical_transcription_job_name):
try: try:
job = self.medical_transcriptions[medical_transcription_job_name] job = self.medical_transcriptions[medical_transcription_job_name]
job.advance_job_status() # Fakes advancement through statuses. job.advance() # Fakes advancement through statuses.
return job.response_object("GET") return job.response_object("GET")
except KeyError: except KeyError:
raise BadRequestException( raise BadRequestException(
@ -577,7 +584,7 @@ class TranscribeBackend(BaseBackend):
jobs = list(self.transcriptions.values()) jobs = list(self.transcriptions.values())
if state_equals: if state_equals:
jobs = [job for job in jobs if job.transcription_job_status == state_equals] jobs = [job for job in jobs if job.status == state_equals]
if job_name_contains: if job_name_contains:
jobs = [ jobs = [
@ -607,7 +614,7 @@ class TranscribeBackend(BaseBackend):
jobs = list(self.medical_transcriptions.values()) jobs = list(self.medical_transcriptions.values())
if status: if status:
jobs = [job for job in jobs if job.transcription_job_status == status] jobs = [job for job in jobs if job.status == status]
if job_name_contains: if job_name_contains:
jobs = [ jobs = [
@ -698,7 +705,7 @@ class TranscribeBackend(BaseBackend):
def get_vocabulary(self, vocabulary_name): def get_vocabulary(self, vocabulary_name):
try: try:
job = self.vocabularies[vocabulary_name] job = self.vocabularies[vocabulary_name]
job.advance_job_status() # Fakes advancement through statuses. job.advance() # Fakes advancement through statuses.
return job.response_object("GET") return job.response_object("GET")
except KeyError: except KeyError:
raise BadRequestException( raise BadRequestException(
@ -709,7 +716,7 @@ class TranscribeBackend(BaseBackend):
def get_medical_vocabulary(self, vocabulary_name): def get_medical_vocabulary(self, vocabulary_name):
try: try:
job = self.medical_vocabularies[vocabulary_name] job = self.medical_vocabularies[vocabulary_name]
job.advance_job_status() # Fakes advancement through statuses. job.advance() # Fakes advancement through statuses.
return job.response_object("GET") return job.response_object("GET")
except KeyError: except KeyError:
raise BadRequestException( raise BadRequestException(
@ -740,7 +747,7 @@ class TranscribeBackend(BaseBackend):
vocabularies = [ vocabularies = [
vocabulary vocabulary
for vocabulary in vocabularies for vocabulary in vocabularies
if vocabulary.vocabulary_state == state_equals if vocabulary.status == state_equals
] ]
if name_contains: if name_contains:
@ -777,7 +784,7 @@ class TranscribeBackend(BaseBackend):
vocabularies = [ vocabularies = [
vocabulary vocabulary
for vocabulary in vocabularies for vocabulary in vocabularies
if vocabulary.vocabulary_state == state_equals if vocabulary.status == state_equals
] ]
if name_contains: if name_contains:

View File

@ -160,8 +160,8 @@ def test_list_jobs():
ec2_client, iam_client, _, _, batch_client = _get_clients() ec2_client, iam_client, _, _, batch_client = _get_clients()
_, _, _, iam_arn = _setup(ec2_client, iam_client) _, _, _, iam_arn = _setup(ec2_client, iam_client)
job_def_name = "sleep5" job_def_name = "sleep2"
commands = ["sleep", "5"] commands = ["sleep", "2"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
resp = batch_client.submit_job( resp = batch_client.submit_job(
@ -179,7 +179,10 @@ def test_list_jobs():
job.should.have.key("createdAt") job.should.have.key("createdAt")
job.should.have.key("jobDefinition") job.should.have.key("jobDefinition")
job.should.have.key("jobName") job.should.have.key("jobName")
job.should.have.key("status").which.should.be.within(["STARTING", "RUNNABLE"]) # This is async, so we can't be sure where we are in the process
job.should.have.key("status").within(
["SUBMITTED", "PENDING", "STARTING", "RUNNABLE", "RUNNING"]
)
batch_client.list_jobs(jobQueue=queue_arn, jobStatus="SUCCEEDED")[ batch_client.list_jobs(jobQueue=queue_arn, jobStatus="SUCCEEDED")[
"jobSummaryList" "jobSummaryList"
@ -299,30 +302,36 @@ def test_cancel_running_job():
jobName="test_job_name", jobQueue=queue_arn, jobDefinition=job_def_arn jobName="test_job_name", jobQueue=queue_arn, jobDefinition=job_def_arn
) )
job_id = resp["jobId"] job_id = resp["jobId"]
_wait_for_job_status(batch_client, job_id, "STARTING") _wait_for_job_statuses(
batch_client, job_id, statuses=["RUNNABLE", "STARTING", "RUNNING"]
)
batch_client.cancel_job(jobId=job_id, reason="test_cancel") batch_client.cancel_job(jobId=job_id, reason="test_cancel")
# We cancelled too late, the job was already running. Now we just wait for it to succeed # We cancelled too late, the job was already running. Now we just wait for it to succeed
_wait_for_job_status(batch_client, job_id, "SUCCEEDED") _wait_for_job_status(batch_client, job_id, "SUCCEEDED", seconds_to_wait=30)
resp = batch_client.describe_jobs(jobs=[job_id]) resp = batch_client.describe_jobs(jobs=[job_id])
resp["jobs"][0]["jobName"].should.equal("test_job_name") resp["jobs"][0]["jobName"].should.equal("test_job_name")
resp["jobs"][0].shouldnt.have.key("statusReason") resp["jobs"][0].shouldnt.have.key("statusReason")
def _wait_for_job_status(client, job_id, status, seconds_to_wait=60): def _wait_for_job_status(client, job_id, status, seconds_to_wait=30):
_wait_for_job_statuses(client, job_id, [status], seconds_to_wait)
def _wait_for_job_statuses(client, job_id, statuses, seconds_to_wait=30):
wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
last_job_status = None last_job_status = None
while datetime.datetime.now() < wait_time: while datetime.datetime.now() < wait_time:
resp = client.describe_jobs(jobs=[job_id]) resp = client.describe_jobs(jobs=[job_id])
last_job_status = resp["jobs"][0]["status"] last_job_status = resp["jobs"][0]["status"]
if last_job_status == status: if last_job_status in statuses:
break break
time.sleep(0.1) time.sleep(0.1)
else: else:
raise RuntimeError( raise RuntimeError(
"Time out waiting for job status {status}!\n Last status: {last_status}".format( "Time out waiting for job status {status}!\n Last status: {last_status}".format(
status=status, last_status=last_job_status status=statuses, last_status=last_job_status
) )
) )

View File

View File

@ -0,0 +1,64 @@
import json
import sure # noqa # pylint: disable=unused-import
from moto import server
def test_set_transition():
backend = server.create_backend_app("moto_api")
test_client = backend.test_client()
post_body = dict(
model_name="server::test1",
transition={"progression": "waiter", "wait_times": 3},
)
resp = test_client.post(
"http://localhost:5000/moto-api/state-manager/set-transition",
data=json.dumps(post_body),
)
resp.status_code.should.equal(201)
resp = test_client.get(
"http://localhost:5000/moto-api/state-manager/get-transition?model_name=server::test1"
)
resp.status_code.should.equal(200)
json.loads(resp.data).should.equal({"progression": "waiter", "wait_times": 3})
def test_unset_transition():
backend = server.create_backend_app("moto_api")
test_client = backend.test_client()
post_body = dict(
model_name="server::test2",
transition={"progression": "waiter", "wait_times": 3},
)
test_client.post(
"http://localhost:5000/moto-api/state-manager/set-transition",
data=json.dumps(post_body),
)
post_body = dict(model_name="server::test2")
resp = test_client.post(
"http://localhost:5000/moto-api/state-manager/unset-transition",
data=json.dumps(post_body),
)
resp.status_code.should.equal(201)
resp = test_client.get(
"http://localhost:5000/moto-api/state-manager/get-transition?model_name=server::test2"
)
resp.status_code.should.equal(200)
json.loads(resp.data).should.equal({"progression": "immediate"})
def test_get_default_transition():
backend = server.create_backend_app("moto_api")
test_client = backend.test_client()
resp = test_client.get(
"http://localhost:5000/moto-api/state-manager/get-transition?model_name=unknown"
)
resp.status_code.should.equal(200)
json.loads(resp.data).should.equal({"progression": "immediate"})

View File

@ -0,0 +1,63 @@
import json
import requests
import sure # noqa # pylint: disable=unused-import
from moto import settings
from unittest import SkipTest
def test_set_transition():
if not settings.TEST_SERVER_MODE:
raise SkipTest("We only want to test ServerMode here")
post_body = dict(
model_name="test_model0", transition={"progression": "waiter", "wait_times": 3}
)
resp = requests.post(
"http://localhost:5000/moto-api/state-manager/set-transition",
data=json.dumps(post_body),
)
resp.status_code.should.equal(201)
resp = requests.get(
"http://localhost:5000/moto-api/state-manager/get-transition?model_name=test_model0"
)
resp.status_code.should.equal(200)
json.loads(resp.content).should.equal({"progression": "waiter", "wait_times": 3})
def test_unset_transition():
if not settings.TEST_SERVER_MODE:
raise SkipTest("We only want to test ServerMode here")
post_body = dict(
model_name="test::model1", transition={"progression": "waiter", "wait_times": 3}
)
requests.post(
"http://localhost:5000/moto-api/state-manager/set-transition",
data=json.dumps(post_body),
)
post_body = dict(model_name="test::model1")
resp = requests.post(
"http://localhost:5000/moto-api/state-manager/unset-transition",
data=json.dumps(post_body),
)
resp = requests.get(
"http://localhost:5000/moto-api/state-manager/get-transition?model_name=test::model1"
)
resp.status_code.should.equal(200)
json.loads(resp.content).should.equal({"progression": "immediate"})
def test_get_default_transition():
if not settings.TEST_SERVER_MODE:
raise SkipTest("We only want to test ServerMode here")
resp = requests.get(
"http://localhost:5000/moto-api/state-manager/get-transition?model_name=unknown"
)
resp.status_code.should.equal(200)
json.loads(resp.content).should.equal({"progression": "immediate"})

View File

@ -0,0 +1,51 @@
from tests.test_batch import _get_clients, _setup
from tests.test_batch.test_batch_jobs import prepare_job, _wait_for_job_status
import sure # noqa # pylint: disable=unused-import
from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs, settings
from moto.moto_api import state_manager
from unittest import SkipTest
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_cancel_pending_job():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't use state_manager in ServerMode directly")
ec2_client, iam_client, _, _, batch_client = _get_clients()
_, _, _, iam_arn = _setup(ec2_client, iam_client)
# We need to be able to cancel a job that has not been started yet
# Locally, our jobs start so fast that we can't cancel them in time
# So artificially delay the status progression
state_manager.set_transition(
"batch::job", transition={"progression": "time", "seconds": 2}
)
commands = ["echo", "hello"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, "test")
resp = batch_client.submit_job(
jobName="test_job_name",
jobQueue=queue_arn,
jobDefinition=job_def_arn,
)
job_id = resp["jobId"]
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
_wait_for_job_status(batch_client, job_id, "FAILED", seconds_to_wait=20)
resp = batch_client.describe_jobs(jobs=[job_id])
resp["jobs"][0]["jobName"].should.equal("test_job_name")
resp["jobs"][0]["statusReason"].should.equal("test_cancel")
@mock_batch
def test_state_manager_should_return_registered_model():
state_manager.get_registered_models().should.contain("batch::job")

View File

@ -0,0 +1,148 @@
import sure # noqa # pylint: disable=unused-import
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.moto_api import state_manager
class ExampleModel(ManagedState):
def __init__(self):
super().__init__(
model_name="example::model", transitions=[("frist_status", "second_status")]
)
state_manager.register_default_transition(
model_name="example::model", transition={"progression": "manual", "times": 999}
)
def test_initial_state():
ExampleModel().status.should.equal("frist_status")
def test_advancing_without_specifying_configuration_does_nothing():
model = ExampleModel()
for _ in range(5):
model.status.should.equal("frist_status")
model.advance()
def test_advance_immediately():
model = ExampleModel()
model._transitions = [
("frist_status", "second"),
("second", "third"),
("third", "fourth"),
("fourth", "fifth"),
]
state_manager.set_transition(
model_name="example::model", transition={"progression": "immediate"}
)
model.status.should.equal("fifth")
model.advance()
model.status.should.equal("fifth")
def test_advance_x_times():
model = ExampleModel()
state_manager.set_transition(
model_name="example::model", transition={"progression": "manual", "times": 3}
)
for _ in range(2):
model.advance()
model.status.should.equal("frist_status")
# 3rd time is a charm
model.advance()
model.status.should.equal("second_status")
# Status is still the same if we keep asking for it
model.status.should.equal("second_status")
# Advancing more does not make a difference - there's nothing to advance to
model.advance()
model.status.should.equal("second_status")
def test_advance_multiple_stages():
model = ExampleModel()
model._transitions = [
("frist_status", "second"),
("second", "third"),
("third", "fourth"),
("fourth", "fifth"),
]
state_manager.set_transition(
model_name="example::model", transition={"progression": "manual", "times": 1}
)
model.status.should.equal("frist_status")
model.status.should.equal("frist_status")
model.advance()
model.status.should.equal("second")
model.status.should.equal("second")
model.advance()
model.status.should.equal("third")
model.status.should.equal("third")
model.advance()
model.status.should.equal("fourth")
model.status.should.equal("fourth")
model.advance()
model.status.should.equal("fifth")
model.status.should.equal("fifth")
def test_override_status():
model = ExampleModel()
model.status = "creating"
model._transitions = [("creating", "ready"), ("updating", "ready")]
state_manager.set_transition(
model_name="example::model", transition={"progression": "manual", "times": 1}
)
model.status.should.equal("creating")
model.advance()
model.status.should.equal("ready")
model.advance()
# We're still ready
model.status.should.equal("ready")
# Override status manually
model.status = "updating"
model.status.should.equal("updating")
model.advance()
model.status.should.equal("ready")
model.status.should.equal("ready")
model.advance()
model.status.should.equal("ready")
class SlowModel(ManagedState):
def __init__(self):
super().__init__(
model_name="example::slowmodel", transitions=[("first", "second")]
)
def test_realworld_delay():
model = SlowModel()
state_manager.set_transition(
model_name="example::slowmodel",
transition={"progression": "time", "seconds": 2},
)
model.status.should.equal("first")
# The status will stick to 'first' for a long time
# Advancing the model doesn't do anything, really
for _ in range(10):
model.advance()
model.status.should.equal("first")
import time
time.sleep(2)
# Status has only progressed after 2 seconds have passed
model.status.should.equal("second")

View File

@ -0,0 +1,76 @@
import sure # noqa # pylint: disable=unused-import
from moto.moto_api._internal.state_manager import StateManager
def test_public_api():
from moto.moto_api import state_manager
state_manager.should.be.a(StateManager)
def test_default_transition():
manager = StateManager()
manager.register_default_transition(
model_name="dax::cluster", transition={"progression": "manual"}
)
actual = manager.get_transition(model_name="dax::cluster")
actual.should.equal({"progression": "manual"})
def test_set_transition():
manager = StateManager()
manager.set_transition(
model_name="dax::cluster", transition={"progression": "waiter", "wait_times": 3}
)
actual = manager.get_transition(model_name="dax::cluster")
actual.should.equal({"progression": "waiter", "wait_times": 3})
def test_set_transition_overrides_default():
manager = StateManager()
manager.register_default_transition(
model_name="dax::cluster", transition={"progression": "manual"}
)
manager.set_transition(
model_name="dax::cluster", transition={"progression": "waiter", "wait_times": 3}
)
actual = manager.get_transition(model_name="dax::cluster")
actual.should.equal({"progression": "waiter", "wait_times": 3})
def test_unset_transition():
manager = StateManager()
manager.register_default_transition(
model_name="dax::cluster", transition={"progression": "manual"}
)
manager.set_transition(
model_name="dax::cluster", transition={"progression": "waiter", "wait_times": 3}
)
manager.unset_transition(model_name="dax::cluster")
actual = manager.get_transition(model_name="dax::cluster")
actual.should.equal({"progression": "manual"})
def test_get_default_transition():
manager = StateManager()
actual = manager.get_transition(model_name="unknown")
actual.should.equal({"progression": "immediate"})
def test_get_registered_models():
manager = StateManager()
manager.get_registered_models().should.equal([])