Server: check ready state with threading.Event (#7479)

This commit is contained in:
Serhii Shevchuk 2024-03-17 22:31:31 +02:00 committed by GitHub
parent 114c09a7a3
commit c974a71ba1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,5 +1,4 @@
import time
from threading import Thread
from threading import Event, Thread
from typing import Optional
from werkzeug.serving import BaseWSGIServer, make_server
@ -11,20 +10,19 @@ class ThreadedMotoServer:
def __init__(
self, ip_address: str = "0.0.0.0", port: int = 5000, verbose: bool = True
):
self._port = port
self._thread: Optional[Thread] = None
self._ip_address = ip_address
self._server: Optional[BaseWSGIServer] = None
self._server_ready = False
self._server_ready_event = Event()
self._verbose = verbose
def _server_entry(self) -> None:
app = DomainDispatcherApplication(create_backend_app)
self._server = make_server(self._ip_address, self._port, app, True)
self._server_ready = True
self._server_ready_event.set()
self._server.serve_forever()
def start(self) -> None:
@ -34,11 +32,10 @@ class ThreadedMotoServer:
)
self._thread = Thread(target=self._server_entry, daemon=True)
self._thread.start()
while not self._server_ready:
time.sleep(0.1)
self._server_ready_event.wait()
def stop(self) -> None:
self._server_ready = False
self._server_ready_event.clear()
if self._server:
self._server.shutdown()