chube_search.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334
  1. import asyncio
  2. import os
  3. import aiohttp
  4. from chube_enums import Message
  5. from chube_ws import Resolver, make_message
  6. API_KEY = os.environ.get('GOOGLE_SEARCH_API_KEY')
  7. URL = "https://www.googleapis.com/youtube/v3/search"
  8. base_query = [('part', 'snippet'), ('type', 'video'), ('videoEmbeddable', 'true'), ('safeSearch', 'none'), ('key', API_KEY)]
  9. async def search_processor(ws, data):
  10. async with aiohttp.ClientSession() as session:
  11. async with session.get(URL, params=base_query + [('q', data['q'])]) as response:
  12. json_data = await response.json()
  13. await ws.send(make_message(Message.SEARCH, json_data))
  14. def make_resolver():
  15. resolver = Resolver()
  16. resolver.register(Message.SEARCH, search_processor)
  17. return resolver
  18. if __name__ == "__main__":
  19. async def foo():
  20. async with aiohttp.ClientSession() as session:
  21. async with session.get(URL, params=base_query + [('q', "She")]) as response:
  22. json_data = await response.json()
  23. print(json_data)
  24. loop = asyncio.get_event_loop()
  25. loop.run_until_complete(foo())