Bastien Sevajol 6 lat temu
rodzic
commit
1e9506320d
2 zmienionych plików z 0 dodań i 177 usunięć
  1. 0 67
      aiopoc.py
  2. 0 110
      poc.py

+ 0 - 67
aiopoc.py Wyświetl plik

@@ -1,67 +0,0 @@
1
-import asyncio
2
-import json
3
-
4
-from sh import tail
5
-from asyncio import sleep
6
-from aiohttp import web
7
-import marshmallow
8
-
9
-from hapic import async as hapic
10
-from hapic.ext.aiohttp.context import AiohttpContext
11
-
12
-
13
-class OutputStreamItemSchema(marshmallow.Schema):
14
-    i = marshmallow.fields.Integer(required=True)
15
-
16
-
17
-# Python 3.6 async generator: http://rickyhan.com/jekyll/update/2018/01/27/python36.html
18
-# Python 3.5 solution: https://stackoverflow.com/questions/37549846/how-to-use-yield-inside-async-function
19
-
20
-
21
-class LinesAsyncGenerator:
22
-    def __init__(self):
23
-        self.iterable = tail("-f", "aiopocdata.txt", _iter=True)
24
-
25
-    async def __aiter__(self):
26
-        return self
27
-
28
-    async def __anext__(self):
29
-        line = next(self.iterable)
30
-
31
-        if 'STOP' in line:
32
-            raise StopAsyncIteration
33
-
34
-        await asyncio.sleep(0.025)
35
-        return json.loads(line)
36
-
37
-
38
-@hapic.with_api_doc()
39
-@hapic.output_stream(item_schema=OutputStreamItemSchema())
40
-def handle(request):
41
-    # response = web.StreamResponse(
42
-    #     status=200,
43
-    #     reason='OK',
44
-    #     headers={
45
-    #         'Content-Type': 'text/plain; charset=utf-8',
46
-    #     },
47
-    # )
48
-    #
49
-    # await response.prepare(request)
50
-    # response.enable_chunked_encoding()
51
-
52
-    # for line in tail("-f", "aiopocdata.txt", _iter=True):
53
-        # await response.write(line.encode('utf-8'))
54
-        # await sleep(0.1)
55
-
56
-    # return response
57
-
58
-    return LinesAsyncGenerator()
59
-
60
-
61
-app = web.Application()
62
-app.add_routes([
63
-    web.get('/', handle)
64
-])
65
-
66
-hapic.set_context(AiohttpContext(app))
67
-web.run_app(app)

+ 0 - 110
poc.py Wyświetl plik

@@ -1,110 +0,0 @@
1
-import asyncio
2
-import aiohttp
3
-import json
4
-from aiohttp import web
5
-import marshmallow
6
-from hapic import async as hapic
7
-from hapic.ext.aiohttp.context import AiohttpContext
8
-
9
-
10
-class UptimeHandlerStreamItem(marshmallow.Schema):
11
-    datetime = marshmallow.fields.String(required=True)
12
-    a_bool = marshmallow.fields.Boolean(required=True)
13
-    a_float = marshmallow.fields.Number(required=True)
14
-    an_int = marshmallow.fields.Integer(required=True)
15
-    text = marshmallow.fields.String(required=True)
16
-    server = marshmallow.fields.String(required=True)
17
-    zone = marshmallow.fields.String(required=True)
18
-
19
-
20
-class LineModel(object):
21
-    def __init__(
22
-        self,
23
-        *column_values
24
-    ):
25
-        self.datetime = column_values[0]
26
-        self.a_bool = column_values[1]
27
-        self.a_float = column_values[2]
28
-        self.an_int = column_values[3]
29
-        self.text = column_values[4]
30
-        self.server = column_values[5]
31
-        self.zone = column_values[6]
32
-
33
-
34
-class AsyncGenerator:
35
-    def __init__(self, session):
36
-        self._session = session
37
-        self._url = 'http://localhost:8086/query?chunk_size=1000&chunked=true'\
38
-                    '&db=resourceAux' \
39
-                    '&q=SELECT+%2A+FROM+resource_aux'
40
-        self._buffer = []
41
-        self._buffer_iter = iter(self._buffer)
42
-
43
-    async def __aiter__(self):
44
-        response = await self._session.get(self._url)
45
-        self._stream_reader = response.content
46
-        return self
47
-
48
-    async def __anext__(self):
49
-        try:
50
-            try:
51
-                # First, send next item
52
-                return next(self._buffer_iter)
53
-            # If no more item in buffer, or not started
54
-            except StopIteration:
55
-                # Read from incoming data
56
-                line = await self._stream_reader.readline()
57
-                # If end of received lines
58
-                if not line:
59
-                    # Break the iteration
60
-                    raise StopAsyncIteration()
61
-
62
-            # load values from received package of incomming data
63
-            data = json.loads(line.decode('utf-8'))
64
-            values = data['results'][0]['series'][0]['values']
65
-
66
-            # Prepare new buffer
67
-            self._buffer = [LineModel(*value) for value in values]
68
-            self._buffer_iter = iter(self._buffer)
69
-
70
-            # Send an item
71
-            return next(self._buffer_iter)
72
-
73
-        except StopAsyncIteration:
74
-            await self._session.close()
75
-            raise
76
-
77
-
78
-@hapic.with_api_doc()
79
-@hapic.output_stream(item_schema=UptimeHandlerStreamItem())
80
-async def uptime_handler(request):
81
-    try:
82
-        # NOTE: This session is currently closed in AsyncGenerator code
83
-        # it should be made otherwise in real code
84
-        session = aiohttp.ClientSession(loop=loop)
85
-        return AsyncGenerator(session)
86
-
87
-    except Exception as e:
88
-        # So you can observe on disconnects and such.
89
-        print(repr(e))
90
-        raise
91
-
92
-
93
-async def build_server(loop, address, port):
94
-    app = web.Application(loop=loop)
95
-    app.router.add_route('GET', "/uptime", uptime_handler)
96
-    hapic.set_context(AiohttpContext(app))
97
-
98
-    return await loop.create_server(app.make_handler(), address, port)
99
-
100
-
101
-if __name__ == '__main__':
102
-    loop = asyncio.get_event_loop()
103
-    loop.run_until_complete(build_server(loop, 'localhost', 9999))
104
-    print("Server ready!")
105
-
106
-    try:
107
-        loop.run_forever()
108
-    except KeyboardInterrupt:
109
-        print("Shutting Down!")
110
-        loop.close()