一个线程启动后,其会自行运行。但如果希望它们能同步运行,应该怎么做呢?
举个简单的例子,有两个线程 A 和 B,A 负责从网络上读取数据,保持到变量 X 中,B 负责处理变量X中的数据,这时线程 B 就需要和 A 同步。也就是说 B 需要等 A 给其一个信号,其才可以开始去做自己的事情。同样,B 完成了任务后也需要通知 A,告诉 A 变量 X 中间的数据已经处理完了,可以将新的数据放入 X 了。
图 1 表示了这个过程:
实现线程同步的方式有很多种,下面分别进行介绍。
1、thread.Lock线程锁
借助锁,可以得到一些排他的资源。例如,为某个资源 A 加上锁 L,如果要使用该资源,则必须得到锁 L,这个锁可以保证在任意时候,只有一个线程可以得到它。其他线程如果想得到已经被别的线程得到的锁,只能等待锁的拥有者主动释放锁。
这个锁类提供了 acquire() 和 release() 两个接口函数。release() 表示某线程已经完成了任务,其他线程可以开始自己的工作了;acquire() 表示某线程计划做某个工作,请在可以开始时通知它。
所以做某件工作可以用下面三步构成:
- acquire():等条件成熟了告诉某线程。
- do_the work():条件成熟了,开始工作。
- release():工作完成,告诉其他线程可以开始工作了。
现在回到前面的例子上,我们需要两个锁。一个用来写变量 X,用 write_lock 来表示;一个用来读变量 X,用 read_lock 来表示。写变量 X 的过程如下:
write_lock.acquire()
X=var
read_lock.release()
读变量 X 的过程如下:
read_lock.acquire()
var=X
write_lock.release()
下面是完整的代码。
import sys, time # 引入时间库
if sys.version_info.major == 2: # Python 2
import thread
else: # Python 3
import _thread as thread # 创建两个锁,一个用来读,一个用来写
read_lock = thread.allocate_lock()
write_lock = thread.allocate_lock()
X = 0 # 变量X,用来保存两个线程之间交换的数据
def write_thread_entry(): # 写线程的入口函数
global X, read_lock, write_lock
for i in range(2, 10, 1):
write_lock.acquire()
X = i
read_lock.release()
def read_thread_entry(): # 读线程的入口函数
global X, read_lock, write_lock
while True:
read_lock.acquire()
print("Processing X = %d" % X)
write_lock.release()
def start_threads(): # 启动线程
read_lock.acquire() # read_lock处于被占用状态
t1 = thread.start_new_thread(write_thread_entry, tuple())
t2 = thread.start_new_thread(read_thread_entry, tuple())
time.sleep(5)
if __name__=='__main__': # 如果是运行该脚本而不是import该文件
start_threads()
运行结果如下:
$ python lockDemo1.py # 运行脚本
Processing X = 2 # 程序第20行的输出
Processing X = 3 # 当前X的值
Processing X = 4
Processing X = 5
Processing X = 6
Processing X = 7
Processing X = 8
Processing X = 9
2、threading.Lock线程锁
threading 包也包含一个类 Lock,其提供的函数也是 acquire() 和 release(),这两个函数和 thread.Lock 类的函数一样。但是其并没有提供 allocate_lock() 接口函数来创建 Lock,需要我们用构造函数自己创建。
下面的代码使用几乎一样的方法实现了前面的例子:
import sys, time
import threading # 引入线程库
read_lock = threading.Lock()
write_lock = threading.Lock()
X = 0 # 变量X,读写线程交互信息的载体
def write_thread_entry():
global X, read_lock, write_lock
for i in range(2, 10, 1):
write_lock.acquire()
X = i
read_lock.release()
def read_thread_entry():
global X, read_lock, write_lock
while True:
read_lock.acquire()
print("Processing X = %d" % X)
write_lock.release()
def start_threads():
read_lock.acquire() # read_lock处于被占用状态
t1 = threading.Thread(target=write_thread_entry)
t1.setDaemon(True)
t1.start()
t2 = threading.Thread(target=read_thread_entry)
t2.setDaemon(True)
t2.start()
time.sleep(5)
if __name__=='__main__':
start_threads()
运行结果如下:
$ python lockDemo1.py
Processing X = 2 # 程序第16行的输出
Processing X = 3 # 显示当前X的值
Processing X = 4
Processing X = 5
Processing X = 6
Processing X = 7
Processing X = 8
Processing X = 9
3、threading.RLock可重入锁
前面介绍的 threading.Lock 有一个问题,就是对某个人 threading.Lock 对象在同一个线程内重复调用两次 acquire() 会发生锁死现象。
下面的代码演示了这个情况:
import sys, time
import threading
lock_obj1 = threading.Lock() # 创建锁对象
def thread_entry(): # 子线程入口函数
global lock_obj1 # 使用全局变量lock_obj1
print("Child Thread: thread_entry() Is Running")
lock_obj1.acquire() # 第一次调用acquire(),成功
print("Child Thread: acquire(1) Finished")
lock_obj1.acquire() # 第二次调用acquire(),阻塞
# 我们看不到下面的两句输出
print("Child Thread: acquire(2) Finished")
print("Child Thread: Quit")
def start_threads(): # 主线程
global lock_obj1
t1 = threading.Thread(target=thread_entry)
t1.setDaemon(True)
t1.start()
time.sleep(5)
print("Main Thread: Quit")
if __name__=='__main__': # 作为脚本执行
start_threads()
运行结果如下:
$ python lockDemo3.py
Child Thread: thread_entry() Is Running
Child Thread: acquire(1) Finished
Main Thread: Quit
可以看到子线程在第二次调用 acquire() 时卡住了。threading.RLock() 类就是为了解决这个问题的,其用法和 Threading.Lock() 基本相同,区别是如果某个线程调用了自己已经调用过的 RLock,不会被阻塞。
下面将前面代码的第三行:
lock_obj1 = threading.Lock()
修改为
lock_obj1 = threading.RLock()
其他都保持不变,再次运行后结果如下:
$ python lockDemo4.py
Child Thread: thread_entry() Is Running
Child Thread: acquire(1) Finished
Child Thread: acquire(2) Finished
Child Thread: Quit
Main Thread: Quit
这里还有一个特别的地方,就是 release() 被调用的次数要求和 acquire() 被调用的次数相同,否则其他的非所有者线程还是会被卡在 acquire() 上。
某个线程成功执行 acquire() 后,该线程就是该 threading.RLock 对象的所有者。threading.RLock 对象内部有一个成员变量 _RLock__count,其类型为整数。如 _RLock__count=0,表示其没有被任意线程所有,因而任意线程都可以成功执行 acquire() 来获得该 threading.RLock对象;如果 _RLock__count>0,则只有上次成功执行 acquire()的所有者线程才可以成功执行 acquire(),其他线程都会被阻塞。成功执行 acquire() 后,_RLock__count 的值会加一;而成功执行 release() 后,_RLo ck__count 的值会减一。
>>> import threading # 引入库
>>> lock_obj1 = threading.RLock() # 创建RLock对象
>>> lock_obj1._RLock__count # 查看当前的值
# =0表示还没有所有者
>>> lock_obj1.acquire() # 获得该RLock对象
True
>>> lock_obj1._RLock__count # 查看值
# 已经被acquire()一次了
>>> lock_obj1.acquire() # 继续获得RLock,同一个线程
1
>>> lock_obj1._RLock__count
2
>>> lock_obj1.release() # 释放一次,值应该减一
>>> lock_obj1._RLock__count
1
>>> lock_obj1.release()
>>> lock_obj1._RLock__count # =0表示没有所有者了
0
4、threading.Condition条件变量
条件变量提供两个接口,一个是 wait(),表示等待有线程调用 notify();另一个是 nodify(),表示激活处于等待的线程。
下面是一个简单的例子,其每隔 3 秒就将工作线程唤醒一次。注意在调用 wait() 和 notify() 之前要调用 acquire(),在调用 wait() 和 notify 之后要调用 release()。
import sys, time
import threading # 引入线程库
def thread_entry(id, condition_obj): # 线程入口函数
print("Worker Thread %d: thread_entry() Is Running" % id)
for round in range(3): # 循环3次
condition_obj.acquire() # 等待
condition_obj.wait()
print("Worker Thread %d: is Doing Work" % id)
condition_obj.release()
time.sleep(0.1*id)
print("Worker Thread %d : Quit" % id) # 线程结束
def start_threads(): # 创建线程
condition_obj = threading.Condition() # 创建condition
t1 = threading.Thread(target=thread_entry, args=(1, condition_obj))
t1.start() # 启动线程
time.sleep(0.1) # 休眠0.1秒
t2 = threading.Thread(target=thread_entry, args=(2, condition_obj))
t2.start()
for round in range(3):
time.sleep(2)
condition_obj.acquire()
condition_obj.notify_all() # 通知子线程开始工作
condition_obj.release()
if __name__=='__main__':
start_threads()
运行结果如下:
$ python conditionDemo1.py
Worker Thread 1: thread_entry() Is Running
Worker Thread 2: thread_entry() Is Running
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1 : Quit
Worker Thread 2 : Quit
如果觉得使用 acquire() 和 release() 比较麻烦,也可以用 with 语句,如
with condition_obj:
condition_obj.notify_all()
等效于:
condition_obj.acquire()
condition_obj.notify_all()
condition_obj.release()
现在代码可以写成:
import sys, time
import threading # 引入线程库
def thread_entry(id, condition_obj):
print("Worker Thread %d: thread_entry() Is Running" % id)
for round in range(3): # 循环3次
with condition_obj:
condition_obj.wait()
print("Worker Thread %d: is Doing Work" % id)
time.sleep(0.1*id)
print("Worker Thread %d : Quit" % id)
def start_threads():
condition_obj = threading.Condition()
t1 = threading.Thread(target=thread_entry, args=(1, condition_obj))
t1.start()
time.sleep(0.1)
t2 = threading.Thread(target=thread_entry, args=(2, condition_obj))
t2.start()
for round in range(3): # 循环3次
time.sleep(2)
with condition_obj: # 进入竞争区
condition_obj.notify_all() # 通知子线程
if __name__=='__main__':
start_threads()
5、threading.Semaphore信号量
Semaphore 和 Lock 的作用相似,其不同之处是 Lock 只能被一个线程获得,其他的线程都只能等待,而 Semaphore 可以被 N 个线程同时获得,N 也可以等于 1。
下面是其用法演示:
import sys, time
import threading # 引入线程库
def thread_entry(id, Semaphore_obj): # 线程入口函数
print("Worker Thread %d: thread_entry() Is Running" % id)
time.sleep(1.8)
for round in range(3): # 循环3次
Semaphore_obj.acquire()
print("Worker Thread %d: is Doing Work" % id)
time.sleep(0.1*id)
print("Worker Thread %d : Quit" % id)
def start_threads():
Semaphore_obj = threading.Semaphore(3) # 创建3个元素的sem
t1 = threading.Thread(target=thread_entry, args=(1, Semaphore_obj))
t1.start()
t2 = threading.Thread(target=thread_entry, args=(2, Semaphore_obj))
t2.start()
t3 = threading.Thread(target=thread_entry, args=(3, Semaphore_obj))
t3.start()
t4 = threading.Thread(target=thread_entry, args=(4, Semaphore_obj))
t4.start()
for round in range(9):
time.sleep(2)
print("Release() is Called") # 释放sem
Semaphore_obj.release()
print("Main Thread Quit")
if __name__=='__main__':
start_threads()
运行结果如下:
$ python semaphoreDemo1.py # 运行脚本
Worker Thread 1: thread_entry() Is Running # 子线程启动
Worker Thread 2: thread_entry() Is Running
Worker Thread 3: thread_entry() Is Running
Worker Thread 4: thread_entry() Is Running
Worker Thread 2: is Doing Work # 仅有3个线程可以并行工作
Worker Thread 1: is Doing Work
Worker Thread 3: is Doing Work
Release() is Called # 释放sem,这时有一个子线程可以工作了
Worker Thread 4: is Doing Work # 被第9行释放的sem唤醒
Release() is Called # 再次释放一个sem
Worker Thread 1: is Doing Work
Release() is Called
Worker Thread 2: is Doing Work
Release() is Called
Worker Thread 3: is Doing Work
Release() is Called
Worker Thread 4: is Doing Work
Release() is Called
Worker Thread 1: is Doing Work
Worker Thread 1 : Quit
Release() is Called
Worker Thread 2: is Doing Work
Worker Thread 2 : Quit
Release() is Called
Worker Thread 3: is Doing Work
Worker Thread 3 : Quit
Release() is Called
vMain Thread Quit
Worker Thread 4: is Doing Work
Worker Thread 4 : Quit
可以看到最开始有 3 个线程并行工作,到后来每调用一次 release() 就会有一个进程开始工作。Semephore 内部保存一个属性 _Semaphore__value 该值在初始化时设置,在上面的代码中设置为 3。每次调用 acquire() 时判断该值是否大于 0,如果大于 0,则将该值减一并立即返回;如果等于 0,则一直等待直到该值大于 0。而在 release() 时,其将 _Semaphore__value 值加一,这样原来阻塞在 acquire() 上的线程可能就会开始执行了。
下面介绍信号量的一些属性和方法。
1) _Semaphore__value属性
该属性值表示还有多少个线程可以得到该 Semaphore。其在 Semaphore 初始化时被初始化。该属性对 Python 2 有效,在 Python 3 中则被 _value 替代。下面是 Python 2 中的情况:
>>> import threading
>>> sem_obj = threading.Semaphore(3)
>>> sem_obj._Semaphore__value
3
>>> sem_obj.acquire()
True
>>> sem_obj._Semaphore__value
2
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
3
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
4
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
5
下面是 Python 3 中的情况:
>>> import threading
>>> sem_obj = threading.Semaphore(3)
>>> sem_obj._value
3
>>> sem_obj.acquire()
True
>>> sem_obj._value
2
>>> sem_obj.release()
>>> sem_obj._value
3
>>> sem_obj.release()
>>> sem_obj._value
4
>>> sem_obj.release()
>>> sem_obj._value
5
需要注意的是,该属性值可以大于初始值。如最开始设定的初始值为 3,但其通过调用 release() 可以达到 4 或 5。这在很多系统中是不会出现的现象。
2) acquire()得到信号量
该函数在 Python 2 中只有一个参数 blocking,如果为 1 表示一直等待;为 0 表示立刻返回。如果得到了信号量,返回值是 True,否则是 False。
>>> import threading
>>> sem_obj = threading.Semaphore(1)
>>> sem_obj._Semaphore__value
1
>>> sem_obj.acquire()
True
>>> sem_obj._Semaphore__value
0
>>> sem_obj.acquire(0) # 不等待,直接返回
False
在 Python 3 中,多了一个超时参数,表示最多等待的时间,单位为秒。
>>> import threading
>>> sem_obj = threading.Semaphore(1)
>>> sem_obj._value
1
>>> sem_obj.acquire()
True
>>> sem_obj._value
0
>>> sem_obj.acquire(1, 3) # 等待最多3秒
False
3) release()释放信号量
该函数没有任何参数,而且返回值也是 None。
>>> import threading # 引入threading库
>>> sem_obj = threading.Semaphore(1) # 创建Semaphore对象
>>> sem_obj.acquire()
True
>>> ret = sem_obj.release() # 返回值是None
>>> ret is None
True
6、threading.Event事件
Event 可以看作是某个开关状态,可以通过 set() 来闭合开关,也可以通过 clear() 来断开开关,还可以使用 wait() 来等待开关的闭合。
import sys, time
import threading # 引入线程库
def thread_entry(id, evt): # 线程入口函数
print("Child Thread %d Wait for event" % id)
evt.wait() # 等待可以执行
print("Child Thread %d Quit" % id) # 子线程退出
def start_threads():
event_obj1 = threading.Event() # 创建事件
thread1 = threading.Thread(target=thread_entry, args=(1, event_obj1))
thread1.start() # 启动子线程1
thread2 = threading.Thread(target=thread_entry, args=(2, event_obj1))
thread2.start() # 启动子线程2
time.sleep(0.8)
print("Active Thread Number = %d" % threading.active_count())
time.sleep(1.8)
event_obj1.set() # 允许子线程运行
print("Main Thread Quit")
if __name__=='__main__':
start_threads()
运行结果如下:
$ python eventDemo1.py
Child Thread 1 Wait for event
Child Thread 2 Wait for event
Active Thread Number = 3
Main Thread Quit
Child Thread 1 Quit
Child Thread 2 Quit
下面介绍 Event 对象的接口函数。
1) is_set():得到 Event 实例对象的状态
新创建的 Event 对象处于非 set 状态。注意:set 状态表示闭合开关。
>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
2) wait(timeout):等待Event实例对象变成set状态
该接口函数有超时参数,表示最多等待多少秒。不提供该参数表示一直等待直到 Event 实例对象变成 set 状态。如果超时,其返回值为 False,否则返回值为 True。
>>> event_obj = threading.Event() # 创建Event实例对象event_obj
>>> event_obj.set() # 设置为set状态(闭合开关)
>>> ret = event_obj.wait() # 等待,没有超时参数
>>> ret # 返回值为True
True
>>> event_obj.clear() # 清除set状态(断开开关)
>>> ret = event_obj.wait(0.1) # 超时时间为0.1秒
>>> ret # 返回值为False,表示超时返回
False
3) set():设置状态
这样所有 wait() 都会满足条件返回,相当于开关闭合。
>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
>>> event_obj.set()
>>> event_obj.is_set()
True
>>> event_obj.set() # 多次设置也没有问题
>>> event_obj.is_set()
True
4) clear():清除状态
这样所有 wait() 都会被阻塞,相当于开关断开。
>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
>>> event_obj.set()
>>> event_obj.is_set()
True
>>> event_obj.clear()
>>> event_obj.is_set()
False
>>> event_obj.clear()
>>> event_obj.is_set()
False