chube.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from threading import RLock
  2. import sys
  3. from itertools import cycle
  4. import chube_search
  5. from channel import Channel
  6. from chube_enums import *
  7. from chube_ws import Resolver, Message, start_server, make_message
  8. class Chueue:
  9. _lock = RLock()
  10. _queue = []
  11. _codes = dict()
  12. _id_iter = cycle(range(sys.maxsize))
  13. def add(self, code):
  14. with self:
  15. song_id = next(self._id_iter)
  16. self._queue.append(song_id)
  17. self._codes[song_id] = code
  18. return song_id
  19. def remove(self, song_id):
  20. with self:
  21. self._queue.remove(song_id)
  22. self._codes.pop(song_id)
  23. def move(self, song_id, displacement):
  24. with self:
  25. i = self._queue.index(song_id)
  26. new_i = min(len(self._queue) - 1, max(0, i + displacement))
  27. self._queue.pop(i)
  28. self._queue.insert(new_i, song_id)
  29. return new_i - i
  30. def pop(self):
  31. with self:
  32. if len(self._queue) > 0:
  33. song_id = self._queue.pop()
  34. code = self._codes.pop(song_id)
  35. return self.as_song(song_id, code)
  36. else:
  37. return None
  38. def as_song(self, song_id, code=None):
  39. if code is None:
  40. code = self._codes[song_id]
  41. return {"id": song_id, "code": code}
  42. def as_list(self):
  43. with self:
  44. res = list(map(self.as_song, self._queue))
  45. return res
  46. def lock(self):
  47. self._lock.acquire()
  48. def unlock(self):
  49. self._lock.release()
  50. def __enter__(self):
  51. self.lock()
  52. def __exit__(self, exc_type, exc_val, exc_tb):
  53. self.unlock()
  54. def __len__(self):
  55. return len(self._queue)
  56. class Playback:
  57. _song = None
  58. _state: PlayerState = PlayerState.LIST_END
  59. lock = RLock()
  60. def set_song(self, song):
  61. with self.lock:
  62. self._song = song
  63. def get_song(self):
  64. with self.lock:
  65. return self._song
  66. def get_song_id(self):
  67. with self.lock:
  68. if self._song is not None:
  69. return self._song["id"]
  70. else:
  71. return None
  72. def get_state(self):
  73. return self._state
  74. def set_state(self, state):
  75. self._state = state
  76. class Room:
  77. chueue = Chueue()
  78. channel = Channel()
  79. controller = None
  80. controller_lock = RLock()
  81. playback = Playback()
  82. rooms = {}
  83. async def request_state_processor(ws, data):
  84. room = rooms["main"]
  85. await ws.send(make_message(Message.STATE, {
  86. "list": room.chueue.as_list(),
  87. "playing": room.playback.get_song(),
  88. "state": room.playback.get_state().name
  89. }))
  90. async def request_list_operation_processor(ws, data):
  91. room = rooms["main"]
  92. chueue = room.chueue
  93. op = data["op"]
  94. message = None
  95. if op == QueueOp.ADD.name:
  96. code = data["code"]
  97. song_id = chueue.add(code)
  98. with room.playback.lock:
  99. if room.playback.get_state() == PlayerState.LIST_END:
  100. room.playback.set_state(PlayerState.PLAYING)
  101. room.playback.set_song(chueue.pop())
  102. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.ADD.name, "code": code, "id": song_id})
  103. elif op == QueueOp.DEL.name:
  104. song_id = data["id"]
  105. chueue.remove(song_id)
  106. message = make_message(Message.LIST_OPERATION, {"op": QueueOp.DEL.name, "id": song_id})
  107. elif op == QueueOp.MOVE.name:
  108. song_id = data["id"]
  109. displacement = data["displacement"]
  110. actual_displacement = chueue.move(song_id, displacement)
  111. if actual_displacement != 0:
  112. message = make_message(Message.LIST_OPERATION,
  113. {"op": QueueOp.MOVE.name, "id": song_id, "displacement": actual_displacement})
  114. if message is not None:
  115. await room.channel.send(message)
  116. async def obtain_control_processor(ws, data):
  117. room = rooms["main"]
  118. await obtain_control(ws, room)
  119. async def release_control_processor(ws, data):
  120. room = rooms["main"]
  121. if len(room.channel.subscribers) > 1:
  122. await release_control(ws, room)
  123. else:
  124. pass
  125. # TODO error here
  126. async def song_end_processor(ws, data):
  127. room = rooms["main"]
  128. old_song_id = data["id"]
  129. with room.controller_lock, room.playback.lock:
  130. if ws is room.controller and old_song_id == room.playback.get_song_id():
  131. new_song = room.chueue.pop()
  132. room.playback.set_song(new_song)
  133. if new_song is None:
  134. room.playback.set_state(PlayerState.LIST_END)
  135. new_song_id = None
  136. else:
  137. new_song_id = new_song["id"]
  138. await room.channel.send(make_message(Message.SONG_END, {"ended_id": old_song_id, "current_id": new_song_id}))
  139. async def playback_processor(ws, data):
  140. room = rooms["main"]
  141. # TODO There is some potential concurrent bug here, when the controller loses/releases control right before a song end.
  142. async def obtain_control(ws, room):
  143. with room.controller_lock:
  144. if room.controller is not ws:
  145. prev_controller = room.controller
  146. room.controller = ws
  147. await ws.send(make_message(Message.OBTAIN_CONTROL))
  148. if prev_controller is not None:
  149. await prev_controller.send(make_message(Message.RELEASE_CONTROL))
  150. async def release_control(ws, room):
  151. with room.controller_lock:
  152. if room.controller is ws:
  153. subs = room.channel.subscribers
  154. if len(subs) > 0:
  155. room.controller = subs[0]
  156. await room.controller.send(make_message(Message.OBTAIN_CONTROL))
  157. else:
  158. room.controller = None
  159. await ws.send(make_message(Message.RELEASE_CONTROL))
  160. async def on_connect(ws, path):
  161. room = rooms["main"]
  162. room.channel.subscribe(ws)
  163. with room.controller_lock:
  164. if room.controller is None:
  165. await obtain_control(ws, room)
  166. # with room.playback.lock:
  167. # if room.playback.get_state() == PlayerState.WAITING_FOR_CLIENTS:
  168. # room.playback.set_state(PlayerState.PLAYING)
  169. # # TODO maybe send a play message on channel.
  170. # # await ws.send(make_message(Message.MEDIA_ACTION, {"action": MediaAction.PLAY}))
  171. async def on_disconnect(ws, path):
  172. room = rooms["main"]
  173. room.channel.unsubscribe(ws)
  174. await release_control(ws, room)
  175. # with room.controller_lock:
  176. # if room.controller is None:
  177. # room.playback.
  178. def make_resolver():
  179. resolver = Resolver()
  180. resolver.register(Message.STATE, request_state_processor)
  181. resolver.register(Message.LIST_OPERATION, request_list_operation_processor)
  182. resolver.register(Message.OBTAIN_CONTROL, obtain_control_processor)
  183. resolver.register(Message.RELEASE_CONTROL, release_control_processor)
  184. resolver.register(Message.SONG_END, song_end_processor)
  185. search_resolver = chube_search.make_resolver()
  186. resolver.add_all(search_resolver)
  187. return resolver
  188. def init_rooms():
  189. rooms["main"] = Room()
  190. if __name__ == "__main__":
  191. player_resolver = make_resolver()
  192. init_rooms()
  193. start_server(player_resolver, on_connect, on_disconnect)