concurrent
concurrent的库,在多进程、多线程方面,进行了更高的抽象,可以非常方便的从多线程切换到多进程等,在多进程模式下,没有GIL全局锁的限制。这样,感觉python还非常不错,至少比php,自己从底层来搞这些方便多了,标准库真的是好用。
设置的work量可以比任务数多,也可以少,不一定非要相等。这样,能重份得利用。
既然submit返回的对象是一种封装,那它应该也会耗费内存。如果有上亿的任务?那这些返回的对象本身也会占据大量的内存吧?
资源
正文
submit
submit后会立即返回一个对象,类似与js的promise对象一样。调用result()方法时,才会卡住。
#coding=utf-8
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 2, 10) # 会直接返回结果
print(future.result()) # 这一步,才会取结果
测试时效
测试各部分代码执行速度,submit本身不耗费时间,取结果才耗费。如下 :
executor.submit提交了,也开始执行了,但是不会被卡住。主线程会继续往下走。只有在调用result()方法时,才会被卡住。如果有一个任务,卡得很久,那么主线程就会被卡很久,其他的线程并不会卡住,还在执行任务,完成一个,执行下一个。等主线程卡的大任务结束了,调用其他其他任务的result,会立马返回。
#coding=utf-8
import time
from concurrent.futures import ThreadPoolExecutor
def task():
time.sleep(5)
return 6
with ThreadPoolExecutor(max_workers=1) as executor:
print('start')
a = executor.submit(task) # 直接返回,结果会在 调用result方法的时候,卡在那
print('after submit')
time.sleep(4) # 上面提交了,可能也执行了,但是不会被卡住
print('%s %s'%(time.time(),a.result())) # 可以 a以及执行了,但是调用result,会卡在这。
print('get result')
一般的用法
提交的任务超过线程总数时,这是一般的用法。
with ThreadPoolExecutor(max_workers=10) as executor:
print('start')
result = []
for i in range(100):
result.append(executor.submit(task))
print('after submit')
print([x.result() for x in result])
print('get result')
map
map,其实相当于对submit的封装,原来需要先遍历,然后收集 submit的结果,然后再对submit的结果进行调result方法获取结果。封装在一起了。
整体结果是有序的。
估计让任务随机延时,但是最终的结果,还是有序的。这对保证整体有序还是非常有帮助的。有些任务延时较长,卡完后,但是后面的任务会立马输出结果。
#coding=utf-8
import time
import random
from concurrent.futures import ThreadPoolExecutor
def taskn(n):
delay = random.randint(1,10)
time.sleep(delay)
# time.sleep(5)
return n
with ThreadPoolExecutor(max_workers=10) as executor:
result = executor.map(taskn,range(100))
print(result)
for item in result:
print('%s %s'%(time.time(),item))
卡住效果演示
#coding=utf-8
import time
from concurrent.futures import ThreadPoolExecutor
def nowtime():
return time.strftime('%H:%M:%S')
def taskn(n):
start = nowtime()
delay = n%10
time.sleep(delay)
end = nowtime()
return 'id:%d 领取时间:%s 返回时间:%s 延时时间:%s'%(n,start,end,delay)
with ThreadPoolExecutor(max_workers=10) as executor:
result = executor.map(taskn,range(100))
for item in result:
print('打印时间:%s %s'%(nowtime(),item))
更直接方式
直接让id=1的任务延时100秒,这个时候,它会卡住整个任务,当他完成了,其他的任务也早已完成了。故,会立马全部输出。这个更能看出这个库的作用。
#coding=utf-8
import time
from concurrent.futures import ThreadPoolExecutor
def nowtime():
return time.strftime('%H:%M:%S')
def taskn(n):
start = nowtime()
delay = n%10
if n == 1:
delay = 100
time.sleep(delay)
end = nowtime()
return 'id:%d 领取时间:%s 返回时间:%s 延时时间:%s'%(n,start,end,delay)
with ThreadPoolExecutor(max_workers=10) as executor:
result = executor.map(taskn,range(100))
for item in result:
print('打印时间:%s %s'%(nowtime(),item))
ProcessPoolExecutor
将上面的结果换成ProcessPoolExecutor,即可切换成多进程的方式。