1 """Some sketches of implementing a WSGI 2 async API, in both sync and async servers"""
2
3 def example_app(environ):
4 yield '200 OK', [('Content-type','text/plain')], [b'hello world!']
5
6
7 def example_middleware(app):
8
9 def wrapped(environ):
10 # middleware can modify environ here
11 status, headers, body = yield app(environ)
12
13 # and modify or replace s, h, body here
14 body = process(body) # ...only if you need to alter it
15
16 yield status, headers, body
17
18 def process(body_iter):
19 # This function is only needed if the middleware wants to read
20 # or modify the body in some way...
21 while True:
22 chunk = yield body_iter # fetch the next chunk from the body
23 if chunk is None:
24 break
25 # process/modify chunk here, then yield it to the server
26 yield chunk
27
28 return wrapped
29
30
31 def WSGI2(app):
32 """Decorator that synchronously emulates WSGI2 futures under WSGI 1"""
33 def wsgi1_app(environ, start_response):
34 def process_response(shb):
35 s, h, body = shb
36 write = start_response(s, h)
37
38 def body_trampoline(routine, yielded):
39 if type(yielded) is bytes:
40 # only accept from outermost middleware
41 if len(routine)==1:
42 return routine.synchronously(write, yielded)
43 else:
44 return routine.RETURN(yielded)
45 else:
46 return base_trampoline(routine, yielded)
47 if not [body has send/throw methods]:
48 body = (item for item in body)
49 Coroutine(body, body_trampoline)()
50
51 def app_trampoline(routine, yielded):
52 if type(yielded) is tuple:
53 return routine.RETURN(yielded)
54 else:
55 return base_trampoline(routine, yielded)
56
57 def base_trampoline(routine, yielded):
58 if [yielded is a future of some sort]:
59 return routine.synchronously(future.result)
60 elif [yielded has send/throw methods]:
61 return routine.CALL(yielded)
62 else:
63 raise TypeError("Not a future, result, or generator:", yielded)
64
65 # [add an executor to environ here]
66 Coroutine(app(environ), app_trampoline, process_response)()
67 return []
68
69 return wsgi1_app
70
71
72 def start_request(app, environ):
73 """Template for asynchronous request handler"""
74 def process_response(shb):
75 s, h, body = shb
76 [do an asynchronous start_response analog here]
77 def body_trampoline(routine, yielded):
78 if type(yielded) is bytes:
79 # only accept from outermost middleware
80 if len(routine)==1:
81 [arrange for the bytes to be sent out]
82 [arrange to invoke routine() when send is completed]
83 return routine.PAUSE
84 else:
85 return routine.RETURN(yielded)
86 else:
87 return base_trampoline(routine, yielded)
88
89 if not [body has send/throw methods]:
90 body = (item for item in body)
91 Coroutine(body, body_trampoline, [optional termination callback])()
92
93 def app_trampoline(routine, yielded):
94 if type(yielded) is tuple:
95 return routine.RETURN(yielded)
96 else:
97 return base_trampoline(routine, yielded)
98
99 def base_trampoline(routine, yielded):
100 if [yielded is a future of some sort]:
101 def done(f):
102 routine(*routine.synchronously(f.result))
103 future.add_done_callback(done)
104 return routine.PAUSE
105 elif [yielded has send/throw methods]:
106 return routine.CALL(yielded)
107 else:
108 raise TypeError("Not a future, result, or generator:", yielded)
109
110 # [add an executor to environ here]
111 Coroutine(app(environ), app_trampoline, process_response)()
112
113
114 class Coroutine:
115 """A thread-like stack of collaborating generators"""
116
117 def __init__(iterator, trampoline=lambda r,v:"return", callback=lambda v:v):
118 """Create and launch a general-purpose pseudo-thread"""
119 self.stack = [iterator]
120 self.trampoline = trampoline
121 self.callback = callback
122
123 PAUSE = ()
124
125 def CALL(self, geniter):
126 self.stack.append(geniter)
127 return None, ()
128
129 def RETURN(self, value=None):
130 self.stack.pop()
131 return value, ()
132
133 def RESUME(self, value=None):
134 return value, ()
135
136 def RAISE(self, exc_info):
137 return None, exc_info
138
139 def __len__(self):
140 return len(self.stack)
141
142 def synchronously(self, func, *args, **kw):
143 try:
144 return self.RESUME(func(*args, **kw))
145 except BaseException:
146 return self.RAISE(sys.exc_info())
147
148 def close():
149 while self.stack:
150 self.stack.pop().close()
151
152 def __call__(self, value=None, exc_info=()):
153 stack = self.stack
154 while stack:
155 try:
156 it = stack[-1]
157 if exc_info:
158 try:
159 rv = it.throw(*exc_info)
160 finally:
161 exc_info = ()
162 else:
163 rv = it.send(value)
164 except BaseException:
165 value = None
166 exc_info = sys.exc_info()
167 if exc_info[0] is StopIteration:
168 # pass return value up the stack
169 value, = exc_info[1].args or (None,)
170 exc_info = () # but not the error
171 stack.pop()
172 else:
173 switch = self.trampoline(self, rv)
174 if switch:
175 value, exc_info = switch
176 else:
177 return rv
178
179 # Coroutine is entirely finished when the stack is empty
180 if exc_info:
181 try:
182 raise exc_info[1]
183 finally:
184 exc_info = ()
185 return self.callback(value)