爬虫入门实战4_高效率的爬虫实现 
在现代爬虫开发中,提高爬取效率是一个永恒的话题。本文将深入探讨Python中的多进程、多线程和协程三种并发编程方式,分析它们在爬虫开发中的应用,并通过实际案例比较它们的性能表现。
1. 并发编程概述 
1.1 多进程(Multiprocessing) 
多进程是指在操作系统中同时运行多个独立的进程。每个进程都有自己的内存空间和系统资源。
优点:
- 可以充分利用多核CPU
 - 绕过Python的全局解释器锁(GIL)
 - 进程间内存隔离,更安全
 
缺点:
- 进程创建和切换开销大
 - 进程间通信相对复杂
 - 占用较多系统资源
 
1.2 多线程(Multithreading) 
多线程是在同一进程内创建多个线程,共享进程的内存空间。
优点:
- 资源占用相对较少
 - 线程间共享内存,通信方便
 - 适合I/O密集型任务
 
缺点:
- 受Python GIL限制,难以充分利用多核CPU
 - 需要考虑线程安全问题
 - 调试相对困难
 
1.3 协程(Coroutine) 
协程是一种用户态的轻量级线程,通过协作式多任务实现并发。
优点:
- 极低的系统开销
 - 高效处理I/O密集型任务
 - 编程模型简单,易于理解
 
缺点:
- 不适合CPU密集型任务
 - 需要特定的库支持(如asyncio)
 - 对于长时间运行的I/O操作可能会阻塞事件循环
 
2. 并发编程的演变 
最新的3.13已经支持编译一个无gil版本的python,后面可能python真的要起飞🛫了
Python并发编程的演变历程:
- 早期:单线程同步编程
 - Python 2.x:引入threading模块,支持多线程
 - Python 2.6+:引入multiprocessing模块,支持多进程
 - Python 3.4+:引入asyncio模块,支持协程
 - Python 3.5+:引入async/await语法,简化协程编写
 
这种演变反映了开发者对更高效、更易用的并发编程方式的不懈追求。
3. 最简单的基本示例 
3.1 多进程示例 
import multiprocessing
import time
def worker(num):
    print(f"Worker {num} started")
    time.sleep(2)
    print(f"Worker {num} finished")
if __name__ == "__main__":
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print("All processes completed")3.2 多线程示例 
import threading
import time
def worker(num):
    print(f"Thread {num} started")
    time.sleep(2)
    print(f"Thread {num} finished")
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print("All threads completed")3.3 协程示例 
import asyncio
async def worker(num):
    print(f"Coroutine {num} started")
    await asyncio.sleep(2)
    print(f"Coroutine {num} finished")
async def main():
    tasks = [asyncio.create_task(worker(i)) for i in range(5)]
    await asyncio.gather(*tasks)
asyncio.run(main())
print("All coroutines completed")上述的三个示例,在时间上都是2秒,但是在内存上有所不同,多进程的内存占用最大,多线程次之,协程最小。
4. 爬虫中的应用 
在爬虫开发中,这三种并发方式各有其适用场景:
- 多进程:适合需要绕过GIL、利用多核CPU的场景,如大规模数据处理。
 - 多线程:适合I/O密集型任务,如同时爬取多个网页。
 - 协程:最适合大量并发I/O操作,如高并发的网络请求。
 
5. 实战对比 
我们将使用多进程、多线程和协程三种方式实现同一个爬虫任务:爬取Yahoo Finance的加密货币数据。我们会比较它们的性能表现。
如果不熟悉 Yahoo Finance 的加密货币数据 可以回过头去看 09_爬虫入门实战2_动态数据提取.md 章节
5.1 多进程版本实现 
# -*- coding: utf-8 -*-
import csv
import time
from typing import Any, Dict, List
from multiprocessing import Pool, cpu_count
import requests
from common import SymbolContent, make_req_params_and_headers
HOST = "https://query1.finance.yahoo.com"
SYMBOL_QUERY_API_URI = "/v1/finance/screener"
PAGE_SIZE = 100  # 可选配置(25, 50, 100)
def parse_symbol_content(quote_item: Dict) -> SymbolContent:
    """
    数据提取
    :param quote_item:
    :return:
    """
    symbol_content = SymbolContent()
    symbol_content.symbol = quote_item["symbol"]
    symbol_content.name = quote_item["shortName"]
    symbol_content.price = quote_item["regularMarketPrice"]["fmt"]
    symbol_content.change_price = quote_item["regularMarketChange"]["fmt"]
    symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"]
    symbol_content.market_price = quote_item["marketCap"]["fmt"]
    return symbol_content
def send_request(page_start: int, page_size: int) -> Dict[str, Any]:
    """
    公共的发送请求的函数
    :param page_start: 分页起始位置
    :param page_size: 每一页的长度
    :return:
    """
    # print(f"[send_request] page_start:{page_start}")
    req_url = HOST + SYMBOL_QUERY_API_URI
    common_params, headers, common_payload_data = make_req_params_and_headers()
    # 修改分页变动参数
    common_payload_data["offset"] = page_start
    common_payload_data["size"] = page_size
    response = requests.post(url=req_url, params=common_params, json=common_payload_data, headers=headers)
    if response.status_code != 200:
        raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text)
    try:
        response_dict: Dict = response.json()
        return response_dict
    except Exception as e:
        raise e
def fetch_currency_data_single(page_start: int) -> List[SymbolContent]:
    """
    Fetch currency data for a single page.
    :param page_start: Page start index.
    :return: List of SymbolContent for the page.
    """
    try:
        response_dict: Dict = send_request(page_start=page_start, page_size=PAGE_SIZE)
        symbol_data_list: List[SymbolContent] = [
            parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"]
        ]
        return symbol_data_list
    except Exception as e:
        print(f"Error fetching data for page_start={page_start}: {e}")
        return []
def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]:
    """
    Fetch currency data using multiprocessing.
    :param max_total_count: Maximum total count of currencies.
    :return: List of all SymbolContent.
    """
    with Pool(processes=cpu_count()) as pool:
        page_starts = list(range(0, max_total_count, PAGE_SIZE))
        print(f"总共发起: {len(page_starts)} 次网络请求")
        results = pool.map(fetch_currency_data_single, page_starts)
        # Flatten the list of lists into a single list
    return [item for sublist in results for item in sublist]
def get_max_total_count() -> int:
    """
    获取所有币种总数量
    :return:
    """
    print("开始获取最大的币种数量")
    try:
        response_dict: Dict = send_request(page_start=0, page_size=PAGE_SIZE)
        total_num: int = response_dict["finance"]["result"][0]["total"]
        print(f"获取到 {total_num} 种币种")
        return total_num
    except Exception as e:
        print("错误信息:", e)
        return 0
def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None:
    """
    保存数据存储到CSV文件中
    :param save_file_name: 保存的文件名
    :param currency_data_list:
    :return:
    """
    with open(save_file_name, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        # 写入标题行
        writer.writerow(SymbolContent.get_fields())
        # 遍历数据列表,并将每个币种的名称写入CSV
        for symbol in currency_data_list:
            writer.writerow([symbol.symbol, symbol.name, symbol.price, symbol.change_price, symbol.change_percent,
                             symbol.market_price])
def run_crawler_mp(save_file_name: str) -> None:
    """
    爬虫主流程(多进程版本)
    :param save_file_name:
    :return:
    """
    # step1 获取最大数据总量
    max_total: int = get_max_total_count()
    # step2 遍历每一页数据并解析存储到数据容器中
    data_list: List[SymbolContent] = fetch_currency_data_list(max_total)
    # step3 将数据容器中的数据保存csv
    save_data_to_csv(save_file_name, data_list)
if __name__ == '__main__':
    start_time = time.time()
    save_csv_file_name = f"symbol_data_{int(start_time)}.csv"
    run_crawler_mp(save_csv_file_name)
    end_time = time.time()
    print(f"多进程执行程序耗时: {end_time - start_time} 秒")5.2 多线程版本 
# -*- coding: utf-8 -*-
import csv
import time
from os import cpu_count
from typing import Any, Dict, List
from concurrent.futures import ThreadPoolExecutor
import requests
from common import SymbolContent, make_req_params_and_headers
HOST = "https://query1.finance.yahoo.com"
SYMBOL_QUERY_API_URI = "/v1/finance/screener"
PAGE_SIZE = 100  # 可选配置(25, 50, 100)
def parse_symbol_content(quote_item: Dict) -> SymbolContent:
    """
    数据提取
    :param quote_item:
    :return:
    """
    symbol_content = SymbolContent()
    symbol_content.symbol = quote_item["symbol"]
    symbol_content.name = quote_item["shortName"]
    symbol_content.price = quote_item["regularMarketPrice"]["fmt"]
    symbol_content.change_price = quote_item["regularMarketChange"]["fmt"]
    symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"]
    symbol_content.market_price = quote_item["marketCap"]["fmt"]
    return symbol_content
def send_request(page_start: int, page_size: int) -> Dict[str, Any]:
    """
    公共的发送请求的函数
    :param page_start: 分页起始位置
    :param page_size: 每一页的长度
    :return:
    """
    # print(f"[send_request] page_start:{page_start}")
    req_url = HOST + SYMBOL_QUERY_API_URI
    common_params, headers, common_payload_data = make_req_params_and_headers()
    # 修改分页变动参数
    common_payload_data["offset"] = page_start
    common_payload_data["size"] = page_size
    response = requests.post(url=req_url, params=common_params, json=common_payload_data, headers=headers)
    if response.status_code != 200:
        raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text)
    try:
        response_dict: Dict = response.json()
        return response_dict
    except Exception as e:
        raise e
def fetch_currency_data_single(page_start: int) -> List[SymbolContent]:
    """
    Fetch currency data for a single page.
    :param page_start: Page start index.
    :return: List of SymbolContent for the page.
    """
    try:
        response_dict: Dict = send_request(page_start=page_start, page_size=PAGE_SIZE)
        return [
            parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"]
        ]
    except Exception as e:
        print(f"Error fetching data for page_start={page_start}: {e}")
        return []
def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]:
    """
    Fetch currency data using multithreading.
    :param max_total_count: Maximum total count of currencies.
    :return: List of all SymbolContent.
    """
    with ThreadPoolExecutor(max_workers=cpu_count() * 2) as executor:
        page_starts = list(range(0, max_total_count, PAGE_SIZE))
        print(f"总共发起: {len(page_starts)} 次网络请求")
        # 使用 map 方法
        results = list(executor.map(fetch_currency_data_single, page_starts))
        # 扁平化结果列表
        return [item for sublist in results for item in sublist]
def get_max_total_count() -> int:
    """
    获取所有币种总数量
    :return:
    """
    print("开始获取最大的币种数量")
    try:
        response_dict: Dict = send_request(page_start=0, page_size=PAGE_SIZE)
        total_num: int = response_dict["finance"]["result"][0]["total"]
        print(f"获取到 {total_num} 种币种")
        return total_num
    except Exception as e:
        print("错误信息:", e)
        return 0
def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None:
    """
    保存数据存储到CSV文件中
    :param save_file_name: 保存的文件名
    :param currency_data_list:
    :return:
    """
    with open(save_file_name, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        # 写入标题行
        writer.writerow(SymbolContent.get_fields())
        # 遍历数据列表,并将每个币种的名称写入CSV
        for symbol in currency_data_list:
            writer.writerow([symbol.symbol, symbol.name, symbol.price, symbol.change_price, symbol.change_percent,
                             symbol.market_price])
def run_crawler_mt(save_file_name: str) -> None:
    """
    爬虫主流程(多线程版本)
    :param save_file_name:
    :return:
    """
    # step1 获取最大数据总量
    max_total: int = get_max_total_count()
    # step2 遍历每一页数据并解析存储到数据容器中
    data_list: List[SymbolContent] = fetch_currency_data_list(max_total)
    # step3 将数据容器中的数据保存csv
    save_data_to_csv(save_file_name, data_list)
if __name__ == '__main__':
    start_time = time.time()
    save_csv_file_name = f"symbol_data_{int(start_time)}.csv"
    run_crawler_mt(save_csv_file_name)
    end_time = time.time()
    print(f"多线程执行程序耗时: {end_time - start_time} 秒")5.3 协程版本实现 
# -*- coding: utf-8 -*-
import asyncio
import csv
import time
from typing import Any, Dict, List
import aiofiles
import httpx
from common import SymbolContent, make_req_params_and_headers
HOST = "https://query1.finance.yahoo.com"
SYMBOL_QUERY_API_URI = "/v1/finance/screener"
PAGE_SIZE = 100  # 可选配置(25, 50, 100)
def parse_symbol_content(quote_item: Dict) -> SymbolContent:
    """
    数据提取
    :param quote_item:
    :return:
    """
    symbol_content = SymbolContent()
    symbol_content.symbol = quote_item["symbol"]
    symbol_content.name = quote_item["shortName"]
    symbol_content.price = quote_item["regularMarketPrice"]["fmt"]
    symbol_content.change_price = quote_item["regularMarketChange"]["fmt"]
    symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"]
    symbol_content.market_price = quote_item["marketCap"]["fmt"]
    return symbol_content
async def send_request(page_start: int, page_size: int) -> Dict[str, Any]:
    """
    公共的发送请求的函数
    :param page_start: 分页起始位置
    :param page_size: 每一页的长度
    :return:
    """
    # print(f"[send_request] page_start:{page_start}")
    req_url = HOST + SYMBOL_QUERY_API_URI
    common_params, headers, common_payload_data = make_req_params_and_headers()
    # 修改分页变动参数
    common_payload_data["offset"] = page_start
    common_payload_data["size"] = page_size
    async with httpx.AsyncClient() as client:
        response = await client.post(url=req_url, params=common_params, json=common_payload_data, headers=headers,
                                     timeout=30)
    if response.status_code != 200:
        raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text)
    try:
        response_dict: Dict = response.json()
        return response_dict
    except Exception as e:
        raise e
async def fetch_currency_data_single(page_start: int) -> List[SymbolContent]:
    """
    Fetch currency data for a single page.
    :param page_start: Page start index.
    :return: List of SymbolContent for the page.
    """
    try:
        response_dict: Dict = await send_request(page_start=page_start, page_size=PAGE_SIZE)
        return [
            parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"]
        ]
    except Exception as e:
        print(f"Error fetching data for page_start={page_start}: {e}")
        return []
async def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]:
    """
    Fetch currency data using asyncio.
    :param max_total_count: Maximum total count of currencies.
    :return: List of all SymbolContent.
    """
    page_starts = list(range(0, max_total_count, PAGE_SIZE))
    print(f"总共发起: {len(page_starts)} 次网络请求")
    tasks = [fetch_currency_data_single(page_start) for page_start in page_starts]
    results = await asyncio.gather(*tasks)
    # 扁平化结果列表
    return [item for sublist in results for item in sublist]
async def get_max_total_count() -> int:
    """
    获取所有币种总数量
    :return:
    """
    print("开始获取最大的币种数量")
    try:
        response_dict: Dict = await send_request(page_start=0, page_size=PAGE_SIZE)
        total_num: int = response_dict["finance"]["result"][0]["total"]
        print(f"获取到 {total_num} 种币种")
        return total_num
    except Exception as e:
        print("错误信息:", e)
        return 0
async def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None:
    """
    保存数据存储到CSV文件中
    :param save_file_name: 保存的文件名
    :param currency_data_list:
    :return:
    """
    async with aiofiles.open(save_file_name, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        # 写入标题行
        await file.write(','.join(SymbolContent.get_fields()) + '\n')
        # 遍历数据列表,并将每个币种的名称写入CSV
        for symbol in currency_data_list:
            await file.write(f"{symbol.symbol},{symbol.name},{symbol.price},{symbol.change_price},{symbol.change_percent},{symbol.market_price}\n")
async def run_crawler_async(save_file_name: str) -> None:
    """
    爬虫主流程(异步并发版本)
    :param save_file_name:
    :return:
    """
    # step1 获取最大数据总量
    max_total: int = await get_max_total_count()
    # step2 遍历每一页数据并解析存储到数据容器中
    data_list: List[SymbolContent] = await fetch_currency_data_list(max_total)
    # step3 将数据容器中的数据保存csv
    await save_data_to_csv(save_file_name, data_list)
async def main():
    """
    主函数
    :return:
    """
    start_time = time.time()
    save_csv_file_name = f"symbol_data_{int(start_time)}.csv"
    await run_crawler_async(save_csv_file_name)
    end_time = time.time()
    print(f"asyncio调度协程执行程序耗时: {end_time - start_time} 秒")
if __name__ == '__main__':
    asyncio.run(main())上述源代码路径:11_爬虫入门实战4_高效率的爬虫实现
5.4 执行耗时 
5.4.1 多线程执行耗时 
开始获取最大的币种数量 
 获取到 9967 种币种 
 总共发起: 100 次网络请求 
 多线程执行程序耗时: 7.992658853530884 秒 
5.4.2 多进程执行耗时 
开始获取最大的币种数量 
 获取到 9967 种币种 
 总共发起: 100 次网络请求 
 多进程执行程序耗时: 17.447596073150635 秒 
5.4.3 协程执行耗时 
开始获取最大的币种数量 
 获取到 9967 种币种 
 总共发起: 100 次网络请求 
 asyncio调度协程执行程序耗时: 4.690491199493408 秒 
6. 上述代码总结 
我比较喜欢使用异步协程,因为编程风格很像同步代码,并且还能带来高效率的表现。
6.1. 多线程(run_crawler_multi_thread.py) 
适用场景:适合I/O密集型任务,如网络请求,因为线程在等待I/O操作(如网络响应)时可以让出CPU给其他线程。 实现逻辑:
- 使用ThreadPoolExecutor来管理线程池。
 - 将任务(获取单页货币数据)分配给线程池中的线程执行。
 - 使用executor.map来并行处理多个页面的数据获取,这个方法会自动处理任务的分配和结果的收集。
 
6.2. 多进程(run_crawler_multi_process.py) 
适用场景:适合CPU密集型任务,但在这个案例中,它用于处理I/O密集型任务,这通常不是最佳选择,因为进程间通信成本较高。 实现逻辑:
- 使用multiprocessing.Pool来创建进程池。
 - 类似于多线程,使用pool.map来并行处理多个页面的数据获取。
 - 进程间的数据传递通过序列化和反序列化实现,这可能会引入额外的开销。
 
6.3. 协程(run_crawler_multi_coroutine.py) 
适用场景:非常适合I/O密集型任务,如网络请求。协程通过事件循环和非阻塞I/O操作提高程序的执行效率。 实现逻辑:
- 使用asyncio库来管理协程。
 - 使用httpx.AsyncClient进行异步HTTP请求,这允许在等待网络响应时不阻塞程序的其他部分。
 - 使用asyncio.gather来并发执行多个协程,这些协程分别处理不同页面的数据获取。
 
总结
- 多线程和多进程都可以处理并发任务,但在处理大量的网络I/O操作时,它们可能不如协程高效。
 - 协程提供了最高的效率和最佳的资源利用率,特别是在处理网络I/O密集型任务时。
 - 在选择并发策略时,应考虑任务的类型(CPU密集型还是I/O密集型)、系统的资源(如CPU核心数)以及程序的复杂性。