Techdebt: Simplify MQ routing (#7190)

This commit is contained in:
Bert Blommers 2024-01-05 23:40:46 +00:00 committed by GitHub
parent be04fe5e0c
commit 57bc607307
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 153 deletions

View File

@ -1,5 +1,4 @@
import json import json
from typing import Any
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
@ -11,58 +10,38 @@ class MQError(JsonRESTError):
class UnknownBroker(MQError): class UnknownBroker(MQError):
def __init__(self, broker_id: str): def __init__(self, broker_id: str):
super().__init__("NotFoundException", "Can't find requested broker") super().__init__("NotFoundException", "Can't find requested broker")
self.broker_id = broker_id
def get_body(
self, *args: Any, **kwargs: Any
) -> str: # pylint: disable=unused-argument
body = { body = {
"errorAttribute": "broker-id", "errorAttribute": "broker-id",
"message": f"Can't find requested broker [{self.broker_id}]. Make sure your broker exists.", "message": f"Can't find requested broker [{broker_id}]. Make sure your broker exists.",
} }
return json.dumps(body) self.description = json.dumps(body)
class UnknownConfiguration(MQError): class UnknownConfiguration(MQError):
def __init__(self, config_id: str): def __init__(self, config_id: str):
super().__init__("NotFoundException", "Can't find requested configuration") super().__init__("NotFoundException", "Can't find requested configuration")
self.config_id = config_id
def get_body(
self, *args: Any, **kwargs: Any
) -> str: # pylint: disable=unused-argument
body = { body = {
"errorAttribute": "configuration_id", "errorAttribute": "configuration_id",
"message": f"Can't find requested configuration [{self.config_id}]. Make sure your configuration exists.", "message": f"Can't find requested configuration [{config_id}]. Make sure your configuration exists.",
} }
return json.dumps(body) self.description = json.dumps(body)
class UnknownUser(MQError): class UnknownUser(MQError):
def __init__(self, username: str): def __init__(self, username: str):
super().__init__("NotFoundException", "Can't find requested user") super().__init__("NotFoundException", "Can't find requested user")
self.username = username
def get_body(
self, *args: Any, **kwargs: Any
) -> str: # pylint: disable=unused-argument
body = { body = {
"errorAttribute": "username", "errorAttribute": "username",
"message": f"Can't find requested user [{self.username}]. Make sure your user exists.", "message": f"Can't find requested user [{username}]. Make sure your user exists.",
} }
return json.dumps(body) self.description = json.dumps(body)
class UnknownEngineType(MQError): class UnknownEngineType(MQError):
def __init__(self, engine_type: str): def __init__(self, engine_type: str):
super().__init__("BadRequestException", "") super().__init__("BadRequestException", "")
self.engine_type = engine_type
def get_body(
self, *args: Any, **kwargs: Any
) -> str: # pylint: disable=unused-argument
body = { body = {
"errorAttribute": "engineType", "errorAttribute": "engineType",
"message": f"Broker engine type [{self.engine_type}] is invalid. Valid values are: [ACTIVEMQ]", "message": f"Broker engine type [{engine_type}] is invalid. Valid values are: [ACTIVEMQ]",
} }
return json.dumps(body) self.description = json.dumps(body)

View File

@ -1,9 +1,7 @@
"""Handles incoming mq requests, invokes methods, returns responses.""" """Handles incoming mq requests, invokes methods, returns responses."""
import json import json
from typing import Any
from urllib.parse import unquote from urllib.parse import unquote
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import MQBackend, mq_backends from .models import MQBackend, mq_backends
@ -20,67 +18,7 @@ class MQResponse(BaseResponse):
"""Return backend instance specific for this region.""" """Return backend instance specific for this region."""
return mq_backends[self.current_account][self.region] return mq_backends[self.current_account][self.region]
def broker(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def create_broker(self) -> str:
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.describe_broker()
if request.method == "DELETE":
return self.delete_broker()
if request.method == "PUT":
return self.update_broker()
def brokers(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_broker()
if request.method == "GET":
return self.list_brokers()
def configuration(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.describe_configuration()
if request.method == "PUT":
return self.update_configuration()
def configurations(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_configuration()
if request.method == "GET":
return self.list_configurations()
def configuration_revision(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.get_configuration_revision()
def tags(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_tags()
if request.method == "DELETE":
return self.delete_tags()
if request.method == "GET":
return self.list_tags()
def user(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_user()
if request.method == "GET":
return self.describe_user()
if request.method == "PUT":
return self.update_user()
if request.method == "DELETE":
return self.delete_user()
def users(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.list_users()
def create_broker(self) -> TYPE_RESPONSE:
params = json.loads(self.body) params = json.loads(self.body)
authentication_strategy = params.get("authenticationStrategy") authentication_strategy = params.get("authenticationStrategy")
auto_minor_version_upgrade = params.get("autoMinorVersionUpgrade") auto_minor_version_upgrade = params.get("autoMinorVersionUpgrade")
@ -122,9 +60,9 @@ class MQResponse(BaseResponse):
) )
# Lowercase members - boto3 will convert it into UpperCase # Lowercase members - boto3 will convert it into UpperCase
resp = {"brokerArn": broker_arn, "brokerId": broker_id} resp = {"brokerArn": broker_arn, "brokerId": broker_id}
return 200, {}, json.dumps(resp) return json.dumps(resp)
def update_broker(self) -> TYPE_RESPONSE: def update_broker(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
broker_id = self.path.split("/")[-1] broker_id = self.path.split("/")[-1]
authentication_strategy = params.get("authenticationStrategy") authentication_strategy = params.get("authenticationStrategy")
@ -150,32 +88,32 @@ class MQResponse(BaseResponse):
) )
return self.describe_broker() return self.describe_broker()
def delete_broker(self) -> TYPE_RESPONSE: def delete_broker(self) -> str:
broker_id = self.path.split("/")[-1] broker_id = self.path.split("/")[-1]
self.mq_backend.delete_broker(broker_id=broker_id) self.mq_backend.delete_broker(broker_id=broker_id)
return 200, {}, json.dumps(dict(brokerId=broker_id)) return json.dumps(dict(brokerId=broker_id))
def describe_broker(self) -> TYPE_RESPONSE: def describe_broker(self) -> str:
broker_id = self.path.split("/")[-1] broker_id = self.path.split("/")[-1]
broker = self.mq_backend.describe_broker(broker_id=broker_id) broker = self.mq_backend.describe_broker(broker_id=broker_id)
resp = broker.to_json() resp = broker.to_json()
resp["tags"] = self.mq_backend.list_tags(broker.arn) resp["tags"] = self.mq_backend.list_tags(broker.arn)
return 200, {}, json.dumps(resp) return json.dumps(resp)
def list_brokers(self) -> TYPE_RESPONSE: def list_brokers(self) -> str:
brokers = self.mq_backend.list_brokers() brokers = self.mq_backend.list_brokers()
return 200, {}, json.dumps(dict(brokerSummaries=[b.summary() for b in brokers])) return json.dumps(dict(brokerSummaries=[b.summary() for b in brokers]))
def create_user(self) -> TYPE_RESPONSE: def create_user(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
broker_id = self.path.split("/")[-3] broker_id = self.path.split("/")[-3]
username = self.path.split("/")[-1] username = self.path.split("/")[-1]
console_access = params.get("consoleAccess", False) console_access = params.get("consoleAccess", False)
groups = params.get("groups", []) groups = params.get("groups", [])
self.mq_backend.create_user(broker_id, username, console_access, groups) self.mq_backend.create_user(broker_id, username, console_access, groups)
return 200, {}, "{}" return "{}"
def update_user(self) -> TYPE_RESPONSE: def update_user(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
broker_id = self.path.split("/")[-3] broker_id = self.path.split("/")[-3]
username = self.path.split("/")[-1] username = self.path.split("/")[-1]
@ -187,30 +125,30 @@ class MQResponse(BaseResponse):
groups=groups, groups=groups,
username=username, username=username,
) )
return 200, {}, "{}" return "{}"
def describe_user(self) -> TYPE_RESPONSE: def describe_user(self) -> str:
broker_id = self.path.split("/")[-3] broker_id = self.path.split("/")[-3]
username = self.path.split("/")[-1] username = self.path.split("/")[-1]
user = self.mq_backend.describe_user(broker_id, username) user = self.mq_backend.describe_user(broker_id, username)
return 200, {}, json.dumps(user.to_json()) return json.dumps(user.to_json())
def delete_user(self) -> TYPE_RESPONSE: def delete_user(self) -> str:
broker_id = self.path.split("/")[-3] broker_id = self.path.split("/")[-3]
username = self.path.split("/")[-1] username = self.path.split("/")[-1]
self.mq_backend.delete_user(broker_id, username) self.mq_backend.delete_user(broker_id, username)
return 200, {}, "{}" return "{}"
def list_users(self) -> TYPE_RESPONSE: def list_users(self) -> str:
broker_id = self.path.split("/")[-2] broker_id = self.path.split("/")[-2]
users = self.mq_backend.list_users(broker_id=broker_id) users = self.mq_backend.list_users(broker_id=broker_id)
resp = { resp = {
"brokerId": broker_id, "brokerId": broker_id,
"users": [{"username": u.username} for u in users], "users": [{"username": u.username} for u in users],
} }
return 200, {}, json.dumps(resp) return json.dumps(resp)
def create_configuration(self) -> TYPE_RESPONSE: def create_configuration(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
name = params.get("name") name = params.get("name")
engine_type = params.get("engineType") engine_type = params.get("engineType")
@ -220,56 +158,54 @@ class MQResponse(BaseResponse):
config = self.mq_backend.create_configuration( config = self.mq_backend.create_configuration(
name, engine_type, engine_version, tags name, engine_type, engine_version, tags
) )
return 200, {}, json.dumps(config.to_json()) return json.dumps(config.to_json())
def describe_configuration(self) -> TYPE_RESPONSE: def describe_configuration(self) -> str:
config_id = self.path.split("/")[-1] config_id = self.path.split("/")[-1]
config = self.mq_backend.describe_configuration(config_id) config = self.mq_backend.describe_configuration(config_id)
resp = config.to_json() resp = config.to_json()
resp["tags"] = self.mq_backend.list_tags(config.arn) resp["tags"] = self.mq_backend.list_tags(config.arn)
return 200, {}, json.dumps(resp) return json.dumps(resp)
def list_configurations(self) -> TYPE_RESPONSE: def list_configurations(self) -> str:
configs = self.mq_backend.list_configurations() configs = self.mq_backend.list_configurations()
resp = {"configurations": [c.to_json() for c in configs]} resp = {"configurations": [c.to_json() for c in configs]}
return 200, {}, json.dumps(resp) return json.dumps(resp)
def update_configuration(self) -> TYPE_RESPONSE: def update_configuration(self) -> str:
config_id = self.path.split("/")[-1] config_id = self.path.split("/")[-1]
params = json.loads(self.body) params = json.loads(self.body)
data = params.get("data") data = params.get("data")
description = params.get("description") description = params.get("description")
config = self.mq_backend.update_configuration(config_id, data, description) config = self.mq_backend.update_configuration(config_id, data, description)
return 200, {}, json.dumps(config.to_json()) return json.dumps(config.to_json())
def get_configuration_revision(self) -> TYPE_RESPONSE: def describe_configuration_revision(self) -> str:
revision_id = self.path.split("/")[-1] revision_id = self.path.split("/")[-1]
config_id = self.path.split("/")[-3] config_id = self.path.split("/")[-3]
revision = self.mq_backend.describe_configuration_revision( revision = self.mq_backend.describe_configuration_revision(
config_id, revision_id config_id, revision_id
) )
return 200, {}, json.dumps(revision.to_json()) return json.dumps(revision.to_json())
def create_tags(self) -> TYPE_RESPONSE: def create_tags(self) -> str:
resource_arn = unquote(self.path.split("/")[-1]) resource_arn = unquote(self.path.split("/")[-1])
tags = json.loads(self.body).get("tags", {}) tags = json.loads(self.body).get("tags", {})
self.mq_backend.create_tags(resource_arn, tags) self.mq_backend.create_tags(resource_arn, tags)
return 200, {}, "{}" return "{}"
def delete_tags(self) -> TYPE_RESPONSE: def delete_tags(self) -> str:
resource_arn = unquote(self.path.split("/")[-1]) resource_arn = unquote(self.path.split("/")[-1])
tag_keys = self._get_param("tagKeys") tag_keys = self._get_param("tagKeys")
self.mq_backend.delete_tags(resource_arn, tag_keys) self.mq_backend.delete_tags(resource_arn, tag_keys)
return 200, {}, "{}" return "{}"
def list_tags(self) -> TYPE_RESPONSE: def list_tags(self) -> str:
resource_arn = unquote(self.path.split("/")[-1]) resource_arn = unquote(self.path.split("/")[-1])
tags = self.mq_backend.list_tags(resource_arn) tags = self.mq_backend.list_tags(resource_arn)
return 200, {}, json.dumps({"tags": tags}) return json.dumps({"tags": tags})
def reboot(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def reboot_broker(self) -> str:
self.setup_class(request, full_url, headers)
if request.method == "POST":
broker_id = self.path.split("/")[-2] broker_id = self.path.split("/")[-2]
self.mq_backend.reboot_broker(broker_id=broker_id) self.mq_backend.reboot_broker(broker_id=broker_id)
return 200, {}, "{}" return "{}"

View File

@ -7,25 +7,13 @@ url_bases = [
url_paths = { url_paths = {
"{0}/v1/brokers/(?P<broker_id>[^/]+)$": MQResponse.method_dispatch( "{0}/v1/brokers/(?P<broker_id>[^/]+)$": MQResponse.dispatch,
MQResponse.broker "{0}/v1/brokers/(?P<broker_id>[^/]+)/reboot$": MQResponse.dispatch,
), "{0}/v1/brokers/(?P<broker_id>[^/]+)/users$": MQResponse.dispatch,
"{0}/v1/brokers/(?P<broker_id>[^/]+)/reboot$": MQResponse.method_dispatch( "{0}/v1/brokers/(?P<broker_id>[^/]+)/users/(?P<user_name>[^/]+)$": MQResponse.dispatch,
MQResponse.reboot "{0}/v1/brokers$": MQResponse.dispatch,
), "{0}/v1/configurations$": MQResponse.dispatch,
"{0}/v1/brokers/(?P<broker_id>[^/]+)/users$": MQResponse.method_dispatch( "{0}/v1/configurations/(?P<config_id>[^/]+)$": MQResponse.dispatch,
MQResponse.users "{0}/v1/configurations/(?P<config_id>[^/]+)/revisions/(?P<revision_id>[^/]+)$": MQResponse.dispatch,
), "{0}/v1/tags/(?P<resource_arn>[^/]+)$": MQResponse.dispatch,
"{0}/v1/brokers/(?P<broker_id>[^/]+)/users/(?P<user_name>[^/]+)$": MQResponse.method_dispatch(
MQResponse.user
),
"{0}/v1/brokers$": MQResponse.method_dispatch(MQResponse.brokers),
"{0}/v1/configurations$": MQResponse.method_dispatch(MQResponse.configurations),
"{0}/v1/configurations/(?P<config_id>[^/]+)$": MQResponse.method_dispatch(
MQResponse.configuration
),
"{0}/v1/configurations/(?P<config_id>[^/]+)/revisions/(?P<revision_id>[^/]+)$": MQResponse.method_dispatch(
MQResponse.configuration_revision
),
"{0}/v1/tags/(?P<resource_arn>[^/]+)$": MQResponse.method_dispatch(MQResponse.tags),
} }