Sunday, September 7, 2014

Wrapping my head around python asyncio

Trying to understand python's asyncio is a challenge.  First, I personally don't know what is more difficult, multi-threaded programming, or event driven programming.  Multithreaded programming has the difficulty of properly finding and eliminating race conditions and dead|live locks.  Event driven programming has the difficulty of non-intuitive flow of control and many layers of abstraction and indirection.  So where do we even start?  If you just start reading the official documentation on asyncio, you probably won't get too far.  Reading PEP 3156 won't get you much farther, though I do recommend studying both.

My main motivation for learning asyncio is probably a little unusual.  I wanted to write something like pexpect, without using pexpect.  In a nutshell, I wanted to interact with a child subprocess perhaps more than once.  Python's subprocess module doesn't let you do this exactly, even though you may think the Popen.communicate() seems to.  The problem is that it is a "one-shot" communication.  You feed it one string and then you are done.  But what if you need to answer multiple prompts from your child process?

So where can we start?  I'm learning this too, so as I go, I'll introduce more examples.  So let's make a small example of calling a subprocess using asyncio.  I won't explain it in detail in this blog.

I will however explain briefly what coroutines are.  In a nutshell, a coroutine is a way to factor out code that uses yield.  The reason is that yield is "contagious".  The very presence of yield in a function turns that function into a generator.  So what would you do if you realize that some code you have that uses yield could be factored out into its own code?  A coroutine can be spotted by its use of the new "yield from" keyword.


subproc_shell.py
1    """ 
2    Took the example from the tulip project and modified it to make it more like pexpect 
3    """ 
4     
5    import asyncio 
6    import os 
7    from asyncio.subprocess import PIPE, STDOUT 
8    import re 
9     
10    
11   @asyncio.coroutine 
12   def send_input(writer, input, que, regex): 
13       """ 
14       The coroutine that will send its input to the input stream (usually stdin) 
15    
16       :param writer: The stream where input will go (usually stdin) 
17       :param input: A sequence of bytes (not strings) 
18       :param que: an asyncio.Queue used to check if we have what we need 
19       :param regex: a re.compile() object used to see if an item from the que matches 
20       :return: None 
21       """ 
22       input.reverse()  # We have to reverse because we pop() from the end 
23       try: 
24           while input: 
25               item = yield from que.get() 
26               #print("Pulled from queue:", repr(item)) 
27               if item is None: 
28                   break 
29               m = regex.match(item.decode()) 
30               if m: 
31                   line = input.pop() 
32                   #print('sending {} bytes'.format(len(line))) 
33                   writer.write(line) 
34                   d = writer.drain() 
35                   if d: 
36                       # writer.drain() returns a generator 
37                       yield from d 
38                       #print('resume writing') 
39                   writer.close() 
40       except asyncio.QueueEmpty: 
41           pass 
42       except BrokenPipeError: 
43           print('stdin: broken pipe error') 
44       except ConnectionResetError: 
45           print('stdin: connection reset error') 
46       except Exception as ex: 
47           print(ex) 
48    
49   @asyncio.coroutine 
50   def log_errors(reader): 
51       while True: 
52           line = yield from reader.read(512) 
53           if not line: 
54               break 
55           print('ERROR', repr(line)) 
56    
57    
58   @asyncio.coroutine 
59   def read_stdout(stdout, que): 
60       """ 
61       The coroutine that reads non-blocking from a reader stream
62       :param stdout: the stream we will read from
63       :param que: an asyncio.Queue object we put lines into
64       
65       """ 
66       while True: 
67           line = yield from stdout.read(512)  # use this instead of readline() so we dont pause on a newline 
68           print('Received from child:', repr(line)) 
69           que.put_nowait(line)  # put the line into the que, so it can be read by send_input() 
70           if not line: 
71               que.put_nowait(None)  # A sentinel so that when send_input() pulls this from the que, it will stop 
72               break 
73    
74    
75   @asyncio.coroutine 
76   def start(cmd, inp=None, queue=None, shell=True, wait=True, **kwargs): 
77       """ 
78       Kicks off the subprocess
79       :param cmd: str of the command to run
80       :param inp: 
81       :param kwargs: 
82       :return: 
83       """ 
84       kwargs['stdout'] = PIPE 
85       kwargs['stderr'] = STDOUT 
86       if inp is None and 'stdin' not in kwargs: 
87           kwargs['stdin'] = None 
88       else: 
89           kwargs['stdin'] = PIPE 
90    
91       fnc = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec 
92       proc = yield from fnc(cmd, **kwargs) 
93    
94       q = queue or asyncio.Queue()  # Stores our output from read_stdout and pops off (maybe) from send_input 
95       regex = re.compile("Reset counter") 
96    
97       tasks = [] 
98       if proc.stdout is not None: 
99           tasks.append(read_stdout(proc.stdout, q)) 
100      else: 
101          print('No stdout') 
102      if inp is not None: 
103          tasks.append(send_input(proc.stdin, inp, q, regex)) 
104      else: 
105          print('No stdin') 
106   
107      if 0: 
108          if proc.stderr is not None: 
109              tasks.append(log_errors(proc.stderr)) 
110          else: 
111              print('No stderr') 
112   
113      if tasks: 
114          # feed stdin while consuming stdout to avoid hang 
115          # when stdin pipe is full 
116          yield from asyncio.wait(tasks) 
117   
118      if wait: 
119          exitcode = yield from proc.wait() 
120          print("exit code: %s" % exitcode) 
121      else: 
122          return proc 
123   
124   
125  def main(): 
126      if os.name == 'nt': 
127          loop = asyncio.ProactorEventLoop() 
128          asyncio.set_event_loop(loop) 
129      else: 
130          loop = asyncio.get_event_loop() 
131      loop.run_until_complete(start('c:\\python34\python.exe dummy.py', inp=[str(x).encode() for x in (3, 3, 0)])) 
132      loop.close() 
133   
134   
135  if __name__ == '__main__': 
136      main()

No comments:

Post a Comment