1 """Some sketches of implementing a WSGI 2 async API, in both sync and async servers""" 2 3 def example_app(environ): 4 yield '200 OK', [('Content-type','text/plain')], [b'hello world!'] 5 6 7 def example_middleware(app): 8 9 def wrapped(environ): 10 # middleware can modify environ here 11 status, headers, body = yield app(environ) 12 13 # and modify or replace s, h, body here 14 body = process(body) # ...only if you need to alter it 15 16 yield status, headers, body 17 18 def process(body_iter): 19 # This function is only needed if the middleware wants to read 20 # or modify the body in some way... 21 while True: 22 chunk = yield body_iter # fetch the next chunk from the body 23 if chunk is None: 24 break 25 # process/modify chunk here, then yield it to the server 26 yield chunk 27 28 return wrapped 29 30 31 def WSGI2(app): 32 """Decorator that synchronously emulates WSGI2 futures under WSGI 1""" 33 def wsgi1_app(environ, start_response): 34 def process_response(shb): 35 s, h, body = shb 36 write = start_response(s, h) 37 38 def body_trampoline(routine, yielded): 39 if type(yielded) is bytes: 40 # only accept from outermost middleware 41 if len(routine)==1: 42 return routine.synchronously(write, yielded) 43 else: 44 return routine.RETURN(yielded) 45 else: 46 return base_trampoline(routine, yielded) 47 if not [body has send/throw methods]: 48 body = (item for item in body) 49 Coroutine(body, body_trampoline)() 50 51 def app_trampoline(routine, yielded): 52 if type(yielded) is tuple: 53 return routine.RETURN(yielded) 54 else: 55 return base_trampoline(routine, yielded) 56 57 def base_trampoline(routine, yielded): 58 if [yielded is a future of some sort]: 59 return routine.synchronously(future.result) 60 elif [yielded has send/throw methods]: 61 return routine.CALL(yielded) 62 else: 63 raise TypeError("Not a future, result, or generator:", yielded) 64 65 # [add an executor to environ here] 66 Coroutine(app(environ), app_trampoline, process_response)() 67 return [] 68 69 return wsgi1_app 70 71 72 def start_request(app, environ): 73 """Template for asynchronous request handler""" 74 def process_response(shb): 75 s, h, body = shb 76 [do an asynchronous start_response analog here] 77 def body_trampoline(routine, yielded): 78 if type(yielded) is bytes: 79 # only accept from outermost middleware 80 if len(routine)==1: 81 [arrange for the bytes to be sent out] 82 [arrange to invoke routine() when send is completed] 83 return routine.PAUSE 84 else: 85 return routine.RETURN(yielded) 86 else: 87 return base_trampoline(routine, yielded) 88 89 if not [body has send/throw methods]: 90 body = (item for item in body) 91 Coroutine(body, body_trampoline, [optional termination callback])() 92 93 def app_trampoline(routine, yielded): 94 if type(yielded) is tuple: 95 return routine.RETURN(yielded) 96 else: 97 return base_trampoline(routine, yielded) 98 99 def base_trampoline(routine, yielded): 100 if [yielded is a future of some sort]: 101 def done(f): 102 routine(*routine.synchronously(f.result)) 103 future.add_done_callback(done) 104 return routine.PAUSE 105 elif [yielded has send/throw methods]: 106 return routine.CALL(yielded) 107 else: 108 raise TypeError("Not a future, result, or generator:", yielded) 109 110 # [add an executor to environ here] 111 Coroutine(app(environ), app_trampoline, process_response)() 112 113 114 class Coroutine: 115 """A thread-like stack of collaborating generators""" 116 117 def __init__(iterator, trampoline=lambda r,v:"return", callback=lambda v:v): 118 """Create and launch a general-purpose pseudo-thread""" 119 self.stack = [iterator] 120 self.trampoline = trampoline 121 self.callback = callback 122 123 PAUSE = () 124 125 def CALL(self, geniter): 126 self.stack.append(geniter) 127 return None, () 128 129 def RETURN(self, value=None): 130 self.stack.pop() 131 return value, () 132 133 def RESUME(self, value=None): 134 return value, () 135 136 def RAISE(self, exc_info): 137 return None, exc_info 138 139 def __len__(self): 140 return len(self.stack) 141 142 def synchronously(self, func, *args, **kw): 143 try: 144 return self.RESUME(func(*args, **kw)) 145 except BaseException: 146 return self.RAISE(sys.exc_info()) 147 148 def close(): 149 while self.stack: 150 self.stack.pop().close() 151 152 def __call__(self, value=None, exc_info=()): 153 stack = self.stack 154 while stack: 155 try: 156 it = stack[-1] 157 if exc_info: 158 try: 159 rv = it.throw(*exc_info) 160 finally: 161 exc_info = () 162 else: 163 rv = it.send(value) 164 except BaseException: 165 value = None 166 exc_info = sys.exc_info() 167 if exc_info[0] is StopIteration: 168 # pass return value up the stack 169 value, = exc_info[1].args or (None,) 170 exc_info = () # but not the error 171 stack.pop() 172 else: 173 switch = self.trampoline(self, rv) 174 if switch: 175 value, exc_info = switch 176 else: 177 return rv 178 179 # Coroutine is entirely finished when the stack is empty 180 if exc_info: 181 try: 182 raise exc_info[1] 183 finally: 184 exc_info = () 185 return self.callback(value)]]>]]>