Trying to understand python's asyncio is a challenge. First, I personally don't know what is more difficult, multi-threaded programming, or event driven programming. Multithreaded programming has the difficulty of properly finding and eliminating race conditions and dead|live locks. Event driven programming has the difficulty of non-intuitive flow of control and many layers of abstraction and indirection. So where do we even start? If you just start reading the official documentation on asyncio, you probably won't get too far. Reading PEP 3156 won't get you much farther, though I do recommend studying both.
My main motivation for learning asyncio is probably a little unusual. I wanted to write something like pexpect, without using pexpect. In a nutshell, I wanted to interact with a child subprocess perhaps more than once. Python's subprocess module doesn't let you do this exactly, even though you may think the Popen.communicate() seems to. The problem is that it is a "one-shot" communication. You feed it one string and then you are done. But what if you need to answer multiple prompts from your child process?
So where can we start? I'm learning this too, so as I go, I'll introduce more examples. So let's make a small example of calling a subprocess using asyncio. I won't explain it in detail in this blog.
I will however explain briefly what coroutines are. In a nutshell, a coroutine is a way to factor out code that uses yield. The reason is that yield is "contagious". The very presence of yield in a function turns that function into a generator. So what would you do if you realize that some code you have that uses yield could be factored out into its own code? A coroutine can be spotted by its use of the new "yield from" keyword.
My main motivation for learning asyncio is probably a little unusual. I wanted to write something like pexpect, without using pexpect. In a nutshell, I wanted to interact with a child subprocess perhaps more than once. Python's subprocess module doesn't let you do this exactly, even though you may think the Popen.communicate() seems to. The problem is that it is a "one-shot" communication. You feed it one string and then you are done. But what if you need to answer multiple prompts from your child process?
So where can we start? I'm learning this too, so as I go, I'll introduce more examples. So let's make a small example of calling a subprocess using asyncio. I won't explain it in detail in this blog.
I will however explain briefly what coroutines are. In a nutshell, a coroutine is a way to factor out code that uses yield. The reason is that yield is "contagious". The very presence of yield in a function turns that function into a generator. So what would you do if you realize that some code you have that uses yield could be factored out into its own code? A coroutine can be spotted by its use of the new "yield from" keyword.
1 """ 2 Took the example from the tulip project and modified it to make it more like pexpect 3 """ 4 5 import asyncio 6 import os 7 from asyncio.subprocess import PIPE, STDOUT 8 import re 9 10 11 @asyncio.coroutine 12 def send_input(writer, input, que, regex): 13 """ 14 The coroutine that will send its input to the input stream (usually stdin) 15 16 :param writer: The stream where input will go (usually stdin) 17 :param input: A sequence of bytes (not strings) 18 :param que: an asyncio.Queue used to check if we have what we need 19 :param regex: a re.compile() object used to see if an item from the que matches 20 :return: None 21 """ 22 input.reverse() # We have to reverse because we pop() from the end 23 try: 24 while input: 25 item = yield from que.get() 26 #print("Pulled from queue:", repr(item)) 27 if item is None: 28 break 29 m = regex.match(item.decode()) 30 if m: 31 line = input.pop() 32 #print('sending {} bytes'.format(len(line))) 33 writer.write(line) 34 d = writer.drain() 35 if d: 36 # writer.drain() returns a generator 37 yield from d 38 #print('resume writing') 39 writer.close() 40 except asyncio.QueueEmpty: 41 pass 42 except BrokenPipeError: 43 print('stdin: broken pipe error') 44 except ConnectionResetError: 45 print('stdin: connection reset error') 46 except Exception as ex: 47 print(ex) 48 49 @asyncio.coroutine 50 def log_errors(reader): 51 while True: 52 line = yield from reader.read(512) 53 if not line: 54 break 55 print('ERROR', repr(line)) 56 57 58 @asyncio.coroutine 59 def read_stdout(stdout, que): 60 """ 61 The coroutine that reads non-blocking from a reader stream 62 :param stdout: the stream we will read from 63 :param que: an asyncio.Queue object we put lines into 64 65 """ 66 while True: 67 line = yield from stdout.read(512) # use this instead of readline() so we dont pause on a newline 68 print('Received from child:', repr(line)) 69 que.put_nowait(line) # put the line into the que, so it can be read by send_input() 70 if not line: 71 que.put_nowait(None) # A sentinel so that when send_input() pulls this from the que, it will stop 72 break 73 74 75 @asyncio.coroutine 76 def start(cmd, inp=None, queue=None, shell=True, wait=True, **kwargs): 77 """ 78 Kicks off the subprocess 79 :param cmd: str of the command to run 80 :param inp: 81 :param kwargs: 82 :return: 83 """ 84 kwargs['stdout'] = PIPE 85 kwargs['stderr'] = STDOUT 86 if inp is None and 'stdin' not in kwargs: 87 kwargs['stdin'] = None 88 else: 89 kwargs['stdin'] = PIPE 90 91 fnc = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec 92 proc = yield from fnc(cmd, **kwargs) 93 94 q = queue or asyncio.Queue() # Stores our output from read_stdout and pops off (maybe) from send_input 95 regex = re.compile("Reset counter") 96 97 tasks = [] 98 if proc.stdout is not None: 99 tasks.append(read_stdout(proc.stdout, q)) 100 else: 101 print('No stdout') 102 if inp is not None: 103 tasks.append(send_input(proc.stdin, inp, q, regex)) 104 else: 105 print('No stdin') 106 107 if 0: 108 if proc.stderr is not None: 109 tasks.append(log_errors(proc.stderr)) 110 else: 111 print('No stderr') 112 113 if tasks: 114 # feed stdin while consuming stdout to avoid hang 115 # when stdin pipe is full 116 yield from asyncio.wait(tasks) 117 118 if wait: 119 exitcode = yield from proc.wait() 120 print("exit code: %s" % exitcode) 121 else: 122 return proc 123 124 125 def main(): 126 if os.name == 'nt': 127 loop = asyncio.ProactorEventLoop() 128 asyncio.set_event_loop(loop) 129 else: 130 loop = asyncio.get_event_loop() 131 loop.run_until_complete(start('c:\\python34\python.exe dummy.py', inp=[str(x).encode() for x in (3, 3, 0)])) 132 loop.close() 133 134 135 if __name__ == '__main__': 136 main()