chube.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. from threading import RLock
  2. from typing import Optional, Iterator, Dict, List
  3. import sys
  4. from itertools import cycle
  5. import chube_youtube
  6. from channel import Channel, Subscriber
  7. from chube_enums import *
  8. from chube_ws import Resolver, Message, start_server, make_message
  9. class Chueue:
  10. _lock: RLock
  11. _queue: List[int]
  12. _codes: Dict[int, str]
  13. _id_iter: Iterator[int]
  14. def __init__(self):
  15. self._lock = RLock()
  16. self._queue = []
  17. self._codes = dict()
  18. self._id_iter = cycle(range(sys.maxsize))
  19. def add(self, code):
  20. with self:
  21. song_id = next(self._id_iter)
  22. self._queue.append(song_id)
  23. self._codes[song_id] = code
  24. return song_id
  25. def remove(self, song_id):
  26. with self:
  27. self._queue.remove(song_id)
  28. self._codes.pop(song_id)
  29. def move(self, song_id, displacement):
  30. with self:
  31. i = self._queue.index(song_id)
  32. new_i = min(len(self._queue) - 1, max(0, i + displacement))
  33. self._queue.pop(i)
  34. self._queue.insert(new_i, song_id)
  35. return new_i - i
  36. def pop(self):
  37. with self:
  38. if len(self._queue) > 0:
  39. song_id = self._queue.pop(0)
  40. code = self._codes.pop(song_id)
  41. return self.as_song(song_id, code)
  42. else:
  43. return None
  44. def as_song(self, song_id, code=None):
  45. if code is None:
  46. code = self._codes[song_id]
  47. return {"id": song_id, "code": code}
  48. def as_list(self):
  49. with self:
  50. res = list(map(self.as_song, self._queue))
  51. return res
  52. def lock(self):
  53. self._lock.acquire()
  54. def unlock(self):
  55. self._lock.release()
  56. def __enter__(self):
  57. self.lock()
  58. def __exit__(self, exc_type, exc_val, exc_tb):
  59. self.unlock()
  60. def __len__(self):
  61. return len(self._queue)
  62. class Playback:
  63. _song: Optional[Dict] = None
  64. _state: PlayerState = PlayerState.LIST_END
  65. lock: RLock()
  66. def __init__(self):
  67. self.lock = RLock()
  68. def set_song(self, song):
  69. with self.lock:
  70. self._song = song
  71. def get_song(self):
  72. with self.lock:
  73. return self._song
  74. def get_song_id(self):
  75. with self.lock:
  76. if self._song is not None:
  77. return self._song["id"]
  78. else:
  79. return None
  80. def get_state(self):
  81. return self._state
  82. def set_state(self, state):
  83. self._state = state
  84. class Room:
  85. chueue: Chueue
  86. channel: Channel
  87. controller: Optional[Subscriber]
  88. controller_lock: RLock
  89. playback: Playback
  90. def __init__(self):
  91. self.chueue = Chueue()
  92. self.channel = Channel()
  93. self.controller_lock = RLock()
  94. self.playback = Playback()
  95. self.controller = None
  96. rooms: Dict[str, Room] = dict()
  97. async def request_state_processor(ws, data, path):
  98. room = rooms[path]
  99. await ws.send(make_message(Message.STATE, {
  100. "list": room.chueue.as_list(),
  101. "playing": room.playback.get_song(),
  102. "state": room.playback.get_state().value
  103. }))
  104. async def request_list_operation_processor(ws, data, path):
  105. room = rooms[path]
  106. chueue = room.chueue
  107. op = data["op"]
  108. message = None
  109. if op == QueueOp.ADD.value:
  110. kind = data["kind"]
  111. if kind == YoutubeResourceType.VIDEO.value:
  112. code = data["code"]
  113. song_id = chueue.add(code)
  114. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.value, "items": [{"code": code, "id": song_id}]})
  115. elif kind == YoutubeResourceType.PLAYLIST.value:
  116. code = data["code"]
  117. playlist_items = await chube_youtube.get_all_playlist_items(code)
  118. response_items = []
  119. with room.chueue:
  120. for item in playlist_items["items"]:
  121. code = item["snippet"]["resourceId"]["videoId"]
  122. song_id = chueue.add(code)
  123. response_items.append({"code": code, "id": song_id, "snippet": item["snippet"]})
  124. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.value, "items": response_items})
  125. with room.playback.lock:
  126. if room.playback.get_state() == PlayerState.LIST_END:
  127. playing = chueue.pop()
  128. if playing is not None:
  129. room.playback.set_state(PlayerState.PLAYING)
  130. room.playback.set_song(playing)
  131. elif op == QueueOp.DEL.value:
  132. song_id = data["id"]
  133. chueue.remove(song_id)
  134. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.DEL.value, "items": [{"id": song_id}]})
  135. elif op == QueueOp.MOVE.value:
  136. song_id = data["id"]
  137. displacement = data["displacement"]
  138. actual_displacement = chueue.move(song_id, displacement)
  139. if actual_displacement != 0:
  140. message = make_message(Message.LIST_OPERATION,
  141. {"op": QueueOp.MOVE.value, "items": [{"id": song_id, "displacement": actual_displacement}]})
  142. if message is not None:
  143. await room.channel.send(message)
  144. async def media_action_processor(ws, data, path):
  145. room = rooms[path]
  146. action = data["action"]
  147. send_next = False
  148. if action == MediaAction.NEXT.value:
  149. current_id = data["current_id"]
  150. with room.playback.lock, room.chueue:
  151. old_song_id = room.playback.get_song_id()
  152. if old_song_id == current_id:
  153. send_next = True
  154. new_song = play_next_song(room)
  155. if new_song is None:
  156. new_song_id = None
  157. else:
  158. new_song_id = new_song["id"]
  159. if send_next:
  160. await room.channel.send(make_message(
  161. Message.MEDIA_ACTION,
  162. {"action": MediaAction.NEXT.value, "ended_id": old_song_id, "current_id": new_song_id}))
  163. if action == MediaAction.PLAY.value or send_next:
  164. send_play = False
  165. with room.playback.lock:
  166. if room.playback.get_state() == PlayerState.PAUSED:
  167. send_play = True
  168. room.playback.set_state(PlayerState.PLAYING)
  169. if send_play:
  170. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PLAY.value}))
  171. if action == MediaAction.PAUSE.value:
  172. send_pause = False
  173. with room.playback.lock:
  174. if room.playback.get_state() == PlayerState.PLAYING:
  175. send_pause = True
  176. room.playback.set_state(PlayerState.PAUSED)
  177. if send_pause:
  178. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PAUSE.value}))
  179. async def obtain_control_processor(ws, data, path):
  180. room = rooms[path]
  181. await obtain_control(ws, room)
  182. async def release_control_processor(ws, data, path):
  183. room = rooms[path]
  184. if len(room.channel.subscribers) > 1:
  185. await release_control(ws, room)
  186. else:
  187. pass
  188. # TODO error here
  189. def play_next_song(room):
  190. new_song = room.chueue.pop()
  191. room.playback.set_song(new_song)
  192. if new_song is None:
  193. room.playback.set_state(PlayerState.LIST_END)
  194. return new_song
  195. async def song_end_processor(ws, data, path):
  196. room = rooms[path]
  197. old_song_id = data["id"]
  198. with room.controller_lock, room.playback.lock:
  199. if room.controller is not None and ws is room.controller.ws and old_song_id == room.playback.get_song_id():
  200. new_song = play_next_song(room)
  201. if new_song is None:
  202. new_song_id = None
  203. else:
  204. new_song_id = new_song["id"]
  205. await room.channel.send(
  206. make_message(Message.SONG_END, {"ended_id": old_song_id, "current_id": new_song_id}))
  207. async def player_enabled_processor(ws, data, path):
  208. room = rooms[path]
  209. room.channel.subscribers[ws].player_enabled = data["enabled"]
  210. if data["enabled"]:
  211. with room.controller_lock:
  212. if room.controller is None:
  213. await obtain_control(ws, room)
  214. else:
  215. await release_control(ws, room)
  216. # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
  217. async def obtain_control(ws, room: Room):
  218. with room.controller_lock:
  219. if room.controller is None or room.controller.ws is not ws:
  220. prev_controller = room.controller
  221. room.controller = room.channel.subscribers[ws]
  222. await ws.send(make_message(Message.OBTAIN_CONTROL))
  223. if prev_controller is not None:
  224. await prev_controller.ws.send(make_message(Message.RELEASE_CONTROL))
  225. async def release_control(ws, room: Room):
  226. with room.controller_lock:
  227. if room.controller is not None and room.controller.ws is ws:
  228. room.controller = next(room.channel.get_player_enabled_subscribers(), None)
  229. if room.controller is not None:
  230. await room.controller.ws.send(make_message(Message.OBTAIN_CONTROL))
  231. await ws.send(make_message(Message.RELEASE_CONTROL))
  232. async def on_connect(ws, path):
  233. if path not in rooms:
  234. rooms[path] = Room()
  235. room = rooms[path]
  236. room.channel.subscribe(ws)
  237. async def on_disconnect(ws, path):
  238. room = rooms[path]
  239. room.channel.unsubscribe(ws)
  240. await release_control(ws, room)
  241. # with room.controller_lock:
  242. # if room.controller is None:
  243. # room.playback.
  244. def make_resolver():
  245. resolver = Resolver()
  246. resolver.register(Message.STATE, request_state_processor)
  247. resolver.register(Message.LIST_OPERATION, request_list_operation_processor)
  248. resolver.register(Message.MEDIA_ACTION, media_action_processor)
  249. resolver.register(Message.PLAYER_ENABLED, player_enabled_processor)
  250. resolver.register(Message.OBTAIN_CONTROL, obtain_control_processor)
  251. resolver.register(Message.RELEASE_CONTROL, release_control_processor)
  252. resolver.register(Message.SONG_END, song_end_processor)
  253. search_resolver = chube_youtube.make_resolver()
  254. resolver.add_all(search_resolver)
  255. return resolver
  256. def init_rooms():
  257. # rooms["main"] = Room()
  258. pass
  259. if __name__ == "__main__":
  260. player_resolver = make_resolver()
  261. init_rooms()
  262. start_server(player_resolver, on_connect, on_disconnect)