chube.py 7.8 KB


  1. from threading import RLock
  2. from typing import Optional, Iterator, Dict, List
  3. import sys
  4. from itertools import cycle
  5. from websockets import WebSocketServerProtocol
  6. import chube_search
  7. from channel import Channel, Subscriber
  8. from chube_enums import *
  9. from chube_ws import Resolver, Message, start_server, make_message
  10. class Chueue:
  11. _lock: RLock
  12. _queue: List[int]
  13. _codes: Dict[int, str]
  14. _id_iter: Iterator[int]
  15. def __init__(self):
  16. self._lock = RLock()
  17. self._queue = []
  18. self._codes = dict()
  19. self._id_iter = cycle(range(sys.maxsize))
  20. def add(self, code):
  21. with self:
  22. song_id = next(self._id_iter)
  23. self._queue.append(song_id)
  24. self._codes[song_id] = code
  25. return song_id
  26. def remove(self, song_id):
  27. with self:
  28. self._queue.remove(song_id)
  29. self._codes.pop(song_id)
  30. def move(self, song_id, displacement):
  31. with self:
  32. i = self._queue.index(song_id)
  33. new_i = min(len(self._queue) - 1, max(0, i + displacement))
  34. self._queue.pop(i)
  35. self._queue.insert(new_i, song_id)
  36. return new_i - i
  37. def pop(self):
  38. with self:
  39. if len(self._queue) > 0:
  40. song_id = self._queue.pop()
  41. code = self._codes.pop(song_id)
  42. return self.as_song(song_id, code)
  43. else:
  44. return None
  45. def as_song(self, song_id, code=None):
  46. if code is None:
  47. code = self._codes[song_id]
  48. return {"id": song_id, "code": code}
  49. def as_list(self):
  50. with self:
  51. res = list(map(self.as_song, self._queue))
  52. return res
  53. def lock(self):
  54. self._lock.acquire()
  55. def unlock(self):
  56. self._lock.release()
  57. def __enter__(self):
  58. self.lock()
  59. def __exit__(self, exc_type, exc_val, exc_tb):
  60. self.unlock()
  61. def __len__(self):
  62. return len(self._queue)
  63. class Playback:
  64. _song: Optional[Dict] = None
  65. _state: PlayerState = PlayerState.LIST_END
  66. lock: RLock()
  67. def __init__(self):
  68. self.lock = RLock()
  69. def set_song(self, song):
  70. with self.lock:
  71. self._song = song
  72. def get_song(self):
  73. with self.lock:
  74. return self._song
  75. def get_song_id(self):
  76. with self.lock:
  77. if self._song is not None:
  78. return self._song["id"]
  79. else:
  80. return None
  81. def get_state(self):
  82. return self._state
  83. def set_state(self, state):
  84. self._state = state
  85. class Room:
  86. chueue: Chueue
  87. channel: Channel
  88. controller: Optional[Subscriber]
  89. controller_lock: RLock
  90. playback: Playback
  91. def __init__(self):
  92. self.chueue = Chueue()
  93. self.channel = Channel()
  94. self.controller_lock = RLock()
  95. self.playback = Playback()
  96. self.controller = None
  97. rooms: Dict[str, Room] = dict()
  98. async def request_state_processor(ws, data, path):
  99. room = rooms[path]
  100. await ws.send(make_message(Message.STATE, {
  101. "list": room.chueue.as_list(),
  102. "playing": room.playback.get_song(),
  103. "state": room.playback.get_state().name
  104. }))
  105. async def request_list_operation_processor(ws, data, path):
  106. room = rooms[path]
  107. chueue = room.chueue
  108. op = data["op"]
  109. message = None
  110. if op == QueueOp.ADD.name:
  111. code = data["code"]
  112. song_id = chueue.add(code)
  113. with room.playback.lock:
  114. if room.playback.get_state() == PlayerState.LIST_END:
  115. room.playback.set_state(PlayerState.PLAYING)
  116. room.playback.set_song(chueue.pop())
  117. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.name, "code": code, "id": song_id})
  118. elif op == QueueOp.DEL.name:
  119. song_id = data["id"]
  120. chueue.remove(song_id)
  121. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.DEL.name, "id": song_id})
  122. elif op == QueueOp.MOVE.name:
  123. song_id = data["id"]
  124. displacement = data["displacement"]
  125. actual_displacement = chueue.move(song_id, displacement)
  126. if actual_displacement != 0:
  127. message = make_message(Message.LIST_OPERATION,
  128. {"op": QueueOp.MOVE.name, "id": song_id, "displacement": actual_displacement})
  129. if message is not None:
  130. await room.channel.send(message)
  131. async def obtain_control_processor(ws, data, path):
  132. room = rooms[path]
  133. await obtain_control(ws, room)
  134. async def release_control_processor(ws, data, path):
  135. room = rooms[path]
  136. if len(room.channel.subscribers) > 1:
  137. await release_control(ws, room)
  138. else:
  139. pass
  140. # TODO error here
  141. async def song_end_processor(ws, data, path):
  142. room = rooms[path]
  143. old_song_id = data["id"]
  144. with room.controller_lock, room.playback.lock:
  145. if room.controller is not None and ws is room.controller.ws and old_song_id == room.playback.get_song_id():
  146. new_song = room.chueue.pop()
  147. room.playback.set_song(new_song)
  148. if new_song is None:
  149. room.playback.set_state(PlayerState.LIST_END)
  150. new_song_id = None
  151. else:
  152. new_song_id = new_song["id"]
  153. await room.channel.send(
  154. make_message(Message.SONG_END, {"ended_id": old_song_id, "current_id": new_song_id}))
  155. async def playback_processor(ws, data, path):
  156. room = rooms[path]
  157. async def player_enabled_processor(ws, data, path):
  158. room = rooms[path]
  159. room.channel.subscribers[ws].player_enabled = data["enabled"]
  160. if data["enabled"]:
  161. with room.controller_lock:
  162. if room.controller is None:
  163. await obtain_control(ws, room)
  164. else:
  165. await release_control(ws, room)
  166. # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
  167. async def obtain_control(ws, room: Room):
  168. with room.controller_lock:
  169. if room.controller is None or room.controller.ws is not ws:
  170. prev_controller = room.controller
  171. room.controller = room.channel.subscribers[ws]
  172. await ws.send(make_message(Message.OBTAIN_CONTROL))
  173. if prev_controller is not None:
  174. await prev_controller.ws.send(make_message(Message.RELEASE_CONTROL))
  175. async def release_control(ws, room: Room):
  176. with room.controller_lock:
  177. if room.controller is not None and room.controller.ws is ws:
  178. room.controller = next(room.channel.get_player_enabled_subscribers(), None)
  179. if room.controller is not None:
  180. await room.controller.ws.send(make_message(Message.OBTAIN_CONTROL))
  181. await ws.send(make_message(Message.RELEASE_CONTROL))
  182. async def on_connect(ws, path):
  183. if path not in rooms:
  184. rooms[path] = Room()
  185. room = rooms[path]
  186. room.channel.subscribe(ws)
  187. async def on_disconnect(ws, path):
  188. room = rooms[path]
  189. room.channel.unsubscribe(ws)
  190. await release_control(ws, room)
  191. # with room.controller_lock:
  192. # if room.controller is None:
  193. # room.playback.
  194. def make_resolver():
  195. resolver = Resolver()
  196. resolver.register(Message.STATE, request_state_processor)
  197. resolver.register(Message.LIST_OPERATION, request_list_operation_processor)
  198. resolver.register(Message.PLAYER_ENABLED, player_enabled_processor)
  199. resolver.register(Message.OBTAIN_CONTROL, obtain_control_processor)
  200. resolver.register(Message.RELEASE_CONTROL, release_control_processor)
  201. resolver.register(Message.SONG_END, song_end_processor)
  202. search_resolver = chube_search.make_resolver()
  203. resolver.add_all(search_resolver)
  204. return resolver
  205. def init_rooms():
  206. # rooms["main"] = Room()
  207. pass
  208. if __name__ == "__main__":
  209. player_resolver = make_resolver()
  210. init_rooms()
  211. start_server(player_resolver, on_connect, on_disconnect)