Python实现多线程批量执行任务
当批量任务出现时, 多线程执行总是更好的选择, Python为我们提供了threading这个包方便我们创建和管理自己的线程
# 时间辅助函数
def dateToStr(dt):
if type(dt) == datetime.datetime:
return datetime.datetime.strftime(dt, '%Y-%m-%d')
if type(dt) == str:
return dt
return None
def dateToDate(dt):
if type(dt) == datetime.datetime:
return dt
if type(dt) == str:
return datetime.datetime.strptime(dt, '%Y-%m-%d')
def dateIsMonthEnd(dt):
if type(dt) == str:
dt = dateToDate(dt)
if (dt + datetime.timedelta(days=1)).month != dt.month:
return False
else:
return True
def datetimeNow():
return datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %X")
# 状态监视器
def statusWatcher(res):
keyList = []
for key in res:
keyList.append(key)
print("%s | " % datetimeNow() + "|".join(keyList))
flag = 1
while flag == 1:
flag = 0
valueList = []
for key in res:
valueList.append(res[key])
# 当输出全部到结束标志位, 停止循环
if res[key].strip() != "end":
flag = 1
# 每0.5秒动态刷新输出状态
sys.stdout.write("\r%s | " % datetimeNow() + "|".join(valueList))
sys.stdout.flush()
time.sleep(0.5)
print("\n")
# 子线程
def singleThread(dt, res):
# 起始标志位
res[dt] = " go! "
time.sleep(1)
# 随机的起始时间
t = float(random.randrange(start=5, stop=10))
while t < 10:
# 更新状态字典
res[dt] = " %s " % str(round(t, 2))
# 没0.5秒更新
time.sleep(0.5)
t += 0.5
res[dt] = " end "
if __name__ == '__main__':
# 日期
start_dt = dateToDate("2018-07-02")
end_dt = dateToDate("2018-07-14")
d = start_dt
# 线程个数
p_num = 5
# 当起始日期小于结束日期时, 每次启动p_num个线程
while d <= end_dt:
# 初始化状态字典
res = {}
# 初始化线程池
tList = []
for i in range(p_num):
tList.append(threading.Thread(target=singleThread, args=(dateToStr(d), res)))
d += datetime.timedelta(days=1)
if d > end_dt:
break
# 同时添加一个监视线程读取状态字典
tList.append(threading.Thread(target=statusWatcher, args=(res,)))
# 同时启动线程池中的线程
for t in tList:
t.start()
# 等待线程池中所有线程执行结束
for t in tList:
t.join()
由于更新状态字典时每个线程只会改变自己的key的value, 因此不需要设置线程锁, 当需要改变同一个值时就需要设置线程锁来防止错误输出了, 执行结果如下, 每5个线程统一同时启动, 每0.5秒动态更新当前状态字典的状态, 当所有线程执行结束再开启下5(n)个线程直至总线程数结束