future对象

Future对象其实在平时的工作中使用的并不是很多, 最近在研究fastapi以及底层的asyncio, 里面遇到了future对象和task对象, 仔细看看.

在python中, future可以指两个对象, 一个是concurrent.futures.Future, 一个是asyncio.Future. 两者的api差别不大. 本文主要介绍前者.

future对象主要用于在同步环境下实现异步操作. 举个例子

import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime


def task(n):
    print(f"task:{datetime.now()}")
    return n * n


def non_blocking_run(func, *args):
    executor = ThreadPoolExecutor(max_workers=3)
    future = executor.submit(func, *args)
    executor.shutdown(wait=False)
    return future


f = non_blocking_run(task, 3)
f.add_done_callback(lambda future: print(future.result()))  # 异步回调 9
time.sleep(3)  # do something business logic
print(f"main:{datetime.now()}")
print(f.result())  # 9

Executor

一个抽象类, 实现类(ThreadPoolExecutor)可以参考上面的例子, 更常见的写法是使用with上下文管理自动的shutdown, 例子参考最下面as_completed的例子.

Future

Executor.submit() 返回

主要方法: result(timeout=None) 阻塞直到结果返回, 或者超时, 如有异常, 正常触发.
exception(timeout=None) 阻塞直到结果返回, 或者超时, 如有异常, 返回异常对象, 否则, 返回None
cancel() 尝试取消任务, 如果任务已经开始, 则返回False, 否则返回True

函数

wait(fs, timeout=None, return_when=ALL_COMPLETED)

alt text
alt text
import concurrent.futures
import random
import time


def task(x):
    time.sleep(random.random())
    return x * x


with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for i in range(10):
        future = executor.submit(task, i)
        futures.append(future)

    done, not_done = concurrent.futures.wait(futures, timeout=0.5)
    print(f"done:{len(done)}")  # done:6
    print(f"not_done:{len(not_done)}")  # not_done:4

    for future in done:
        print(future.result())  # 81 36 9 0 49 4

concurrent.futures.as_completed(fs, timeout=None)

返回一个迭代器, 迭代器中的元素都是运行完的future对象, 下面是官网给的一个例子. 其中with可以理解为一种快捷方式. 自动调用executor.shutdown

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

全文完.

使用 Discussions 讨论 Github 上编辑 分享到 Twitter