chube.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. import logging
  2. import os
  3. import asyncio
  4. from threading import RLock
  5. from typing import Optional, Iterator, Dict, List
  6. import sys
  7. from itertools import cycle
  8. import chube_youtube
  9. from channel import Channel, Subscriber
  10. from chube_enums import *
  11. from chube_ws import Resolver, Message, start_server, make_message
  12. logger = logging.getLogger('chube')
  13. class Chueue:
  14. _lock: RLock
  15. _queue: List[int]
  16. _codes: Dict[int, str]
  17. _id_iter: Iterator[int]
  18. _played_queue: Optional[List[int]]
  19. _repeat_enabled: bool = False
  20. def __init__(self):
  21. self._lock = RLock()
  22. self._queue = []
  23. self._codes = dict()
  24. self._id_iter = cycle(range(sys.maxsize))
  25. self._played_queue = None
  26. def add(self, code):
  27. with self:
  28. song_id = next(self._id_iter)
  29. self._queue.append(song_id)
  30. self._codes[song_id] = code
  31. logger.debug("Added code %s id %d to chueue %s", code, song_id, self)
  32. return song_id
  33. def remove(self, song_id):
  34. with self:
  35. self._queue.remove(song_id)
  36. self._codes.pop(song_id)
  37. logger.debug("Removed id %d from chueue %s", song_id, self)
  38. def move(self, song_id, displacement):
  39. with self:
  40. i = self._queue.index(song_id)
  41. new_i = min(len(self._queue) - 1, max(0, i + displacement))
  42. self._queue.pop(i)
  43. self._queue.insert(new_i, song_id)
  44. logger.debug("Moved id %d from chueue %s. dx: %d, i0: %d, i1: %d", song_id, self, displacement, i, new_i)
  45. return new_i - i
  46. def pop(self):
  47. with self:
  48. if len(self._queue) <= 0:
  49. if self._repeat_enabled and len(self._played_queue) > 0:
  50. self._queue = self._played_queue
  51. self._played_queue = []
  52. logger.debug("Popped empty list, repeat enabled for chueue %s", self)
  53. else:
  54. logger.debug("Popped empty list, repeat disabled for chueue %s", self)
  55. return None
  56. song_id = self._queue.pop(0)
  57. if self._repeat_enabled:
  58. code = self._codes[song_id]
  59. self._played_queue.append(song_id)
  60. else:
  61. code = self._codes.pop(song_id)
  62. logger.debug("Popped id %d, repeat %s for chueue %s", song_id, self._repeat_enabled, self)
  63. return self.as_song(song_id, code)
  64. def set_repeat_enabled(self, enable, playback_song):
  65. with self:
  66. self._repeat_enabled = enable
  67. if enable:
  68. if playback_song is not None:
  69. self._played_queue = [playback_song["id"]]
  70. self._codes[playback_song["id"]] = playback_song["code"]
  71. else:
  72. self._played_queue = []
  73. logger.debug("Set repeat enabled")
  74. else:
  75. for song_id in self._played_queue:
  76. self._codes.pop(song_id)
  77. self._played_queue = None
  78. logger.debug("Set repeat disabled")
  79. def is_repeat_enabled(self):
  80. return self._repeat_enabled
  81. def as_song(self, song_id, code=None):
  82. if code is None:
  83. code = self._codes[song_id]
  84. return {"id": song_id, "code": code}
  85. def as_lists(self):
  86. with self:
  87. queue_as_list = list(map(self.as_song, self._queue))
  88. played_as_list = list(map(self.as_song, self._played_queue)) if self.is_repeat_enabled() else None
  89. return {"next": queue_as_list, "previous": played_as_list}
  90. def lock(self):
  91. self._lock.acquire()
  92. def unlock(self):
  93. self._lock.release()
  94. def __enter__(self):
  95. self.lock()
  96. def __exit__(self, exc_type, exc_val, exc_tb):
  97. self.unlock()
  98. def __len__(self):
  99. return len(self._queue)
  100. class Playback:
  101. _song: Optional[Dict] = None
  102. _state: PlayerState = PlayerState.LIST_END
  103. lock: RLock()
  104. def __init__(self):
  105. self.lock = RLock()
  106. def set_song(self, song):
  107. with self.lock:
  108. self._song = song
  109. logger.debug("Playback %s: Set song to %d", self, song["id"])
  110. def get_song(self):
  111. with self.lock:
  112. return self._song
  113. def get_song_id(self):
  114. with self.lock:
  115. if self._song is not None:
  116. return self._song["id"]
  117. else:
  118. return None
  119. def get_state(self):
  120. return self._state
  121. def set_state(self, state):
  122. self._state = state
  123. logger.debug("Playback %s: Set state to %s", self, state)
  124. class Room:
  125. chueue: Chueue
  126. channel: Channel
  127. _controller: Optional[Subscriber]
  128. controller_lock: RLock
  129. playback: Playback
  130. def __init__(self):
  131. self.chueue = Chueue()
  132. self.channel = Channel()
  133. self.controller_lock = RLock()
  134. self.playback = Playback()
  135. self._controller = None
  136. def get_controller(self):
  137. return self._controller
  138. def set_controller(self, controller):
  139. logger.debug("Room %s: Set controller to %s", self, controller)
  140. self._controller = controller
  141. rooms: Dict[str, Room] = dict()
  142. async def request_state_processor(ws, _, path):
  143. room = rooms[path]
  144. state = {
  145. "lists": room.chueue.as_lists(),
  146. "playing": room.playback.get_song(),
  147. "state": room.playback.get_state().value
  148. }
  149. logger.debug("Processor: State request, state: %s", state)
  150. await ws.send(make_message(Message.STATE, state))
  151. async def request_list_operation_processor(_, data, path):
  152. room = rooms[path]
  153. chueue = room.chueue
  154. op = data["op"]
  155. message = None
  156. if op == QueueOp.ADD.value:
  157. kind = data["kind"]
  158. if kind == YoutubeResourceType.VIDEO.value:
  159. code = data["code"]
  160. song_id = chueue.add(code)
  161. logger.debug("Processor: Added song %s / %d to chueue %s", code, song_id, chueue)
  162. message = make_message(Message.LIST_OPERATION,
  163. {"op": QueueOp.ADD.value, "items": [{"code": code, "id": song_id}]})
  164. elif kind == YoutubeResourceType.PLAYLIST.value:
  165. code = data["code"]
  166. playlist_items = await chube_youtube.get_all_playlist_items(code)
  167. response_items = []
  168. with room.chueue:
  169. for item in playlist_items["items"]:
  170. code = item["snippet"]["resourceId"]["videoId"]
  171. song_id = chueue.add(code)
  172. response_items.append({"code": code, "id": song_id, "snippet": item["snippet"]})
  173. logger.debug("Processor: Added playlist %s to chueue %s", code, chueue)
  174. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.value, "items": response_items})
  175. with room.playback.lock:
  176. if room.playback.get_state() == PlayerState.LIST_END:
  177. playing = chueue.pop()
  178. if playing is not None:
  179. room.playback.set_state(PlayerState.PLAYING)
  180. room.playback.set_song(playing)
  181. logger.debug("Processor: LIST_END ended")
  182. elif op == QueueOp.DEL.value:
  183. song_id = data["id"]
  184. chueue.remove(song_id)
  185. logger.debug("Processor: Deleted song %d from chueue %s", song_id, chueue)
  186. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.DEL.value, "items": [{"id": song_id}]})
  187. elif op == QueueOp.MOVE.value:
  188. song_id = data["id"]
  189. displacement = data["displacement"]
  190. actual_displacement = chueue.move(song_id, displacement)
  191. if actual_displacement != 0:
  192. logger.debug("Processor: Moved song %d by dx %d from chueue %s", song_id, actual_displacement, chueue)
  193. message = make_message(Message.LIST_OPERATION,
  194. {"op": QueueOp.MOVE.value,
  195. "items": [{"id": song_id, "displacement": actual_displacement}]})
  196. if message is not None:
  197. await room.channel.send(message)
  198. async def media_action_processor(_, data, path):
  199. room = rooms[path]
  200. action = data["action"]
  201. send_next = False
  202. if action == MediaAction.NEXT.value:
  203. current_id = data["current_id"]
  204. with room.playback.lock, room.chueue:
  205. old_song_id = room.playback.get_song_id()
  206. if old_song_id == current_id:
  207. send_next = True
  208. new_song = play_next_song(room)
  209. if new_song is None:
  210. new_song_id = None
  211. else:
  212. new_song_id = new_song["id"]
  213. if send_next:
  214. await room.channel.send(make_message(
  215. Message.MEDIA_ACTION,
  216. {"action": MediaAction.NEXT.value, "ended_id": old_song_id, "current_id": new_song_id}))
  217. if action == MediaAction.PLAY.value or send_next:
  218. send_play = False
  219. with room.playback.lock:
  220. if room.playback.get_state() == PlayerState.PAUSED:
  221. send_play = True
  222. room.playback.set_state(PlayerState.PLAYING)
  223. if send_play:
  224. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PLAY.value}))
  225. if action == MediaAction.PAUSE.value:
  226. send_pause = False
  227. with room.playback.lock:
  228. if room.playback.get_state() == PlayerState.PLAYING:
  229. send_pause = True
  230. room.playback.set_state(PlayerState.PAUSED)
  231. if send_pause:
  232. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PAUSE.value}))
  233. if action == MediaAction.REPEAT.value:
  234. enable = data["enable"]
  235. if room.chueue.is_repeat_enabled() != enable:
  236. with room.chueue:
  237. room.chueue.set_repeat_enabled(enable, room.playback.get_song())
  238. await room.channel.send(
  239. make_message(Message.MEDIA_ACTION, {"action": MediaAction.REPEAT.value, "enable": enable}))
  240. async def obtain_control_processor(ws, data, path):
  241. room = rooms[path]
  242. await obtain_control(ws, room)
  243. async def release_control_processor(ws, data, path):
  244. room = rooms[path]
  245. if len(room.channel.subscribers) > 1:
  246. await release_control(ws, room)
  247. else:
  248. pass
  249. def play_next_song(room):
  250. new_song = room.chueue.pop()
  251. room.playback.set_song(new_song)
  252. if new_song is None:
  253. room.playback.set_state(PlayerState.LIST_END)
  254. return new_song
  255. async def song_end_processor(ws, data, path):
  256. room = rooms[path]
  257. old_song_id = data["id"]
  258. with room.controller_lock, room.playback.lock:
  259. controller = room.get_controller()
  260. if controller is not None and controller.ws is ws and old_song_id == room.playback.get_song_id():
  261. new_song = play_next_song(room)
  262. if new_song is None:
  263. new_song_id = None
  264. else:
  265. new_song_id = new_song["id"]
  266. await room.channel.send(
  267. make_message(Message.SONG_END, {"ended_id": old_song_id, "current_id": new_song_id}))
  268. async def player_enabled_processor(ws, data, path):
  269. room = rooms[path]
  270. room.channel.subscribers[ws].player_enabled = data["enabled"]
  271. if data["enabled"]:
  272. with room.controller_lock:
  273. if room.get_controller() is None:
  274. await obtain_control(ws, room)
  275. else:
  276. await release_control(ws, room)
  277. # TODO change OBTAIN_CONTROL en RELEASE_CONTROL to one message
  278. # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
  279. async def obtain_control(ws, room: Room):
  280. with room.controller_lock:
  281. controller = room.get_controller()
  282. if controller is None or controller.ws is not ws:
  283. room.set_controller(room.channel.subscribers[ws])
  284. await ws.send(make_message(Message.OBTAIN_CONTROL))
  285. if controller is not None:
  286. await controller.ws.send(make_message(Message.RELEASE_CONTROL))
  287. async def release_control(ws, room: Room):
  288. with room.controller_lock:
  289. controller = room.get_controller()
  290. if controller is not None and controller.ws is ws:
  291. controller = next(room.channel.get_player_enabled_subscribers(), None)
  292. room.set_controller(controller)
  293. if controller is not None:
  294. await controller.ws.send(make_message(Message.OBTAIN_CONTROL))
  295. # if ws.open:
  296. await ws.send(make_message(Message.RELEASE_CONTROL))
  297. async def on_connect(ws, path):
  298. if path not in rooms:
  299. rooms[path] = Room()
  300. room = rooms[path]
  301. room.channel.subscribe(ws)
  302. print("Currently {} user{} {} using room {}".format(
  303. len(room.channel.subscribers),
  304. "s" if len(room.channel.subscribers) != 1 else "",
  305. "are" if len(room.channel.subscribers) != 1 else "is",
  306. path))
  307. async def on_disconnect(ws, path):
  308. room = rooms[path]
  309. room.channel.unsubscribe(ws)
  310. await release_control(ws, room)
  311. print("Currently {} user{} {} using room {}".format(
  312. len(room.channel.subscribers),
  313. "s" if len(room.channel.subscribers) != 1 else "",
  314. "are" if len(room.channel.subscribers) != 1 else "is",
  315. path))
  316. def make_resolver():
  317. resolver = Resolver()
  318. resolver.register(Message.STATE, request_state_processor)
  319. resolver.register(Message.LIST_OPERATION, request_list_operation_processor)
  320. resolver.register(Message.MEDIA_ACTION, media_action_processor)
  321. resolver.register(Message.PLAYER_ENABLED, player_enabled_processor)
  322. resolver.register(Message.OBTAIN_CONTROL, obtain_control_processor)
  323. resolver.register(Message.RELEASE_CONTROL, release_control_processor)
  324. resolver.register(Message.SONG_END, song_end_processor)
  325. search_resolver = chube_youtube.make_resolver()
  326. resolver.add_all(search_resolver)
  327. return resolver
  328. if __name__ == "__main__":
  329. # Logging
  330. log_level = os.environ.get('CHUBE_LOGLEVEL', 'INFO').upper()
  331. logger.setLevel(log_level)
  332. ch = logging.StreamHandler()
  333. ch.setLevel(log_level)
  334. formatter = logging.Formatter(
  335. fmt='%(asctime)s %(levelname)-8s %(message)s',
  336. datefmt='%Y-%m-%d %H:%M:%S')
  337. ch.setFormatter(formatter)
  338. logger.addHandler(ch)
  339. player_resolver = make_resolver()
  340. asyncio.run(start_server(player_resolver, on_connect, on_disconnect))