chube.py 15 KB


  1. import logging
  2. import os
  3. import asyncio
  4. from threading import RLock
  5. from typing import Optional, Iterator, Dict, List
  6. import sys
  7. from itertools import cycle
  8. from websockets.asyncio.server import ServerConnection
  9. import chube_youtube
  10. from channel import Channel, Subscriber
  11. from chube_enums import *
  12. from chube_ws import Resolver, Message, start_server, make_message
  13. logger = logging.getLogger('chube')
  14. class Chueue:
  15. _lock: RLock
  16. _queue: List[int]
  17. _codes: Dict[int, str]
  18. _id_iter: Iterator[int]
  19. _played_queue: Optional[List[int]]
  20. _repeat_enabled: bool = False
  21. def __init__(self):
  22. self._lock = RLock()
  23. self._queue = []
  24. self._codes = dict()
  25. self._id_iter = cycle(range(sys.maxsize))
  26. self._played_queue = None
  27. def add(self, code):
  28. with self:
  29. song_id = next(self._id_iter)
  30. self._queue.append(song_id)
  31. self._codes[song_id] = code
  32. logger.debug("Added code %s id %d to chueue %s", code, song_id, self)
  33. return song_id
  34. def remove(self, song_id):
  35. with self:
  36. self._queue.remove(song_id)
  37. self._codes.pop(song_id)
  38. logger.debug("Removed id %d from chueue %s", song_id, self)
  39. def move(self, song_id, displacement):
  40. with self:
  41. i = self._queue.index(song_id)
  42. new_i = min(len(self._queue) - 1, max(0, i + displacement))
  43. self._queue.pop(i)
  44. self._queue.insert(new_i, song_id)
  45. logger.debug("Moved id %d from chueue %s. dx: %d, i0: %d, i1: %d", song_id, self, displacement, i, new_i)
  46. return new_i - i
  47. def pop(self):
  48. with self:
  49. if len(self._queue) <= 0:
  50. if self._repeat_enabled and len(self._played_queue) > 0:
  51. self._queue = self._played_queue
  52. self._played_queue = []
  53. logger.debug("Popped empty list, repeat enabled for chueue %s", self)
  54. else:
  55. logger.debug("Popped empty list, repeat disabled for chueue %s", self)
  56. return None
  57. song_id = self._queue.pop(0)
  58. if self._repeat_enabled:
  59. code = self._codes[song_id]
  60. self._played_queue.append(song_id)
  61. else:
  62. code = self._codes.pop(song_id)
  63. logger.debug("Popped id %d, repeat %s for chueue %s", song_id, self._repeat_enabled, self)
  64. return self.as_song(song_id, code)
  65. def set_repeat_enabled(self, enable, playback_song):
  66. with self:
  67. self._repeat_enabled = enable
  68. if enable:
  69. if playback_song is not None:
  70. self._played_queue = [playback_song["id"]]
  71. self._codes[playback_song["id"]] = playback_song["code"]
  72. else:
  73. self._played_queue = []
  74. logger.debug("Set repeat enabled")
  75. else:
  76. for song_id in self._played_queue:
  77. self._codes.pop(song_id)
  78. self._played_queue = None
  79. logger.debug("Set repeat disabled")
  80. def is_repeat_enabled(self):
  81. return self._repeat_enabled
  82. def as_song(self, song_id, code=None):
  83. if code is None:
  84. code = self._codes[song_id]
  85. return {"id": song_id, "code": code}
  86. def as_lists(self):
  87. with self:
  88. queue_as_list = list(map(self.as_song, self._queue))
  89. played_as_list = list(map(self.as_song, self._played_queue)) if self.is_repeat_enabled() else None
  90. return {"next": queue_as_list, "previous": played_as_list}
  91. def lock(self):
  92. self._lock.acquire()
  93. def unlock(self):
  94. self._lock.release()
  95. def __enter__(self):
  96. self.lock()
  97. def __exit__(self, exc_type, exc_val, exc_tb):
  98. self.unlock()
  99. def __len__(self):
  100. return len(self._queue)
  101. class Playback:
  102. _song: Optional[Dict] = None
  103. _state: PlayerState = PlayerState.LIST_END
  104. lock: RLock()
  105. def __init__(self):
  106. self.lock = RLock()
  107. def set_song(self, song):
  108. with self.lock:
  109. self._song = song
  110. if song is not None:
  111. logger.debug("Playback %s: Set song to %d", self, song["id"])
  112. else:
  113. logger.debug("Playback %s: finished last song", self)
  114. def get_song(self):
  115. with self.lock:
  116. return self._song
  117. def get_song_id(self):
  118. with self.lock:
  119. if self._song is not None:
  120. return self._song["id"]
  121. else:
  122. return None
  123. def get_state(self):
  124. return self._state
  125. def set_state(self, state):
  126. self._state = state
  127. logger.debug("Playback %s: Set state to %s", self, state)
  128. class Room:
  129. chueue: Chueue
  130. channel: Channel
  131. _controller: Optional[Subscriber]
  132. controller_lock: RLock
  133. playback: Playback
  134. def __init__(self):
  135. self.chueue = Chueue()
  136. self.channel = Channel()
  137. self.controller_lock = RLock()
  138. self.playback = Playback()
  139. self._controller = None
  140. def get_controller(self):
  141. return self._controller
  142. def set_controller(self, controller):
  143. logger.debug("Room %s: Set controller to %s", self, controller)
  144. self._controller = controller
  145. rooms: Dict[str, Room] = dict()
  146. async def request_state_processor(ws: ServerConnection, _, path):
  147. room = rooms[path]
  148. state = {
  149. "lists": room.chueue.as_lists(),
  150. "playing": room.playback.get_song(),
  151. "state": room.playback.get_state().value
  152. }
  153. logger.debug("Processor: State request, state: %s", state)
  154. await ws.send(make_message(Message.STATE, state))
  155. async def request_list_operation_processor(_, data, path):
  156. room = rooms[path]
  157. chueue = room.chueue
  158. op = data["op"]
  159. message = None
  160. if op == QueueOp.ADD.value:
  161. kind = data["kind"]
  162. if kind == YoutubeResourceType.VIDEO.value:
  163. code = data["code"]
  164. song_id = chueue.add(code)
  165. logger.debug("Processor: Added song %s / %d to chueue %s", code, song_id, chueue)
  166. message = make_message(Message.LIST_OPERATION,
  167. {"op": QueueOp.ADD.value, "items": [{"code": code, "id": song_id}]})
  168. elif kind == YoutubeResourceType.PLAYLIST.value:
  169. code = data["code"]
  170. playlist_items = await chube_youtube.get_all_playlist_items(code)
  171. response_items = []
  172. with room.chueue:
  173. for item in playlist_items["items"]:
  174. code = item["snippet"]["resourceId"]["videoId"]
  175. song_id = chueue.add(code)
  176. response_items.append({"code": code, "id": song_id, "snippet": item["snippet"]})
  177. logger.debug("Processor: Added playlist %s to chueue %s", code, chueue)
  178. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.value, "items": response_items})
  179. with room.playback.lock:
  180. if room.playback.get_state() == PlayerState.LIST_END:
  181. playing = chueue.pop()
  182. if playing is not None:
  183. room.playback.set_state(PlayerState.PLAYING)
  184. room.playback.set_song(playing)
  185. logger.debug("Processor: LIST_END ended")
  186. elif op == QueueOp.DEL.value:
  187. song_id = data["id"]
  188. chueue.remove(song_id)
  189. logger.debug("Processor: Deleted song %d from chueue %s", song_id, chueue)
  190. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.DEL.value, "items": [{"id": song_id}]})
  191. elif op == QueueOp.MOVE.value:
  192. song_id = data["id"]
  193. displacement = data["displacement"]
  194. actual_displacement = chueue.move(song_id, displacement)
  195. if actual_displacement != 0:
  196. logger.debug("Processor: Moved song %d by dx %d from chueue %s", song_id, actual_displacement, chueue)
  197. message = make_message(Message.LIST_OPERATION,
  198. {"op": QueueOp.MOVE.value,
  199. "items": [{"id": song_id, "displacement": actual_displacement}]})
  200. if message is not None:
  201. await room.channel.send(message)
  202. async def media_action_processor(_, data, path):
  203. room = rooms[path]
  204. action = data["action"]
  205. send_next = False
  206. if action == MediaAction.NEXT.value:
  207. current_id = data["current_id"]
  208. with room.playback.lock, room.chueue:
  209. old_song_id = room.playback.get_song_id()
  210. if old_song_id == current_id:
  211. send_next = True
  212. new_song = play_next_song(room)
  213. if new_song is None:
  214. new_song_id = None
  215. else:
  216. new_song_id = new_song["id"]
  217. if send_next:
  218. await room.channel.send(make_message(
  219. Message.MEDIA_ACTION,
  220. {"action": MediaAction.NEXT.value, "ended_id": old_song_id, "current_id": new_song_id}))
  221. if action == MediaAction.PLAY.value or send_next:
  222. send_play = False
  223. with room.playback.lock:
  224. if room.playback.get_state() == PlayerState.PAUSED:
  225. send_play = True
  226. room.playback.set_state(PlayerState.PLAYING)
  227. if send_play:
  228. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PLAY.value}))
  229. if action == MediaAction.PAUSE.value:
  230. send_pause = False
  231. with room.playback.lock:
  232. if room.playback.get_state() == PlayerState.PLAYING:
  233. send_pause = True
  234. room.playback.set_state(PlayerState.PAUSED)
  235. if send_pause:
  236. await room.channel.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PAUSE.value}))
  237. if action == MediaAction.REPEAT.value:
  238. enable = data["enable"]
  239. if room.chueue.is_repeat_enabled() != enable:
  240. with room.chueue:
  241. room.chueue.set_repeat_enabled(enable, room.playback.get_song())
  242. await room.channel.send(
  243. make_message(Message.MEDIA_ACTION, {"action": MediaAction.REPEAT.value, "enable": enable}))
  244. async def obtain_control_processor(ws, data, path):
  245. room = rooms[path]
  246. await obtain_control(ws, room)
  247. async def release_control_processor(ws: ServerConnection, data, path):
  248. room = rooms[path]
  249. if len(room.channel.subscribers) > 1:
  250. await release_control(ws, False, room)
  251. else:
  252. pass
  253. def play_next_song(room):
  254. new_song = room.chueue.pop()
  255. room.playback.set_song(new_song)
  256. if new_song is None:
  257. room.playback.set_state(PlayerState.LIST_END)
  258. return new_song
  259. async def song_end_processor(ws, data, path):
  260. room = rooms[path]
  261. old_song_id = data["id"]
  262. with room.controller_lock, room.playback.lock:
  263. controller = room.get_controller()
  264. if controller is not None and controller.ws is ws and old_song_id == room.playback.get_song_id():
  265. new_song = play_next_song(room)
  266. if new_song is None:
  267. new_song_id = None
  268. else:
  269. new_song_id = new_song["id"]
  270. await room.channel.send(
  271. make_message(Message.SONG_END, {"ended_id": old_song_id, "current_id": new_song_id}))
  272. async def player_enabled_processor(ws, data, path):
  273. room = rooms[path]
  274. room.channel.subscribers[ws].player_enabled = data["enabled"]
  275. if data["enabled"]:
  276. with room.controller_lock:
  277. if room.get_controller() is None:
  278. await obtain_control(ws, room)
  279. else:
  280. await release_control(ws, False, room)
  281. # TODO change OBTAIN_CONTROL en RELEASE_CONTROL to one message
  282. # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
  283. async def obtain_control(ws: ServerConnection, room: Room):
  284. with room.controller_lock:
  285. controller = room.get_controller()
  286. if controller is None or controller.ws is not ws:
  287. room.set_controller(room.channel.subscribers[ws])
  288. await ws.send(make_message(Message.OBTAIN_CONTROL))
  289. if controller is not None:
  290. await controller.ws.send(make_message(Message.RELEASE_CONTROL))
  291. async def release_control(ws: ServerConnection, ws_disconnected: bool, room: Room):
  292. with room.controller_lock:
  293. controller = room.get_controller()
  294. if controller is not None and controller.ws is ws:
  295. controller = next(room.channel.get_player_enabled_subscribers(), None)
  296. room.set_controller(controller)
  297. if controller is not None:
  298. await controller.ws.send(make_message(Message.OBTAIN_CONTROL))
  299. if not ws_disconnected:
  300. await ws.send(make_message(Message.RELEASE_CONTROL))
  301. async def on_connect(ws, path):
  302. if path not in rooms:
  303. rooms[path] = Room()
  304. room = rooms[path]
  305. room.channel.subscribe(ws)
  306. print("Currently {} user{} {} using room {}".format(
  307. len(room.channel.subscribers),
  308. "s" if len(room.channel.subscribers) != 1 else "",
  309. "are" if len(room.channel.subscribers) != 1 else "is",
  310. path))
  311. async def on_disconnect(ws, path):
  312. room = rooms[path]
  313. room.channel.unsubscribe(ws)
  314. await release_control(ws, True, room)
  315. print("Currently {} user{} {} using room {}".format(
  316. len(room.channel.subscribers),
  317. "s" if len(room.channel.subscribers) != 1 else "",
  318. "are" if len(room.channel.subscribers) != 1 else "is",
  319. path))
  320. def make_resolver():
  321. resolver = Resolver()
  322. resolver.register(Message.STATE, request_state_processor)
  323. resolver.register(Message.LIST_OPERATION, request_list_operation_processor)
  324. resolver.register(Message.MEDIA_ACTION, media_action_processor)
  325. resolver.register(Message.PLAYER_ENABLED, player_enabled_processor)
  326. resolver.register(Message.OBTAIN_CONTROL, obtain_control_processor)
  327. resolver.register(Message.RELEASE_CONTROL, release_control_processor)
  328. resolver.register(Message.SONG_END, song_end_processor)
  329. search_resolver = chube_youtube.make_resolver()
  330. resolver.add_all(search_resolver)
  331. return resolver
  332. if __name__ == "__main__":
  333. # Logging
  334. log_level = os.environ.get('CHUBE_LOGLEVEL', 'INFO').upper()
  335. logger.setLevel(log_level)
  336. ch = logging.StreamHandler()
  337. ch.setLevel(log_level)
  338. formatter = logging.Formatter(
  339. fmt='%(asctime)s %(levelname)-8s %(message)s',
  340. datefmt='%Y-%m-%d %H:%M:%S')
  341. ch.setFormatter(formatter)
  342. logger.addHandler(ch)
  343. player_resolver = make_resolver()
  344. asyncio.run(start_server(player_resolver, on_connect, on_disconnect))