python">一、单进程,单线程引起等待
import requests

def fetch_async(url):
    response = requests.get(url)
    return response

url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']

for url in url_list:
    fetch_async(url)
    
多线程执行
from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_async(url):
    response = requests.get(url)
    return response

url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
    pool.submit(fetch_async,url)
pool.shutdown(wait=True)


多线程+回调函数执行
from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_async(url):
    response = requests.get(url)
    return response

def callback(future):
    print(future.result())

url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
    v = pool.submit(fetch_async,url)
    v.add_done_callback(callback)

pool.shutdown(wait=True)

多进程执行
from concurrent.futures import ProcessPoolExecutor
import requests

def fetch_async(url):
    response = requests.get(url)
    return response

if __name__ == '__main__':
    url_list = ['http://www.github.com', 'http://www.baidu.com', 'http://www.bing.com']
    pool = ProcessPoolExecutor(5)
    for url in url_list:
        pool.submit(fetch_async, url)
    pool.shutdown(wait=True)
    
示例2:
from concurrent.futures import ThreadPoolExecutor
import requests
urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    res = requests.get(url)
    print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)

for url in urls:
    future = executor.submit(load_url,url)
    print(future.done())

print('主线程')

结果:
False
False
False
主线程
http://www.163.com page is 646107 bytes
https://www.baidu.com/ page is 2443 bytes
https://github.com/ page is 51203 bytes

我们使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。由于线程池异步提交了任务,主线程并不会等待线程池里创建的线程执行完毕,所以执行了print('主线程'),相应的线程池中创建的线程并没有执行完毕,故future.done()返回结果为False
    
示例3
from concurrent.futures import ThreadPoolExecutor
import requests
urls = urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']

def load_url(url):
    res = requests.get(url)
    print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)
executor.map(load_url,urls)    #同python内置的map函数用法一样
print('主线程')

示例4
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']

def load_url(url):
    res = requests.get(url)
    print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in urls:
    future = executor.submit(load_url, url)
    f_list.append(future)
print(wait(f_list))
print('主线程')

注: wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一
个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,
它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED
如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程.


示例5
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']

def load_url(url):
    res = requests.get(url)
    print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in urls:
    future = executor.submit(load_url, url)
    f_list.append(future)
print(wait(f_list, return_when='FIRST_COMPLETED'))    #如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成
print('主线程')


多进程+回调函数
from concurrent.futures import ProcessPoolExecutor
import requests

def fetch_async(url):
    response = requests.get(url)
    return response

def callback(future):
    print(future.result())

url_list = ['http://www.github.com', 'http://www.baidu.com', 'http://www.bing.com']
if __name__ == '__main__':
    pool = ProcessPoolExecutor(5)
    for url in url_list:
        v = pool.submit(fetch_async, url)
        v.add_done_callback(callback)
    pool.shutdown(wait=True)
    
    
二、异步io库asyncio,asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO
示例1
import asyncio

@asyncio.coroutine
def func1():
    print('before...func1......')
    yield from asyncio.sleep(5)
    print('end...func1......')

@asyncio.coroutine
def func2():
    print('before...func2......')
    yield from asyncio.sleep(5)
    print('end...func2......')

tasks = [func1(), func2()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

示例2
import asyncio

@asyncio.coroutine
def fetch_async(host, url='/'):
    print(host, url)
    reader, writer = yield from asyncio.open_connection(host, 80)

    request_header_content = """GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n""".format(url, host)
    request_header_content = bytes(request_header_content, encoding='utf-8')

    writer.write(request_header_content)
    yield from writer.drain()
    text = yield from reader.read()
    print(host, url, text)
    writer.close()

tasks = [
    fetch_async('www.cnblogs.com', '/wupeiqi/'),
    fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()


示例3
import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(5)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


示例4:
import time
import asyncio

now = lambda : time.time()
async def do_some_work(x):        #async关键字定义协程
    print('Waiting: ', x)
    return 'Done after {0}'.format(x)

def callback(future):
    print('Callback: ', future.result())

start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)


aiohttp+asyncio模块

import aiohttp
import asyncio

@asyncio.coroutine
def fetch_async(url):
    print(url)
    response = yield from aiohttp.request('GET', url)
    print(url, response)
    response.close()

tasks = [fetch_async('http://www.baidu.com'), fetch_async('http://www.chouti.com')]
event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()


asyncio+requests模块
import asyncio
import requests

@asyncio.coroutine
def fetch_async(func, *args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None, func, *args)
    response = yield from future
    print(response.url, response.content)

tasks = [
    fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
    fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()


gevent+requests模块
import gevent
import requests
from gevent import monkey

monkey.patch_all()

def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url, response.content)

gevent.joinall([
    gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.sina.com/', req_kwargs={})
])


grequests模块
import grequests

request_list = [
    grequests.get('http://httpbin.org/delay/1',timeout=0.001),
    grequests.get('http://fakedomain/'),
    grequests.get('http://httpbin.org/status/500')
]

twisted模块
from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
    reactor.stop()

def callback(contents):
    print(contents)

deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com',]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()


tornado模块
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

def handle_response(response):
    if response.error:
        print("Error:", response.error)
    else:
        print(response.body)

def func():
    url_list = [
        'http://www.baidu.com',
        'http://www.bing.com',
    ]
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)

ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()


tornado更多
from twisted.internet import reactor
from twisted.web.client import getPage
import urllib.parse

def one_done(arg):
    print(arg)
    reactor.stop()

post_data = urllib.parse.urlencode({'check_data': 'adf'})
post_data = bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login',encoding='utf8'),
                   method=bytes('POST', encoding='utf8'),
                   postdata=post_data,
                   cookies={},
                   headers=headers)
response.addBoth(one_done)

reactor.run()



以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】
python">三、牛逼的select模块
服务端例子
import socket
import select

sk1 = socket.socket()
sk1.bind(('0.0.0.0', 8001))
sk1.listen()

sk2 = socket.socket()
sk2.bind(('0.0.0.0', 8002))
sk2.listen()

sk3 = socket.socket()
sk3.bind(('0.0.0.0', 8003))
sk3.listen()

inputs = [sk1,sk2,sk3]

while True:
    r_list, w_list, e_list = select.select(inputs,[],inputs,1)
    for sk in r_list:
        conn, address = sk.accept()
        conn.sendall(bytes('hello', encoding='utf-8'))
        conn.close()
    for sk in e_list:
        inputs.remove(sk)
        
解释:    
    # select内部自动监听sk1,sk2,sk3三个对象,监听三个句柄是否发生变化,把发生变化的元素放
    入r_list中。
    # 如果有人连接sk1,则r_list = [sk1]
    # 如果有人连接sk1和sk2,则r_list = [sk1,sk2]
    # select中第1个参数表示inputs中发生变化的句柄放入r_list。
    # select中第2个参数表示[]中的值原封不动的传递给w_list。
    # select中第3个参数表示inputs中发生错误的句柄放入e_list。
    # 参数1表示1秒监听一次
     # 当有用户连接时,r_list里面的内容[<socket.socket fd=220, family=AddressFamily.AF_INET
     , type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]
     
客户端
import socket
obj = socket.socket()
obj.connect(('127.0.0.1', 8001))
content = str(obj.recv(1024), encoding='utf-8')
print(content)

obj.close()