Python实践 - 多线程(1)

rick    2018-07-14 21:28

Python实现多线程批量执行任务

当批量任务出现时, 多线程执行总是更好的选择, Python为我们提供了threading这个包方便我们创建和管理自己的线程

# 时间辅助函数
def dateToStr(dt):
    if type(dt) == datetime.datetime:
        return datetime.datetime.strftime(dt, '%Y-%m-%d')
    if type(dt) == str:
        return dt
    return None


def dateToDate(dt):
    if type(dt) == datetime.datetime:
        return dt
    if type(dt) == str:
        return datetime.datetime.strptime(dt, '%Y-%m-%d')


def dateIsMonthEnd(dt):
    if type(dt) == str:
        dt = dateToDate(dt)
    if (dt + datetime.timedelta(days=1)).month != dt.month:
        return False
    else:
        return True


def datetimeNow():
    return datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %X")


# 状态监视器
def statusWatcher(res):
    keyList = []
    for key in res:
        keyList.append(key)
    print("%s | " % datetimeNow() + "|".join(keyList))
    flag = 1
    while flag == 1:
        flag = 0
        valueList = []
        for key in res:
            valueList.append(res[key])
            # 当输出全部到结束标志位, 停止循环
            if res[key].strip() != "end":
                flag = 1
        # 每0.5秒动态刷新输出状态
        sys.stdout.write("\r%s | " % datetimeNow() + "|".join(valueList))
        sys.stdout.flush()
        time.sleep(0.5)
    print("\n")


# 子线程
def singleThread(dt, res):
    # 起始标志位
    res[dt] = "   go!    "
    time.sleep(1)
    # 随机的起始时间
    t = float(random.randrange(start=5, stop=10))
    while t < 10:
        # 更新状态字典
        res[dt] = "   %s    " % str(round(t, 2))
        # 没0.5秒更新
        time.sleep(0.5)
        t += 0.5
    res[dt] = "   end    "


if __name__ == '__main__':
    # 日期
    start_dt = dateToDate("2018-07-02")
    end_dt = dateToDate("2018-07-14")
    d = start_dt
    # 线程个数
    p_num = 5

    # 当起始日期小于结束日期时, 每次启动p_num个线程
    while d <= end_dt:
        # 初始化状态字典
        res = {}
        # 初始化线程池
        tList = []
        for i in range(p_num):
            tList.append(threading.Thread(target=singleThread, args=(dateToStr(d), res)))
            d += datetime.timedelta(days=1)
            if d > end_dt:
                break
        # 同时添加一个监视线程读取状态字典
        tList.append(threading.Thread(target=statusWatcher, args=(res,)))
        # 同时启动线程池中的线程
        for t in tList:
            t.start()
        # 等待线程池中所有线程执行结束
        for t in tList:
            t.join()

由于更新状态字典时每个线程只会改变自己的key的value, 因此不需要设置线程锁, 当需要改变同一个值时就需要设置线程锁来防止错误输出了, 执行结果如下, 每5个线程统一同时启动, 每0.5秒动态更新当前状态字典的状态, 当所有线程执行结束再开启下5(n)个线程直至总线程数结束

Views: 1.7K

[[total]] comments

Post your comment
  1. [[item.time]]
    [[item.user.username]] [[item.floor]]Floor
  2. Click to load more...
  3. Post your comment