chube_ws.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import asyncio
  2. import json
  3. import websockets
  4. from chube_enums import Message
  5. PORT = 3821 # CHU
  6. class Resolver:
  7. _registerDict: dict = {}
  8. def register(self, message: Message, handler):
  9. self._registerDict[message.name] = handler
  10. def unregister(self, message):
  11. return self._registerDict.pop(message.name)
  12. def resolve(self, data):
  13. message = json.loads(data)
  14. if not isinstance(message, dict):
  15. raise Exception("Received bytes is not a json object but a {}. {}".format(type(message), message))
  16. if "__message" not in message:
  17. raise Exception("Received message does not have required '__message' field. {}".format(message))
  18. message_type = message["__message"]
  19. if message_type not in self._registerDict:
  20. raise Exception("No handler for message type {}. {}".format(message_type, message))
  21. if "__body" not in message:
  22. return self._registerDict[message_type], None
  23. else:
  24. return self._registerDict[message_type], message["__body"]
  25. def make_handler(self, on_open=None, on_close=None):
  26. async def on_open_handler(websocket, path):
  27. if on_open is not None:
  28. await on_open(websocket, path)
  29. async def on_close_handler(websocket, path):
  30. if on_close is not None:
  31. await on_close(websocket, path)
  32. async def handler(websocket, path):
  33. await on_open_handler(websocket, path)
  34. try:
  35. while True:
  36. message = await websocket.recv()
  37. processor, body = self.resolve(message)
  38. await processor(websocket, body)
  39. except websockets.ConnectionClosedOK:
  40. await on_close_handler(websocket, path)
  41. return handler
  42. def add_all(self, search_resolver: "Resolver"):
  43. for message, handler in search_resolver._registerDict.items():
  44. self._registerDict[message] = handler
  45. def make_message(message_type, body=None):
  46. return json.dumps({"__message": message_type.name, "__body": body})
  47. def start_server(resolver: Resolver, on_new_connection, on_connection_close):
  48. ws_server = websockets.serve(
  49. resolver.make_handler(on_open=on_new_connection, on_close=on_connection_close),
  50. "localhost", PORT)
  51. asyncio.get_event_loop().run_until_complete(ws_server)
  52. asyncio.get_event_loop().run_forever()