| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- #!/usr/bin/env python
- # WS server example that synchronizes state across clients
- import asyncio
- import functools
- import json
- import logging
- import os
- import random
- import websockets
- from http import HTTPStatus
- from color import Color
- from option import OptionCode, Option
- from game import Game
- from player import ErrorCode, StateCode, Player
- logging.basicConfig()
- games = dict() # code -> game
- sockets = dict() # code -> [(player, websocket), ...]
- users = ["Anna", "Bob", "Cynthia", "Daan", "Esoirulthayro", "Frank", "Gerben", "Hanna", "Ida", "Joost", "Karin", "Loes", "Max", "Nina"]
- class DogEncoder(json.JSONEncoder):
- def default(self, obj):
- if hasattr(obj, '__dict__'):
- return obj.__dict__
- return json.JSONEncoder.default(self, obj)
- MIME_TYPES = {
- "html": "text/html",
- "js": "text/javascript",
- "css": "text/css"
- }
- async def process_request(sever_root, path, request_headers):
- """Serves a file when doing a GET request with a valid path."""
- if "Upgrade" in request_headers:
- return # Probably a WebSocket connection
- path = '/dog.html'
- response_headers = [
- ('Server', 'asyncio websocket server'),
- ('Connection', 'close'),
- ]
- # Derive full system path
- full_path = os.path.realpath(os.path.join(sever_root, path[1:]))
- # Validate the path
- if os.path.commonpath((sever_root, full_path)) != sever_root or \
- not os.path.exists(full_path) or not os.path.isfile(full_path):
- print("HTTP GET {} 404 NOT FOUND".format(path))
- return HTTPStatus.NOT_FOUND, [], b'404 NOT FOUND'
- # Guess file content type
- extension = full_path.split(".")[-1]
- mime_type = MIME_TYPES.get(extension, "application/octet-stream")
- response_headers.append(('Content-Type', mime_type))
- # Read the whole file into memory and send it out
- body = open(full_path, 'rb').read()
- response_headers.append(('Content-Length', str(len(body))))
- print("HTTP GET {} 200 OK".format(path))
- return HTTPStatus.OK, response_headers, body
- async def notify(player_sockets):
- if player_sockets:
- await asyncio.wait([websocket.send(json.dumps(player, cls=DogEncoder)) for (player, websocket) in player_sockets])
- async def handler(websocket, path):
- player = Player(name=users[0])
- player.options = [
- Option(OptionCode.NEW_GAME, "Nieuw spel"),
- Option(OptionCode.JOIN_GAME, "Doe mee met een spel", game_code='3936')]
- player.message = "Wat wil je doen?"
- player.state = StateCode.START
- game_code = 0
- game = None
- try:
- await notify([(player, websocket)])
- async for message in websocket:
- option = Option(**json.loads(message))
- if not player.check_option(option):
- await notify([(player, websocket)])
- continue
- player.set_error(None)
- if option.code == OptionCode.NEW_GAME:
- game_code = 3936 # random.randint(1000, 9999);
- game = Game()
- games[game_code] = game
- if option.user_name is None:
- option.user_name = users[0]
- users.append(users[0])
- users.remove(users[0])
- player = game.join_player(option.user_name)
- player.game_code = game_code
- sockets[game_code] = [(player, websocket)]
- await notify(sockets[game_code])
- elif option.code == OptionCode.JOIN_GAME:
- game_code = option.game_code
- game = games.get(game_code) if game_code is not None and type(game_code) is int and game_code > 0 else None
- if game is None:
- player.message = f"onbekande code {game_code}"
- player.set_error(ErrorCode.UNKNOWN_CODE, game_code=game_code)
- await notify([(player, websocket)])
- continue
- if option.user_name is None:
- option.user_name = users[0]
- users.append(users[0])
- users.remove(users[0])
- player = game.join_player(option.user_name)
- player.game_code = game_code
- sockets[game_code].append((player, websocket))
- await notify(sockets[game_code])
- else:
- game = games[game_code]
- game.play_option(player, option)
- await notify(sockets[game_code])
- finally:
- if player is not None:
- if game is not None:
- game.unjoin_player(player)
- if game_code > 0:
- sockets[game_code].remove((player, websocket))
- await notify(sockets[game_code])
- if __name__ == "__main__":
- random.seed()
- static_handler = functools.partial(process_request, os.path.join(os.getcwd(), 'ui'))
- start_server = websockets.serve(handler, "localhost", 6789, process_request=static_handler)
- print("Running server at http://127.0.0.1:6789/")
- asyncio.get_event_loop().run_until_complete(start_server)
- asyncio.get_event_loop().run_forever()
|