Contents
16.2. 线程¶
线程线程(Thread,有时被称为轻量级进程)跟进程有些相似,不同的是所有线程运行在同一个进程中,共享运行环境。
线程有开始、顺序执行和结束3部分,有一个自己的指令指针,记录运行到什么地方。
线程的运行可能被抢占(中断)或暂时被挂起(睡眠),从而让其他线程运行,这叫作让步。
一个进程中的各个线程之间共享同一块数据空间,所以线程之间可以比进程之间更方便地共享数据和相互通信。
线程一般是并发执行的。正是由于这种并行和数据共享的机制,使得多个任务的合作变得可能。实际上,在单CPU系统中,真正的并发并不可能,每个线程会被安排成每次只运行一小会儿,然后就把CPU让出来,让其他线程运行。
在进程的整个运行过程中,每个线程都只做自己的事,需要时再跟其他线程共享运行结果。多个线程共同访问同一块数据不是完全没有危险的,由于访问数据的顺序不一样,因此有可能导致数据结果不一致的问题,这叫作竞态条件。大多数线程库都带有一系列同步原语,用于控制线程的执行和数据的访问
Python 的标准库提供了两个模块: _thread 和 threading , _thread 是低级模块, threading 是高级模
块,对 _thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。
启动一个线程就是把一个函数传入并创建 Thread 实例,然后调用 start() 开始执行:
16.2.1. 线程初探¶
#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:11
# filename: threading001.py
"""
threading.actice_count():返回当前处于活动状态的线程个数
threading.current_thread():返回当前的Thread对象
threading.main_thread():返回主线程对象,主线程是Python解释器启动的线程
"""
import threading
#当前线程对象
t = threading.current_thread()
#当前线程名
print(t.name)
# 返回当前处于活动状态的线程
print(threading.active_count())
# 主线程名
print(t.name)
输出内容:
MainThread
1
MainThread
16.2.2. 调用Thread类来创建多线程¶
代码示例1¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
import threading
import time
# 新线程执行的代码
def loop():
print("thread {} is running ....".format(threading.current_thread().name))
n = 0
while n < 5:
n = n + 1
print("thread {} >>> {}".format(threading.current_thread().name, n))
time.sleep(1)
print("thread {} ended".format(threading.current_thread().name))
print("thread {} is running ....".format(threading.current_thread().name))
t = threading.Thread(target=loop, name="Loopthread",)
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
代码示例2¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:18
# filename: Thread类实现多线程1.py
import threading
import time
# 线程体函数
def thread_bady():
# 当前线程对象
t = threading.current_thread()
for n in range(5):
# 当前线程名
print("第{}次执行线程:{}".format(n, t.name))
# 线程休眠,如果不休眠,线程对象t1结束后才会执行线程对象t2线程将
time.sleep(1)
print("线程:{}执行完成!".format(t.name))
# 主函数
def main():
# 创建线程对象t1
t1 = threading.Thread(target=thread_bady, name="hu_thread")
# 启动线程t1
t1.start()
# 创建线程对象t2
t2 = threading.Thread(target=thread_bady, name="xiaojian_thread")
# 启动线程t2
t2.start()
if __name__ == '__main__':
main()
输出信息:
第0次执行线程:hu_thread
第0次执行线程:xiaojian_thread
第1次执行线程:hu_thread
第1次执行线程:xiaojian_thread
第2次执行线程:hu_thread
第2次执行线程:xiaojian_thread
第3次执行线程:hu_thread
第3次执行线程:xiaojian_thread
第4次执行线程:hu_thread
第4次执行线程:xiaojian_thread
线程:hu_thread执行完成!
线程:xiaojian_thread执行完成!
#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/17 17:14
# filename: 调用Thread类创建多线程.py
import threading
def action(max):
for i in range(max):
print(threading.current_thread().getName() + " " + str(i))
for i in range(100):
print(threading.current_thread().getName() + " " + str(i))
if i == 20:
#创建并启动第一个线程
t1 = threading.Thread(target=action, args=(10, ))
t1.start()
#创建并启动第二个线程
t2 = threading.Thread(target=action, args=(10,))
t2.start()
print("主线程执行完成!!")
代码示例3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/7/30 22:08
# @filename: exp_thread_1.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import _thread
from time import sleep
from datetime import datetime
date_time_format = "%Y-%M-%d %H:%M:%S"
def date_time_str(date_time):
""" 时间转为字符串"""
return datetime.strftime(date_time, date_time_format)
def loop_one():
print(f"-----线程1开始于:{date_time_str(datetime.now())}")
print("------线程休眠4秒")
sleep(4)
print(f"-----线程1休眠结束,结束于:{date_time_str(datetime.now())}")
def loop_two():
print(f"-----线程2开始于:{date_time_str(datetime.now())}")
print("------线程休眠2秒")
sleep(2)
print(f"-----线程2休眠结束,结束于:{date_time_str(datetime.now())}")
def main():
print(f"----所有线程开始时间:{date_time_str(datetime.now())}")
_thread.start_new_thread(loop_one, ())
_thread.start_new_thread(loop_two, ())
sleep(6)
print(f"-----所有线程结束时间:{date_time_str(datetime.now())}")
if __name__ == '__main__':
main()
输出信息
----所有线程开始时间:2020-14-30 22:14:20
-----线程2开始于:2020-14-30 22:14:20
------线程休眠2秒
-----线程1开始于:2020-14-30 22:14:20
------线程休眠4秒
-----线程2休眠结束,结束于:2020-14-30 22:14:22
-----线程1休眠结束,结束于:2020-14-30 22:14:24
-----所有线程结束时间:2020-14-30 22:14:26
16.2.3. 继承Thread类创建多线程¶
代码示例1
import threading
import time
class MyThreading(threading.Thread):
def __init__(self, conn):
super(MyThreading, self).__init__()
self.conn = conn
def run(self):
print('run task', self.conn)
time.sleep(5)
t1 = MyThreading('t1')
t2 = MyThreading('t2')
t1.start()
t2.start()
代码示例2
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:27
# filename: 继承Thread类创建多线程.py
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name=None):
super(MyThread, self).__init__(name=name)
# 线程体函数
def run(self):
# 当前线程对象
t = threading.current_thread()
for n in range(5):
# 当前线程名
print("第{}此执行线程:{}".format(n, t.name))
# 线程休眠
time.sleep(1)
print("线程{}执行完毕!".format(t.name))
def main():
# 创建线程对象t1
t1 = MyThread(name="t1-thread")
# 启动线程t1
t1.start()
# 创建线程对象t2
t2 = MyThread(name="t2-thread")
# 启动线程t2
t2.start()
if __name__ == '__main__':
main()
输出信息:
第0此执行线程:t1-thread
第0此执行线程:t2-thread
第1此执行线程:t2-thread
第1此执行线程:t1-thread
第2此执行线程:t1-thread
第2此执行线程:t2-thread
第3此执行线程:t2-thread
第3此执行线程:t1-thread
第4此执行线程:t1-thread
第4此执行线程:t2-thread
线程t2-thread执行完毕!
线程t1-thread执行完毕!
#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/17 17:23
# filename: 继承Thread类创建多线程.py
import threading
class FkThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.i = 0
# 重写run()方法作为线程执行体
def run(self):
while self.i < 100:
print(threading.current_thread().getName() + " " + str(self.i))
self.i +=1
for i in range(100):
print(threading.current_thread().getName() + " " + str(i))
if i == 20:
# 启动第一个线程
ft1 = FkThread()
ft1.start()
# 启动第二个线程
ft2 = FkThread()
ft2.start()
16.2.4. 演示deamon属性的作用 后台线程¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 18:27
# filename: 04.deamon属性使用.py
import threading
import time
class myThread(threading.Thread):
def __init__(self, mynum):
super(myThread, self).__init__()
self.mynum = mynum
def run(self):
time.sleep(1)
for i in range(self.mynum, self.mynum + 5):
print(str(i * i) + ";")
def main():
"""
main()主函数运行结束时,ma和mb在后台运行,无法输出运行结果
:return:
"""
print("start............")
ma = myThread(1)
mb = myThread(16)
ma.daemon = True
mb.daemon = True
ma.start()
mb.start()
print("end...........")
if __name__ == '__main__':
main()
"""
start............
end...........
"""
16.2.5. 线程管理¶
等待线程结束¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:37
# filename: 等待线程结束.py
"""
join()方法,当前线程t1调用join()方法时,会阻塞当前线程,等到t1线程结束,如果t1线程结束
或者等待超时,则当前线程回到活动状态继续执行。
join(timeout=None)
参数timeout 设置超时时间,单位是秒。如果没有设置timeout时间,则可以一直等待
使用join()方法的场景是:一个线程依赖另一个线程的运行结果,所以调用另一个线程的join()方法等待它的运行完成
"""
import threading
import time
# 共享变量0
value = 0
# 线程体函数
def thread_body():
global value
# 当前线程对象
print("ThreadA 开始.....")
for n in range(2):
print("ThreadA 执行.......")
value += 1
# 线程休眠
time.sleep(1)
print("ThreadA 结束.......")
def main():
print("主线程 开始........")
t1 = threading.Thread(target=thread_body, name="ThreadA")
# 启动线程
t1.start()
# 主线程被阻塞,等待t1线程结束
t1.join()
print("value = {0}".format(value))
print("主线程 结束.....")
if __name__ == '__main__':
main()
输出信息:
主线程 开始........
ThreadA 开始.....
ThreadA 执行.......
ThreadA 结束.......
ThreadA 执行.......
ThreadA 结束.......
value = 2
主线程 结束.....
线程停止¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 14:51
# filename: 5.线程停止.py
"""
模拟一个下载程序,设置一个停止子进程的停止变量
"""
import threading
import time
# 线程停止变量
isrunning = True
count = 0
# 线程体函数
def thread_body():
while isrunning:
# 线程开始工作
# TODO
global count
count += 1
print("下载中:【{}】.......".format(count), file=open("download.log", "a",encoding="utf-8"))
# 程序休眠
time.sleep(0.5)
print("执行完成!!,执行结果查看:'download.log'")
# 主函数
def main():
# 创建线程对象t1
t1 = threading.Thread(target=thread_body)
# 启动线程t1
t1.start()
# 从键盘停止指令
command = input("请输入停止指令:")
if command == "exit":
global isrunning
isrunning = False
if __name__ == '__main__':
main()
输出信息:
请输入停止指令:exit
执行完成!!,执行结果查看:'download.log'
16.2.6. 线程安全¶
代码示例1
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/7/30 22:08
# @filename: exp_thread_1.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import _thread
from time import sleep
from datetime import datetime
loops = [4, 2]
date_time_format = "%Y-%M-%d %H:%M:%S"
def date_time_str(date_time):
""" 时间转为字符串"""
return datetime.strftime(date_time, date_time_format)
def loop(nloop, n_sec, lock):
print(f"-----线程({nloop})开始于:{date_time_str(datetime.now())},先休眠({n_sec})秒")
sleep(n_sec)
print(f"-----线程({nloop})休眠结束,结束于:{date_time_str(datetime.now())}")
lock.release()
def main():
print("-------所有线程开始执行---------")
locks = []
n_loops = range(len(loops))
for i in n_loops:
lock = _thread.allocate_lock()
lock.acquire()
locks.append(lock)
for i in n_loops:
_thread.start_new_thread(loop, (i, loops[i], locks[i]))
for i in n_loops:
while locks[i].locked():
pass
print(f"----所有线程执行结束:{date_time_str(datetime.now())}")
if __name__ == '__main__':
main()
代码示例2
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 15:12
# filename: 线程安全.py
import threading
import time
class TickDB:
def __init__(self):
# 机票的数量
self.ticket_count = 5
def get_ticket_count(self):
'''
:return: 获得当前机票的数量
'''
return self.ticket_count
def sell_ticket(self, name):
"""
:return: 销售机票
"""
# TODO
# 线程休眠,模拟等待用户付款
time.sleep(1)
self.ticket_count -= 1
if self.ticket_count < 1:
print("机票已经售完,请换乘其他航空..............")
else:
print("第{0}号票,已经售出,【购买者】乘客:{1},还剩下:{2}张票".format(self.ticket_count, name, self.ticket_count - 1))
# 创建TickDB对象
db = TickDB()
# 模拟选票线程体1
def thread_body1(name=None):
# 声明为全局变量
global db
while True:
curr_ticket_count = db.get_ticket_count()
# 查询是否有票
if curr_ticket_count > 0:
db.sell_ticket(name)
else:
print("【{}】 您查询到的结果:无票".format(name))
break
# 模拟选票线程体2
def thread_body2(name=None):
# 声明为全局变量
global db
while True:
curr_ticket_count = db.get_ticket_count()
# 查询是否有票
if curr_ticket_count > 0:
db.sell_ticket(name)
else:
print("【{}】您查询到的结果:无票".format(name))
break
def main():
print("***************************************************************")
print("*************** 欢迎来到XXX航空购票系统 ************************")
print("***************************************************************")
print("----------------------------------- t1开始购票--------------------------------------------------")
# 创建线程对象t1
t1 = threading.Thread(target=thread_body1, args=("t1",))
# 启动线程t1
t1.start()
print("----------------------------------- t2开始购票--------------------------------------------------")
# 创建线程对象t2
t2 = threading.Thread(target=thread_body2, args=("t2",))
# 启动线程t1
t2.start()
if __name__ == '__main__':
main()
输出信息:
***************************************************************
*************** 欢迎来到XXX航空购票系统 ************************
***************************************************************
----------------------------------- t1开始购票--------------------------------------------------
----------------------------------- t2开始购票--------------------------------------------------
第4号票,已经售出,【购买者】乘客:t1,还剩下:3张票
第3号票,已经售出,【购买者】乘客:t2,还剩下:2张票
第2号票,已经售出,【购买者】乘客:t1,还剩下:1张票
第1号票,已经售出,【购买者】乘客:t2,还剩下:0张票
机票已经售完,请换乘其他航空..............
【t2】您查询到的结果:无票
机票已经售完,请换乘其他航空..............
【t1】 您查询到的结果:无票
16.2.7. 什么是互斥锁¶
Lock¶
示例1
#!/usr/bin/env python
# -*- coding:utf8 -*-
# threading.Lock()
# 使用互斥锁可以防止多个线程同时读取内存的某一个区域,互斥锁保证了每个线程同一时间只有一个在使用内存资源
"""
从系统的角度来看。锁的作用其实是将多线程变回到单线程,这是以牺牲性能,来换取程序的准确性。
在代码设计中,应该最大化地避免使用锁。即使加了锁,也要让被保护的区域尽量地少,在满足准确性的同时实现性能最大化。
在代码中,有“加锁”操作,就一定要有与之对应的“解锁”操作,否则代码失去多线程的优势。
在Python中,使用threading.RLock类来创建锁。threading.RLock类有两个方法--acquire与release
* acquire负责开始对代码进行保护,在acquire之后的代码,都将只允许一个线程进行执行。
* release方法用于停止保护(即释放锁资源)。在release之后的代码又恢复到原来的样子,可以被多线程交叉执行。
"""
from threading import Thread, Lock
import time
'''
# 互斥锁的使用
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([blocking])
#释放锁
mutex.release()
'''
# 计数器,总票数
num = 20
def task(arg):
global num # 使用全局变量
mutex.acquire() # 锁定线程,只有1个线程可以抢用
time.sleep(0.5)
num -= 1
print("{}号用户【线程】,购买成功,剩余{}张电影票".format(arg, num))
mutex.release() # 释放,其他线程可以进行操作
if __name__ == '__main__':
mutex = Lock() # 创建锁
t_l = []
for i in range(10):
t = Thread(target=task, args=(i,))
t_l.append(t)
t.start()
for t in t_l:
t.join()
print("main thread end..!")
# 0号用户【线程】,购买成功,剩余19张电影票
# 1号用户【线程】,购买成功,剩余18张电影票
# 2号用户【线程】,购买成功,剩余17张电影票
# 3号用户【线程】,购买成功,剩余16张电影票
# 4号用户【线程】,购买成功,剩余15张电影票
# 5号用户【线程】,购买成功,剩余14张电影票
# 6号用户【线程】,购买成功,剩余13张电影票
# 7号用户【线程】,购买成功,剩余12张电影票
# 8号用户【线程】,购买成功,剩余11张电影票
# 9号用户【线程】,购买成功,剩余10张电影票
# main thread end..!
示例2
#!/usr/bin/env python
# -*- coding:utf8 -*-
import threading
import time
class myTread(threading.Thread):
def run(self):
global x
lock.acquire()
for i in range(3):
x += 10
time.sleep(1)
print("{} result = {}".format(threading.Thread.getName(self), x))
lock.release()
x = 0
lock = threading.RLock()
def main():
thrs = []
for item in range(5):
thrs.append(myTread())
for item in thrs:
item.start()
if __name__ == '__main__':
main()
"""
自定义一个带锁访问全局变量x的线程类myThread,在main()函数中初始化了5个线程来修改变量x,
但同一时刻只能由一个线程对x操作
Thread-1 result = 30
Thread-2 result = 60
Thread-3 result = 90
Thread-4 result = 120
Thread-5 result = 150
"""
代码示例
#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther: 18793
# @Date: 2020/6/23 13:44
# @filename: sample03_lock.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import threading
import time
data = 0
lock = threading.Lock()
def func():
global data
print("{} acquire lock ...".format(threading.currentThread().getName()))
if lock.acquire():
print("{} get the lock".format(threading.currentThread().getName()))
data += 1
time.sleep(2)
print("{} release lock".format(threading.currentThread().getName()))
lock.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
互斥锁航空机票示例¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/23 15:42
# filename: 7.多线程同步、互斥锁.py
import threading
import time
class TickDB:
def __init__(self):
# 机票的数量
self.ticket_count = 5
def get_ticket_count(self):
'''
:return: 获得当前机票的数量
'''
return self.ticket_count
def sell_ticket(self, name):
"""
:return: 销售机票
"""
# TODO
# 线程休眠,模拟等待用户付款
time.sleep(1)
self.ticket_count -= 1
if self.ticket_count < 1:
print("机票已经售完,请换乘其他航空..............")
else:
print("第{0}号票,已经售出,【购买者】乘客:{1},还剩下:{2}张票".format(self.ticket_count, name, self.ticket_count - 1))
# 创建TickDB对象
db = TickDB()
# 创建lock对象
lock = threading.Lock()
# 模拟选票线程体1
def thread_body1(name=None):
# 声明为全局变量
global db
global lock
while True:
# 看这里!!开始锁定,加上小锁
lock.acquire()
curr_ticket_count = db.get_ticket_count()
# 查询是否有票
if curr_ticket_count > 0:
db.sell_ticket(name)
else:
# 看这里,解锁,放开锁定
lock.release()
print("【{}】 您查询到的结果:无票".format(name))
break
# 解锁
lock.release()
time.sleep(1)
# 模拟选票线程体2
def thread_body2(name=None):
# 声明为全局变量
global db
global lock
while True:
# 开始锁定,加上小锁
lock.acquire()
curr_ticket_count = db.get_ticket_count()
# 查询是否有票
if curr_ticket_count > 0:
db.sell_ticket(name)
else:
# 看这里,解锁,放开锁定
lock.release()
print("【{}】您查询到的结果:无票".format(name))
break
#解锁
lock.release()
time.sleep(1)
def main():
print("***************************************************************")
print("*************** 欢迎来到XXX航空购票系统 ************************")
print("***************************************************************")
print("----------------------------------- t1开始购票--------------------------------------------------")
# 创建线程对象t1
t1 = threading.Thread(target=thread_body1, args=("t1",))
# 启动线程t1
t1.start()
print("----------------------------------- t2开始购票--------------------------------------------------")
# 创建线程对象t2
t2 = threading.Thread(target=thread_body2, args=("t2",))
# 启动线程t1
t2.start()
if __name__ == '__main__':
main()
***************************************************************
*************** 欢迎来到XXX航空购票系统 ************************
***************************************************************
----------------------------------- t1开始购票--------------------------------------------------
----------------------------------- t2开始购票--------------------------------------------------
第4号票,已经售出,【购买者】乘客:t1,还剩下:3张票
第3号票,已经售出,【购买者】乘客:t2,还剩下:2张票
第2号票,已经售出,【购买者】乘客:t1,还剩下:1张票
第1号票,已经售出,【购买者】乘客:t2,还剩下:0张票
机票已经售完,请换乘其他航空..............
【t2】您查询到的结果:无票
【t1】 您查询到的结果:无票
使用Thread对象的Lock和RLock可以实现简单的线程同步,
这两个对象都有acquire方法和release方法。
对于每次只允许一个线程操作的数据,可以将操作放到acquire和release方法之间。
多线程的优势在于可以同时运行多个任务,但当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况:一个列表里所有元素都是0,线程set从后向前把所有元素改成1,
而线程print负责从前往后读取列表并输出。
代码示例¶
#!/usr/bin/env python
#-*- coding:utf8 -*-
import threading
from time import sleep
from datetime import datetime
date_time_format = '%y-%M-%d %H:%M:%S'
class MyThread(threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("开启线程:" + self.name)
#获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
#释放锁,开启下一个线程
threadLock.release()
def date_time_str(date_time):
return datetime.strftime(date_time, date_time_format)
def print_time(threadName, delay, counter):
while counter:
sleep(delay)
print("{} {}".format(threadName, date_time_str(datetime.now())))
counter -=1
def main():
#创建新线程
thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 3)
#开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
#等待所有线程完成
for t in threads:
t.join()
print("退出主线程.......")
if __name__ == '__main__':
threadLock = threading.Lock()
threads = []
main()
RLock¶
RLockRLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther: 18793
# @Date: 2020/6/23 13:44
# @filename: sample03_lock.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import threading
import time
lock = threading.RLock()
def func():
# 第一次请求锁
print("{} acquire lock ...".format(threading.currentThread().getName()))
if lock.acquire():
print("{} get the lock".format(threading.currentThread().getName()))
time.sleep(2)
# 第二次请求锁
print("{} acquire lock agin...".format(threading.currentThread().getName()))
if lock.acquire():
print("{} get the lock".format(threading.currentThread().getName()))
time.sleep(2)
# 第一次释放锁
print("{} release lock ....".format(threading.currentThread().getName()))
lock.release()
time.sleep(2)
# 第二次释放锁
print("{} release lock agin ....".format(threading.currentThread().getName()))
lock.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
16.2.8. 使用信号量同步多线程之间的执行顺序¶
1.纯粹的信号量(Semphore)
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 18:40
# filename: 05.使用信号量同步多线程之间的执行顺序.py
'''
信号量(semaphore)是一种带计数的线程同步机制,调用release函数时,计数器加1,调用acquire函数时,计数器减1.
当计数为0时,线程会自动阻塞,等待release被调用。
Python中存在两种信号量:
1.纯粹的信号量(Semphore)
2.带有边界的信号量(BoundedSemaphore)
1.纯粹的信号量(Semphore): 在调用release函数时,单纯地将计数器加1,不会检查加1后计算器是否超过上限
2.带有边界的信号量(BoundedSemaphore):在调用release函数时,会检查计数器是否超过上限,对计数器的上限进行校验,是一个更加安全的机制。
'''
import threading
import time
import random
semaphore = threading.Semaphore(0) # 创建信号量
def consumer():
"""
消费者
:return:
"""
print("consumer: 挂起...")
semaphore.acquire() # 计数器减1
print("consumer:消费 {}".format(item))
def producer():
global item # 定义商品编号
time.sleep(3)
item = random.randint(1, 1000) # 产生随机数并赋值给全局变量--商品编号
print("producer :生产 {}".format(item))
semaphore.release() # 计数器加1
threads = [] # 定义列表收集线程
for i in range(0, 2): # 循环完成生产者与消费者线程的建立
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
threads.append(t1)
threads.append(t2)
for t in threads:
t.join()
'''
consumer: 挂起...
producer :生产 694
consumer:消费 694
consumer: 挂起...
producer :生产 939
consumer:消费 939
'''
2.带有边界的信号量(BoundedSemaphore)
将上述代码中的semaphore = threading.Semaphore(0)改为
semaphore = threading.BoundedSemaphore(2) # 创建信号量为2.初始的时候item就有2个,消费者的可以消费的item有2个,
对item的取值进行判断,过滤掉初始值
需要将全局变量item放在外部。
global item # 定义商品编号
item = random.randint(1, 1000) # 产生随机数并赋值给全局变量--商品编号
16.2.9. 创建定时触发程序¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:12
# filename: 06.创建定时触发程序.py
import threading
import time
def timer1_headle():
print("1 Timer headle!") # 定时触发函数
def timer3_headle():
print("3 Timer headle!") # 定时触发函数
timer1 = threading.Timer(1, timer1_headle) # 实例化定时器线程,1s后执行线程处理函数
timer3 = threading.Timer(3, timer3_headle) # 实例化定时器线程,3s后执行线程处理函数
timer1.start()
timer3.start()
"""
1 Timer headle!
3 Timer headle!
"""
16.2.10. 循环定时触发程序¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:12
# filename: 07.循环定时触发程序.py
import threading
import time
def loop_timer_headle():
'''
定时循环触发函数
:return:
'''
print("Timer headle!")
global timer2
timer2 = threading.Timer(1, loop_timer_headle) # 创建定时器
timer2.start()
timer2 = threading.Timer(1, loop_timer_headle)
timer2.start()
"""
Timer headle!
Timer headle!
Timer headle!
.......
"""
16.2.11. 设置定时间隔和结束定时器¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:12
# filename: 07.循环定时触发程序.py
import threading
import time
n = 0
def loop_timer_headle():
'''
定时循环触发函数
:return:
'''
print("Timer headle!")
time.sleep(2)
global n
n += 1
global timer2
timer2 = threading.Timer(1, loop_timer_headle) # 创建定时器
timer2.start()
if n == 3:
timer2.cancel() # 结束定时器
print("循环了3次了,要退出了...........")
timer2 = threading.Timer(1, loop_timer_headle)
timer2.start()
'''
Timer headle!
Timer headle!
Timer headle!
循环了3次了,要退出了...........
'''
16.2.12. 使用线程池提升运行效率¶
线程池实现主机端口扫描实例¶
本例使用的多线程模块需要用命令“pip3 install threadpool”进行安装。
#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther: 18793
# @Date: 2020/6/22 11:27
# @filename: sample01.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import threadpool
import os
import socket
def scan_port(num):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
ports = num * 4096
thread_name = 'thread' + str(num)
for port in range(ports, ports + 4096):
result = s.connect_ex((ip, port))
if result == 0:
print("I am %s,port %d is openned!" % (thread_name, port))
s.close()
ip = "127.0.0.1"
p = threadpool.ThreadPool(16)
num_list = list(range(16))
tasks = threadpool.makeRequests(scan_port, num_list)
for task in tasks:
p.putRequest(task)
p.wait()
print("All subprocesses had finished!")
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 19:29
# filename: 08.使用线程池提升运行效率.py
"""
在需要频繁创建线程的系统中, 一般都会使用线程池技术。原因有两点:
1.每一个线程的创建都是需要占用系统资源的, 是一件相对耗时的事情。同样在销毁线程时还需要回收线程资源。
线程池技术, 可以省去创建与回收过程中所浪费的系统开销。
2.在某些系统中需要为每个子任务来创建对应的线程(例如爬虫系统中的子链接)。
这种情况会导致线程数量失控性暴涨, 直到程序崩溃。线程池技术可以很好地固定线程的数量保持程序稳定。
实现线程池
Python中,使用conncurrent.futures 模块下的ThreadPoolExecutor 类来实现线程池。在实例化时, 会将需要的线程个数传入。
系统就会为该线程池初始化相应个数的线程。线程池的使用有两种方式。
* 抢占式: 线程池中的线程执行顺序不固定。该方式使用ThreadPooIExecutor 的submit方法实现。
* 非抢占式: 线程将按照调用的顺序执行。此方式使用ThreadPoolExecutor 的map方法来实现。
从使用角度来看: 抢占式更灵活; 非抢占式更严格。
· 抢占式, 允许池中线程的处理函数不一样。如执行过程中某个线程出现异常, 也不影响其他线程。
· 非抢占式, 要求线程池中的线程必须执行同样的处理函数。而且一旦某个线程出现异常,其他线程也会停止。
"""
from concurrent.futures import ThreadPoolExecutor
import time
def printperson(p):
'''
定义线程池处理函数
:param p:
:return:
'''
print(p)
time.sleep(2)
person = ["hujianli1", "hujianli2", "hujianli3"]
start_time = time.time()
for p in person:
printperson(p)
end_time = time.time()
printperson("all spend time :{}".format(end_time - start_time))
"""
hujianli1
hujianli2
hujianli3
all spend time :6.00168251991272
"""
实现抢占线程池
start2 = time.time()
with ThreadPoolExecutor(3) as executor:
for p in person:
executor.submit(printperson, p)
end2 = time.time()
printperson("all spend time :{}".format(end2 - start2))
"""
hujianli1
hujianli2
hujianli3
all spend time :2.0018222332000732
"""
实现非抢占线程池
start3 = time.time()
with ThreadPoolExecutor(3) as executorl:
executorl.map(printperson, person)
end3 = time.time()
printperson("all spend time :{}".format(end3 - start3))
"""
hujianli1
hujianli2
hujianli3
all spend time :2.001864433288574
"""
代码示例
from concurrent.futures import ThreadPoolExecutor
from threading import Thread, currentThread
from time import time
def task(i):
print("{} 在执行任务{}".format(currentThread().name, i))
time.sleep(1)
if __name__ == '__main__':
pool = ThreadPoolExecutor(4) # 进程池里有4个进程
for i in range(20): # 20个任务
pool.submit(task, i) # 进程池里当前执行的任务i,池子里的4个进程一次一次执行任务
抢占模式+回调函数
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/1/14 22:19
# filename: 09-1.线程池01.py
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + " " + str(i))
my_sum += 1
return my_sum
# 创建一个包含两个线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池中提交一个任务,50作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池中再提交一个任务,100作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数,该函数在线程任务结束时获取其返回值
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数,该函数在线程任务结束时获取其返回值
future2.add_done_callback(get_result)
print("------------------------------------")
使用map()方法启动线程,并收集线程任务的返回值。
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/1/14 22:19
# filename: 09-1.线程池02.py
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + " " + str(i))
my_sum += 1
return my_sum
# 创建一个包含4个线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面的元祖有3个元素,因此程序启动了3个线程来执行action函数
results = pool.map(action, (50, 100, 150))
print("---------------------------------------------")
for i in results:
print(i)
可供参考文献:
Python标准库 concurrent.futures — 启动并行任务
需要创建多少个线程才算合理
多线程的并发机制, 虽然可以提升程序效率, 但线程个数也不是越多越好。如要找到更优
的线程数量, 可以使用如下方法:
(1)初始化一定数量的线程。
(2)在多次实验中递增或递减线程数量, 测试运行性能。
(3)确定最优的线程数量。
其中的第(1)步初始化线程的个数,可以先查看单个任务的CPU消耗,然后直接乘以百分比。而第(2)步,评估运行性能的方法,从外部观察每秒处理的任务数,算出批处理全部任务所用的时间。
16.2.13. 使用队列实现线程间通信¶
#!/usr/bin/env python
#-*- coding:utf8 -*-
'''
通常使用于生产者和消费者模式
'''
#导入队列模块
from queue import Queue
from threading import Thread
import time
import random
class Producer(Thread):
def __init__(self, name, queue):
Thread.__init__(self, name=name)
self.data = queue
def run(self):
for i in range(5):
print("生产者{} 将产品{}加入队列".format(self.getName(), i))
self.data.put(i)
time.sleep(random.random())
print("生产者{}完成!".format(self.getName()))
class Consumer(Thread):
def __init__(self, name, queue):
Thread.__init__(self, name=name)
self.data = queue
def run(self):
for i in range(5):
val = self.data.get()
print("消费者{} 将产品{}从队列中取出".format(self.getName(), val))
time.sleep(random.random())
print("消费者{}完成!".format(self.getName()))
if __name__ == '__main__':
print("主线程开始".center(50, "-"))
queue = Queue()
producer = Producer('Producer', queue)
consumer = Consumer('Consumer', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print("主线程结束".center(50, "-"))
输出信息
----------------------主线程开始-----------------------
生产者Producer 将产品0加入队列
消费者Consumer 将产品0从队列中取出
生产者Producer 将产品1加入队列
消费者Consumer 将产品1从队列中取出
生产者Producer 将产品2加入队列
消费者Consumer 将产品2从队列中取出
生产者Producer 将产品3加入队列
消费者Consumer 将产品3从队列中取出
生产者Producer 将产品4加入队列
生产者Producer完成!
消费者Consumer 将产品4从队列中取出
消费者Consumer完成!
----------------------主线程结束-----------------------
'''
队列在进程中的通信
'''
from multiprocessing import Process, Queue # 导入进程和队列
import time
def write_task(q):
if not q.full():
for i in range(5):
message = "消息" + str(i)
q.put(message)
print("写入:{}".format(message))
def read_task(q):
time.sleep(1)
while not q.empty():
print("读取:{}".format(q.get(True, 2)))
if __name__ == '__main__':
print("---主进程开始-----")
q = Queue()
pw = Process(target=write_task, args=(q,))
pr = Process(target=read_task, args=(q,))
pw.start()
pr.start()
pw.join()
pr.join()
print("---主进程结束----")
输出信息
---主进程开始-----
写入:消息0
写入:消息1
写入:消息2
写入:消息3
写入:消息4
读取:消息0
读取:消息1
读取:消息2
读取:消息3
读取:消息4
---主进程结束----
#!/usr/bin/env python
#-*- coding:utf8 -*-
import threading,time
import queue
q = queue.Queue(maxsize=5) #设置maxsize=5,防止生产过快
def Producer(name): #生产者
count = 1
while True:
q.put("面包%s" % count)
print("%s生产了面包%s"%(name,count))
count +=1
time.sleep(1)
def Consumer(name): #消费者
while True:
print("[%s] 取到[%s] 并且吃了它..." %(name, q.get()))
time.sleep(1)
#生成多个线程
p = threading.Thread(target=Producer,args=("derek",))
c = threading.Thread(target=Consumer,args=("chihuo1",))
c1 = threading.Thread(target=Consumer,args=("chihou2",))
p.start()
c.start()
c1.start()
输出信息
derek生产了面包1
[chihuo1] 取到[面包1] 并且吃了它...
derek生产了面包2
[chihuo1] 取到[面包2] 并且吃了它...
derek生产了面包3
[chihou2] 取到[面包3] 并且吃了它...
derek生产了面包4
[chihuo1] 取到[面包4] 并且吃了它...
derek生产了面包5
[chihou2] 取到[面包5] 并且吃了它...
derek生产了面包6
[chihuo1] 取到[面包6] 并且吃了它...
derek生产了面包7
[chihou2] 取到[面包7] 并且吃了它...
16.2.14. 使用Condition实现线程间通信¶
wait(timeout=None):使当前线程释放锁,然后当前线程处于阻塞状态,等待相同条件变量中其他线程唤醒或超时。
notify():唤醒相同条件变量中的一个线程;
notify_all():唤醒相同条件变量中的所有线程。
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/25 9:58
# filename: 8.使用Condition实现线程间通信.py
import threading
import time
import random
# 创建条件变量对象
condition = threading.Condition()
class Stack:
def __init__(self):
# 堆栈指针初始值为0
self.pointer = 0
# 堆栈有5个数字的空间
self.data = [-1, -1, -1, -1, -1]
# 压栈方法
def push(self, c):
global condition
condition.acquire()
# 堆栈已满,不能压栈
while self.pointer == len(self.data):
# 等待其它线程把数据出栈
condition.wait()
# 通知其他线程把数据出栈
condition.notify()
# 数据压栈
self.data[self.pointer] = c
# 指针向上移动
self.pointer += 1
condition.release()
# 出栈方法
def pop(self):
global condition
condition.acquire()
# 堆栈无数据,不能出栈
while self.pointer == 0:
# 等待其他线程把数据压栈
condition.wait()
# 通知其他线程压栈
condition.notify()
# 指针向下移动
self.pointer -= 1
data = self.data[self.pointer]
condition.release()
# 数据出栈
return data
# 创建堆栈Stack对象
stack = Stack()
# 创建堆栈Stack对象
stack = Stack()
# 生产者线程体函数
def producer_thread_body():
global stack # 声明为全局变量
# 产生10个数字
for i in range(0, 10):
# 把数字压栈
stack.push(i)
# 打印数字
print('生产者:{0} 开始生产{1}'.format("hujianli", i))
# 每产生一个数字线程就睡眠
time.sleep(1)
# 消费者线程体函数
def consumer_thread_body():
global stack # 声明为全局变量
# 从堆栈中读取数字
for i in range(0, 10):
# 从堆栈中读取数字
x = stack.pop()
# 打印数字
print('消费者:{0} 开始消费{1}'.format("xiaojian", x))
# 每消费一个数字线程就睡眠
time.sleep(1)
# 主函数
def main():
# 创建生产者线程对象producer
producer = threading.Thread(target=producer_thread_body)
# 启动生产者线程
producer.start()
# 创建消费者线程对象consumer
consumer = threading.Thread(target=consumer_thread_body)
# 启动消费者线程
consumer.start()
if __name__ == '__main__':
main()
输出信息
生产者:hujianli 开始生产0
消费者:xiaojian 开始消费0
生产者:hujianli 开始生产1
消费者:xiaojian 开始消费1
生产者:hujianli 开始生产2
消费者:xiaojian 开始消费2
16.2.15. 使用Event实现线程间通信¶
threading模块提供的Evernt可以实现线程间通信。 * wait(timeout=None)方法:阻塞当前线程,是线程进入等待状态。 * Event对象的set()方法,通知所有等待状态的线程恢复运行。
#!/usr/bin/env python
#-*- coding:utf8 -*-
# auther; 18793
# Date:2019/6/25 11:50
# filename: 9.使用Event实现线程间通信.py
import threading
import time
import random
# 创建条件变量对象
event = threading.Event()
class Stack:
def __init__(self):
# 堆栈指针初始值为0
self.pointer = 0
# 堆栈有5个数字的空间
self.data = [-1, -1, -1, -1, -1]
# 压栈方法
def push(self, c):
global event
# 堆栈已满,不能压栈
while self.pointer == len(self.data):
# 等待其它线程把数据出栈
event.wait()
# 通知其他线程把数据出栈
event.set()
# 数据压栈
self.data[self.pointer] = c
# 指针向上移动
self.pointer += 1
# 出栈方法
def pop(self):
global event
# 堆栈无数据,不能出栈
while self.pointer == 0:
# 等待其他线程把数据压栈
event.wait()
# 通知其他线程压栈
event.set()
# 指针向下移动
self.pointer -= 1
# 数据出栈
data = self.data[self.pointer]
return data
# 创建堆栈Stack对象
stack = Stack()
# 生产者线程体函数
def producer_thread_body():
global stack # 声明为全局变量
# 产生10个数字
for i in range(0, 10):
# 把数字压栈
stack.push(i)
# 打印数字
print('生产者:{0} 开始生产{1}'.format("hujianli", i))
# 每产生一个数字线程就睡眠
time.sleep(1)
# 消费者线程体函数
def consumer_thread_body():
global stack # 声明为全局变量
# 从堆栈中读取数字
for i in range(0, 10):
# 从堆栈中读取数字
x = stack.pop()
# 打印数字
print('消费者:{0} 开始消费{1}'.format("xiaojian", x))
# 每消费一个数字线程就睡眠
time.sleep(1)
# 主函数
def main():
# 创建生产者线程对象producer
producer = threading.Thread(target=producer_thread_body)
# 启动生产者线程
producer.start()
# 创建消费者线程对象consumer
consumer = threading.Thread(target=consumer_thread_body)
# 启动消费者线程
consumer.start()
if __name__ == '__main__':
main()
输出信息:
生产者:hujianli 开始生产0
消费者:xiaojian 开始消费0
生产者:hujianli 开始生产1
消费者:xiaojian 开始消费1
生产者:hujianli 开始生产2
消费者:xiaojian 开始消费2
模拟人物对话¶
#!/usr/bin/env python
#-*- coding:utf8 -*-
import threading
import time
class myThreada(threading.Thread):
def run(self):
evt.wait()
time.sleep(1)
print(self.name,":Good morning!")
evt.clear()
time.sleep(2)
evt.set()
time.sleep(2)
evt.wait()
print(self.name,":I'm fine,thank you.")
class myThreadb(threading.Thread):
def run(self):
print(self.name,":Good moring!")
evt.set()
time.sleep(2)
evt.wait()
print(self.name,": How are you?")
evt.clear()
time.sleep(2)
evt.set()
evt = threading.Event()
def main():
John = myThreada()
John.name = "John"
Smith = myThreadb()
Smith.name = 'Smith'
John.start()
Smith.start()
if __name__ == '__main__':
main()
输出信息
Smith :Good moring!
John :Good morning!
Smith : How are you?
John :I'm fine,thank you.