百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

python入门系列:多进程(python多进程进程池)

citgpt 2024-09-09 02:17 9 浏览 0 评论

多进程和多线程的区别

  • Python多线程的操作,由于有GIL锁的存在,使得其运行效率并不会很高,无法充分利用 多核cpu 的优势,只有在I/O密集形的任务逻辑中才能实现并发。
  • 使用多进程来编写同样消耗cpu(一般是计算)的逻辑,对于 多核cpu 来说效率会好很多。
  • 操作系统对进程的调度代价要比线程调度要大的多。

多线程和多进程使用案例对比

1.用多进程多线程两种方式来运算 斐波那契数列,这里都依赖 concurrent.futures 模块提供的线/进程池。

import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
def fib(n):
 return 1 if n <= 2 else fib(n-1) + fib(n-2)
if __name__ == '__main__':
 # with ProcessPoolExecutor(3) as executor:
 with ThreadPoolExecutor(3) as executor:
 all_task = [executor.submit(fib, n) for n in range(25, 35)]
 start_time = time.time()
 for future in as_completed(all_task):
 data = future.result()
 # todo
 
 end_time = time.time()
 
 
 print("time consuming by threads: {0}s".format(end_time-start_time))
 # print("time consuming by processes: {0}s".format(end_time-start_time))

两种方式的运行结果对比:

python入门系列:多进程(python多进程进程池)

# result:
# time consuming by threads: 4.823292016983032s
# time consuming by processes: 3.3890748023986816s

可以看到,对于高计算量的任务,多进程要比多线程更加高效。同时,从这个例子中还能看出,通过concurrent.futures模块使用线程池进程池的方式的接口和使用逻辑是一样的,不过在使用多进程时,对于Windows的操作平台,相关逻辑一定要放在main中,Linux不受约束。

2.用多进程多线程两种方式来模拟 I/O密集操作,I/O操作 的特点就是 cpu 要耗费大量的时间进行等待数据,这里用sleep()进行模拟即可。

整体的操作方式不变,修改过的逻辑如下:

def random_sleep(n):
 time.sleep(n)
 return n
 
...
# 8 个线程,每个休眠两秒,模拟 I/O
with ProcessPoolExecutor(8) as executor:
# with ThreadPoolExecutor(8) as executor:
 all_task = [executor.submit(random_sleep, 2) for i in range(30)]
# result:
# time consuming by threads: 8.002903699874878s
# time consuming by processes: 8.34946894645691s

多进程编程

直接使用

import time
import multiprocessing
def read(times):
 time.sleep(times)
 print("process reading...")
 return "read for {0}s".format(times)
def write(times):
 time.sleep(times)
 print("process writing...")
 return "write for {0}s".format(times)
if __name__ == '__main__':
 read_process = multiprocessing.Process(target=read, args=(1,))
 write_process = multiprocessing.Process(target=write, args=(2,))
 read_process.start()
 write_process.start()
 print("read_process id {rid}".format(rid=read_process.pid))
 print("write_process id {wid}".format(wid=write_process.pid))
 read_process.join()
 write_process.join()
 print("done")
 
# result:
# read_process id 7064
# write_process id 836
# process reading...
# process writing...
# done

可以看出,关于多线程的逻辑和多线程的使用方式以类似的,要注意在Windows操作系统上,和进程有关的逻辑要写在if __name__ == '__main__'中。其他的一些方法请参阅 官方文档。

使用原生进程池

import time
import multiprocessing
def read(times):
 time.sleep(times)
 print("process reading...")
 return "read for {0}s".format(times)
def write(times):
 time.sleep(times)
 print("process writing...")
 return "write for {0}s".format(times)
if __name__ == '__main__':
 # multiprocessing.cpu_count() 获取cpu的核心数
 pool = multiprocessing.Pool(multiprocessing.cpu_count())
 
 read_result = pool.apply_async(read, args=(2,))
 write_result = pool.apply_async(write, args=(3,))
 # 关闭进程池,不再接受新的任务提交,否则 join() 出错
 pool.close()
 # 等待进程池中提交的所有任务完成
 pool.join()
 print(read_result.get())
 print(write_result.get())
# result:
# process reading...
# process writing...
# read for 2s
# write for 3s

使用imap(),所有任务顺序执行:

pool = multiprocessing.Pool(multiprocessing.cpu_count())
 
for result in pool.imap(read, [2, 1, 3]):
 print(result)
 
# result:
# process reading...
# process reading...
# read for 2s
# read for 1s
# process reading...
# read for 3s

使用imap_unordered(),哪个任务先完成就先返回结果:

for result in pool.imap_unordered(read, [1, 5, 3]):
 print(result)
# process reading...
# read for 1s
# process reading...
# read for 3s
# process reading...
# read for 5s

使用concurrent.futures中的ProcessPoolExecutor

这个在多线程和多进程对比的时提到过,因为和多线程的使用方式一样,这里就不多赘述,可以参阅 官方文档 给出的例子

进程间通信

进程通信和线程通信有些区别,在线程通信中各种提供的锁的机制全局变量在这里不再适用,我们要选取新的工具来完成进程通信任务。

使用multiprocessing.Queue

使用逻辑是和多线程中的Queue是一样的,详细方法。这种通信方式不能用在通过Pool进程池创建的进程

import multiprocessing
import time
def plus(queue):
 for i in range(6):
 num = queue.get() + 1
 queue.put(num)
 print(num)
 time.sleep(1)
def subtract(queue):
 for i in range(6):
 num = queue.get() - 1
 queue.put(num)
 print(num)
 time.sleep(2)
if __name__ == '__main__':
 queue = multiprocessing.Queue(1)
 queue.put(0)
 plus_process = multiprocessing.Process(target=plus, args=(queue,))
 subtract_process = multiprocessing.Process(target=subtract, args=(queue,))
 plus_process.start()
 subtract_process.start()
 
# result:
# 1
# 1
# 2
# 2
# 3
# 3
# 0
# 1
# 2
# 2
# 1
# 0

使用Manager()中的Queue

Manager()会返回一个在进程间进行同步管理的一个对象,它提供了多种在进程间共享数据的形式。

import multiprocessing
import time
def plus(queue):
 for i in range(6):
 num = queue.get() + 1
 queue.put(num)
 print(num)
 time.sleep(1)
def subtract(queue):
 for i in range(6):
 num = queue.get() - 1
 queue.put(num)
 print(num)
 time.sleep(2)
if __name__ == '__main__':
 queue = multiprocessing.Manager().Queue(1) # 创建方式有些奇特
 # queue = multiprocessing.Queue() # 这时用这个就行不通了
 pool = multiprocessing.Pool(2)
 queue.put(0)
 pool.apply_async(plus, args=(queue,))
 pool.apply_async(subtract, args=(queue,))
 pool.close()
 pool.join()
 
# result:
# 0
# 1
# 1
# 2
# 2
# 3
# -1
# 0
# 1
# 2
# 1
# 0

使用Manager()中的list()

多个进程可以共享全局的list,因为是进程间共享,所以用锁的机制保证它的安全性。这里的Manager().Lock不是前面线程级别的Lock,它可以保证进程间的同步。

import multiprocessing as mp
import time
def add_person(waiting_list, name_list, lock):
 lock.acquire()
 for name in name_list:
 waiting_list.append(name)
 time.sleep(1)
 print(waiting_list)
 lock.release()
def get_person(waiting_list, lock):
 lock.acquire()
 if waiting_list:
 name = waiting_list.pop(0)
 print("get {0}".format(name))
 lock.release()
if __name__ == '__main__':
 waiting_list = mp.Manager().list()
 lock = mp.Manager().Lock() # 使用 lock 限制进程对全局量的访问
 name_list = ["MetaTian", "Rity", "Anonymous"]
 add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock))
 get_process = mp.Process(target=get_person, args=(waiting_list, lock))
 add_process.start()
 get_process.start()
 add_process.join()
 get_process.join()
 print(waiting_list)
 
# result:
# ['MetaTian']
# ['MetaTian', 'Rity']
# ['MetaTian', 'Rity', 'Anonymous']
# get MetaTian
# ['Rity', 'Anonymous']

Manager()中还有更多的进程间通信的工具,可以参阅官方文档。

使用Pipe

Pipe只能适用于两个进程间的通信,它的性能高于Queue,Pipe()会返回两个Connection对象,使用这个对象可以在进程间进行数据的发送和接收,非常像前面讲过的socket对象。关于Connection

import multiprocessing
def plus(conn):
 default_num = 0
 for i in range(3):
 num = 0 if i == 0 else conn.recv()
 conn.send(num + 1)
 print("plus send: {0}".format(num+1))
def subtract(conn):
 for i in range(3):
 num = conn.recv()
 conn.send(num-1)
 print("subtract send: {0}".format(num-1))
if __name__ == '__main__':
 conn_plus, conn_sbtract = multiprocessing.Pipe()
 plus_process = multiprocessing.Process(target=plus, args=(conn_plus,))
 subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,))
 plus_process.start()
 subtract_process.start()
# result:
# plus send: 1
# subtract send: 0
# plus send: 1
# subtract send: 0
# plus send: 1
# subtract send: 0

send()可以连续发送数据,recv()将另一端发送的数据陆续取出,如果没有取到数据,则进入等待状态。

注:喜欢python + qun:839383765 可以获取Python各类免费最新入门学习资料!

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: