poc.py 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import asyncio
  2. import aiohttp
  3. import json
  4. from aiohttp import web
  5. import marshmallow
  6. from hapic import async as hapic
  7. from hapic.ext.aiohttp.context import AiohttpContext
  8. class UptimeHandlerStreamItem(marshmallow.Schema):
  9. datetime = marshmallow.fields.String(required=True)
  10. a_bool = marshmallow.fields.Boolean(required=True)
  11. a_float = marshmallow.fields.Number(required=True)
  12. an_int = marshmallow.fields.Integer(required=True)
  13. text = marshmallow.fields.String(required=True)
  14. server = marshmallow.fields.String(required=True)
  15. zone = marshmallow.fields.String(required=True)
  16. class LineModel(object):
  17. def __init__(
  18. self,
  19. *column_values
  20. ):
  21. self.datetime = column_values[0]
  22. self.a_bool = column_values[1]
  23. self.a_float = column_values[2]
  24. self.an_int = column_values[3]
  25. self.text = column_values[4]
  26. self.server = column_values[5]
  27. self.zone = column_values[6]
  28. class AsyncGenerator:
  29. def __init__(self, session):
  30. self._session = session
  31. self._url = 'http://localhost:8086/query?chunk_size=1000&chunked=true'\
  32. '&db=resourceAux' \
  33. '&q=SELECT+%2A+FROM+resource_aux'
  34. self._buffer = []
  35. self._buffer_iter = iter(self._buffer)
  36. async def __aiter__(self):
  37. response = await self._session.get(self._url)
  38. self._stream_reader = response.content
  39. return self
  40. async def __anext__(self):
  41. try:
  42. try:
  43. # First, send next item
  44. return next(self._buffer_iter)
  45. # If no more item in buffer, or not started
  46. except StopIteration:
  47. # Read from incoming data
  48. line = await self._stream_reader.readline()
  49. # If end of received lines
  50. if not line:
  51. # Break the iteration
  52. raise StopAsyncIteration()
  53. # load values from received package of incomming data
  54. data = json.loads(line.decode('utf-8'))
  55. values = data['results'][0]['series'][0]['values']
  56. # Prepare new buffer
  57. self._buffer = [LineModel(*value) for value in values]
  58. self._buffer_iter = iter(self._buffer)
  59. # Send an item
  60. return next(self._buffer_iter)
  61. except StopAsyncIteration:
  62. await self._session.close()
  63. raise
  64. @hapic.with_api_doc()
  65. @hapic.output_stream(item_schema=UptimeHandlerStreamItem())
  66. async def uptime_handler(request):
  67. try:
  68. # NOTE: This session is currently closed in AsyncGenerator code
  69. # it should be made otherwise in real code
  70. session = aiohttp.ClientSession(loop=loop)
  71. return AsyncGenerator(session)
  72. except Exception as e:
  73. # So you can observe on disconnects and such.
  74. print(repr(e))
  75. raise
  76. async def build_server(loop, address, port):
  77. app = web.Application(loop=loop)
  78. app.router.add_route('GET', "/uptime", uptime_handler)
  79. hapic.set_context(AiohttpContext(app))
  80. return await loop.create_server(app.make_handler(), address, port)
  81. if __name__ == '__main__':
  82. loop = asyncio.get_event_loop()
  83. loop.run_until_complete(build_server(loop, 'localhost', 9999))
  84. print("Server ready!")
  85. try:
  86. loop.run_forever()
  87. except KeyboardInterrupt:
  88. print("Shutting Down!")
  89. loop.close()