Parcourir la source

Make subscriber and controller logic neater

niels il y a 4 ans
Parent
commit
7e6b320189
3 fichiers modifiés avec 33 ajouts et 30 suppressions
  1. 5 2
      channel.py
  2. 27 27
      chube.py
  3. 1 1
      chube_ws.py

+ 5 - 2
channel.py

@@ -18,10 +18,12 @@ class Channel:
         self.subscribers = dict()
 
     def subscribe(self, ws: WebSocketServerProtocol):
-        self.subscribers[ws] = (Subscriber(ws))
+        if ws not in self.subscribers:
+            self.subscribers[ws] = (Subscriber(ws))
 
     def unsubscribe(self, ws: WebSocketServerProtocol):
-        self.subscribers.pop(ws)
+        if ws in self.subscribers:
+            self.subscribers.pop(ws)
 
     def get_player_enabled_subscribers(self):
         return filter(lambda s: s.player_enabled, self.subscribers.values())
@@ -31,4 +33,5 @@ class Channel:
             if sub.ws.open:
                 await sub.ws.send(message)
             else:
+                print("closed ws still in channel")
                 self.unsubscribe(sub.ws)

+ 27 - 27
chube.py

@@ -54,7 +54,7 @@ class Chueue:
                     self._played_queue = []
                 else:
                     return None
-                
+
             song_id = self._queue.pop(0)
             if self._repeat_enabled:
                 code = self._codes[song_id]
@@ -137,7 +137,7 @@ class Playback:
 class Room:
     chueue: Chueue
     channel: Channel
-    controller: Optional[Subscriber]
+    _controller: Optional[Subscriber]
     controller_lock: RLock
     playback: Playback
 
@@ -146,7 +146,13 @@ class Room:
         self.channel = Channel()
         self.controller_lock = RLock()
         self.playback = Playback()
-        self.controller = None
+        self._controller = None
+
+    def get_controller(self):
+        return self._controller
+
+    def set_controller(self, controller):
+        self._controller = controller
 
 
 rooms: Dict[str, Room] = dict()
@@ -249,7 +255,8 @@ async def media_action_processor(ws, data, path):
         if room.chueue.is_repeat_enabled() != enable:
             with room.chueue:
                 room.chueue.set_repeat_enabled(enable, room.playback.get_song())
-            await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.REPEAT.value, "enable": enable}))
+            await room.channel.send(
+                make_message(Message.MEDIA_ACTION, {"action": MediaAction.REPEAT.value, "enable": enable}))
 
 
 async def obtain_control_processor(ws, data, path):
@@ -278,7 +285,8 @@ async def song_end_processor(ws, data, path):
     room = rooms[path]
     old_song_id = data["id"]
     with room.controller_lock, room.playback.lock:
-        if room.controller is not None and ws is room.controller.ws and old_song_id == room.playback.get_song_id():
+        controller = room.get_controller()
+        if controller is not None and controller.ws is ws and old_song_id == room.playback.get_song_id():
             new_song = play_next_song(room)
             if new_song is None:
                 new_song_id = None
@@ -293,7 +301,7 @@ async def player_enabled_processor(ws, data, path):
     room.channel.subscribers[ws].player_enabled = data["enabled"]
     if data["enabled"]:
         with room.controller_lock:
-            if room.controller is None:
+            if room.get_controller() is None:
                 await obtain_control(ws, room)
     else:
         await release_control(ws, room)
@@ -302,21 +310,24 @@ async def player_enabled_processor(ws, data, path):
 # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
 async def obtain_control(ws, room: Room):
     with room.controller_lock:
-        if room.controller is None or room.controller.ws is not ws:
-            prev_controller = room.controller
-            room.controller = room.channel.subscribers[ws]
+        controller = room.get_controller()
+        if controller is None or controller.ws is not ws:
+            room.set_controller(room.channel.subscribers[ws])
             await ws.send(make_message(Message.OBTAIN_CONTROL))
-            if prev_controller is not None:
-                await prev_controller.ws.send(make_message(Message.RELEASE_CONTROL))
+            if controller is not None:
+                await controller.ws.send(make_message(Message.RELEASE_CONTROL))
 
 
 async def release_control(ws, room: Room):
     with room.controller_lock:
-        if room.controller is not None and room.controller.ws is ws:
-            room.controller = next(room.channel.get_player_enabled_subscribers(), None)
-            if room.controller is not None:
-                await room.controller.ws.send(make_message(Message.OBTAIN_CONTROL))
-            await ws.send(make_message(Message.RELEASE_CONTROL))
+        controller = room.get_controller()
+        if controller is not None and controller.ws is ws:
+            controller = next(room.channel.get_player_enabled_subscribers(), None)
+            room.set_controller(controller)
+            if controller is not None:
+                await controller.ws.send(make_message(Message.OBTAIN_CONTROL))
+            if ws.open:
+                await ws.send(make_message(Message.RELEASE_CONTROL))
 
 
 async def on_connect(ws, path):
@@ -330,9 +341,6 @@ async def on_disconnect(ws, path):
     room = rooms[path]
     room.channel.unsubscribe(ws)
     await release_control(ws, room)
-    # with room.controller_lock:
-    #     if room.controller is None:
-    #         room.playback.
 
 
 def make_resolver():
@@ -352,14 +360,6 @@ def make_resolver():
     return resolver
 
 
-def init_rooms():
-    # rooms["main"] = Room()
-    pass
-
-
 if __name__ == "__main__":
     player_resolver = make_resolver()
-
-    init_rooms()
-
     start_server(player_resolver, on_connect, on_disconnect)

+ 1 - 1
chube_ws.py

@@ -53,7 +53,7 @@ class Resolver:
                     message = await websocket.recv()
                     processor, body = self.resolve(message)
                     await processor(websocket, body, path)
-            except websockets.ConnectionClosedOK:
+            except websockets.ConnectionClosed:
                 await on_close_handler(websocket, path)
 
         return handler