chube_ws.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import asyncio
  2. import json
  3. import os
  4. import websockets
  5. from chube_enums import Message
  6. PORT = os.environ.get("CHUBE_WS_PORT") or 3821 # CHU
  7. HOST = os.environ.get("CHUBE_WS_HOST") or "localhost"
  8. class Resolver:
  9. _registerDict: dict = {}
  10. def register(self, message: Message, handler):
  11. self._registerDict[message.name] = handler
  12. def unregister(self, message):
  13. return self._registerDict.pop(message.name)
  14. def resolve(self, data):
  15. message = json.loads(data)
  16. if not isinstance(message, dict):
  17. raise Exception("Received bytes is not a json object but a {}. {}".format(type(message), message))
  18. if "__message" not in message:
  19. raise Exception("Received message does not have required '__message' field. {}".format(message))
  20. message_type = message["__message"]
  21. if message_type not in self._registerDict:
  22. raise Exception("No handler for message type {}. {}".format(message_type, message))
  23. if "__body" not in message:
  24. return self._registerDict[message_type], None
  25. else:
  26. return self._registerDict[message_type], message["__body"]
  27. def make_handler(self, on_open=None, on_close=None):
  28. async def on_open_handler(websocket, path):
  29. if on_open is not None:
  30. await on_open(websocket, path)
  31. async def on_close_handler(websocket, path):
  32. if on_close is not None:
  33. await on_close(websocket, path)
  34. async def handler(websocket, path):
  35. await on_open_handler(websocket, path)
  36. try:
  37. while True:
  38. message = await websocket.recv()
  39. processor, body = self.resolve(message)
  40. await processor(websocket, body)
  41. except websockets.ConnectionClosedOK:
  42. await on_close_handler(websocket, path)
  43. return handler
  44. def add_all(self, search_resolver: "Resolver"):
  45. for message, handler in search_resolver._registerDict.items():
  46. self._registerDict[message] = handler
  47. def make_message(message_type, body=None):
  48. return json.dumps({"__message": message_type.name, "__body": body})
  49. def start_server(resolver: Resolver, on_new_connection, on_connection_close):
  50. ws_server = websockets.serve(
  51. resolver.make_handler(on_open=on_new_connection, on_close=on_connection_close),
  52. HOST, PORT)
  53. asyncio.get_event_loop().run_until_complete(ws_server)
  54. asyncio.get_event_loop().run_forever()