16.2. 线程

线程线程(Thread,有时被称为轻量级进程)跟进程有些相似,不同的是所有线程运行在同一个进程中,共享运行环境。

线程有开始、顺序执行和结束3部分,有一个自己的指令指针,记录运行到什么地方。

线程的运行可能被抢占(中断)或暂时被挂起(睡眠),从而让其他线程运行,这叫作让步。

一个进程中的各个线程之间共享同一块数据空间,所以线程之间可以比进程之间更方便地共享数据和相互通信。

线程一般是并发执行的。正是由于这种并行和数据共享的机制,使得多个任务的合作变得可能。实际上,在单CPU系统中,真正的并发并不可能,每个线程会被安排成每次只运行一小会儿,然后就把CPU让出来,让其他线程运行。

在进程的整个运行过程中,每个线程都只做自己的事,需要时再跟其他线程共享运行结果。多个线程共同访问同一块数据不是完全没有危险的,由于访问数据的顺序不一样,因此有可能导致数据结果不一致的问题,这叫作竞态条件。大多数线程库都带有一系列同步原语,用于控制线程的执行和数据的访问

Python 的标准库提供了两个模块: _thread 和 threading , _thread 是低级模块, threading 是高级模
块,对 _thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。
启动一个线程就是把一个函数传入并创建 Thread 实例,然后调用 start() 开始执行:

16.2.1. 线程初探

#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:11
# filename: threading001.py
"""
threading.actice_count():返回当前处于活动状态的线程个数
threading.current_thread():返回当前的Thread对象
threading.main_thread():返回主线程对象,主线程是Python解释器启动的线程

"""
import threading

#当前线程对象
t = threading.current_thread()
#当前线程名
print(t.name)

# 返回当前处于活动状态的线程
print(threading.active_count())

# 主线程名
print(t.name)

输出内容:

MainThread
1
MainThread

16.2.2. 调用Thread类来创建多线程

代码示例1

#!/usr/bin/env python
# -*- coding:utf8 -*-
import threading
import time


# 新线程执行的代码

def loop():
    print("thread {} is running ....".format(threading.current_thread().name))
    n = 0
    while n < 5:
        n = n + 1
        print("thread {} >>> {}".format(threading.current_thread().name, n))
        time.sleep(1)
    print("thread {} ended".format(threading.current_thread().name))


print("thread {} is running ....".format(threading.current_thread().name))
t = threading.Thread(target=loop, name="Loopthread",)
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

代码示例2

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:18
# filename: Thread类实现多线程1.py
import threading
import time


# 线程体函数
def thread_bady():
    # 当前线程对象
    t = threading.current_thread()

    for n in range(5):
        # 当前线程名
        print("第{}次执行线程:{}".format(n, t.name))
        # 线程休眠,如果不休眠,线程对象t1结束后才会执行线程对象t2线程将
        time.sleep(1)
    print("线程:{}执行完成!".format(t.name))


# 主函数
def main():
    # 创建线程对象t1
    t1 = threading.Thread(target=thread_bady, name="hu_thread")
    # 启动线程t1
    t1.start()

    # 创建线程对象t2
    t2 = threading.Thread(target=thread_bady, name="xiaojian_thread")
    # 启动线程t2
    t2.start()


if __name__ == '__main__':
    main()

输出信息:

第0次执行线程:hu_thread
第0次执行线程:xiaojian_thread
第1次执行线程:hu_thread
第1次执行线程:xiaojian_thread
第2次执行线程:hu_thread
第2次执行线程:xiaojian_thread
第3次执行线程:hu_thread
第3次执行线程:xiaojian_thread
第4次执行线程:hu_thread
第4次执行线程:xiaojian_thread
线程:hu_thread执行完成!
线程:xiaojian_thread执行完成!
#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/17 17:14
# filename: 调用Thread类创建多线程.py
import threading

def action(max):
    for i in range(max):
        print(threading.current_thread().getName() + " " + str(i))

for i in range(100):
    print(threading.current_thread().getName() + " " + str(i))

    if i == 20:
        #创建并启动第一个线程
        t1 = threading.Thread(target=action, args=(10, ))
        t1.start()

        #创建并启动第二个线程
        t2 = threading.Thread(target=action, args=(10,))
        t2.start()
print("主线程执行完成!!")

代码示例3

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/7/30 22:08
# @filename: exp_thread_1.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import _thread
from time import sleep
from datetime import datetime

date_time_format = "%Y-%M-%d %H:%M:%S"


def date_time_str(date_time):
    """ 时间转为字符串"""
    return datetime.strftime(date_time, date_time_format)


def loop_one():
    print(f"-----线程1开始于:{date_time_str(datetime.now())}")
    print("------线程休眠4秒")
    sleep(4)
    print(f"-----线程1休眠结束,结束于:{date_time_str(datetime.now())}")


def loop_two():
    print(f"-----线程2开始于:{date_time_str(datetime.now())}")
    print("------线程休眠2秒")
    sleep(2)
    print(f"-----线程2休眠结束,结束于:{date_time_str(datetime.now())}")


def main():
    print(f"----所有线程开始时间:{date_time_str(datetime.now())}")
    _thread.start_new_thread(loop_one, ())
    _thread.start_new_thread(loop_two, ())
    sleep(6)
    print(f"-----所有线程结束时间:{date_time_str(datetime.now())}")


if __name__ == '__main__':
    main()

输出信息

----所有线程开始时间:2020-14-30 22:14:20
-----线程2开始于:2020-14-30 22:14:20
------线程休眠2秒
-----线程1开始于:2020-14-30 22:14:20
------线程休眠4秒
-----线程2休眠结束,结束于:2020-14-30 22:14:22
-----线程1休眠结束,结束于:2020-14-30 22:14:24
-----所有线程结束时间:2020-14-30 22:14:26

16.2.3. 继承Thread类创建多线程

代码示例1

import threading
import time
class MyThreading(threading.Thread):
    def __init__(self, conn):
        super(MyThreading, self).__init__()
        self.conn = conn
    def run(self):
        print('run task', self.conn)
        time.sleep(5)
t1 = MyThreading('t1')
t2 = MyThreading('t2')
t1.start()
t2.start()

代码示例2

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:27
# filename: 继承Thread类创建多线程.py
import threading
import time


class MyThread(threading.Thread):
    def __init__(self, name=None):
        super(MyThread, self).__init__(name=name)

    # 线程体函数
    def run(self):
        # 当前线程对象
        t = threading.current_thread()
        for n in range(5):
            # 当前线程名
            print("第{}此执行线程:{}".format(n, t.name))
            # 线程休眠
            time.sleep(1)
        print("线程{}执行完毕!".format(t.name))


def main():
    # 创建线程对象t1
    t1 = MyThread(name="t1-thread")
    # 启动线程t1
    t1.start()

    # 创建线程对象t2
    t2 = MyThread(name="t2-thread")
    # 启动线程t2
    t2.start()


if __name__ == '__main__':
    main()

输出信息:

第0此执行线程:t1-thread
第0此执行线程:t2-thread
第1此执行线程:t2-thread
第1此执行线程:t1-thread
第2此执行线程:t1-thread
第2此执行线程:t2-thread
第3此执行线程:t2-thread
第3此执行线程:t1-thread
第4此执行线程:t1-thread
第4此执行线程:t2-thread
线程t2-thread执行完毕!
线程t1-thread执行完毕!
#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/17 17:23
# filename: 继承Thread类创建多线程.py
import threading
class FkThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.i = 0

    # 重写run()方法作为线程执行体
    def run(self):
        while self.i < 100:
            print(threading.current_thread().getName() + " " + str(self.i))
            self.i +=1


for i in range(100):
    print(threading.current_thread().getName() + " " + str(i))
    if i == 20:
        # 启动第一个线程
        ft1 = FkThread()
        ft1.start()

        # 启动第二个线程
        ft2 = FkThread()
        ft2.start()

16.2.4. 演示deamon属性的作用 后台线程

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 18:27
# filename: 04.deamon属性使用.py
import threading
import time


class myThread(threading.Thread):
    def __init__(self, mynum):
        super(myThread, self).__init__()
        self.mynum = mynum

    def run(self):
        time.sleep(1)
        for i in range(self.mynum, self.mynum + 5):
            print(str(i * i) + ";")


def main():
    """
    main()主函数运行结束时,ma和mb在后台运行,无法输出运行结果
    :return:
    """
    print("start............")
    ma = myThread(1)
    mb = myThread(16)
    ma.daemon = True
    mb.daemon = True
    ma.start()
    mb.start()
    print("end...........")


if __name__ == '__main__':
    main()

"""
start............
end...........
"""

16.2.5. 线程管理

等待线程结束

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:37
# filename: 等待线程结束.py
"""
join()方法,当前线程t1调用join()方法时,会阻塞当前线程,等到t1线程结束,如果t1线程结束
或者等待超时,则当前线程回到活动状态继续执行。
join(timeout=None)
参数timeout 设置超时时间,单位是秒。如果没有设置timeout时间,则可以一直等待

使用join()方法的场景是:一个线程依赖另一个线程的运行结果,所以调用另一个线程的join()方法等待它的运行完成
"""
import threading
import time

# 共享变量0
value = 0


# 线程体函数
def thread_body():
    global value
    # 当前线程对象
    print("ThreadA 开始.....")
    for n in range(2):
        print("ThreadA 执行.......")
        value += 1
        # 线程休眠
        time.sleep(1)
        print("ThreadA 结束.......")


def main():
    print("主线程 开始........")
    t1 = threading.Thread(target=thread_body, name="ThreadA")
    # 启动线程
    t1.start()
    # 主线程被阻塞,等待t1线程结束
    t1.join()
    print("value = {0}".format(value))
    print("主线程  结束.....")


if __name__ == '__main__':
    main()

输出信息:

主线程 开始........
ThreadA 开始.....
ThreadA 执行.......
ThreadA 结束.......
ThreadA 执行.......
ThreadA 结束.......
value = 2
主线程  结束.....

线程停止

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:51
# filename: 5.线程停止.py
"""
模拟一个下载程序,设置一个停止子进程的停止变量
"""
import threading
import time

# 线程停止变量
isrunning = True
count = 0


# 线程体函数
def thread_body():
    while isrunning:
        # 线程开始工作
        # TODO
        global count
        count += 1
        print("下载中:【{}】.......".format(count), file=open("download.log", "a",encoding="utf-8"))
        # 程序休眠
        time.sleep(0.5)
    print("执行完成!!,执行结果查看:'download.log'")


# 主函数
def main():
    # 创建线程对象t1
    t1 = threading.Thread(target=thread_body)
    # 启动线程t1
    t1.start()

    # 从键盘停止指令
    command = input("请输入停止指令:")
    if command == "exit":
        global isrunning
        isrunning = False


if __name__ == '__main__':
    main()

输出信息:

请输入停止指令:exit
执行完成!!,执行结果查看:'download.log'

16.2.6. 线程安全

代码示例1

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/7/30 22:08
# @filename: exp_thread_1.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import _thread
from time import sleep
from datetime import datetime

loops = [4, 2]
date_time_format = "%Y-%M-%d %H:%M:%S"


def date_time_str(date_time):
    """ 时间转为字符串"""
    return datetime.strftime(date_time, date_time_format)


def loop(nloop, n_sec, lock):
    print(f"-----线程({nloop})开始于:{date_time_str(datetime.now())},先休眠({n_sec})秒")
    sleep(n_sec)
    print(f"-----线程({nloop})休眠结束,结束于:{date_time_str(datetime.now())}")
    lock.release()


def main():
    print("-------所有线程开始执行---------")
    locks = []
    n_loops = range(len(loops))


    for i in n_loops:
        lock = _thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in n_loops:
        _thread.start_new_thread(loop, (i, loops[i], locks[i]))


    for i in n_loops:
        while locks[i].locked():
            pass

    print(f"----所有线程执行结束:{date_time_str(datetime.now())}")


if __name__ == '__main__':
    main()

代码示例2

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 15:12
# filename: 线程安全.py
import threading
import time


class TickDB:
    def __init__(self):
        # 机票的数量
        self.ticket_count = 5

    def get_ticket_count(self):
        '''
        :return: 获得当前机票的数量
        '''
        return self.ticket_count

    def sell_ticket(self, name):
        """
        :return: 销售机票
        """
        # TODO
        # 线程休眠,模拟等待用户付款
        time.sleep(1)
        self.ticket_count -= 1
        if self.ticket_count < 1:
            print("机票已经售完,请换乘其他航空..............")
        else:
            print("第{0}号票,已经售出,【购买者】乘客:{1},还剩下:{2}张票".format(self.ticket_count, name, self.ticket_count - 1))


# 创建TickDB对象
db = TickDB()


# 模拟选票线程体1
def thread_body1(name=None):
    # 声明为全局变量
    global db
    while True:
        curr_ticket_count = db.get_ticket_count()
        # 查询是否有票
        if curr_ticket_count > 0:
            db.sell_ticket(name)
        else:
            print("【{}】 您查询到的结果:无票".format(name))
            break


# 模拟选票线程体2
def thread_body2(name=None):
    # 声明为全局变量
    global db
    while True:
        curr_ticket_count = db.get_ticket_count()
        # 查询是否有票
        if curr_ticket_count > 0:
            db.sell_ticket(name)
        else:
            print("【{}】您查询到的结果:无票".format(name))
            break


def main():
    print("***************************************************************")
    print("*************** 欢迎来到XXX航空购票系统 ************************")
    print("***************************************************************")

    print("----------------------------------- t1开始购票--------------------------------------------------")
    # 创建线程对象t1
    t1 = threading.Thread(target=thread_body1, args=("t1",))
    # 启动线程t1
    t1.start()

    print("----------------------------------- t2开始购票--------------------------------------------------")
    # 创建线程对象t2
    t2 = threading.Thread(target=thread_body2, args=("t2",))
    # 启动线程t1
    t2.start()


if __name__ == '__main__':
    main()

输出信息:

***************************************************************
*************** 欢迎来到XXX航空购票系统 ************************
***************************************************************
----------------------------------- t1开始购票--------------------------------------------------
----------------------------------- t2开始购票--------------------------------------------------
第4号票,已经售出,【购买者】乘客:t1,还剩下:3张票
第3号票,已经售出,【购买者】乘客:t2,还剩下:2张票
第2号票,已经售出,【购买者】乘客:t1,还剩下:1张票
第1号票,已经售出,【购买者】乘客:t2,还剩下:0张票
机票已经售完,请换乘其他航空..............
【t2】您查询到的结果:无票
机票已经售完,请换乘其他航空..............
【t1】 您查询到的结果:无票

16.2.7. 什么是互斥锁

Lock

示例1

#!/usr/bin/env python
# -*- coding:utf8 -*-
# threading.Lock()
# 使用互斥锁可以防止多个线程同时读取内存的某一个区域,互斥锁保证了每个线程同一时间只有一个在使用内存资源

"""
从系统的角度来看。锁的作用其实是将多线程变回到单线程,这是以牺牲性能,来换取程序的准确性。

在代码设计中,应该最大化地避免使用锁。即使加了锁,也要让被保护的区域尽量地少,在满足准确性的同时实现性能最大化。
在代码中,有“加锁”操作,就一定要有与之对应的“解锁”操作,否则代码失去多线程的优势。

在Python中,使用threading.RLock类来创建锁。threading.RLock类有两个方法--acquire与release

* acquire负责开始对代码进行保护,在acquire之后的代码,都将只允许一个线程进行执行。

* release方法用于停止保护(即释放锁资源)。在release之后的代码又恢复到原来的样子,可以被多线程交叉执行。
"""

from threading import Thread, Lock
import time

'''
# 互斥锁的使用

#创建锁
mutex = threading.Lock()

#锁定
mutex.acquire([blocking])

#释放锁
mutex.release()

'''
# 计数器,总票数
num = 20


def task(arg):
    global num       # 使用全局变量
    mutex.acquire()  # 锁定线程,只有1个线程可以抢用
    time.sleep(0.5)
    num -= 1
    print("{}号用户【线程】,购买成功,剩余{}张电影票".format(arg, num))
    mutex.release()  # 释放,其他线程可以进行操作


if __name__ == '__main__':
    mutex = Lock()  # 创建锁
    t_l = []
    for i in range(10):
        t = Thread(target=task, args=(i,))
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

print("main thread end..!")

# 0号用户【线程】,购买成功,剩余19张电影票
# 1号用户【线程】,购买成功,剩余18张电影票
# 2号用户【线程】,购买成功,剩余17张电影票
# 3号用户【线程】,购买成功,剩余16张电影票
# 4号用户【线程】,购买成功,剩余15张电影票
# 5号用户【线程】,购买成功,剩余14张电影票
# 6号用户【线程】,购买成功,剩余13张电影票
# 7号用户【线程】,购买成功,剩余12张电影票
# 8号用户【线程】,购买成功,剩余11张电影票
# 9号用户【线程】,购买成功,剩余10张电影票
# main thread end..!

示例2

#!/usr/bin/env python
# -*- coding:utf8 -*-
import threading
import time


class myTread(threading.Thread):
    def run(self):
        global x
        lock.acquire()
        for i in range(3):
            x += 10
        time.sleep(1)
        print("{} result = {}".format(threading.Thread.getName(self), x))
        lock.release()


x = 0
lock = threading.RLock()


def main():
    thrs = []
    for item in range(5):
        thrs.append(myTread())

    for item in thrs:
        item.start()


if __name__ == '__main__':
    main()

"""
自定义一个带锁访问全局变量x的线程类myThread,在main()函数中初始化了5个线程来修改变量x,
但同一时刻只能由一个线程对x操作

Thread-1 result = 30
Thread-2 result = 60
Thread-3 result = 90
Thread-4 result = 120
Thread-5 result = 150
"""

代码示例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/23 13:44
# @filename: sample03_lock.py
# @Email:    1879324764@qq.com
# @Software: PyCharm

import threading
import time

data = 0
lock = threading.Lock()


def func():
    global data
    print("{} acquire lock ...".format(threading.currentThread().getName()))
    if lock.acquire():
        print("{} get the lock".format(threading.currentThread().getName()))
        data += 1
        time.sleep(2)
        print("{} release lock".format(threading.currentThread().getName()))
        lock.release()


t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()

互斥锁航空机票示例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 15:42
# filename: 7.多线程同步、互斥锁.py

import threading
import time


class TickDB:
    def __init__(self):
        # 机票的数量
        self.ticket_count = 5

    def get_ticket_count(self):
        '''
        :return: 获得当前机票的数量
        '''
        return self.ticket_count

    def sell_ticket(self, name):
        """
        :return: 销售机票
        """
        # TODO
        # 线程休眠,模拟等待用户付款
        time.sleep(1)
        self.ticket_count -= 1
        if self.ticket_count < 1:
            print("机票已经售完,请换乘其他航空..............")
        else:
            print("第{0}号票,已经售出,【购买者】乘客:{1},还剩下:{2}张票".format(self.ticket_count, name, self.ticket_count - 1))


# 创建TickDB对象
db = TickDB()
# 创建lock对象
lock = threading.Lock()


# 模拟选票线程体1
def thread_body1(name=None):
    # 声明为全局变量
    global db
    global lock
    while True:
        # 看这里!!开始锁定,加上小锁
        lock.acquire()
        curr_ticket_count = db.get_ticket_count()
        # 查询是否有票
        if curr_ticket_count > 0:
            db.sell_ticket(name)
        else:
            # 看这里,解锁,放开锁定
            lock.release()
            print("【{}】 您查询到的结果:无票".format(name))
            break
        # 解锁
        lock.release()
        time.sleep(1)

# 模拟选票线程体2
def thread_body2(name=None):
    # 声明为全局变量
    global db
    global lock
    while True:
        # 开始锁定,加上小锁
        lock.acquire()
        curr_ticket_count = db.get_ticket_count()
        # 查询是否有票
        if curr_ticket_count > 0:
            db.sell_ticket(name)
        else:
            # 看这里,解锁,放开锁定
            lock.release()
            print("【{}】您查询到的结果:无票".format(name))
            break
        #解锁
        lock.release()
        time.sleep(1)


def main():
    print("***************************************************************")
    print("*************** 欢迎来到XXX航空购票系统 ************************")
    print("***************************************************************")

    print("----------------------------------- t1开始购票--------------------------------------------------")
    # 创建线程对象t1
    t1 = threading.Thread(target=thread_body1, args=("t1",))
    # 启动线程t1
    t1.start()

    print("----------------------------------- t2开始购票--------------------------------------------------")
    # 创建线程对象t2
    t2 = threading.Thread(target=thread_body2, args=("t2",))
    # 启动线程t1
    t2.start()


if __name__ == '__main__':
    main()
***************************************************************
*************** 欢迎来到XXX航空购票系统 ************************
***************************************************************
----------------------------------- t1开始购票--------------------------------------------------
----------------------------------- t2开始购票--------------------------------------------------
第4号票,已经售出,【购买者】乘客:t1,还剩下:3张票
第3号票,已经售出,【购买者】乘客:t2,还剩下:2张票
第2号票,已经售出,【购买者】乘客:t1,还剩下:1张票
第1号票,已经售出,【购买者】乘客:t2,还剩下:0张票
机票已经售完,请换乘其他航空..............
【t2】您查询到的结果:无票
【t1】 您查询到的结果:无票
使用Thread对象的Lock和RLock可以实现简单的线程同步,
这两个对象都有acquire方法和release方法。
对于每次只允许一个线程操作的数据,可以将操作放到acquire和release方法之间。
多线程的优势在于可以同时运行多个任务,但当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况:一个列表里所有元素都是0,线程set从后向前把所有元素改成1,
而线程print负责从前往后读取列表并输出。
代码示例
#!/usr/bin/env python
#-*- coding:utf8 -*-
import threading
from time import sleep
from datetime import datetime

date_time_format = '%y-%M-%d %H:%M:%S'

class MyThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter

    def run(self):
        print("开启线程:" + self.name)
        #获取锁,用于线程同步
        threadLock.acquire()

        print_time(self.name, self.counter, 3)

        #释放锁,开启下一个线程
        threadLock.release()


def date_time_str(date_time):
    return datetime.strftime(date_time, date_time_format)

def print_time(threadName, delay, counter):
    while counter:
        sleep(delay)
        print("{} {}".format(threadName, date_time_str(datetime.now())))
        counter -=1


def main():
    #创建新线程
    thread1 = MyThread(1, "Thread-1", 1)
    thread2 = MyThread(2, "Thread-2", 3)

    #开启新线程
    thread1.start()
    thread2.start()

    # 添加线程到线程列表
    threads.append(thread1)
    threads.append(thread2)

    #等待所有线程完成
    for t in threads:
        t.join()
    print("退出主线程.......")

if __name__ == '__main__':
    threadLock = threading.Lock()
    threads = []
    main()

RLock

RLockRLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/23 13:44
# @filename: sample03_lock.py
# @Email:    1879324764@qq.com
# @Software: PyCharm

import threading
import time

lock = threading.RLock()


def func():
    # 第一次请求锁
    print("{} acquire lock ...".format(threading.currentThread().getName()))
    if lock.acquire():
        print("{} get the lock".format(threading.currentThread().getName()))
        time.sleep(2)

        #  第二次请求锁
        print("{} acquire lock agin...".format(threading.currentThread().getName()))
        if lock.acquire():
            print("{} get the lock".format(threading.currentThread().getName()))
            time.sleep(2)
        # 第一次释放锁
        print("{} release lock ....".format(threading.currentThread().getName()))
        lock.release()
        time.sleep(2)

        # 第二次释放锁
        print("{} release lock agin ....".format(threading.currentThread().getName()))
        lock.release()


t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()

16.2.8. 使用信号量同步多线程之间的执行顺序

1.纯粹的信号量(Semphore)

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 18:40
# filename: 05.使用信号量同步多线程之间的执行顺序.py
'''
信号量(semaphore)是一种带计数的线程同步机制,调用release函数时,计数器加1,调用acquire函数时,计数器减1.
当计数为0时,线程会自动阻塞,等待release被调用。

Python中存在两种信号量:
1.纯粹的信号量(Semphore)
2.带有边界的信号量(BoundedSemaphore)

1.纯粹的信号量(Semphore): 在调用release函数时,单纯地将计数器加1,不会检查加1后计算器是否超过上限
2.带有边界的信号量(BoundedSemaphore):在调用release函数时,会检查计数器是否超过上限,对计数器的上限进行校验,是一个更加安全的机制。

'''

import threading
import time
import random

semaphore = threading.Semaphore(0)  # 创建信号量


def consumer():
    """
    消费者
    :return:
    """
    print("consumer: 挂起...")
    semaphore.acquire()  # 计数器减1
    print("consumer:消费 {}".format(item))


def producer():
    global item  # 定义商品编号
    time.sleep(3)
    item = random.randint(1, 1000)              # 产生随机数并赋值给全局变量--商品编号
    print("producer :生产 {}".format(item))
    semaphore.release()  # 计数器加1


threads = []  # 定义列表收集线程
for i in range(0, 2):  # 循环完成生产者与消费者线程的建立
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    threads.append(t1)
    threads.append(t2)

    for t in threads:
        t.join()


'''
consumer: 挂起...
producer :生产 694
consumer:消费 694
consumer: 挂起...
producer :生产 939
consumer:消费 939
'''

2.带有边界的信号量(BoundedSemaphore)

将上述代码中的semaphore = threading.Semaphore(0)改为
semaphore = threading.BoundedSemaphore(2)  # 创建信号量为2.初始的时候item就有2个,消费者的可以消费的item有2个,
对item的取值进行判断,过滤掉初始值

需要将全局变量item放在外部。
    global item  # 定义商品编号
    item = random.randint(1, 1000)  # 产生随机数并赋值给全局变量--商品编号

16.2.9. 创建定时触发程序

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:12
# filename: 06.创建定时触发程序.py
import threading
import time


def timer1_headle():
    print("1  Timer headle!")  # 定时触发函数


def timer3_headle():
    print("3 Timer headle!")  # 定时触发函数


timer1 = threading.Timer(1, timer1_headle)  # 实例化定时器线程,1s后执行线程处理函数
timer3 = threading.Timer(3, timer3_headle)  # 实例化定时器线程,3s后执行线程处理函数
timer1.start()
timer3.start()

"""
1 Timer headle!
3 Timer headle!
"""

16.2.10. 循环定时触发程序

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:12
# filename: 07.循环定时触发程序.py
import threading
import time


def loop_timer_headle():
    '''
     定时循环触发函数
    :return:
    '''
    print("Timer headle!")
    global timer2
    timer2 = threading.Timer(1, loop_timer_headle)  # 创建定时器
    timer2.start()


timer2 = threading.Timer(1, loop_timer_headle)
timer2.start()

"""
Timer headle!
Timer headle!
Timer headle!
.......
"""

16.2.11. 设置定时间隔和结束定时器

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:12
# filename: 07.循环定时触发程序.py
import threading
import time

n = 0


def loop_timer_headle():
    '''
     定时循环触发函数
    :return:
    '''

    print("Timer headle!")
    time.sleep(2)
    global n
    n += 1

    global timer2
    timer2 = threading.Timer(1, loop_timer_headle)  # 创建定时器
    timer2.start()
    if n == 3:
        timer2.cancel()  # 结束定时器
        print("循环了3次了,要退出了...........")


timer2 = threading.Timer(1, loop_timer_headle)
timer2.start()

'''
Timer headle!
Timer headle!
Timer headle!
循环了3次了,要退出了...........
'''

16.2.12. 使用线程池提升运行效率

线程池实现主机端口扫描实例

本例使用的多线程模块需要用命令“pip3 install threadpool”进行安装。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/22 11:27
# @filename: sample01.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import threadpool
import os
import socket


def scan_port(num):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(1)
    ports = num * 4096
    thread_name = 'thread' + str(num)
    for port in range(ports, ports + 4096):
        result = s.connect_ex((ip, port))
        if result == 0:
            print("I am %s,port %d is openned!" % (thread_name, port))
    s.close()


ip = "127.0.0.1"
p = threadpool.ThreadPool(16)
num_list = list(range(16))
tasks = threadpool.makeRequests(scan_port, num_list)
for task in tasks:
    p.putRequest(task)
p.wait()
print("All subprocesses had finished!")
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:29
# filename: 08.使用线程池提升运行效率.py
"""

在需要频繁创建线程的系统中, 一般都会使用线程池技术。原因有两点:
    1.每一个线程的创建都是需要占用系统资源的, 是一件相对耗时的事情。同样在销毁线程时还需要回收线程资源。
    线程池技术, 可以省去创建与回收过程中所浪费的系统开销。

    2.在某些系统中需要为每个子任务来创建对应的线程(例如爬虫系统中的子链接)。
    这种情况会导致线程数量失控性暴涨, 直到程序崩溃。线程池技术可以很好地固定线程的数量保持程序稳定。


实现线程池
Python中,使用conncurrent.futures 模块下的ThreadPoolExecutor 类来实现线程池。在实例化时, 会将需要的线程个数传入。
系统就会为该线程池初始化相应个数的线程。线程池的使用有两种方式。

    * 抢占式: 线程池中的线程执行顺序不固定。该方式使用ThreadPooIExecutor 的submit方法实现。

    * 非抢占式: 线程将按照调用的顺序执行。此方式使用ThreadPoolExecutor 的map方法来实现。

从使用角度来看: 抢占式更灵活; 非抢占式更严格。


· 抢占式, 允许池中线程的处理函数不一样。如执行过程中某个线程出现异常, 也不影响其他线程。
· 非抢占式, 要求线程池中的线程必须执行同样的处理函数。而且一旦某个线程出现异常,其他线程也会停止。
"""
from concurrent.futures import ThreadPoolExecutor
import time


def printperson(p):
    '''
    定义线程池处理函数
    :param p:
    :return:
    '''
    print(p)
    time.sleep(2)


person = ["hujianli1", "hujianli2", "hujianli3"]

start_time = time.time()
for p in person:
    printperson(p)

end_time = time.time()
printperson("all spend time :{}".format(end_time - start_time))

"""
hujianli1
hujianli2
hujianli3
all spend time :6.00168251991272
"""

实现抢占线程池

start2 = time.time()
with ThreadPoolExecutor(3) as executor:
    for p in person:
        executor.submit(printperson, p)
end2 = time.time()
printperson("all spend time :{}".format(end2 - start2))

"""
hujianli1
hujianli2
hujianli3
all spend time :2.0018222332000732
"""

实现非抢占线程池

start3 = time.time()
with ThreadPoolExecutor(3) as executorl:
    executorl.map(printperson, person)
end3 = time.time()
printperson("all spend time :{}".format(end3 - start3))
"""
hujianli1
hujianli2
hujianli3
all spend time :2.001864433288574
"""

代码示例

from concurrent.futures import ThreadPoolExecutor
from threading import Thread, currentThread
from time import time


def task(i):
    print("{} 在执行任务{}".format(currentThread().name, i))
    time.sleep(1)


if __name__ == '__main__':
    pool = ThreadPoolExecutor(4)  # 进程池里有4个进程
    for i in range(20):  # 20个任务
        pool.submit(task, i)  # 进程池里当前执行的任务i,池子里的4个进程一次一次执行任务

抢占模式+回调函数

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/1/14 22:19
# filename: 09-1.线程池01.py
from concurrent.futures import ThreadPoolExecutor
import threading
import time

def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + " " + str(i))
        my_sum += 1
    return my_sum


# 创建一个包含两个线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池中提交一个任务,50作为action()函数的参数
    future1 = pool.submit(action, 50)
    # 向线程池中再提交一个任务,100作为action()函数的参数
    future2 = pool.submit(action, 100)

    def get_result(future):
        print(future.result())


    # 为future1添加线程完成的回调函数,该函数在线程任务结束时获取其返回值
    future1.add_done_callback(get_result)

    # 为future2添加线程完成的回调函数,该函数在线程任务结束时获取其返回值
    future2.add_done_callback(get_result)
    print("------------------------------------")

使用map()方法启动线程,并收集线程任务的返回值。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/1/14 22:19
# filename: 09-1.线程池02.py
from concurrent.futures import ThreadPoolExecutor
import threading
import time


def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + " " + str(i))
        my_sum += 1
    return my_sum


# 创建一个包含4个线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
    # 使用线程执行map计算
    # 后面的元祖有3个元素,因此程序启动了3个线程来执行action函数
    results = pool.map(action, (50, 100, 150))
    print("---------------------------------------------")
    for i in results:
        print(i)

可供参考文献:

https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p08_perform_simple_parallel_programming.html

Python标准库 concurrent.futures — 启动并行任务

Python程序中的线程操作-concurrent模块

需要创建多少个线程才算合理

多线程的并发机制, 虽然可以提升程序效率, 但线程个数也不是越多越好。如要找到更优
的线程数量, 可以使用如下方法:
    (1)初始化一定数量的线程。
    (2)在多次实验中递增或递减线程数量, 测试运行性能。
    (3)确定最优的线程数量。

其中的第(1)步初始化线程的个数,可以先查看单个任务的CPU消耗,然后直接乘以百分比。而第(2)步,评估运行性能的方法,从外部观察每秒处理的任务数,算出批处理全部任务所用的时间。

16.2.13. 使用队列实现线程间通信

#!/usr/bin/env python
#-*- coding:utf8 -*-
'''
通常使用于生产者和消费者模式
'''
#导入队列模块
from queue import Queue
from threading import Thread
import time
import random

class Producer(Thread):
    def __init__(self, name, queue):
        Thread.__init__(self, name=name)
        self.data = queue

    def run(self):
        for i in range(5):
            print("生产者{} 将产品{}加入队列".format(self.getName(), i))
            self.data.put(i)
            time.sleep(random.random())

        print("生产者{}完成!".format(self.getName()))


class Consumer(Thread):
    def __init__(self, name, queue):
        Thread.__init__(self, name=name)
        self.data = queue

    def run(self):
        for i in range(5):
            val = self.data.get()
            print("消费者{} 将产品{}从队列中取出".format(self.getName(), val))
            time.sleep(random.random())

        print("消费者{}完成!".format(self.getName()))

if __name__ == '__main__':
    print("主线程开始".center(50, "-"))
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print("主线程结束".center(50, "-"))

输出信息

----------------------主线程开始-----------------------
生产者Producer 将产品0加入队列
消费者Consumer 将产品0从队列中取出
生产者Producer 将产品1加入队列
消费者Consumer 将产品1从队列中取出
生产者Producer 将产品2加入队列
消费者Consumer 将产品2从队列中取出
生产者Producer 将产品3加入队列
消费者Consumer 将产品3从队列中取出
生产者Producer 将产品4加入队列
生产者Producer完成!
消费者Consumer 将产品4从队列中取出
消费者Consumer完成!
----------------------主线程结束-----------------------
'''
队列在进程中的通信
'''
from multiprocessing import Process, Queue  # 导入进程和队列
import time


def write_task(q):
    if not q.full():
        for i in range(5):
            message = "消息" + str(i)
            q.put(message)
            print("写入:{}".format(message))


def read_task(q):
    time.sleep(1)
    while not q.empty():
        print("读取:{}".format(q.get(True, 2)))


if __name__ == '__main__':
    print("---主进程开始-----")
    q = Queue()
    pw = Process(target=write_task, args=(q,))
    pr = Process(target=read_task, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.join()
    print("---主进程结束----")

输出信息

---主进程开始-----
写入:消息0
写入:消息1
写入:消息2
写入:消息3
写入:消息4
读取:消息0
读取:消息1
读取:消息2
读取:消息3
读取:消息4
---主进程结束----
#!/usr/bin/env python
#-*- coding:utf8 -*-
import threading,time
import queue

q = queue.Queue(maxsize=5)       #设置maxsize=5,防止生产过快

def Producer(name):    #生产者
    count = 1
    while True:
        q.put("面包%s" % count)
        print("%s生产了面包%s"%(name,count))
        count +=1
        time.sleep(1)

def Consumer(name):         #消费者
    while True:
        print("[%s] 取到[%s] 并且吃了它..." %(name, q.get()))
        time.sleep(1)

#生成多个线程
p = threading.Thread(target=Producer,args=("derek",))
c = threading.Thread(target=Consumer,args=("chihuo1",))
c1 = threading.Thread(target=Consumer,args=("chihou2",))

p.start()
c.start()
c1.start()

输出信息

derek生产了面包1
[chihuo1] 取到[面包1] 并且吃了它...
derek生产了面包2
[chihuo1] 取到[面包2] 并且吃了它...
derek生产了面包3
[chihou2] 取到[面包3] 并且吃了它...
derek生产了面包4
[chihuo1] 取到[面包4] 并且吃了它...
derek生产了面包5
[chihou2] 取到[面包5] 并且吃了它...
derek生产了面包6
[chihuo1] 取到[面包6] 并且吃了它...
derek生产了面包7
[chihou2] 取到[面包7] 并且吃了它...

16.2.14. 使用Condition实现线程间通信

  • wait(timeout=None):使当前线程释放锁,然后当前线程处于阻塞状态,等待相同条件变量中其他线程唤醒或超时。

  • notify():唤醒相同条件变量中的一个线程;

  • notify_all():唤醒相同条件变量中的所有线程。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/25 9:58
# filename: 8.使用Condition实现线程间通信.py
import threading
import time
import random

# 创建条件变量对象
condition = threading.Condition()


class Stack:
    def __init__(self):
        # 堆栈指针初始值为0
        self.pointer = 0
        # 堆栈有5个数字的空间
        self.data = [-1, -1, -1, -1, -1]

    # 压栈方法
    def push(self, c):
        global condition
        condition.acquire()
        # 堆栈已满,不能压栈
        while self.pointer == len(self.data):
            # 等待其它线程把数据出栈
            condition.wait()
        # 通知其他线程把数据出栈
        condition.notify()
        # 数据压栈
        self.data[self.pointer] = c
        # 指针向上移动
        self.pointer += 1
        condition.release()

    # 出栈方法
    def pop(self):
        global condition
        condition.acquire()
        # 堆栈无数据,不能出栈
        while self.pointer == 0:
            # 等待其他线程把数据压栈
            condition.wait()
        # 通知其他线程压栈
        condition.notify()
        # 指针向下移动
        self.pointer -= 1
        data = self.data[self.pointer]
        condition.release()
        # 数据出栈
        return data


# 创建堆栈Stack对象
stack = Stack()

# 创建堆栈Stack对象

stack = Stack()


# 生产者线程体函数
def producer_thread_body():
    global stack  # 声明为全局变量
    # 产生10个数字
    for i in range(0, 10):
        # 把数字压栈
        stack.push(i)
        # 打印数字
        print('生产者:{0} 开始生产{1}'.format("hujianli", i))
        # 每产生一个数字线程就睡眠
        time.sleep(1)


# 消费者线程体函数
def consumer_thread_body():
    global stack  # 声明为全局变量
    # 从堆栈中读取数字
    for i in range(0, 10):
        # 从堆栈中读取数字
        x = stack.pop()
        # 打印数字
        print('消费者:{0} 开始消费{1}'.format("xiaojian", x))
        # 每消费一个数字线程就睡眠
        time.sleep(1)


# 主函数
def main():
    # 创建生产者线程对象producer
    producer = threading.Thread(target=producer_thread_body)
    # 启动生产者线程
    producer.start()
    # 创建消费者线程对象consumer
    consumer = threading.Thread(target=consumer_thread_body)
    # 启动消费者线程
    consumer.start()


if __name__ == '__main__':
    main()

输出信息

生产者:hujianli 开始生产0
消费者:xiaojian 开始消费0
生产者:hujianli 开始生产1
消费者:xiaojian 开始消费1
生产者:hujianli 开始生产2
消费者:xiaojian 开始消费2

16.2.15. 使用Event实现线程间通信

threading模块提供的Evernt可以实现线程间通信。 * wait(timeout=None)方法:阻塞当前线程,是线程进入等待状态。 * Event对象的set()方法,通知所有等待状态的线程恢复运行。

#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/25 11:50
# filename: 9.使用Event实现线程间通信.py

import threading
import time
import random

# 创建条件变量对象
event = threading.Event()


class Stack:
    def __init__(self):
        # 堆栈指针初始值为0
        self.pointer = 0
        # 堆栈有5个数字的空间
        self.data = [-1, -1, -1, -1, -1]

    # 压栈方法
    def push(self, c):
        global event

        # 堆栈已满,不能压栈
        while self.pointer == len(self.data):
            # 等待其它线程把数据出栈
            event.wait()
        # 通知其他线程把数据出栈
        event.set()
        # 数据压栈
        self.data[self.pointer] = c
        # 指针向上移动
        self.pointer += 1

    # 出栈方法
    def pop(self):
        global event

        # 堆栈无数据,不能出栈
        while self.pointer == 0:
            # 等待其他线程把数据压栈
            event.wait()
        # 通知其他线程压栈
        event.set()
        # 指针向下移动
        self.pointer -= 1

        # 数据出栈
        data = self.data[self.pointer]
        return data

# 创建堆栈Stack对象
stack = Stack()


# 生产者线程体函数
def producer_thread_body():
    global stack  # 声明为全局变量
    # 产生10个数字
    for i in range(0, 10):
        # 把数字压栈
        stack.push(i)
        # 打印数字
        print('生产者:{0} 开始生产{1}'.format("hujianli", i))
        # 每产生一个数字线程就睡眠
        time.sleep(1)


# 消费者线程体函数
def consumer_thread_body():
    global stack  # 声明为全局变量
    # 从堆栈中读取数字
    for i in range(0, 10):
        # 从堆栈中读取数字
        x = stack.pop()
        # 打印数字
        print('消费者:{0} 开始消费{1}'.format("xiaojian", x))
        # 每消费一个数字线程就睡眠
        time.sleep(1)


# 主函数
def main():
    # 创建生产者线程对象producer
    producer = threading.Thread(target=producer_thread_body)
    # 启动生产者线程
    producer.start()
    # 创建消费者线程对象consumer
    consumer = threading.Thread(target=consumer_thread_body)
    # 启动消费者线程
    consumer.start()


if __name__ == '__main__':
    main()

输出信息:

生产者:hujianli 开始生产0
消费者:xiaojian 开始消费0
生产者:hujianli 开始生产1
消费者:xiaojian 开始消费1
生产者:hujianli 开始生产2
消费者:xiaojian 开始消费2

模拟人物对话

#!/usr/bin/env python
#-*- coding:utf8 -*-
import threading
import time
class myThreada(threading.Thread):
    def run(self):
        evt.wait()
        time.sleep(1)
        print(self.name,":Good morning!")
        evt.clear()
        time.sleep(2)
        evt.set()
        time.sleep(2)
        evt.wait()
        print(self.name,":I'm fine,thank you.")

class myThreadb(threading.Thread):
    def run(self):
        print(self.name,":Good moring!")
        evt.set()
        time.sleep(2)
        evt.wait()
        print(self.name,": How are you?")
        evt.clear()
        time.sleep(2)
        evt.set()

evt = threading.Event()
def main():
    John = myThreada()
    John.name = "John"
    Smith = myThreadb()
    Smith.name = 'Smith'
    John.start()
    Smith.start()

if __name__ == '__main__':
    main()

输出信息

Smith :Good moring!
John :Good morning!
Smith : How are you?
John :I'm fine,thank you.