简单爬虫的通用步骤——多线程/多进程爬虫示例

前言

很久很久以前,我写了篇文章《简单爬虫的通用步骤》,这篇文章中对于多线程/多进程/分布式/增量爬虫没有具体例子进行解释,现在来填坑了。

  • 本文以及本系列文章只适用于小白入门,欢迎各路大神指点。
  • 欢迎署名转载

介绍

单线程爬虫就像是一个人处理一堆事情,没法同时处理很多事,效率单一。多线程爬虫可以比作好多人组了一个team来处理这一堆事情。多进程爬虫可以说是,有多个team(team里面可能有一个或多个人)来处理一堆事情。大多数情况下,多线程/多进程比单线程效率高得多。多进程相比于多线程,开销较大,在规模不大的问题处理上,可能多线程比多进程效率高,就好比team内部交流比跨team交流顺畅。

多线程基本操作

仅介绍本文中用到的多线程基本操作以供入门,需要更多进阶知识,请自行学习。

在python中,建议使用threading高级模块,而不是_thread模块。

from threading import Thread
import time


def func1(index):
    print("I'm thread {}".format(index))
    time.sleep(index)
    print("Thread {} end".format(index))


if __name__ == '__main__':
    start_time = time.time()
    thread_list = []
    # 实例化一个线程
    # target是目标函数,记得别加括号
    # args是目标函数的参数,当参数个数仅有一个时,记得后面加上','逗号
    for index in range(1,10):
        thread_list.append(Thread(target=func1, args=(index,)))
    # 用start启动线程
    for t in thread_list:
        t.start()
    # join表示等待线程执行结束
    for t in thread_list:
        t.join()
    print("用时:{:.2f}s".format(time.time()-start_time))

多线程通信没什么值得注意的,用全局变量也行,用类中的公有成员也行。

但要注意一点,多线程共享一个变量时,同时读写会造成数据错误。多进程会将变量各自复制一份到自己的进程空间,变量不进行共享;但是多线程是会共享同一个变量的。例如,两个线程同时对一个公共变量读写1,000,000次。

from threading import Thread
import time

public_count = 0


def func_decrease():
    global public_count
    for index in range(1000000):
        public_count -= 1


def func_increase():
    global public_count
    for index in range(1000000):
        public_count += 1


if __name__ == '__main__':
    start_time = time.time()
    increase_thread = Thread(target=func_increase)
    decrease_thread = Thread(target=func_decrease)

    increase_thread.start()
    decrease_thread.start()

    increase_thread.join()
    decrease_thread.join()
    print(public_count)

多次执行结果均不相同,且都不为0。当多个线程对同一变量进行读写(尤其是写操作)时,一定要加锁。以上程序就变为:

from threading import Thread, Lock
import time

public_count = 0
lock = Lock() #定义一个锁

def func_decrease():
    global public_count
    for index in range(1000000):
        lock.acquire() #获取锁
        public_count -= 1 #修改
        lock.release() #释放锁


def func_increase():
    global public_count
    for index in range(1000000):
        lock.acquire()
        public_count += 1
        lock.release()


if __name__ == '__main__':
    start_time = time.time()
    increase_thread = Thread(target=func_increase)
    decrease_thread = Thread(target=func_decrease)

    increase_thread.start()
    decrease_thread.start()

    increase_thread.join()
    decrease_thread.join()
    print(public_count)

这样执行结果就为0 了,加锁保证了这段代码的执行完整性,中间不会被打断。但是同时只有一个线程能获得锁,这样一来保证了数据安全,失去了多线程的优势。

注意:

使用完一定要调用lock.release()!!!!!!!!!!!避免其他线程等待事件过长造成死锁。

当多个线程拥有不同的锁,又等待其他线程释放其他锁时,会造成死锁现象。就像是两队人马交换人质,都喊着让对面给先放人,自己人回来后再放。这样一来,多个线程都陷入等待状态,等待其他线程释放锁。这就造成了死锁。

多进程基本操作

在unix/linux/mac中,使用fork()可以创建;但是在windows上,需要使用multiprocessing模块。这次主要说multiprocessing。

基本操作与threading类似,使用Process类跟Thread类一样的用法,同时有多线程通信Queue的使用方法(Pipe的使用方法自行查阅)可以看下面的程序:

from multiprocessing import Process, Queue
import time


def func1(index, q):
    print("I'm Process {}".format(index))
    q.put(index)
    time.sleep(index)
    print("Process {} end".format(index))


if __name__ == '__main__':
    start_time = time.time()
    Process_list = []
    q = Queue()
    # 实例化一个线程
    # target是目标函数,记得别加括号
    # args是目标函数的参数,当参数个数仅有一个时,记得后面加上','逗号
    for index in range(1, 14):
        Process_list.append(Process(target=func1, args=(index, q)))
    # 用start启动线程
    for t in Process_list:
        t.start()
    # join表示等待线程执行结束
    for t in Process_list:
        t.join()
    for index in range(1,14):
        print(q.get())
    print("用时:{:.2f}s".format(time.time() - start_time))

当需要启动大量进程时,可以使用进程池Pool类创建子进程。例如:

from multiprocessing import Process, Queue, Pool, Manager
import time


def func1(index, q):
    print("I'm Process {}".format(index))
    q.put(index)
    time.sleep(index)
    print("Process {} end".format(index))


if __name__ == '__main__':
    start_time = time.time()
    Process_list = []
    pool = Pool() #默认是CPU核心数
    q = Manager().Queue()
    # 实例化一个线程
    # target是目标函数,记得别加括号
    # args是目标函数的参数,当参数个数仅有一个时,记得后面加上','逗号
    for index in range(1, 14):
        #apply_async表示异步非阻塞,不用等执行完,随时根据系统调度来切换
        #pool.apply是阻塞的,等执行完才能执行下一个。
        pool.apply_async(func=func1,args=(index,q))
    #关闭进程池,不允许其他进程再加入
    pool.close()
    #等所有进程执行完
    pool.join()
    for index in range(1,14):
        print(q.get())
    print("用时:{:.2f}s".format(time.time() - start_time))

注意:使用进程池的时候,队列要用Manager().Queue()而不是Queue(),否则进程无法启动!!!!!!

程序示例

下面以爬取煎蛋网无聊图为例,代码结构可以参考《简单爬虫的通用步骤》中的多线程和多进程部分。

  • 在使用单线程(单线程爬虫示例)的时候,用时346S,网络IO不到1Mb/s,没有充分发挥网络的性能。
  • 使用多线程(多线程爬虫示例)的时候,用时68S,网络IO一直满负载,CPU/内存等空闲较大。
  • 使用多进程的时候(多进程爬虫示例),用时78S,网络IO略有空闲,CPU/内存等空闲较大。

上面提到过,多进程开销较多线程大,在网络IO或者磁盘IO密集型的爬虫中,多进程不一定能体现出优势。但是在计算密集型程序中,多进程优势大于多线程(因为python中有GIL,多线程无法利用多核优势)。

总结

(写的比较乱,想到哪儿写到哪儿)

有上对比结果可以看出,爬虫类(尤其是下载类爬虫)计算量不大的话,对网络性能和磁盘性能要求比较高,当然现在SSD比较普及,磁盘性能不是很大问题。所以,对于爬虫程序中的下载部分,可以开启多线程以充分发挥网络的性能,对于爬虫程序中的计算部分,可以开启较少的线程甚至是单线程运行。以达到总体效率最大。

但是有写爬虫抓取完数据后需要进行大量运算,这种情况下,需要分配较多的线程给计算部分,较少部分分配给下载部分。以达到总体效率最大。

要做到以上部分,需要预先对爬虫任务进行分析,确认自己的爬虫偏重于什么,性能瓶颈在哪儿。同样要针对自己的分析,进行合理的程序设计。

大家可以看到以上三个示例的代码相差不多,下载部分/计算部分/图片存储都可以复用,只是需要重新写一下线程/进程的安排就行了。

python中,多线程和多进程的代码差距不大,主要在进程通信上面;如果不涉及进程通信问题,甚至不需要改动代码,剩下的交给python处理。

多线程/多进程/分布式爬虫,结构区别不大,就是单个爬虫的运行环境是线程/进程还是单个主机。还有就是通信方式不同,多线程处于同一进程,通信比较方便,变量都共享;多进程需要跨进程通信,通过使用Queue或Pipe;分布式通过网络和redis这种数据库。

下篇文章实现一个简单的分布式爬虫。

发表评论

电子邮件地址不会被公开。 必填项已用*标注