123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import asyncio
- import json
- import logging
- import os
- import pathlib
- import ssl
- from websockets.asyncio.server import serve
- from websockets.exceptions import ConnectionClosed
- from chube_enums import Message
- logger = logging.getLogger('chube')
- PORT = os.environ.get("CHUBE_WS_PORT") or 38210 # CHU
- HOST = os.environ.get("CHUBE_WS_HOST") or "localhost"
- ENABLE_WSS = os.environ.get("CHUBE_NO_WSS") != '1'
- CERT_PATH = os.environ.get("CHUBE_CERT_PATH")
- KEY_PATH = os.environ.get("CHUBE_KEY_PATH")
- if ENABLE_WSS and (CERT_PATH is None or not os.path.isfile(CERT_PATH)):
- raise Exception("WSS is enabled but no valid certificate is provided. To disable WSS provide the CHUBE_NO_WSS=1 "
- "environment variable.\nProvided certificate path is {}".format(CERT_PATH))
- if ENABLE_WSS and (CERT_PATH is None or not os.path.isfile(KEY_PATH)):
- raise Exception("WSS is enabled but no valid key is provided. To disable WSS provide the CHUBE_NO_WSS=1 "
- "environment variable.\nProvided key path is {}".format(KEY_PATH))
- class MessageResolveException(Exception):
- pass
- class Resolver:
- _registerDict: dict = {}
- def register(self, message: Message, handler):
- self._registerDict[message.value] = handler
- def unregister(self, message):
- return self._registerDict.pop(message.value)
- def resolve(self, data):
- message = json.loads(data)
- if not isinstance(message, dict):
- raise MessageResolveException("Received bytes is not a json object but a {}. {}".format(type(message), message))
- if "__message" not in message:
- raise MessageResolveException("Received message does not have required '__message' field. {}".format(message))
- message_type = message["__message"]
- if message_type not in self._registerDict:
- raise MessageResolveException("No handler for message type {}. {}".format(message_type, message))
- if "__body" not in message:
- return self._registerDict[message_type], None
- else:
- return self._registerDict[message_type], message["__body"]
- def make_handler(self, on_open=None, on_close=None):
- async def on_open_handler(websocket, path):
- if on_open is not None:
- await on_open(websocket, path)
- async def on_close_handler(websocket, path):
- if on_close is not None:
- await on_close(websocket, path)
- async def handler(websocket):
- path = websocket.request.path
- await on_open_handler(websocket, path.lower())
- try:
- while True:
- message = await websocket.recv()
- logger.debug(f"{path} WebsocketMessage {{{message}}}")
- processor, body = self.resolve(message)
- await processor(websocket, body, path.lower())
- except MessageResolveException as e:
- logger.exception(e)
- except ConnectionClosed:
- await on_close_handler(websocket, path.lower())
- return handler
- def add_all(self, search_resolver: "Resolver"):
- for message, handler in search_resolver._registerDict.items():
- self._registerDict[message] = handler
- def make_message(message_type, body=None):
- return json.dumps({"__message": message_type.value, "__body": body})
- def make_message_from_json_string(message_type, raw_body: str):
- return "{{\"__message\": \"{}\", \"__body\": {}}}".format(message_type.value, raw_body)
- async def start_server(resolver: Resolver, on_new_connection, on_connection_close):
- ssl_context = None
- if ENABLE_WSS:
- ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- cert_pem = pathlib.Path(CERT_PATH)
- key_pem = pathlib.Path(KEY_PATH)
- ssl_context.load_cert_chain(cert_pem, key_pem)
- async with serve(
- resolver.make_handler(on_open=on_new_connection, on_close=on_connection_close),
- HOST, PORT, ssl=ssl_context) as server:
- await server.serve_forever()
|