IOTData: Fix bug where publish() could only be called once (#5812)

This commit is contained in:
Bert Blommers 2023-01-03 21:21:52 +00:00 committed by GitHub
parent 031f89dee0
commit d68fb04ee1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 26 deletions

View File

@ -41,7 +41,7 @@ jobs:
- name: Run tests
run: |
pytest -sv tests/test_core ./tests/test_apigateway/test_apigateway_integration.py ./tests/test_s3/test_server.py
pytest -sv tests/test_core ./tests/test_apigateway/test_apigateway_integration.py tests/test_iotdata ./tests/test_s3/test_server.py
- name: Start MotoServer
run: |
@ -52,7 +52,7 @@ jobs:
env:
TEST_SERVER_MODE: ${{ true }}
run: |
pytest -sv tests/test_core tests/test_awslambda tests/test_cloudformation
pytest -sv tests/test_core tests/test_awslambda tests/test_cloudformation tests/test_iotdata
- name: "Stop MotoServer"
if: always()
run: |

View File

@ -8,6 +8,12 @@ class IoTDataPlaneResponse(BaseResponse):
def __init__(self):
super().__init__(service_name="iot-data")
def _get_action(self) -> str:
if self.path and self.path.startswith("/topics/"):
# Special usecase - there is no way identify this action, besides the URL
return "publish"
return super()._get_action()
@property
def iotdata_backend(self):
return iotdata_backends[self.current_account][self.region]
@ -30,18 +36,8 @@ class IoTDataPlaneResponse(BaseResponse):
payload = self.iotdata_backend.delete_thing_shadow(thing_name=thing_name)
return json.dumps(payload.to_dict())
def dispatch_publish(self, request, full_url, headers):
# This endpoint requires specialized handling because it has
# a uri parameter containing forward slashes that is not
# correctly url encoded when we're running in server mode.
# https://github.com/pallets/flask/issues/900
self.setup_class(request, full_url, headers)
self.querystring["Action"] = ["Publish"]
topic = self.path.partition("/topics/")[-1]
self.querystring["target"] = [unquote(topic)] if "%" in topic else [topic]
return self.call_action()
def publish(self):
topic = self._get_param("target")
topic = self.path.split("/topics/")[-1]
topic = unquote(topic) if "%" in topic else topic
self.iotdata_backend.publish(topic=topic, payload=self.body)
return json.dumps(dict())

View File

@ -10,18 +10,9 @@ response = IoTDataPlaneResponse()
url_paths = {
#
# Paths for :class:`moto.core.models.MockAWS`
#
# This route requires special handling.
"{0}/topics/(?P<topic>.*)$": response.dispatch_publish,
# The remaining routes can be handled by the default dispatcher.
"{0}/.*$": response.dispatch,
#
# (Flask) Paths for :class:`moto.core.models.ServerModeMockAWS`
#
# This route requires special handling.
"{0}/topics/<path:topic>$": response.dispatch_publish,
# The remaining routes can be handled by the default dispatcher.
"{0}/<path:route>$": response.dispatch,
}

View File

@ -110,12 +110,16 @@ def test_update():
def test_publish():
region_name = "ap-northeast-1"
client = boto3.client("iot-data", region_name=region_name)
client.publish(topic="test/topic", qos=1, payload=b"pl")
client.publish(topic="test/topic1", qos=1, payload=b"pl1")
client.publish(topic="test/topic2", qos=1, payload=b"pl2")
client.publish(topic="test/topic3", qos=1, payload=b"pl3")
if not settings.TEST_SERVER_MODE:
mock_backend = moto.iotdata.models.iotdata_backends[ACCOUNT_ID][region_name]
mock_backend.published_payloads.should.have.length_of(1)
mock_backend.published_payloads.should.contain(("test/topic", "pl"))
mock_backend.published_payloads.should.have.length_of(3)
mock_backend.published_payloads.should.contain(("test/topic1", "pl1"))
mock_backend.published_payloads.should.contain(("test/topic2", "pl2"))
mock_backend.published_payloads.should.contain(("test/topic3", "pl3"))
@mock_iot