百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

Python线程同步实现方式详解(python线程之间如何通信)

citgpt 2024-08-01 13:32 10 浏览 0 评论

一个线程启动后,其会自行运行。但如果希望它们能同步运行,应该怎么做呢?

举个简单的例子,有两个线程 A 和 B,A 负责从网络上读取数据,保持到变量 X 中,B 负责处理变量X中的数据,这时线程 B 就需要和 A 同步。也就是说 B 需要等 A 给其一个信号,其才可以开始去做自己的事情。同样,B 完成了任务后也需要通知 A,告诉 A 变量 X 中间的数据已经处理完了,可以将新的数据放入 X 了。

图 1 表示了这个过程:

实现线程同步的方式有很多种,下面分别进行介绍。

Python线程同步实现方式详解(python线程之间如何通信)

1、thread.Lock线程锁

借助锁,可以得到一些排他的资源。例如,为某个资源 A 加上锁 L,如果要使用该资源,则必须得到锁 L,这个锁可以保证在任意时候,只有一个线程可以得到它。其他线程如果想得到已经被别的线程得到的锁,只能等待锁的拥有者主动释放锁。

这个锁类提供了 acquire() 和 release() 两个接口函数。release() 表示某线程已经完成了任务,其他线程可以开始自己的工作了;acquire() 表示某线程计划做某个工作,请在可以开始时通知它。

所以做某件工作可以用下面三步构成:

  • acquire():等条件成熟了告诉某线程。
  • do_the work():条件成熟了,开始工作。
  • release():工作完成,告诉其他线程可以开始工作了。


现在回到前面的例子上,我们需要两个锁。一个用来写变量 X,用 write_lock 来表示;一个用来读变量 X,用 read_lock 来表示。写变量 X 的过程如下:

write_lock.acquire()
X=var
read_lock.release()


读变量 X 的过程如下:

read_lock.acquire()
var=X
write_lock.release()


下面是完整的代码。

import sys, time                        # 引入时间库
if sys.version_info.major == 2:         # Python 2
import thread
else:                                   # Python 3
import _thread as thread               # 创建两个锁,一个用来读,一个用来写
read_lock = thread.allocate_lock()
write_lock = thread.allocate_lock()
X = 0                                   # 变量X,用来保存两个线程之间交换的数据
def write_thread_entry():               # 写线程的入口函数
    global X, read_lock,  write_lock
    for i in range(2, 10, 1):
        write_lock.acquire()
        X = i
        read_lock.release()
def read_thread_entry():                # 读线程的入口函数
    global X, read_lock, write_lock
    while True:
        read_lock.acquire()
        print("Processing X = %d" % X)
        write_lock.release()
def start_threads():                    # 启动线程
    read_lock.acquire()                 # read_lock处于被占用状态
    t1 = thread.start_new_thread(write_thread_entry, tuple())
    t2 = thread.start_new_thread(read_thread_entry, tuple())
    time.sleep(5)
if __name__=='__main__':                # 如果是运行该脚本而不是import该文件
    start_threads()

运行结果如下:

$ python lockDemo1.py # 运行脚本
Processing X = 2 # 程序第20行的输出
Processing X = 3 # 当前X的值
Processing X = 4
Processing X = 5
Processing X = 6
Processing X = 7
Processing X = 8
Processing X = 9

2、threading.Lock线程锁

threading 包也包含一个类 Lock,其提供的函数也是 acquire() 和 release(),这两个函数和 thread.Lock 类的函数一样。但是其并没有提供 allocate_lock() 接口函数来创建 Lock,需要我们用构造函数自己创建。

下面的代码使用几乎一样的方法实现了前面的例子:

import sys, time
import threading                       # 引入线程库
read_lock = threading.Lock()
write_lock = threading.Lock()
X = 0                                  # 变量X,读写线程交互信息的载体
def write_thread_entry():
    global X, read_lock,  write_lock
    for i in range(2, 10, 1):
        write_lock.acquire()
        X = i
        read_lock.release()
def read_thread_entry():
    global X, read_lock, write_lock
    while True:
        read_lock.acquire()
        print("Processing X = %d" % X)
        write_lock.release()
def start_threads():
    read_lock.acquire()         # read_lock处于被占用状态
    t1 = threading.Thread(target=write_thread_entry)
    t1.setDaemon(True)
    t1.start()
    t2 = threading.Thread(target=read_thread_entry)
    t2.setDaemon(True)
    t2.start()
    time.sleep(5)
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python lockDemo1.py
Processing X = 2 # 程序第16行的输出
Processing X = 3 # 显示当前X的值
Processing X = 4
Processing X = 5
Processing X = 6
Processing X = 7
Processing X = 8
Processing X = 9

3、threading.RLock可重入锁

前面介绍的 threading.Lock 有一个问题,就是对某个人 threading.Lock 对象在同一个线程内重复调用两次 acquire() 会发生锁死现象。

下面的代码演示了这个情况:

import sys, time
import threading
lock_obj1 = threading.Lock()            # 创建锁对象
def thread_entry():                     # 子线程入口函数
    global lock_obj1                    # 使用全局变量lock_obj1
    print("Child Thread: thread_entry() Is Running")
    lock_obj1.acquire()                         # 第一次调用acquire(),成功
    print("Child Thread: acquire(1) Finished")
    lock_obj1.acquire()                         # 第二次调用acquire(),阻塞
            # 我们看不到下面的两句输出
    print("Child Thread: acquire(2) Finished")
    print("Child Thread: Quit")
def start_threads():                            # 主线程
    global lock_obj1
    t1 = threading.Thread(target=thread_entry)
    t1.setDaemon(True)
    t1.start()
    time.sleep(5)
print("Main Thread: Quit")
if __name__=='__main__':            # 作为脚本执行
    start_threads()

运行结果如下:

$ python lockDemo3.py
Child Thread: thread_entry() Is Running
Child Thread: acquire(1) Finished
Main Thread: Quit

可以看到子线程在第二次调用 acquire() 时卡住了。threading.RLock() 类就是为了解决这个问题的,其用法和 Threading.Lock() 基本相同,区别是如果某个线程调用了自己已经调用过的 RLock,不会被阻塞。

下面将前面代码的第三行:

lock_obj1 = threading.Lock()

修改为

lock_obj1 = threading.RLock()

其他都保持不变,再次运行后结果如下:

$ python lockDemo4.py
Child Thread: thread_entry() Is Running
Child Thread: acquire(1) Finished
Child Thread: acquire(2) Finished
Child Thread: Quit
Main Thread: Quit

这里还有一个特别的地方,就是 release() 被调用的次数要求和 acquire() 被调用的次数相同,否则其他的非所有者线程还是会被卡在 acquire() 上。

某个线程成功执行 acquire() 后,该线程就是该 threading.RLock 对象的所有者。threading.RLock 对象内部有一个成员变量 _RLock__count,其类型为整数。如 _RLock__count=0,表示其没有被任意线程所有,因而任意线程都可以成功执行 acquire() 来获得该 threading.RLock对象;如果 _RLock__count>0,则只有上次成功执行 acquire()的所有者线程才可以成功执行 acquire(),其他线程都会被阻塞。成功执行 acquire() 后,_RLock__count 的值会加一;而成功执行 release() 后,_RLo ck__count 的值会减一。

>>> import threading                   # 引入库
>>> lock_obj1 = threading.RLock()      # 创建RLock对象
>>> lock_obj1._RLock__count            # 查看当前的值
                                # =0表示还没有所有者
>>> lock_obj1.acquire()                # 获得该RLock对象
True
>>> lock_obj1._RLock__count            # 查看值
                                       # 已经被acquire()一次了
>>> lock_obj1.acquire()                # 继续获得RLock,同一个线程
1
>>> lock_obj1._RLock__count
2
>>> lock_obj1.release()                # 释放一次,值应该减一
>>> lock_obj1._RLock__count
1
>>> lock_obj1.release()
>>> lock_obj1._RLock__count            # =0表示没有所有者了
0

4、threading.Condition条件变量

条件变量提供两个接口,一个是 wait(),表示等待有线程调用 notify();另一个是 nodify(),表示激活处于等待的线程。

下面是一个简单的例子,其每隔 3 秒就将工作线程唤醒一次。注意在调用 wait() 和 notify() 之前要调用 acquire(),在调用 wait() 和 notify 之后要调用 release()。

import sys, time
import threading                                # 引入线程库
def thread_entry(id, condition_obj):            # 线程入口函数
    print("Worker Thread %d: thread_entry() Is Running" % id)
    for round in range(3):                      # 循环3次
        condition_obj.acquire()                 # 等待
        condition_obj.wait()
        print("Worker Thread %d: is Doing Work" % id)
        condition_obj.release()
        time.sleep(0.1*id)
    print("Worker Thread %d : Quit" % id)       # 线程结束
def start_threads():                            # 创建线程
    condition_obj = threading.Condition()       # 创建condition
    t1 = threading.Thread(target=thread_entry, args=(1, condition_obj))
    t1.start()                                  # 启动线程
    time.sleep(0.1)                             # 休眠0.1秒
    t2 = threading.Thread(target=thread_entry, args=(2, condition_obj))
    t2.start()
    for round in range(3):
        time.sleep(2)
        condition_obj.acquire()
        condition_obj.notify_all()               # 通知子线程开始工作
        condition_obj.release()
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python conditionDemo1.py
Worker Thread 1: thread_entry() Is Running
Worker Thread 2: thread_entry() Is Running
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1 : Quit
Worker Thread 2 : Quit


如果觉得使用 acquire() 和 release() 比较麻烦,也可以用 with 语句,如

with condition_obj:
condition_obj.notify_all()

等效于:

condition_obj.acquire()
condition_obj.notify_all()
condition_obj.release()

现在代码可以写成:

import sys, time
import threading                                        # 引入线程库
def thread_entry(id, condition_obj):
    print("Worker Thread %d: thread_entry() Is Running" % id)
    for round in range(3):                              # 循环3次
        with condition_obj:
        condition_obj.wait()
        print("Worker Thread %d: is Doing Work" % id)
time.sleep(0.1*id)
    print("Worker Thread %d : Quit" % id)
def start_threads():
condition_obj = threading.Condition()
    t1 = threading.Thread(target=thread_entry, args=(1, condition_obj))
    t1.start()
time.sleep(0.1)
    t2 = threading.Thread(target=thread_entry, args=(2, condition_obj))
    t2.start()
    for round in range(3):                              # 循环3次
time.sleep(2)
        with condition_obj:                             # 进入竞争区
        condition_obj.notify_all()                      # 通知子线程
if __name__=='__main__':
start_threads()

5、threading.Semaphore信号量

Semaphore 和 Lock 的作用相似,其不同之处是 Lock 只能被一个线程获得,其他的线程都只能等待,而 Semaphore 可以被 N 个线程同时获得,N 也可以等于 1。

下面是其用法演示:

import sys, time
import threading                        # 引入线程库
def thread_entry(id, Semaphore_obj):    # 线程入口函数
    print("Worker Thread %d: thread_entry() Is Running" % id)
    time.sleep(1.8)
    for round in range(3):              # 循环3次
        Semaphore_obj.acquire()
        print("Worker Thread %d: is Doing Work" % id)
        time.sleep(0.1*id)
    print("Worker Thread %d : Quit" % id)
def start_threads():
    Semaphore_obj = threading.Semaphore(3)      # 创建3个元素的sem
    t1 = threading.Thread(target=thread_entry, args=(1, Semaphore_obj))
    t1.start()
    t2 = threading.Thread(target=thread_entry, args=(2, Semaphore_obj))
    t2.start()
    t3 = threading.Thread(target=thread_entry, args=(3, Semaphore_obj))
    t3.start()
    t4 = threading.Thread(target=thread_entry, args=(4, Semaphore_obj))
    t4.start()
    for round in range(9):
        time.sleep(2)
        print("Release() is Called")      # 释放sem
        Semaphore_obj.release()
    print("Main Thread Quit")
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python semaphoreDemo1.py # 运行脚本
Worker Thread 1: thread_entry() Is Running # 子线程启动
Worker Thread 2: thread_entry() Is Running
Worker Thread 3: thread_entry() Is Running
Worker Thread 4: thread_entry() Is Running
Worker Thread 2: is Doing Work # 仅有3个线程可以并行工作
Worker Thread 1: is Doing Work
Worker Thread 3: is Doing Work
Release() is Called # 释放sem,这时有一个子线程可以工作了
Worker Thread 4: is Doing Work # 被第9行释放的sem唤醒
Release() is Called # 再次释放一个sem
Worker Thread 1: is Doing Work
Release() is Called
Worker Thread 2: is Doing Work
Release() is Called
Worker Thread 3: is Doing Work
Release() is Called
Worker Thread 4: is Doing Work
Release() is Called
Worker Thread 1: is Doing Work
Worker Thread 1 : Quit
Release() is Called
Worker Thread 2: is Doing Work
Worker Thread 2 : Quit
Release() is Called
Worker Thread 3: is Doing Work
Worker Thread 3 : Quit
Release() is Called
vMain Thread Quit
Worker Thread 4: is Doing Work
Worker Thread 4 : Quit

可以看到最开始有 3 个线程并行工作,到后来每调用一次 release() 就会有一个进程开始工作。Semephore 内部保存一个属性 _Semaphore__value 该值在初始化时设置,在上面的代码中设置为 3。每次调用 acquire() 时判断该值是否大于 0,如果大于 0,则将该值减一并立即返回;如果等于 0,则一直等待直到该值大于 0。而在 release() 时,其将 _Semaphore__value 值加一,这样原来阻塞在 acquire() 上的线程可能就会开始执行了。

下面介绍信号量的一些属性和方法。

1) _Semaphore__value属性

该属性值表示还有多少个线程可以得到该 Semaphore。其在 Semaphore 初始化时被初始化。该属性对 Python 2 有效,在 Python 3 中则被 _value 替代。下面是 Python 2 中的情况:

>>> import threading
>>> sem_obj = threading.Semaphore(3)
>>> sem_obj._Semaphore__value
3
>>> sem_obj.acquire()
True
>>> sem_obj._Semaphore__value
2
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
3
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
4
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
5

下面是 Python 3 中的情况:

>>> import threading
>>> sem_obj = threading.Semaphore(3)
>>> sem_obj._value
3
>>> sem_obj.acquire()
True
>>> sem_obj._value
2
>>> sem_obj.release()
>>> sem_obj._value
3
>>> sem_obj.release()
>>> sem_obj._value
4
>>> sem_obj.release()
>>> sem_obj._value
5

需要注意的是,该属性值可以大于初始值。如最开始设定的初始值为 3,但其通过调用 release() 可以达到 4 或 5。这在很多系统中是不会出现的现象。

2) acquire()得到信号量

该函数在 Python 2 中只有一个参数 blocking,如果为 1 表示一直等待;为 0 表示立刻返回。如果得到了信号量,返回值是 True,否则是 False。

>>> import threading
>>> sem_obj = threading.Semaphore(1)
>>> sem_obj._Semaphore__value
1
>>> sem_obj.acquire()
True
>>> sem_obj._Semaphore__value
0
>>> sem_obj.acquire(0)     # 不等待,直接返回
False

在 Python 3 中,多了一个超时参数,表示最多等待的时间,单位为秒。

>>> import threading
>>> sem_obj = threading.Semaphore(1)
>>> sem_obj._value
1
>>> sem_obj.acquire()
True
>>> sem_obj._value
0
>>> sem_obj.acquire(1, 3)     # 等待最多3秒
False

3) release()释放信号量

该函数没有任何参数,而且返回值也是 None。

>>> import threading                   # 引入threading库
>>> sem_obj = threading.Semaphore(1)   # 创建Semaphore对象
>>> sem_obj.acquire()
True
>>> ret = sem_obj.release()             # 返回值是None
>>> ret is None
True

6、threading.Event事件

Event 可以看作是某个开关状态,可以通过 set() 来闭合开关,也可以通过 clear() 来断开开关,还可以使用 wait() 来等待开关的闭合。

import sys, time
import threading                                        # 引入线程库
def thread_entry(id, evt):                              # 线程入口函数
    print("Child Thread %d Wait for event" % id)
    evt.wait()                                          # 等待可以执行
    print("Child Thread %d Quit" % id)  # 子线程退出
def start_threads():
    event_obj1 = threading.Event()      # 创建事件
    thread1 = threading.Thread(target=thread_entry, args=(1, event_obj1))
    thread1.start()                     # 启动子线程1
    thread2 = threading.Thread(target=thread_entry, args=(2, event_obj1))
    thread2.start()                     # 启动子线程2       
    time.sleep(0.8)
    print("Active Thread Number = %d" % threading.active_count())
    time.sleep(1.8)
    event_obj1.set()                    # 允许子线程运行
    print("Main Thread Quit")
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python eventDemo1.py
Child Thread 1 Wait for event
Child Thread 2 Wait for event
Active Thread Number = 3
Main Thread Quit
Child Thread 1 Quit
Child Thread 2 Quit


下面介绍 Event 对象的接口函数。

1) is_set():得到 Event 实例对象的状态

新创建的 Event 对象处于非 set 状态。注意:set 状态表示闭合开关。

>>> event_obj = threading.Event()
>>> event_obj.is_set()
False

2) wait(timeout):等待Event实例对象变成set状态

该接口函数有超时参数,表示最多等待多少秒。不提供该参数表示一直等待直到 Event 实例对象变成 set 状态。如果超时,其返回值为 False,否则返回值为 True。

>>> event_obj = threading.Event()     # 创建Event实例对象event_obj
>>> event_obj.set()                   # 设置为set状态(闭合开关)
>>> ret = event_obj.wait()            # 等待,没有超时参数
>>> ret                               # 返回值为True
True
>>> event_obj.clear()                 # 清除set状态(断开开关)
>>> ret = event_obj.wait(0.1)         # 超时时间为0.1秒
>>> ret                               # 返回值为False,表示超时返回
False

3) set():设置状态

这样所有 wait() 都会满足条件返回,相当于开关闭合。

>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
>>> event_obj.set()
>>> event_obj.is_set()
True
>>> event_obj.set()             # 多次设置也没有问题
>>> event_obj.is_set()
True

4) clear():清除状态

这样所有 wait() 都会被阻塞,相当于开关断开。

>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
>>> event_obj.set()
>>> event_obj.is_set()
True
>>> event_obj.clear()
>>> event_obj.is_set()
False
>>> event_obj.clear()
>>> event_obj.is_set()
False

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: