chube.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. from threading import RLock
  2. from typing import Optional, Iterator, Dict, List
  3. import sys
  4. from itertools import cycle
  5. import chube_youtube
  6. from channel import Channel, Subscriber
  7. from chube_enums import *
  8. from chube_ws import Resolver, Message, start_server, make_message
  9. class Chueue:
  10. _lock: RLock
  11. _queue: List[int]
  12. _codes: Dict[int, str]
  13. _id_iter: Iterator[int]
  14. _played_queue: Optional[List[int]]
  15. _repeat_enabled: bool = False
  16. def __init__(self):
  17. self._lock = RLock()
  18. self._queue = []
  19. self._codes = dict()
  20. self._id_iter = cycle(range(sys.maxsize))
  21. self._played_queue = None
  22. def add(self, code):
  23. with self:
  24. song_id = next(self._id_iter)
  25. self._queue.append(song_id)
  26. self._codes[song_id] = code
  27. return song_id
  28. def remove(self, song_id):
  29. with self:
  30. self._queue.remove(song_id)
  31. self._codes.pop(song_id)
  32. def move(self, song_id, displacement):
  33. with self:
  34. i = self._queue.index(song_id)
  35. new_i = min(len(self._queue) - 1, max(0, i + displacement))
  36. self._queue.pop(i)
  37. self._queue.insert(new_i, song_id)
  38. return new_i - i
  39. def pop(self):
  40. with self:
  41. if len(self._queue) <= 0:
  42. if self._repeat_enabled and len(self._played_queue) > 0:
  43. self._queue = self._played_queue
  44. self._played_queue = []
  45. else:
  46. return None
  47. song_id = self._queue.pop(0)
  48. if self._repeat_enabled:
  49. code = self._codes[song_id]
  50. self._played_queue.append(song_id)
  51. else:
  52. code = self._codes.pop(song_id)
  53. return self.as_song(song_id, code)
  54. def set_repeat_enabled(self, enable, playback_song):
  55. with self:
  56. self._repeat_enabled = enable
  57. if enable:
  58. if playback_song is not None:
  59. self._played_queue = [playback_song["id"]]
  60. self._codes[playback_song["id"]] = playback_song["code"]
  61. else:
  62. self._played_queue = []
  63. else:
  64. self._played_queue = None
  65. def is_repeat_enabled(self):
  66. return self._repeat_enabled
  67. def as_song(self, song_id, code=None):
  68. if code is None:
  69. code = self._codes[song_id]
  70. return {"id": song_id, "code": code}
  71. def as_lists(self):
  72. with self:
  73. queue_as_list = list(map(self.as_song, self._queue))
  74. played_as_list = list(map(self.as_song, self._played_queue)) if self.is_repeat_enabled() else None
  75. return {"next": queue_as_list, "previous": played_as_list}
  76. def lock(self):
  77. self._lock.acquire()
  78. def unlock(self):
  79. self._lock.release()
  80. def __enter__(self):
  81. self.lock()
  82. def __exit__(self, exc_type, exc_val, exc_tb):
  83. self.unlock()
  84. def __len__(self):
  85. return len(self._queue)
  86. class Playback:
  87. _song: Optional[Dict] = None
  88. _state: PlayerState = PlayerState.LIST_END
  89. lock: RLock()
  90. def __init__(self):
  91. self.lock = RLock()
  92. def set_song(self, song):
  93. with self.lock:
  94. self._song = song
  95. def get_song(self):
  96. with self.lock:
  97. return self._song
  98. def get_song_id(self):
  99. with self.lock:
  100. if self._song is not None:
  101. return self._song["id"]
  102. else:
  103. return None
  104. def get_state(self):
  105. return self._state
  106. def set_state(self, state):
  107. self._state = state
  108. class Room:
  109. chueue: Chueue
  110. channel: Channel
  111. _controller: Optional[Subscriber]
  112. controller_lock: RLock
  113. playback: Playback
  114. def __init__(self):
  115. self.chueue = Chueue()
  116. self.channel = Channel()
  117. self.controller_lock = RLock()
  118. self.playback = Playback()
  119. self._controller = None
  120. def get_controller(self):
  121. return self._controller
  122. def set_controller(self, controller):
  123. self._controller = controller
  124. rooms: Dict[str, Room] = dict()
  125. async def request_state_processor(ws, data, path):
  126. room = rooms[path]
  127. await ws.send(make_message(Message.STATE, {
  128. "lists": room.chueue.as_lists(),
  129. "playing": room.playback.get_song(),
  130. "state": room.playback.get_state().value
  131. }))
  132. async def request_list_operation_processor(ws, data, path):
  133. room = rooms[path]
  134. chueue = room.chueue
  135. op = data["op"]
  136. message = None
  137. if op == QueueOp.ADD.value:
  138. kind = data["kind"]
  139. if kind == YoutubeResourceType.VIDEO.value:
  140. code = data["code"]
  141. song_id = chueue.add(code)
  142. message = make_message(Message.LIST_OPERATION,
  143. {"op": QueueOp.ADD.value, "items": [{"code": code, "id": song_id}]})
  144. elif kind == YoutubeResourceType.PLAYLIST.value:
  145. code = data["code"]
  146. playlist_items = await chube_youtube.get_all_playlist_items(code)
  147. response_items = []
  148. with room.chueue:
  149. for item in playlist_items["items"]:
  150. code = item["snippet"]["resourceId"]["videoId"]
  151. song_id = chueue.add(code)
  152. response_items.append({"code": code, "id": song_id, "snippet": item["snippet"]})
  153. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.value, "items": response_items})
  154. with room.playback.lock:
  155. if room.playback.get_state() == PlayerState.LIST_END:
  156. playing = chueue.pop()
  157. if playing is not None:
  158. room.playback.set_state(PlayerState.PLAYING)
  159. room.playback.set_song(playing)
  160. elif op == QueueOp.DEL.value:
  161. song_id = data["id"]
  162. chueue.remove(song_id)
  163. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.DEL.value, "items": [{"id": song_id}]})
  164. elif op == QueueOp.MOVE.value:
  165. song_id = data["id"]
  166. displacement = data["displacement"]
  167. actual_displacement = chueue.move(song_id, displacement)
  168. if actual_displacement != 0:
  169. message = make_message(Message.LIST_OPERATION,
  170. {"op": QueueOp.MOVE.value,
  171. "items": [{"id": song_id, "displacement": actual_displacement}]})
  172. if message is not None:
  173. await room.channel.send(message)
  174. async def media_action_processor(ws, data, path):
  175. room = rooms[path]
  176. action = data["action"]
  177. send_next = False
  178. if action == MediaAction.NEXT.value:
  179. current_id = data["current_id"]
  180. with room.playback.lock, room.chueue:
  181. old_song_id = room.playback.get_song_id()
  182. if old_song_id == current_id:
  183. send_next = True
  184. new_song = play_next_song(room)
  185. if new_song is None:
  186. new_song_id = None
  187. else:
  188. new_song_id = new_song["id"]
  189. if send_next:
  190. await room.channel.send(make_message(
  191. Message.MEDIA_ACTION,
  192. {"action": MediaAction.NEXT.value, "ended_id": old_song_id, "current_id": new_song_id}))
  193. if action == MediaAction.PLAY.value or send_next:
  194. send_play = False
  195. with room.playback.lock:
  196. if room.playback.get_state() == PlayerState.PAUSED:
  197. send_play = True
  198. room.playback.set_state(PlayerState.PLAYING)
  199. if send_play:
  200. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PLAY.value}))
  201. if action == MediaAction.PAUSE.value:
  202. send_pause = False
  203. with room.playback.lock:
  204. if room.playback.get_state() == PlayerState.PLAYING:
  205. send_pause = True
  206. room.playback.set_state(PlayerState.PAUSED)
  207. if send_pause:
  208. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PAUSE.value}))
  209. if action == MediaAction.REPEAT.value:
  210. enable = data["enable"]
  211. if room.chueue.is_repeat_enabled() != enable:
  212. with room.chueue:
  213. room.chueue.set_repeat_enabled(enable, room.playback.get_song())
  214. await room.channel.send(
  215. make_message(Message.MEDIA_ACTION, {"action": MediaAction.REPEAT.value, "enable": enable}))
  216. async def obtain_control_processor(ws, data, path):
  217. room = rooms[path]
  218. await obtain_control(ws, room)
  219. async def release_control_processor(ws, data, path):
  220. room = rooms[path]
  221. if len(room.channel.subscribers) > 1:
  222. await release_control(ws, room)
  223. else:
  224. pass
  225. def play_next_song(room):
  226. new_song = room.chueue.pop()
  227. room.playback.set_song(new_song)
  228. if new_song is None:
  229. room.playback.set_state(PlayerState.LIST_END)
  230. return new_song
  231. async def song_end_processor(ws, data, path):
  232. room = rooms[path]
  233. old_song_id = data["id"]
  234. with room.controller_lock, room.playback.lock:
  235. controller = room.get_controller()
  236. if controller is not None and controller.ws is ws and old_song_id == room.playback.get_song_id():
  237. new_song = play_next_song(room)
  238. if new_song is None:
  239. new_song_id = None
  240. else:
  241. new_song_id = new_song["id"]
  242. await room.channel.send(
  243. make_message(Message.SONG_END, {"ended_id": old_song_id, "current_id": new_song_id}))
  244. async def player_enabled_processor(ws, data, path):
  245. room = rooms[path]
  246. room.channel.subscribers[ws].player_enabled = data["enabled"]
  247. if data["enabled"]:
  248. with room.controller_lock:
  249. if room.get_controller() is None:
  250. await obtain_control(ws, room)
  251. else:
  252. await release_control(ws, room)
  253. # TODO change OBTAIN_CONTROL en RELEASE_CONTROL to one message
  254. # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
  255. async def obtain_control(ws, room: Room):
  256. with room.controller_lock:
  257. controller = room.get_controller()
  258. if controller is None or controller.ws is not ws:
  259. room.set_controller(room.channel.subscribers[ws])
  260. await ws.send(make_message(Message.OBTAIN_CONTROL))
  261. if controller is not None:
  262. await controller.ws.send(make_message(Message.RELEASE_CONTROL))
  263. async def release_control(ws, room: Room):
  264. with room.controller_lock:
  265. controller = room.get_controller()
  266. if controller is not None and controller.ws is ws:
  267. controller = next(room.channel.get_player_enabled_subscribers(), None)
  268. room.set_controller(controller)
  269. if controller is not None:
  270. await controller.ws.send(make_message(Message.OBTAIN_CONTROL))
  271. if ws.open:
  272. await ws.send(make_message(Message.RELEASE_CONTROL))
  273. async def on_connect(ws, path):
  274. if path not in rooms:
  275. rooms[path] = Room()
  276. room = rooms[path]
  277. room.channel.subscribe(ws)
  278. async def on_disconnect(ws, path):
  279. room = rooms[path]
  280. room.channel.unsubscribe(ws)
  281. await release_control(ws, room)
  282. def make_resolver():
  283. resolver = Resolver()
  284. resolver.register(Message.STATE, request_state_processor)
  285. resolver.register(Message.LIST_OPERATION, request_list_operation_processor)
  286. resolver.register(Message.MEDIA_ACTION, media_action_processor)
  287. resolver.register(Message.PLAYER_ENABLED, player_enabled_processor)
  288. resolver.register(Message.OBTAIN_CONTROL, obtain_control_processor)
  289. resolver.register(Message.RELEASE_CONTROL, release_control_processor)
  290. resolver.register(Message.SONG_END, song_end_processor)
  291. search_resolver = chube_youtube.make_resolver()
  292. resolver.add_all(search_resolver)
  293. return resolver
  294. if __name__ == "__main__":
  295. player_resolver = make_resolver()
  296. start_server(player_resolver, on_connect, on_disconnect)