百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

python爬虫之多线程threading、多进程、协程aiohttp批量下载图片

citgpt 2024-09-09 02:17 10 浏览 0 评论

一、单线程常规下载

常规单线程执行脚本爬取壁纸图片,只爬取一页的图片。

import datetime
import re
import requests
from bs4 import BeautifulSoup

start = datetime.datetime.now()
j = 0
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'}

def pic_re(url):
    re = requests.get(url=url,headers=headers)
    return re.text

def pic_download(soup):
    list = soup.find(class_='contlistw mtw').find_all('li')
    for item in list:
        global j
        pic_name = item.find('img')['alt']
        pic_url = item.find('img')['lazysrc'].replace('.278.154.jpg','')
        pic_type = re.sub(r'h.*\d+.','', pic_url)
        #print(pic_name,pic_type,pic_url)
        filename = '{}.{}'.format(pic_name, pic_type)
        print('开始下载:'+filename)
        with open(filename, 'wb') as f:
            f.write(requests.get(pic_url,headers=headers).content)
        print(filename+' 下载完成')
        j += 1

def main():
    for i in range(1,2):
        url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
        html=pic_re(url)
        soup=BeautifulSoup(html,'lxml')
        pic_download(soup)
    date_all = (datetime.datetime.now() - start).total_seconds()
    print(f'总共{j}张图片,下载总用时:{date_all}s')

if __name__ == '__main__':
    main()

执行结果:

python爬虫之多线程threading、多进程、协程aiohttp批量下载图片

开始下载:站在油菜花地的小清新美女背影.jpg
站在油菜花地的小清新美女背影.jpg 下载完成
开始下载:穿大花袖子连衣裙的印度美女近照摄影.jpg
穿大花袖子连衣裙的印度美女近照摄影.jpg 下载完成
总共24张图片,下载总用时:485.885113s

进程已结束,退出代码0

结果,第一页24张图片,就下载差不多8分钟,排除网络等因素,还没有手动下载快。

二、多线程下载

上面的有两个循环,第一个是页面的循环,一页一页地加载,每页在单独循环单独下载图片。

所以有两个等待时间,第一个就是等待第一页下载完成,才会到第二页。第二个等待就是每页图片一张下载完才下载第二张。

综上,优化两点:

第一点,第一步提取所有图片链接保存,不用一页等一页的提取。

第二点,所有图片多线程同时下载,不用等一个一个下载。

1:创建列表,储存图片信息

只需要两个信息,图片名称,和图片链接,储存到 pic_list = [] 列表。

pic_list = []
def get_pic_list(soup):
    list = soup.find(class_='contlistw mtw').find_all('li')
    for item in list:
        global j
        pic_name = item.find('img')['alt']
        pic_url = item.find('img')['lazysrc'].replace('.278.154.jpg','')
        pic_type = re.sub(r'h.*\d+.','', pic_url)
        filename = 'pic\{}.{}'.format(pic_name, pic_type)
        pic_list.append([filename,pic_url])

2:读取列表,多线程同时下载

threading说明:

  • 创建空列表t_list,将三个子线程放入该列表,用于执行join,
  • 执行子线程(start),start方法开启一个新线程。把需要并行处理的代码放在run()方法中,start()方法启动线程将自动调用 run()方法。
  • 执行阻塞 (join),join函数可以理解为,如果某个子进程执行了join函数,那么在该子进程执行到join之前,父进程都会等待。

threading.Thread命令参数:

第一个为参数为函数,第二次参数为函数值。

  • 使用args 传递参数 threading.Thread(target=target, args=(10, 100, 100)),args参数为元组,所以只有一个参数,以 , 结尾,例:args=(10,)
  • 使用kwargs传递参数 threading.Thread(target=target, kwargs={“a”: 10, “b”:100, “c”: 100})
  • 同时使用 args 和 kwargs 传递参数 threading.Thread(target=target, args=(10, ), kwargs={“b”: 100,“c”: 100})

分别创建下载函数,和多线程函数。

代码如下:

def image_down(filename,image_url):
    re = requests.get(image_url,headers=headers)
    print('开始下载:' + filename)
    with open(filename, 'wb') as f:
    	f.write(re.content)
    	print(filename + ' 下载完成')
    	
def thread_down():
    t_list = []
    for url in pic_list:
        global j
        t = threading.Thread(target=image_down,
                             kwargs={'filename': url[0], 'image_url': url[1]})
        t_list.append(t)
        t.start()
        j += 1
    for t in t_list:
        t.join()

最终代码为:

import datetime
import re
import requests
from bs4 import BeautifulSoup
import threading

start = datetime.datetime.now()
j = 0
pic_list = []
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'}

def pic_re(url):
    re = requests.get(url=url,headers=headers)
    return re.text

def get_pic_list(soup):
    list = soup.find(class_='contlistw mtw').find_all('li')
    for item in list:
        global j
        pic_name = item.find('img')['alt']
        pic_url = item.find('img')['lazysrc'].replace('.278.154.jpg','')
        pic_type = re.sub(r'h.*\d+.','', pic_url)
        filename = 'pic\{}.{}'.format(pic_name, pic_type)
        pic_list.append([filename,pic_url])

def image_down(filename,image_url):
    re = requests.get(image_url,headers=headers)
    print('开始下载:' + filename)
    with open(filename, 'wb') as f:
    	f.write(re.content)
    	print(filename + ' 下载完成')

def thread_down():
    t_list = []
    for url in pic_list:
        global j
        t = threading.Thread(target=image_down,
                             kwargs={'filename': url[0], 'image_url': url[1]})
        t_list.append(t)
        t.start()
        j += 1
    for t in t_list:
        t.join()

def main():
    for i in range(1,24):
        url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
        html=pic_re(url)
        soup=BeautifulSoup(html,'lxml')
        get_pic_list(soup)
    thread_down()
    date_all = (datetime.datetime.now() - start).total_seconds()
    print(f'总共{j}张图片,下载总用时:{date_all}s')

if __name__ == '__main__':
    main()

执行结果,533张图片,只用了差不多3分钟就下载完了

开始下载:pic\超清4K长发少女,高清到毛孔都看的见强烈推荐.png
pic\超清4K长发少女,高清到毛孔都看的见强烈推荐.png 下载完成
开始下载:pic\超高清长发清纯学生妹街拍电脑背景.jpg
pic\超高清长发清纯学生妹街拍电脑背景.jpg 下载完成
开始下载:pic\图书馆的气质少女高清头像壁纸图片-真8K壁纸推荐.png
pic\图书馆的气质少女高清头像壁纸图片-真8K壁纸推荐.png 下载完成
总共533张图片,下载总用时:182.8669s

进程已结束,退出代码0


三、图片不完整解决

以上虽然速度上来了,但是查看图片有下载失败,如0kb,或者图片不完整,半截是灰色的。

原因多半是网络原因,壁纸多半都是大尺寸,容量也大,图片1M到20M不等,经常会加载不全就下载下来或网络访问失败。

对于网络失败的(0kb),让他返回重新访问。用re.status_code == 200判断即可。

对于下载不全的,用其他的方式下载。这里用

image = Image.open(BytesIO(re.content)

因为这种方法,如果图片下载不全,会报错异常OSError: image file is truncated (X bytes not processed),通过捕获异常,同样重新返回执行。

为了防止因为网络原因,陷入死循环,设置返回次数,超过规定次数,则停止返回。

改image_down函数即可。

from PIL import Image
from io import BytesIO

def image_down(filename,image_url):
    re = requests.get(image_url,headers=headers)
    if re.status_code == 200:
        try:
            print('开始下载:' + filename)
            image = Image.open(BytesIO(re.content))
            image.save(filename)
            print(filename + ' 下载完成')
        except OSError:
            count = 1
            print('图片不完整,重新下载')
            if count <= 5:
                return image_down(filename, image_url)
            else:
                print(filename + ' 下载失败')
            count += 1
    else:
        count = 1
        print('网络错误,重新下载')
        if count <= 5:
            return image_down(filename,image_url)
        else:
            print(filename+' 下载失败')
        count += 1

再次执行,图片全部下载完成,而且没有不全的图片了。

四、多进程下载

除了多线程之外,我们还可以使用多进程来提高爬虫速度。

在Python中multiprocessing提供了两个用于多进程的类,即Process和Pool类

  • multiprocessing.Process 无法批量开启子进程,可以直接用multiprocesssing.Queue等进行通信
  • multiprocessing.Pool 可以批量开启子进程,不能直接用multiprocessing.Queue进行通信,只能通过共享内存,或者用multiprocessing.Manager()进行进程间通信。

Pool仅在内存中分配正在执行的进程,而Process在内存中分配所有任务,因此,当任务数较小时,我们可以使用Process类;当任务数较大时,我们可以使用Pool。

1、Process(用于创建进程)

multiprocessing模块提供了一个Process类来代表一个进程对象。

在multiprocessing中,每一个进程都用一个Process类来表示。

构造方法:Process([group [, target [, name [, args [, kwargs]]]]])

  • start():启动进程,并调用该子进程中的p.run()
  • run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
  • is_alive():返回进程是否在运行。如果p仍然运行,返回True
  • join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

使用方法和多线程threading.Thread的使用方法差不多。

p = Process(target=run_proc, args=('test',))
p.start()
p.join()

如,前面的代码,增加def multiprocess_down()类,同样,同时创建多个子进程,加入列表,子进程并发跑。

from multiprocessing import Process


def multiprocess_down():
    p_list = []
    for url in pic_list:
        global j
        p = Process(target=image_down,
                             kwargs={'filename': url[0], 'image_url': url[1]})
        p_list.append(p)
        p.start()
        j += 1
    for p in p_list:
        p.join()
...
...

def main():
    for i in range(1,3):
        url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
        html=pic_re(url)
        soup=BeautifulSoup(html,'lxml')
        get_pic_list(soup)
    #thread_down()
    multiprocess_down()
    date_all = (datetime.datetime.now() - start).total_seconds()
    print(f'总共{j}张图片,下载总用时:{date_all}s')

执行结果

开始下载:pic\LED大屏幕前的欧美美女超清桌面壁纸下载.jpg
pic\LED大屏幕前的欧美美女超清桌面壁纸下载.jpg 下载完成
开始下载:pic\手捧窗帘的欧美时尚模特高清壁纸.jpg
pic\手捧窗帘的欧美时尚模特高清壁纸.jpg 下载完成
开始下载:pic\站在油菜花地的小清新美女背影.jpg
pic\站在油菜花地的小清新美女背影.jpg 下载完成
总共48张图片,下载总用时:172.714426s

进程已结束,退出代码0

可以看出,这种多进程,显然没有多线程速度块,虽然也用了多个子进程同时跑,速度提升还是没有多线程速度快。

2、Pool(用于创建管理进程池)

构造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes :要创建的进程数,如果省略,将默认使用cpu_count()返回的数量。
  • initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
  • initargs:是要传给initializer的参数组。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

方法:

  • apply(),

函数原型:apply(func[, args=()[, kwds={}]])

该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不再出现)

  • apply_async

函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

与apply用法一致,但它是非阻塞的且支持结果返回后进行回调

  • map()

函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回

注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程

  • map_async()

函数原型:map_async(func, iterable[, chunksize[, callback]])

与map用法一致,但是它是非阻塞的

  • close()

关闭进程池(pool),使其不再接受新的任务

  • terminal()

结束工作进程,不再处理未处理的任务

  • join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用

Pool.apply_async:异步执行,结果的顺序不能保证与调用的顺序相同

Pool.map:同步执行,阻塞直到返回完整的结果,

map和map_async一次调用一个作业列表,但是apply和apply_async只能调用一个作业。但是,apply_async是在后台并行执行作业的。

我这里用apply_async,也可以用map,但是map正常情况只能传一个参数,要用map传多个参数,用starmap即可。

from multiprocessing import Pool

def multipoll_down():
    p = Pool(multiprocessing.cpu_count())
    for url in pic_list:
        global j
        p.apply_async(image_down,[url[0],url[1]])
        #p.starmap(image_down, [(url[0], url[1]),])
        j += 1
    p.close()
    p.join()


def main():
    for i in range(1, 3):
        url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
        html = pic_re(url)
        soup = BeautifulSoup(html, 'lxml')
        get_pic_list(soup)
    # thread_down()
    # multiprocess_down()
    multipoll_down()
    date_all = (datetime.datetime.now() - start).total_seconds()
    print(f'总共{j}张图片,下载总用时:{date_all}s')

执行结果,速度和process子进程并行差不多(我电脑cpu最大8个进程而已)。

五、协程下载

1:aiohttp 和 asyncio库

这里需要用到两个库aiohttp 和 asyncio。

aiohttp :可以把这个库当作 requests库 的替代品,因为requets不支持异步默认,所以这里需要用aiohttp 库替代requests库。

asyncio:asyncio 是用来编写 并发 代码的库,使用 async/await 语法。

正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加async关键。

async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是sleep(5))消失后,也就是5秒到了再回来执行。

asyncio基本流程

声明协程函数,函数前加async:
async def function():

创建任务:
asyncio.create_task(coro, *, name=None)

两种执行任务,简单等待asyncio.wait和并发执行asyncio.gather
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
awaitable asyncio.gather(*aws, return_exceptions=False) 

运行 asyncio 程序:
asyncio.run(coro, *, debug=False)

Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
asyncio.get_event_loop().run_until_complete(main())
import aiohttp
import asyncio

async def images_down(filename, image_url):
    async with aiohttp.ClientSession() as session:
        async with session.get(image_url,headers=headers) as resp:
            #print(filename,resp.status)
            if resp.status == 200:
                print('开始下载:' + filename)
                with open(filename, 'wb') as f:
               # 这个地方通过对流的处理,而不是一下子整体读取。
               # 一下子整个的读取,会导致批量下载图片的时候,一开始会出现资源浪费,只下载几张图片
                    while True:
                        chunk = await resp.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
                    print('下载完成:' + filename)
            else:
                print('网络错误,重新下载:'+ filename)
                return await images_down(filename,image_url)

async def aiohttp_down():
    for url in pic_list:
        global j
        task = [asyncio.create_task(images_down(url[0],url[1]))]
        j += 1
    #await asyncio.wait(task)
    await asyncio.gather(*task)
    
def main():
    for i in range(1, 3):
        url = 'https://desk.3gbizhi.com/deskMV/index_{}.html'.format(i)
        html = pic_re(url)
        soup = BeautifulSoup(html, 'lxml')
        get_pic_list(soup)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(aiohttp_down())
    #asyncio.run(aiohttp_down())
    date_all = (datetime.datetime.now() - start).total_seconds()
    print(f'总共{j}张图片,下载总用时:{date_all}s')

执行结果:

下载完成:pic\鸽子飞过欧美性感深V装欧美美女高清壁纸.jpg
下载完成:pic\穿黄色衣服在路边黄花的摄影电脑壁纸.jpg
下载完成:pic\棕色色调穿朝鲜传统服饰的韩国高颜值美女.jpg
下载完成:pic\欧美古城街道摄影的欧美美女.jpg
下载完成:pic\沙滩上捧着花散步的长发美女电脑壁纸.jpg
总共48张图片,下载总用时:18.869591s

进程已结束,退出代码0

48张,只用了18秒左右,协程速度最快。

2:遇到的问题

问题一:RuntimeError: Event loop is closed。

asyncio.run()执行报错,所以改成 asyncio.get_event_loop().run_until_complete(main()),就不报错了,有趣的是,官方是推荐使用asyncio.run()的方法。

问题二: 图片下载速度快,但是大尺寸图片下载不完整。

跟多线程遇到的问题一致,1M到20M的图片,显示不完全,小的图片则正常。

尝试解决方法1:

跟多线程一致的方法,或者判断Content-Length值返回循环下载

f os.path.getsize(filename) != int(resp.headers["Content-Length"]):
	return	await images_down(filename, image_url)

结果:直接陷入死循环,发现所有下载图片基本不可能和Content-Length一致,即使下载完整。

尝试解决方法2:

用块的方式,导入aiofiles包,通过设置块(content.iter_chunked)的值尝试循环写入。

async def async_http_download(filename, image_url):
     async with aiofiles.open(filename, 'wb') as fd:
         async with aiohttp.ClientSession() as session:
             async with session.get(image_url) as resp:
                 async for chunk in resp.content.iter_chunked(1024):
                     await fd.write(chunk)

结果,虽然不完整的减少,但是大尺寸的图,还是有,解决未果。

用aiohttp虽然代替requests库使用,估计功能还是没有requests库完整及全面,等后续在找解决方法。

六、总结

对于多任务爬虫来说,多线程、多进程、协程这几种方式处理效率的排序为:aiohttp协程 > 多线程 > 多进程。

但是aiohttp协程难度有点复杂,需要了解,而且本人目前没有解决协程下载大尺寸图片不完整的情况,还需要后续继续学习。

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: