Browse Source

output stream async

Bastien Sevajol 5 years ago
parent
commit
c497377ad3
7 changed files with 193 additions and 23 deletions
  1. 51 15
      aiopoc.py
  2. 1 0
      hapic/__init__.py
  3. 1 0
      hapic/async.py
  4. 48 0
      hapic/decorator.py
  5. 6 0
      hapic/description.py
  6. 56 8
      hapic/ext/aiohttp/context.py
  7. 30 0
      hapic/hapic.py

+ 51 - 15
aiopoc.py View File

@@ -1,25 +1,61 @@
1
+import asyncio
2
+import json
3
+
1 4
 from sh import tail
2 5
 from asyncio import sleep
3 6
 from aiohttp import web
7
+import marshmallow
8
+
9
+from hapic import async as hapic
10
+from hapic.ext.aiohttp.context import AiohttpContext
11
+
12
+
13
+class OutputStreamItemSchema(marshmallow.Schema):
14
+    i = marshmallow.fields.Integer(required=True)
15
+
16
+
17
+# Python 3.6 async generator: http://rickyhan.com/jekyll/update/2018/01/27/python36.html
18
+# Python 3.5 solution: https://stackoverflow.com/questions/37549846/how-to-use-yield-inside-async-function
19
+
4 20
 
21
+class LinesAsyncGenerator:
22
+    def __init__(self):
23
+        self.iterable = tail("-f", "aiopocdata.txt", _iter=True)
5 24
 
6
-async def handle(request):
7
-    response = web.StreamResponse(
8
-        status=200,
9
-        reason='OK',
10
-        headers={
11
-            'Content-Type': 'text/plain; charset=utf-8',
12
-        },
13
-    )
25
+    async def __aiter__(self):
26
+        return self
14 27
 
15
-    await response.prepare(request)
16
-    response.enable_chunked_encoding()
28
+    async def __anext__(self):
29
+        line = next(self.iterable)
17 30
 
18
-    for line in tail("-f", "aiopocdata.txt", _iter=True):
19
-        await response.write(line.encode('utf-8'))
20
-        await sleep(0.1)
31
+        if 'STOP' in line:
32
+            raise StopAsyncIteration
21 33
 
22
-    return response
34
+        await asyncio.sleep(0.025)
35
+        return json.loads(line)
36
+
37
+
38
+@hapic.with_api_doc()
39
+@hapic.output_stream(item_schema=OutputStreamItemSchema())
40
+def handle(request):
41
+    # response = web.StreamResponse(
42
+    #     status=200,
43
+    #     reason='OK',
44
+    #     headers={
45
+    #         'Content-Type': 'text/plain; charset=utf-8',
46
+    #     },
47
+    # )
48
+    #
49
+    # await response.prepare(request)
50
+    # response.enable_chunked_encoding()
51
+
52
+    # for line in tail("-f", "aiopocdata.txt", _iter=True):
53
+        # await response.write(line.encode('utf-8'))
54
+        # await sleep(0.1)
55
+
56
+    # return response
57
+
58
+    return LinesAsyncGenerator()
23 59
 
24 60
 
25 61
 app = web.Application()
@@ -27,5 +63,5 @@ app.add_routes([
27 63
     web.get('/', handle)
28 64
 ])
29 65
 
30
-
66
+hapic.set_context(AiohttpContext(app))
31 67
 web.run_app(app)

+ 1 - 0
hapic/__init__.py View File

@@ -20,3 +20,4 @@ set_context = _hapic_default.set_context
20 20
 reset_context = _hapic_default.reset_context
21 21
 add_documentation_view = _hapic_default.add_documentation_view
22 22
 handle_exception = _hapic_default.handle_exception
23
+output_stream = _hapic_default.output_stream

+ 1 - 0
hapic/async.py View File

@@ -18,3 +18,4 @@ set_context = _hapic_default.set_context
18 18
 reset_context = _hapic_default.reset_context
19 19
 add_documentation_view = _hapic_default.add_documentation_view
20 20
 handle_exception = _hapic_default.handle_exception
21
+output_stream = _hapic_default.output_stream

+ 48 - 0
hapic/decorator.py View File

@@ -313,6 +313,10 @@ class OutputBodyControllerWrapper(OutputControllerWrapper):
313 313
     pass
314 314
 
315 315
 
316
+# TODO BS 2018-07-23: This class is an async version of
317
+# OutputBodyControllerWrapper (ControllerWrapper.get_wrapper rewrite)
318
+# to permit async compatibility.
319
+# Please re-think about code refact. TAG: REFACT_ASYNC
316 320
 class AsyncOutputBodyControllerWrapper(OutputControllerWrapper):
317 321
     def get_wrapper(
318 322
         self,
@@ -332,6 +336,50 @@ class AsyncOutputBodyControllerWrapper(OutputControllerWrapper):
332 336
         return functools.update_wrapper(wrapper, func)
333 337
 
334 338
 
339
+class AsyncOutputStreamControllerWrapper(OutputControllerWrapper):
340
+    def get_wrapper(
341
+        self,
342
+        func: 'typing.Callable[..., typing.Any]',
343
+    ) -> 'typing.Callable[..., typing.Any]':
344
+        # async def wrapper(*args, **kwargs) -> typing.Any:
345
+        async def wrapper(*args, **kwargs) -> typing.Any:
346
+            # Note: Design of before_wrapped_func can be to update kwargs
347
+            # by reference here
348
+            replacement_response = self.before_wrapped_func(args, kwargs)
349
+            if replacement_response:
350
+                return replacement_response
351
+
352
+            stream_response = await self.context.get_stream_response_object(
353
+                args,
354
+                kwargs,
355
+            )
356
+            async for stream_item in self._execute_wrapped_function(
357
+                func,
358
+                args,
359
+                kwargs,
360
+            ):
361
+                serialized_item = self._get_serialized_item(stream_item)
362
+                await self.context.feed_stream_response(
363
+                    stream_response,
364
+                    serialized_item,
365
+                )
366
+
367
+            return stream_response
368
+
369
+        return functools.update_wrapper(wrapper, func)
370
+
371
+    def _get_serialized_item(
372
+        self,
373
+        item_object: typing.Any,
374
+    ) -> dict:
375
+        try:
376
+            return self.processor.process(item_object)
377
+        except ProcessException:
378
+            # TODO BS 2018-07-25: Must interrupt stream response: but how
379
+            # inform about error ?
380
+            raise NotImplementedError('todo')
381
+
382
+
335 383
 class OutputHeadersControllerWrapper(OutputControllerWrapper):
336 384
     pass
337 385
 

+ 6 - 0
hapic/description.py View File

@@ -38,6 +38,10 @@ class OutputBodyDescription(Description):
38 38
     pass
39 39
 
40 40
 
41
+class OutputStreamDescription(Description):
42
+    pass
43
+
44
+
41 45
 class OutputFileDescription(Description):
42 46
     pass
43 47
 
@@ -60,6 +64,7 @@ class ControllerDescription(object):
60 64
         input_forms: InputFormsDescription=None,
61 65
         input_files: InputFilesDescription=None,
62 66
         output_body: OutputBodyDescription=None,
67
+        output_stream: OutputStreamDescription=None,
63 68
         output_file: OutputFileDescription=None,
64 69
         output_headers: OutputHeadersDescription=None,
65 70
         errors: typing.List[ErrorDescription]=None,
@@ -72,6 +77,7 @@ class ControllerDescription(object):
72 77
         self.input_forms = input_forms
73 78
         self.input_files = input_files
74 79
         self.output_body = output_body
80
+        self.output_stream = output_stream
75 81
         self.output_file = output_file
76 82
         self.output_headers = output_headers
77 83
         self.errors = errors or []

+ 56 - 8
hapic/ext/aiohttp/context.py View File

@@ -1,5 +1,6 @@
1 1
 # coding: utf-8
2 2
 import asyncio
3
+import json
3 4
 import typing
4 5
 from http import HTTPStatus
5 6
 from json import JSONDecodeError
@@ -65,8 +66,10 @@ class AiohttpContext(BaseContext):
65 66
     def __init__(
66 67
         self,
67 68
         app: web.Application,
69
+        debug: bool = False,
68 70
     ) -> None:
69 71
         self._app = app
72
+        self._debug = debug
70 73
 
71 74
     @property
72 75
     def app(self) -> web.Application:
@@ -110,19 +113,22 @@ class AiohttpContext(BaseContext):
110 113
         self,
111 114
         decorated_controller: DecoratedController,
112 115
     ) -> RouteRepresentation:
113
-        pass
116
+        # TODO BS 2018-07-15: to do
117
+        raise NotImplementedError('todo')
114 118
 
115 119
     def get_swagger_path(
116 120
         self,
117 121
         contextualised_rule: str,
118 122
     ) -> str:
119
-        pass
123
+        # TODO BS 2018-07-15: to do
124
+        raise NotImplementedError('todo')
120 125
 
121 126
     def by_pass_output_wrapping(
122 127
         self,
123 128
         response: typing.Any,
124 129
     ) -> bool:
125
-        pass
130
+        # TODO BS 2018-07-15: to do
131
+        raise NotImplementedError('todo')
126 132
 
127 133
     def add_view(
128 134
         self,
@@ -130,30 +136,72 @@ class AiohttpContext(BaseContext):
130 136
         http_method: str,
131 137
         view_func: typing.Callable[..., typing.Any],
132 138
     ) -> None:
133
-        pass
139
+        # TODO BS 2018-07-15: to do
140
+        raise NotImplementedError('todo')
134 141
 
135 142
     def serve_directory(
136 143
         self,
137 144
         route_prefix: str,
138 145
         directory_path: str,
139 146
     ) -> None:
140
-        pass
147
+        # TODO BS 2018-07-15: to do
148
+        raise NotImplementedError('todo')
141 149
 
142 150
     def is_debug(
143 151
         self,
144 152
     ) -> bool:
145
-        pass
153
+        return self._debug
146 154
 
147 155
     def handle_exception(
148 156
         self,
149 157
         exception_class: typing.Type[Exception],
150 158
         http_code: int,
151 159
     ) -> None:
152
-        pass
160
+        # TODO BS 2018-07-15: to do
161
+        raise NotImplementedError('todo')
153 162
 
154 163
     def handle_exceptions(
155 164
         self,
156 165
         exception_classes: typing.List[typing.Type[Exception]],
157 166
         http_code: int,
158 167
     ) -> None:
159
-        pass
168
+        # TODO BS 2018-07-15: to do
169
+        raise NotImplementedError('todo')
170
+
171
+    async def get_stream_response_object(
172
+        self,
173
+        func_args,
174
+        func_kwargs,
175
+        http_code: HTTPStatus = HTTPStatus.OK,
176
+        headers: dict = None,
177
+    ) -> web.StreamResponse:
178
+        headers = headers or {
179
+            'Content-Type': 'text/plain; charset=utf-8',
180
+        }
181
+
182
+        response = web.StreamResponse(
183
+            status=http_code,
184
+            headers=headers,
185
+        )
186
+
187
+        try:
188
+            request = func_args[0]
189
+        except IndexError:
190
+            raise WorkflowException(
191
+                'Unable to get aiohttp request object',
192
+            )
193
+        request = typing.cast(Request, request)
194
+
195
+        await response.prepare(request)
196
+
197
+        return response
198
+
199
+    async def feed_stream_response(
200
+        self,
201
+        stream_response: web.StreamResponse,
202
+        serialized_item: dict,
203
+    ) -> None:
204
+        await stream_response.write(
205
+            # FIXME BS 2018-07-25: need \n :/
206
+            json.dumps(serialized_item).encode('utf-8') + b'\n',
207
+        )

+ 30 - 0
hapic/hapic.py View File

@@ -22,6 +22,7 @@ from hapic.decorator import InputQueryControllerWrapper
22 22
 from hapic.decorator import InputFilesControllerWrapper
23 23
 from hapic.decorator import OutputBodyControllerWrapper
24 24
 from hapic.decorator import AsyncOutputBodyControllerWrapper
25
+from hapic.decorator import AsyncOutputStreamControllerWrapper
25 26
 from hapic.decorator import OutputHeadersControllerWrapper
26 27
 from hapic.decorator import OutputFileControllerWrapper
27 28
 from hapic.description import InputBodyDescription
@@ -32,6 +33,7 @@ from hapic.description import InputPathDescription
32 33
 from hapic.description import InputQueryDescription
33 34
 from hapic.description import InputFilesDescription
34 35
 from hapic.description import OutputBodyDescription
36
+from hapic.description import OutputStreamDescription
35 37
 from hapic.description import OutputHeadersDescription
36 38
 from hapic.description import OutputFileDescription
37 39
 from hapic.doc import DocGenerator
@@ -170,6 +172,34 @@ class Hapic(object):
170 172
             return decoration.get_wrapper(func)
171 173
         return decorator
172 174
 
175
+    def output_stream(
176
+        self,
177
+        item_schema: typing.Any,
178
+        processor: ProcessorInterface = None,
179
+        context: ContextInterface = None,
180
+        error_http_code: HTTPStatus = HTTPStatus.INTERNAL_SERVER_ERROR,
181
+        default_http_code: HTTPStatus = HTTPStatus.OK,
182
+    ) -> typing.Callable[[typing.Callable[..., typing.Any]], typing.Any]:
183
+        processor = processor or MarshmallowOutputProcessor()
184
+        processor.schema = item_schema
185
+        context = context or self._context_getter
186
+
187
+        if self._async:
188
+            decoration = AsyncOutputStreamControllerWrapper(
189
+                context=context,
190
+                processor=processor,
191
+                error_http_code=error_http_code,
192
+                default_http_code=default_http_code,
193
+            )
194
+        else:
195
+            # TODO BS 2018-07-25: To do
196
+            raise NotImplementedError('todo')
197
+
198
+        def decorator(func):
199
+            self._buffer.output_stream = OutputStreamDescription(decoration)
200
+            return decoration.get_wrapper(func)
201
+        return decorator
202
+
173 203
     def output_headers(
174 204
         self,
175 205
         schema: typing.Any,