The PEAK Developers' Center   AsyncWSGISketch UserPreferences
 
HelpContents Search Diffs Info Edit Subscribe XML Print View

    1 """Some sketches of implementing a WSGI 2 async API, in both sync and async servers"""
    2 
    3 def example_app(environ):
    4     yield '200 OK', [('Content-type','text/plain')], [b'hello world!']
    5 
    6 
    7 def example_middleware(app):
    8 
    9     def wrapped(environ):
   10         # middleware can modify environ here
   11         status, headers, body = yield app(environ)
   12 
   13         # and modify or replace s, h, body here
   14         body = process(body)  # ...only if you need to alter it
   15 
   16         yield status, headers, body
   17 
   18     def process(body_iter):
   19         # This function is only needed if the middleware wants to read
   20         # or modify the body in some way...
   21         while True:
   22             chunk = yield body_iter  # fetch the next chunk from the body
   23             if chunk is None:
   24                 break
   25             # process/modify chunk here, then yield it to the server
   26             yield chunk
   27 
   28     return wrapped
   29 
   30 
   31 def WSGI2(app):
   32     """Decorator that synchronously emulates WSGI2 futures under WSGI 1"""
   33     def wsgi1_app(environ, start_response):
   34         def process_response(shb):
   35             s, h, body = shb
   36             write = start_response(s, h)
   37 
   38             def body_trampoline(routine, yielded):
   39                 if type(yielded) is bytes:
   40                      # only accept from outermost middleware
   41                      if len(routine)==1:
   42                          return routine.synchronously(write, yielded)
   43                      else:
   44                          return routine.RETURN(yielded)
   45                 else:
   46                     return base_trampoline(routine, yielded)
   47             if not [body has send/throw methods]:
   48                 body = (item for item in body)
   49             Coroutine(body, body_trampoline)()
   50 
   51         def app_trampoline(routine, yielded):
   52             if type(yielded) is tuple:
   53                 return routine.RETURN(yielded)
   54             else:
   55                 return base_trampoline(routine, yielded)
   56 
   57         def base_trampoline(routine, yielded):
   58             if [yielded is a future of some sort]:
   59                 return routine.synchronously(future.result)
   60             elif [yielded has send/throw methods]:
   61                 return routine.CALL(yielded)
   62             else:
   63                 raise TypeError("Not a future, result, or generator:", yielded)
   64 
   65         # [add an executor to environ here]
   66         Coroutine(app(environ), app_trampoline, process_response)()
   67         return []
   68 
   69     return wsgi1_app
   70 
   71 
   72 def start_request(app, environ):
   73     """Template for asynchronous request handler"""
   74     def process_response(shb):
   75         s, h, body = shb
   76         [do an asynchronous start_response analog here]
   77         def body_trampoline(routine, yielded):
   78             if type(yielded) is bytes:
   79                 # only accept from outermost middleware
   80                 if len(routine)==1:
   81                     [arrange for the bytes to be sent out]
   82                     [arrange to invoke routine() when send is completed]
   83                     return routine.PAUSE
   84                 else:
   85                      return routine.RETURN(yielded)
   86             else:
   87                 return base_trampoline(routine, yielded)
   88 
   89         if not [body has send/throw methods]:
   90             body = (item for item in body)
   91         Coroutine(body, body_trampoline, [optional termination callback])()
   92 
   93     def app_trampoline(routine, yielded):
   94         if type(yielded) is tuple:
   95             return routine.RETURN(yielded)
   96         else:
   97             return base_trampoline(routine, yielded)
   98 
   99     def base_trampoline(routine, yielded):
  100         if [yielded is a future of some sort]:
  101             def done(f):
  102                 routine(*routine.synchronously(f.result))
  103             future.add_done_callback(done)
  104             return routine.PAUSE
  105         elif [yielded has send/throw methods]:
  106             return routine.CALL(yielded)
  107         else:
  108             raise TypeError("Not a future, result, or generator:", yielded)
  109 
  110     # [add an executor to environ here]
  111     Coroutine(app(environ), app_trampoline, process_response)()
  112 
  113 
  114 class Coroutine:
  115     """A thread-like stack of collaborating generators"""
  116 
  117     def __init__(iterator, trampoline=lambda r,v:"return", callback=lambda v:v):
  118         """Create and launch a general-purpose pseudo-thread"""
  119         self.stack = [iterator]
  120         self.trampoline = trampoline
  121         self.callback = callback
  122 
  123     PAUSE = ()
  124 
  125     def CALL(self, geniter):
  126         self.stack.append(geniter)
  127         return None, ()
  128 
  129     def RETURN(self, value=None):
  130         self.stack.pop()
  131         return value, ()
  132 
  133     def RESUME(self, value=None):
  134         return value, ()
  135 
  136     def RAISE(self, exc_info):
  137         return None, exc_info
  138 
  139     def __len__(self):
  140         return len(self.stack)
  141 
  142     def synchronously(self, func, *args, **kw):
  143         try:
  144             return self.RESUME(func(*args, **kw))
  145         except BaseException:
  146             return self.RAISE(sys.exc_info())
  147 
  148     def close():
  149         while self.stack:
  150             self.stack.pop().close()
  151 
  152     def __call__(self, value=None, exc_info=()):
  153         stack = self.stack
  154         while stack:
  155             try:
  156                 it = stack[-1]
  157                 if exc_info:
  158                     try:
  159                         rv = it.throw(*exc_info)
  160                     finally:
  161                         exc_info = ()
  162                 else:
  163                     rv = it.send(value)
  164             except BaseException:
  165                 value = None
  166                 exc_info = sys.exc_info()
  167                 if exc_info[0] is StopIteration:
  168                     # pass return value up the stack
  169                     value, = exc_info[1].args or (None,)
  170                     exc_info = ()   # but not the error
  171                 stack.pop()
  172             else:
  173                 switch = self.trampoline(self, rv)
  174                 if switch:
  175                     value, exc_info = switch
  176                 else:
  177                     return rv
  178 
  179         # Coroutine is entirely finished when the stack is empty
  180         if exc_info:
  181             try:
  182                 raise exc_info[1]
  183             finally:
  184                 exc_info = ()
  185         return self.callback(value)

PythonPowered
EditText of this page (last modified 2011-01-10 09:58:13)
FindPage by browsing, title search , text search or an index
Or try one of these actions: AttachFile, DeletePage, LikePages, LocalSiteMap, SpellCheck