Service: Scheduler (#6197)

This commit is contained in:
Bert Blommers 2023-04-10 16:00:15 +00:00 committed by GitHub
parent 01e2d11ed3
commit b4346e2eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 783 additions and 2 deletions

View File

@ -6172,6 +6172,24 @@
- [ ] update_workteam
</details>
## scheduler
<details>
<summary>100% implemented</summary>
- [X] create_schedule
- [X] create_schedule_group
- [X] delete_schedule
- [X] delete_schedule_group
- [X] get_schedule
- [X] get_schedule_group
- [X] list_schedule_groups
- [X] list_schedules
- [X] list_tags_for_resource
- [X] tag_resource
- [X] untag_resource
- [X] update_schedule
</details>
## sdb
<details>
<summary>50% implemented</summary>
@ -7081,7 +7099,6 @@
- sagemaker-metrics
- sagemaker-runtime
- savingsplans
- scheduler
- schemas
- securityhub
- securitylake

View File

@ -0,0 +1,58 @@
.. _implementedservice_scheduler:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
=========
scheduler
=========
.. autoclass:: moto.scheduler.models.EventBridgeSchedulerBackend
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_scheduler
def test_scheduler_behaviour:
boto3.client("scheduler")
...
|start-h3| Implemented features for this service |end-h3|
- [X] create_schedule
The ClientToken parameter is not yet implemented
- [X] create_schedule_group
The ClientToken parameter is not yet implemented
- [X] delete_schedule
- [X] delete_schedule_group
- [X] get_schedule
- [X] get_schedule_group
- [X] list_schedule_groups
The MaxResults-parameter and pagination options are not yet implemented
- [ ] list_schedules
- [X] list_tags_for_resource
- [X] tag_resource
- [X] untag_resource
- [X] update_schedule
The ClientToken is not yet implemented

View File

@ -143,6 +143,7 @@ mock_route53resolver = lazy_load(
mock_s3 = lazy_load(".s3", "mock_s3")
mock_s3control = lazy_load(".s3control", "mock_s3control")
mock_sagemaker = lazy_load(".sagemaker", "mock_sagemaker")
mock_scheduler = lazy_load(".scheduler", "mock_scheduler")
mock_sdb = lazy_load(".sdb", "mock_sdb")
mock_secretsmanager = lazy_load(".secretsmanager", "mock_secretsmanager")
mock_servicequotas = lazy_load(

View File

@ -148,6 +148,7 @@ backend_url_patterns = [
re.compile("https?://([0-9]+)\\.s3-control\\.(.+)\\.amazonaws\\.com"),
),
("sagemaker", re.compile("https?://api\\.sagemaker\\.(.+)\\.amazonaws.com")),
("scheduler", re.compile("https?://scheduler\\.(.+)\\.amazonaws\\.com")),
("sdb", re.compile("https?://sdb\\.(.+)\\.amazonaws\\.com")),
("secretsmanager", re.compile("https?://secretsmanager\\.(.+)\\.amazonaws\\.com")),
(

View File

@ -0,0 +1,5 @@
"""scheduler module initialization; sets value for base decorator."""
from .models import scheduler_backends
from ..core.models import base_decorator
mock_scheduler = base_decorator(scheduler_backends)

View File

@ -0,0 +1,12 @@
"""Exceptions raised by the scheduler service."""
from moto.core.exceptions import JsonRESTError
class ScheduleNotFound(JsonRESTError):
def __init__(self) -> None:
super().__init__("ResourceNotFoundException", "Schedule not found")
class ScheduleGroupNotFound(JsonRESTError):
def __init__(self) -> None:
super().__init__("ResourceNotFoundException", "ScheduleGroup not found")

240
moto/scheduler/models.py Normal file
View File

@ -0,0 +1,240 @@
"""EventBridgeSchedulerBackend class with methods for supported APIs."""
from typing import Any, Dict, List, Iterable, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import unix_time
from moto.utilities.tagging_service import TaggingService
from .exceptions import ScheduleNotFound, ScheduleGroupNotFound
class Schedule(BaseModel):
def __init__(
self,
region: str,
account_id: str,
group_name: str,
name: str,
description: Optional[str],
schedule_expression: str,
schedule_expression_timezone: Optional[str],
flexible_time_window: Dict[str, Any],
target: Dict[str, Any],
state: Optional[str],
kms_key_arn: Optional[str],
start_date: Optional[str],
end_date: Optional[str],
):
self.name = name
self.group_name = group_name
self.description = description
self.arn = (
f"arn:aws:scheduler:{region}:{account_id}:schedule/{group_name}/{name}"
)
self.schedule_expression = schedule_expression
self.schedule_expression_timezone = schedule_expression_timezone
self.flexible_time_window = flexible_time_window
self.target = Schedule.validate_target(target)
self.state = state or "ENABLED"
self.kms_key_arn = kms_key_arn
self.start_date = start_date
self.end_date = end_date
@staticmethod
def validate_target(target: Dict[str, Any]) -> Dict[str, Any]: # type: ignore[misc]
if "RetryPolicy" not in target:
target["RetryPolicy"] = {
"MaximumEventAgeInSeconds": 86400,
"MaximumRetryAttempts": 185,
}
return target
def to_dict(self, short: bool = False) -> Dict[str, Any]:
dct: Dict[str, Any] = {
"Arn": self.arn,
"Name": self.name,
"GroupName": self.group_name,
"Description": self.description,
"ScheduleExpression": self.schedule_expression,
"ScheduleExpressionTimezone": self.schedule_expression_timezone,
"FlexibleTimeWindow": self.flexible_time_window,
"Target": self.target,
"State": self.state,
"KmsKeyArn": self.kms_key_arn,
"StartDate": self.start_date,
"EndDate": self.end_date,
}
if short:
dct["Target"] = {"Arn": dct["Target"]["Arn"]}
return dct
class ScheduleGroup(BaseModel):
def __init__(self, region: str, account_id: str, name: str):
self.name = name
self.arn = f"arn:aws:scheduler:{region}:{account_id}:schedule-group/{name}"
self.schedules: Dict[str, Schedule] = dict()
self.created_on = None if self.name == "default" else unix_time()
self.last_modified = None if self.name == "default" else unix_time()
def add_schedule(self, schedule: Schedule) -> None:
self.schedules[schedule.name] = schedule
def get_schedule(self, name: str) -> Schedule:
if name not in self.schedules:
raise ScheduleNotFound
return self.schedules[name]
def delete_schedule(self, name: str) -> None:
self.schedules.pop(name)
def to_dict(self) -> Dict[str, Any]:
return {
"Arn": self.arn,
"CreationDate": self.created_on,
"LastModificationDate": self.last_modified,
"Name": self.name,
"State": "ACTIVE",
}
class EventBridgeSchedulerBackend(BaseBackend):
"""Implementation of EventBridgeScheduler APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.schedules: List[Schedule] = list()
self.schedule_groups = {
"default": ScheduleGroup(
region=region_name, account_id=account_id, name="default"
)
}
self.tagger = TaggingService()
def create_schedule(
self,
description: str,
end_date: str,
flexible_time_window: Dict[str, Any],
group_name: str,
kms_key_arn: str,
name: str,
schedule_expression: str,
schedule_expression_timezone: str,
start_date: str,
state: str,
target: Dict[str, Any],
) -> Schedule:
"""
The ClientToken parameter is not yet implemented
"""
group = self.schedule_groups[group_name or "default"]
schedule = Schedule(
region=self.region_name,
account_id=self.account_id,
group_name=group.name,
name=name,
description=description,
schedule_expression=schedule_expression,
schedule_expression_timezone=schedule_expression_timezone,
flexible_time_window=flexible_time_window,
target=target,
state=state,
kms_key_arn=kms_key_arn,
start_date=start_date,
end_date=end_date,
)
group.add_schedule(schedule)
return schedule
def get_schedule(self, group_name: Optional[str], name: str) -> Schedule:
group = self.get_schedule_group(group_name)
return group.get_schedule(name)
def delete_schedule(self, group_name: Optional[str], name: str) -> None:
group = self.get_schedule_group(group_name)
group.delete_schedule(name)
def update_schedule(
self,
description: str,
end_date: str,
flexible_time_window: Dict[str, Any],
group_name: str,
kms_key_arn: str,
name: str,
schedule_expression: str,
schedule_expression_timezone: str,
start_date: str,
state: str,
target: Dict[str, Any],
) -> Schedule:
"""
The ClientToken is not yet implemented
"""
schedule = self.get_schedule(group_name=group_name, name=name)
schedule.schedule_expression = schedule_expression
schedule.schedule_expression_timezone = schedule_expression_timezone
schedule.flexible_time_window = flexible_time_window
schedule.target = Schedule.validate_target(target)
schedule.description = description
schedule.state = state
schedule.kms_key_arn = kms_key_arn
schedule.start_date = start_date
schedule.end_date = end_date
return schedule
def list_schedules(
self, group_names: Optional[str], state: Optional[str]
) -> Iterable[Schedule]:
"""
The following parameters are not yet implemented: MaxResults, NamePrefix, NextToken
"""
results = []
for group in self.schedule_groups.values():
if not group_names or group.name in group_names:
for schedule in group.schedules.values():
if not state or schedule.state == state:
results.append(schedule)
return results
def create_schedule_group(
self, name: str, tags: List[Dict[str, str]]
) -> ScheduleGroup:
"""
The ClientToken parameter is not yet implemented
"""
group = ScheduleGroup(
region=self.region_name, account_id=self.account_id, name=name
)
self.schedule_groups[name] = group
self.tagger.tag_resource(group.arn, tags)
return group
def get_schedule_group(self, group_name: Optional[str]) -> ScheduleGroup:
if (group_name or "default") not in self.schedule_groups:
raise ScheduleGroupNotFound
return self.schedule_groups[group_name or "default"]
def list_schedule_groups(self) -> Iterable[ScheduleGroup]:
"""
The MaxResults-parameter and pagination options are not yet implemented
"""
return self.schedule_groups.values()
def delete_schedule_group(self, name: Optional[str]) -> None:
self.schedule_groups.pop(name or "default")
def list_tags_for_resource(
self, resource_arn: str
) -> Dict[str, List[Dict[str, str]]]:
return self.tagger.list_tags_for_resource(resource_arn)
def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None:
self.tagger.tag_resource(resource_arn, tags)
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
scheduler_backends = BackendDict(EventBridgeSchedulerBackend, "scheduler")

142
moto/scheduler/responses.py Normal file
View File

@ -0,0 +1,142 @@
"""Handles incoming scheduler requests, invokes methods, returns responses."""
import json
from typing import Any
from urllib.parse import unquote
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import scheduler_backends, EventBridgeSchedulerBackend
class EventBridgeSchedulerResponse(BaseResponse):
"""Handler for EventBridgeScheduler requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="scheduler")
@property
def scheduler_backend(self) -> EventBridgeSchedulerBackend:
"""Return backend instance specific for this region."""
return scheduler_backends[self.current_account][self.region]
def create_schedule(self) -> str:
description = self._get_param("Description")
end_date = self._get_param("EndDate")
flexible_time_window = self._get_param("FlexibleTimeWindow")
group_name = self._get_param("GroupName")
kms_key_arn = self._get_param("KmsKeyArn")
name = self.uri.split("/")[-1]
schedule_expression = self._get_param("ScheduleExpression")
schedule_expression_timezone = self._get_param("ScheduleExpressionTimezone")
start_date = self._get_param("StartDate")
state = self._get_param("State")
target = self._get_param("Target")
schedule = self.scheduler_backend.create_schedule(
description=description,
end_date=end_date,
flexible_time_window=flexible_time_window,
group_name=group_name,
kms_key_arn=kms_key_arn,
name=name,
schedule_expression=schedule_expression,
schedule_expression_timezone=schedule_expression_timezone,
start_date=start_date,
state=state,
target=target,
)
return json.dumps(dict(ScheduleArn=schedule.arn))
def get_schedule(self) -> str:
group_name = self._get_param("groupName")
full_url = self.uri.split("?")[0]
name = full_url.split("/")[-1]
schedule = self.scheduler_backend.get_schedule(group_name, name)
return json.dumps(schedule.to_dict())
def delete_schedule(self) -> str:
group_name = self._get_param("groupName")
name = self.uri.split("?")[0].split("/")[-1]
self.scheduler_backend.delete_schedule(group_name, name)
return "{}"
def update_schedule(self) -> str:
group_name = self._get_param("groupName")
name = self.uri.split("?")[0].split("/")[-1]
description = self._get_param("Description")
end_date = self._get_param("EndDate")
flexible_time_window = self._get_param("FlexibleTimeWindow")
kms_key_arn = self._get_param("KmsKeyArn")
schedule_expression = self._get_param("ScheduleExpression")
schedule_expression_timezone = self._get_param("ScheduleExpressionTimezone")
start_date = self._get_param("StartDate")
state = self._get_param("State")
target = self._get_param("Target")
schedule = self.scheduler_backend.update_schedule(
description=description,
end_date=end_date,
flexible_time_window=flexible_time_window,
group_name=group_name,
kms_key_arn=kms_key_arn,
name=name,
schedule_expression=schedule_expression,
schedule_expression_timezone=schedule_expression_timezone,
start_date=start_date,
state=state,
target=target,
)
return json.dumps(dict(ScheduleArn=schedule.arn))
def list_schedules(self) -> str:
group_names = self.querystring.get("ScheduleGroup")
state = self._get_param("State")
schedules = self.scheduler_backend.list_schedules(group_names, state)
return json.dumps({"Schedules": [sch.to_dict(short=True) for sch in schedules]})
def create_schedule_group(self) -> str:
name = self._get_param("Name")
tags = self._get_param("Tags")
schedule_group = self.scheduler_backend.create_schedule_group(
name=name,
tags=tags,
)
return json.dumps(dict(ScheduleGroupArn=schedule_group.arn))
def get_schedule_group(self) -> str:
group_name = self.uri.split("?")[0].split("/")[-1]
group = self.scheduler_backend.get_schedule_group(group_name)
return json.dumps(group.to_dict())
def delete_schedule_group(self) -> str:
group_name = self.uri.split("?")[0].split("/")[-1]
self.scheduler_backend.delete_schedule_group(group_name)
return "{}"
def list_schedule_groups(self) -> str:
schedule_groups = self.scheduler_backend.list_schedule_groups()
return json.dumps(dict(ScheduleGroups=[sg.to_dict() for sg in schedule_groups]))
def list_tags_for_resource(self) -> TYPE_RESPONSE:
resource_arn = unquote(self.uri.split("/tags/")[-1])
tags = self.scheduler_backend.list_tags_for_resource(resource_arn)
return 200, {}, json.dumps(tags)
def tag_resource(self) -> TYPE_RESPONSE:
resource_arn = unquote(self.uri.split("/tags/")[-1])
tags = json.loads(self.body)["Tags"]
self.scheduler_backend.tag_resource(resource_arn, tags)
return 200, {}, "{}"
def untag_resource(self) -> TYPE_RESPONSE:
resource_arn = unquote(self.uri.split("?")[0].split("/tags/")[-1])
tag_keys = self.querystring.get("TagKeys")
self.scheduler_backend.untag_resource(resource_arn, tag_keys) # type: ignore
return 200, {}, "{}"
def tags(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE:
super().setup_class(request, full_url, headers)
if request.method == "POST":
return self.tag_resource()
elif request.method == "DELETE":
return self.untag_resource()
else:
return self.list_tags_for_resource()

19
moto/scheduler/urls.py Normal file
View File

@ -0,0 +1,19 @@
"""scheduler base URL and path."""
from .responses import EventBridgeSchedulerResponse
url_bases = [
r"https?://scheduler\.(.+)\.amazonaws\.com",
]
response = EventBridgeSchedulerResponse()
url_paths = {
"{0}/schedules$": response.dispatch,
"{0}/schedules/(?P<name>[^/]+)$": response.dispatch,
"{0}/schedule-groups$": response.dispatch,
"{0}/schedule-groups/(?P<name>[^/]+)$": response.dispatch,
"{0}/tags/(?P<ResourceArn>.+)$": response.tags,
"{0}/tags/arn:aws:scheduler:(?P<region_name>[^/]+):(?P<account_id>[^/]+):schedule/(?P<group_name>[^/]+)/(?P<schedule_name>[^/]+)/?$": response.tags,
}

View File

@ -235,7 +235,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/p*,moto/q*,moto/rdsdata
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/i*,moto/k*,moto/l*,moto/m*,moto/n*,moto/o*,moto/p*,moto/q*,moto/rdsdata,moto/scheduler
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract

View File

@ -494,6 +494,9 @@ s3:
- TestAccS3ObjectsDataSource_fetchOwner
sagemaker:
- TestAccSageMakerPrebuiltECRImageDataSource
scheduler:
- TestAccSchedulerSchedule_
- TestAccSchedulerScheduleGroup_
secretsmanager:
- TestAccSecretsManagerSecretDataSource_basic
- TestAccSecretsManagerSecretPolicy_

View File

View File

@ -0,0 +1,49 @@
"""Unit tests for scheduler-supported APIs."""
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_scheduler
from moto.core import DEFAULT_ACCOUNT_ID
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_scheduler
def test_create_get_delete_schedule_group():
client = boto3.client("scheduler", region_name="eu-west-1")
arn = client.create_schedule_group(Name="sg")["ScheduleGroupArn"]
assert arn == f"arn:aws:scheduler:eu-west-1:{DEFAULT_ACCOUNT_ID}:schedule-group/sg"
group = client.get_schedule_group(Name="sg")
assert group["Arn"] == arn
assert group["Name"] == "sg"
assert group["State"] == "ACTIVE"
client.delete_schedule_group(Name="sg")
with pytest.raises(ClientError) as exc:
client.get_schedule_group(Name="sg")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
@mock_scheduler
def test_list_schedule_groups():
client = boto3.client("scheduler", region_name="ap-southeast-1")
# The default group is always active
groups = client.list_schedule_groups()["ScheduleGroups"]
assert len(groups) == 1
assert (
groups[0]["Arn"]
== f"arn:aws:scheduler:ap-southeast-1:{DEFAULT_ACCOUNT_ID}:schedule-group/default"
)
arn1 = client.create_schedule_group(Name="sg")["ScheduleGroupArn"]
groups = client.list_schedule_groups()["ScheduleGroups"]
assert len(groups) == 2
assert groups[1]["Arn"] == arn1

View File

@ -0,0 +1,167 @@
"""Unit tests for scheduler-supported APIs."""
import boto3
import pytest
from botocore.client import ClientError
from moto import mock_scheduler
from moto.core import DEFAULT_ACCOUNT_ID
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_scheduler
def test_create_get_schedule():
client = boto3.client("scheduler", region_name="eu-west-1")
arn = client.create_schedule(
Name="my-schedule",
ScheduleExpression="some cron",
FlexibleTimeWindow={
"MaximumWindowInMinutes": 4,
"Mode": "OFF",
},
Target={
"Arn": "not supported yet",
"RoleArn": "n/a",
},
)["ScheduleArn"]
assert (
arn
== f"arn:aws:scheduler:eu-west-1:{DEFAULT_ACCOUNT_ID}:schedule/default/my-schedule"
)
resp = client.get_schedule(Name="my-schedule")
assert resp["Arn"] == arn
assert resp["Name"] == "my-schedule"
assert resp["ScheduleExpression"] == "some cron"
assert resp["FlexibleTimeWindow"] == {
"MaximumWindowInMinutes": 4,
"Mode": "OFF",
}
assert resp["Target"] == {
"Arn": "not supported yet",
"RoleArn": "n/a",
"RetryPolicy": {"MaximumEventAgeInSeconds": 86400, "MaximumRetryAttempts": 185},
}
@mock_scheduler
def test_create_get_delete__in_different_group():
client = boto3.client("scheduler", region_name="eu-west-1")
client.create_schedule_group(Name="sg")
schedule_arn = client.create_schedule(
Name="my-schedule",
GroupName="sg",
ScheduleExpression="some cron",
FlexibleTimeWindow={
"MaximumWindowInMinutes": 4,
"Mode": "OFF",
},
Target={
"Arn": "not supported yet",
"RoleArn": "n/a",
},
)["ScheduleArn"]
assert (
schedule_arn
== "arn:aws:scheduler:eu-west-1:123456789012:schedule/sg/my-schedule"
)
schedule = client.get_schedule(GroupName="sg", Name="my-schedule")
assert schedule["Arn"] == schedule_arn
client.delete_schedule(GroupName="sg", Name="my-schedule")
with pytest.raises(ClientError) as exc:
client.get_schedule(GroupName="sg", Name="my-schedule")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
@mock_scheduler
def test_update_schedule():
client = boto3.client("scheduler", region_name="eu-west-1")
client.create_schedule(
Name="my-schedule",
ScheduleExpression="some cron",
FlexibleTimeWindow={
"MaximumWindowInMinutes": 4,
"Mode": "OFF",
},
Target={
"Arn": "not supported yet",
"RoleArn": "n/a",
},
)
client.update_schedule(
Name="my-schedule",
Description="new desc",
ScheduleExpression="new cron",
FlexibleTimeWindow={
"MaximumWindowInMinutes": 4,
"Mode": "OFF",
},
State="DISABLED",
Target={
"Arn": "different arn",
"RoleArn": "n/a",
},
)
schedule = client.get_schedule(Name="my-schedule")
assert schedule["Description"] == "new desc"
assert schedule["ScheduleExpression"] == "new cron"
assert schedule["State"] == "DISABLED"
assert schedule["Target"] == {
"Arn": "different arn",
"RoleArn": "n/a",
"RetryPolicy": {"MaximumEventAgeInSeconds": 86400, "MaximumRetryAttempts": 185},
}
@mock_scheduler
def test_get_schedule_for_unknown_group():
client = boto3.client("scheduler", region_name="eu-west-1")
with pytest.raises(ClientError) as exc:
client.get_schedule(GroupName="unknown", Name="my-schedule")
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
@mock_scheduler
def test_list_schedules():
client = boto3.client("scheduler", region_name="eu-west-1")
schedules = client.list_schedules()["Schedules"]
assert schedules == []
client.create_schedule_group(Name="group2")
for group in ["default", "group2"]:
for schedule in ["sch1", "sch2"]:
for state in ["ENABLED", "DISABLED"]:
client.create_schedule(
Name=f"{schedule}_{state}",
GroupName=group,
State=state,
ScheduleExpression="some cron",
FlexibleTimeWindow={"MaximumWindowInMinutes": 4, "Mode": "OFF"},
Target={"Arn": "not supported yet", "RoleArn": "n/a"},
)
schedules = client.list_schedules()["Schedules"]
assert len(schedules) == 8
# The ListSchedules command should not return the entire Target-dictionary
assert schedules[0]["Target"] == {"Arn": "not supported yet"}
schedules = client.list_schedules(GroupName="group2")["Schedules"]
assert len(schedules) == 4
schedules = client.list_schedules(State="ENABLED")["Schedules"]
assert len(schedules) == 4

View File

@ -0,0 +1,57 @@
import boto3
from moto import mock_scheduler
@mock_scheduler
def test_schedule_tags():
client = boto3.client("scheduler", "us-east-1")
arn = client.create_schedule(
Name="my-schedule",
ScheduleExpression="some cron",
FlexibleTimeWindow={
"MaximumWindowInMinutes": 4,
"Mode": "OFF",
},
Target={
"Arn": "not supported yet",
"RoleArn": "n/a",
},
)["ScheduleArn"]
resp = client.list_tags_for_resource(ResourceArn=arn)
assert resp["Tags"] == []
client.tag_resource(
ResourceArn=arn,
Tags=[{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}],
)
resp = client.list_tags_for_resource(ResourceArn=arn)
assert resp["Tags"] == [{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]
client.untag_resource(ResourceArn=arn, TagKeys=["k1"])
resp = client.list_tags_for_resource(ResourceArn=arn)
assert resp["Tags"] == [{"Key": "k2", "Value": "v2"}]
@mock_scheduler
def test_schedule_group_tags():
client = boto3.client("scheduler", "us-east-1")
arn = client.create_schedule_group(
Name="my-schedule", Tags=[{"Key": "k1", "Value": "v1"}]
)["ScheduleGroupArn"]
resp = client.list_tags_for_resource(ResourceArn=arn)
assert resp["Tags"] == [{"Key": "k1", "Value": "v1"}]
client.tag_resource(ResourceArn=arn, Tags=[{"Key": "k2", "Value": "v2"}])
resp = client.list_tags_for_resource(ResourceArn=arn)
assert resp["Tags"] == [{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]
client.untag_resource(ResourceArn=arn, TagKeys=["k1"])
resp = client.list_tags_for_resource(ResourceArn=arn)
assert resp["Tags"] == [{"Key": "k2", "Value": "v2"}]

View File

@ -0,0 +1,10 @@
from moto import server as server
def test_list_tags():
test_client = server.create_backend_app("scheduler").test_client()
res = test_client.get(
"/tags/arn%3Aaws%3Ascheduler%3Aus-east-1%3A123456789012%3Aschedule%2Fdefault%2Fmy-schedule"
)
assert res.data == b'{"Tags": []}'