线程
操作系统进行资源调度的最小单位。一个进程拥有至少一个线程。线程共享进程的内存空间(区别于进程,进程的空间彼此相互独立)。
线程创建方式
- 为Thread实例指定target参数
# coding: utf-8
import threading
import random
import time
def work():
for i in range(3):
# 输出(当前线程名称,i)
print(threading.current_thread().getName(), i)
# 让线程睡眠0.1秒,并让操作系统重新调度
time.sleep(0.1)
if __name__ == "__main__":
# 创建线程: target指定线程执行的方法
t1 = threading.Thread(target=work, name="T1")
t2 = threading.Thread(target=work, name="T2")
# 线程进入就绪状态,等待操作系统进行调度
t1.start()
t2.start()
# 输出结果
# T1 0
# T2 0
# T2 1
# T1 1
# T2 2
# T1 2
- 继承Thread类
# coding: utf-8
import threading
import random
import time
class T(threading.Thread):
# 重写父类__init__方法:可以在__init__方法中指定target,name,args,kwargs参数
# 指定了这几个参数后可以使用T(target=fuc)的方式创建线程
def __init__(self, thread_name):
threading.Thread.__init__(self)
self.name = thread_name
# 重写run方法
# 当实例化T后,没有为T制定target参数时,调用示例的start方法会执行run方法
def run(self):
for i in range(3):
print(self.name, i)
time.sleep(0.1)
if __name__ == "__main__":
# 实例化两个线程对象,并制定线程名称
t1 = T("T1")
t2 = T("T2")
# 线程进入就绪状态,等待操作系统调度
t1.start()
t2.start()
# 输出结果
# T1 0
# T2 0
# T2 1
# T1 1
# T2 2
# T1 2
线程安全
CPU调度线程的方式是: 轮流执行,每次执行的时间称为一个时间片。当程序在时间片内执行完毕或者时间片到期则换出当前线程,重新选择一个新的线程执行。由于线程共享进程空间,当一个进程拥有多个线程时,共享变量如果会因为线程的无序而出现覆盖的情况则称为是非线程安全,否则称为是线程安全的。在高并发的场景下非线程安全会引起业务上的问题。下面以一个并发购买商品的场景为例说明这个问题。
# coding: utf-8
import threading
import time
import random
def create_order():
"""
模拟下单
"""
global order
# 模拟下单的耗时
time.sleep(random.random())
order = order + 1
def acc():
"""
模拟商品扣减
思路:
1. 先判断商品库存是否足够
2. 创建订单
3. 构建商品库存
"""
global num
# 执行代码前,先睡眠, 让CPU调度
time.sleep(random.random())
# 防止超卖: 在没有锁的情况下, 会不起作用
if num > 0:
create_order()
num = num - 1
def buy():
"""
模拟连接请求:每个连接发送10次请求
"""
for i in range(10):
acc()
if __name__ == "__main__":
# 模拟5个并发连接,每个连接请求10次的场景
# 商品库存
num = 1
# 订单数量
order = 0
# 模拟5个并发连接
for i in range(5):
t = threading.Thread(target=buy)
t.start()
# 主线程等待所有的子线程结束后,打印num和order变量值
while True:
if threading.active_count() == 1:
print(num, order)
break
else:
pass
# 多次测试输出的结果
# -3 4
# -4 5
# 0 1
从输出结果可以看出,虽然加了限制超卖的代码,但是结果依然会出现超卖的情况。
- 普通锁(threading.Lock)
if num > 0:
create_order()
num = num - 1
# 以上代码片段称为临界资源,由多条语句组成,每个线程在执行到某一行代码时,可能会被换出CPU,这样就会发生多个线程同时在访问临界资源的情况。如果不加以限制,会出现变量值被覆盖的问题。如果在线程在访问临界资源时,先竞争一个锁,只有获取到锁的线程才能访问临界资源,访问完临界资源后再释放掉锁。这样也就可以保证同时只有一个线程访问临界资源。示例代码如下:
def acc(lock):
...
lock.acquire()
if num > 0:
create_order()
num = num - 1
lock.release()
def buy(lock):
"""
模拟连接请求:每个连接发送10次请求
"""
for i in range(10):
acc(lock)
if __name__ == "__main__":
...
lock = threading.Lock()
# 模拟5个并发连接
for i in range(5):
t = threading.Thread(target=buy, args=(lock,))
- 递归锁(threading.RLock)
普通锁只能申请一个资源对象,当线程想申请多个资源时需要使用递归锁。递归锁允许一个线程可以申请多个资源。每调用acquire一次相当于对一个资源进行上锁操作。下面以吃水果为例说明RLock的用法:有一副刀和叉。吃水果必须同时拿到刀和叉。
# coding: utf-8
import threading
from threading import Thread,RLock
import time
import random
def eat_fruit(lock):
for i in range(5):
# 申请"刀"资源
lock.acquire()
print(threading.current_thread().getName(),"拿刀")
# 如果采用Lock的话,上面申请"刀"后,再次调用会阻塞当前线程,导致死锁问题
lock.acquire()
print(threading.current_thread().getName(),"拿叉")
lock.release()
print(threading.current_thread().getName(),"释放叉")
lock.release()
print(threading.current_thread().getName(),"释放刀")
if __name__ == "__main__":
lock = RLock()
p1 = Thread(target=eat_fruit, name="T1", args=(lock, ))
p1.start()
while True:
if threading.active_count() == 1:
print("程序结束")
break
else:
pass
线程同步
- 条件对象(threading.Condition)
使用流程:
1. 调用Condition对象acquire方法获取锁
2. 如果满足一定条件执行逻辑,然后使用notify通知其它线程,否则使用wait将当前线程阻塞等待其它线程通知。
# coding:utf-8
import threading
import random
def prod(cv):
"""
生产者线程
"""
global msg
global count
while count > 0:
# 获取锁
if cv.acquire():
# 如果队列中消息还没有消费,阻塞当前线程,等待消费者通知
if msg:
cv.wait()
else:
# 模拟向队列中批量写入消息
for i in range(3):
s = random.randint(1,10)
print("(",count,")发送消息:", s)
msg.append(s)
# 消息写入完毕后通知消费者线程
cv.notify()
cv.release()
def consumer(cv):
"""
消费者线程
"""
global msg
global count
while count > 0:
# 获取锁
if cv.acquire():
# 当队列中存在消息时,进行消费
if msg:
while msg:
print("(",count,")接收到消息:", msg.pop())
count = count - 1
# 消费完通知生产者线程继续生产消息
cv.notify()
else:
# 如果队列中没有消息,阻塞当前线程等待生产者产生消息
cv.wait()
cv.release()
if __name__ == "__main__":
# 模拟生产者向队列中写消息,消费者从队列中进行消费的场景
# 生产者产生消息的次数
count = 2
# 保存消息的队列
msg = []
# 条件对象,用于线程同步
cv = threading.Condition()
p = threading.Thread(target=prod, args=(cv,))
c = threading.Thread(target=consumer, args=(cv,))
p.start()
c.start()
# 输出结果
# ( 2 )发送消息: 2
# ( 2 )发送消息: 6
# ( 2 )发送消息: 4
# ( 2 )接收到消息: 4
# ( 2 )接收到消息: 6
# ( 2 )接收到消息: 2
# ( 1 )发送消息: 6
# ( 1 )发送消息: 1
# ( 1 )发送消息: 8
# ( 1 )接收到消息: 8
# ( 1 )接收到消息: 1
# ( 1 )接收到消息: 6
- 事件对象(threading.Event)
一个线程发出事件信号,而其他线程等待该信号。
使用流程:
1. 创建threading.Event对象
2. 如果满足一定条件执行逻辑,然后使用set发送事件信号,此时其它线程阻塞中的线程将会被唤醒,否则使用wait将当前线程阻塞。
# coding:utf-8
import threading
import time
class Teacher(threading.Thread):
def __init__(self, eve):
super(Teacher, self).__init__()
self.e = eve
def run(self):
# 老师需要首先提问,所以线程被调度进CPU后直接执行
print("Teacher:1 + 1?")
# 提出问题后向Stu线程发送事件信号
self.e.set()
# 发送完信号后,阻塞线程等待学生回答
self.e.wait()
# 学生回答完问题后被唤醒,再次提问题
print("Teacher:1 + 2?")
# 发送完信号后结束程序
self.e.set()
class Stu(threading.Thread):
def __init__(self, eve):
super(Stu, self).__init__()
self.e = eve
def run(self):
# 如果Stu线程先被执行时,由于老师还没有提问,所以先将线程阻塞,等待信号唤醒
self.e.wait()
# 回答老师提出的问题
print("Stu:2")
# 向老师发送一回答问题的事件信号
self.e.set()
# 阻塞线程,等待老师再次提问
self.e.wait()
# 最后一次回答问题,回答完结束程序
print("Stu: 3")
if __name__ == "__main__":
# 模拟老师提问,学生回答问题的场景
# 由于set方法是唤醒所有其他睡眠的线程,所以Event对象适合处理两个线程协同的业务
# Teacher和Stu线程运行时,保证Teacher先执行,Stu收到Teacher的事件信号后再执行。
event = threading.Event()
t = Teacher(event)
s = Stu(event)
t.start()
s.start()
# 输出结果
# Teacher:1 + 1?
# Teacher:1 + 2?
# Stu:2
# Stu: 3