Skip to content

代理 IP 的使用与管理

当你的爬虫遭遇 IP 封禁时,代理 IP 就成了必不可少的工具。本章将深入讲解代理 IP 的原理、类型选择,以及如何设计和实现一个实用的代理池管理系统。

代理 IP 基础

什么是代理 IP

代理服务器(Proxy Server)是一种位于客户端和目标服务器之间的中间服务器。当你通过代理发送请求时:

  1. 你的请求先发送到代理服务器
  2. 代理服务器代替你向目标服务器发送请求
  3. 目标服务器将响应返回给代理服务器
  4. 代理服务器再将响应转发给你

这样,目标服务器看到的是代理服务器的 IP,而不是你的真实 IP。

代理类型详解

按协议分类

类型特点使用场景
HTTP 代理只支持 HTTP 协议普通网页爬取
HTTPS 代理支持 HTTPS 协议加密网站爬取
SOCKS4 代理支持 TCP 连接需要更底层控制
SOCKS5 代理支持 TCP/UDP,可认证最灵活的代理类型

按匿名度分类

类型特点识别难度
透明代理目标服务器能看到你的真实 IP容易被识别
匿名代理隐藏真实 IP,但暴露代理身份中等
高匿代理完全隐藏真实 IP 和代理身份较难识别

对于爬虫来说,高匿代理是首选。

按来源分类

类型优点缺点
免费代理成本为零不稳定、速度慢、可用率低
付费代理稳定、速度快、可用率高需要成本
自建代理完全可控需要服务器资源

代理提供商选择指南

选择代理提供商时需要考虑:

  1. IP 质量:是否是高匿代理,是否已被目标网站封禁
  2. IP 数量:IP 池的规模
  3. 地理分布:是否覆盖目标地区
  4. 稳定性:连接成功率和响应速度
  5. 价格:按流量计费还是按 IP 数计费
  6. API 支持:是否提供便捷的 API 获取代理

常见的代理类型:

  • API 提取型:通过 API 获取代理 IP 列表
  • 隧道代理型:固定入口,自动轮换 IP
  • 动态转发型:每次请求自动更换 IP

代理池设计

为什么需要代理池

直接使用单个代理存在以下问题:

  • 代理可能随时失效
  • 单个 IP 容易被封禁
  • 无法动态切换和管理

代理池可以解决这些问题:

  • 统一管理多个代理
  • 自动检测代理有效性
  • 智能分配和轮换代理
  • 记录代理质量评分

代理池架构设计

代理池工作流程

核心接口设计

python
from abc import ABC, abstractmethod
from typing import Optional, List
from dataclasses import dataclass
from enum import Enum


class ProxyProtocol(Enum):
    """代理协议"""
    HTTP = "http"
    HTTPS = "https"
    SOCKS5 = "socks5"


@dataclass
class ProxyInfo:
    """代理信息"""
    host: str
    port: int
    protocol: ProxyProtocol = ProxyProtocol.HTTP
    username: Optional[str] = None
    password: Optional[str] = None

    # 质量指标
    success_count: int = 0
    fail_count: int = 0
    avg_response_time: float = 0.0
    last_check_time: float = 0.0

    @property
    def url(self) -> str:
        """构建代理 URL"""
        auth = ""
        if self.username and self.password:
            auth = f"{self.username}:{self.password}@"
        return f"{self.protocol.value}://{auth}{self.host}:{self.port}"

    @property
    def score(self) -> float:
        """计算代理评分"""
        total = self.success_count + self.fail_count
        if total == 0:
            return 0.5  # 未测试的代理给中等分数
        success_rate = self.success_count / total
        # 考虑响应时间,越快分数越高
        time_score = max(0, 1 - self.avg_response_time / 10)
        return success_rate * 0.7 + time_score * 0.3


class IProxyFetcher(ABC):
    """代理获取器接口"""

    @abstractmethod
    async def fetch(self) -> List[ProxyInfo]:
        """获取代理列表"""
        pass


class IProxyChecker(ABC):
    """代理检测器接口"""

    @abstractmethod
    async def check(self, proxy: ProxyInfo) -> bool:
        """检测代理是否可用"""
        pass


class IProxyPool(ABC):
    """代理池接口"""

    @abstractmethod
    async def get_proxy(self) -> Optional[ProxyInfo]:
        """获取一个可用代理"""
        pass

    @abstractmethod
    async def return_proxy(self, proxy: ProxyInfo, success: bool):
        """归还代理并报告使用结果"""
        pass

    @abstractmethod
    async def add_proxy(self, proxy: ProxyInfo):
        """添加代理"""
        pass

    @abstractmethod
    async def remove_proxy(self, proxy: ProxyInfo):
        """移除代理"""
        pass

代理获取器实现

免费代理获取(仅供学习)

python
import httpx
from typing import List
from loguru import logger


class FreeProxyFetcher(IProxyFetcher):
    """
    免费代理获取器

    注意:免费代理质量较差,仅供学习测试使用
    """

    async def fetch(self) -> List[ProxyInfo]:
        """从免费代理网站获取代理"""
        proxies = []

        # 示例:从 API 获取(这里用一个示例 API)
        try:
            async with httpx.AsyncClient(timeout=10) as client:
                # 这是一个示例 URL,实际使用时替换为真实的代理 API
                response = await client.get(
                    "https://api.proxyscrape.com/v2/"
                    "?request=getproxies&protocol=http&timeout=10000&country=all"
                )

                if response.status_code == 200:
                    lines = response.text.strip().split("\n")
                    for line in lines:
                        try:
                            host, port = line.strip().split(":")
                            proxies.append(ProxyInfo(
                                host=host,
                                port=int(port),
                                protocol=ProxyProtocol.HTTP
                            ))
                        except ValueError:
                            continue

                logger.info(f"获取到 {len(proxies)} 个免费代理")

        except Exception as e:
            logger.error(f"获取免费代理失败: {e}")

        return proxies

API 代理获取器

python
class APIProxyFetcher(IProxyFetcher):
    """
    API 代理获取器

    从付费代理服务商的 API 获取代理
    """

    def __init__(
        self,
        api_url: str,
        api_key: Optional[str] = None,
        count: int = 10
    ):
        """
        初始化 API 代理获取器

        Args:
            api_url: API 地址
            api_key: API 密钥
            count: 每次获取数量
        """
        self.api_url = api_url
        self.api_key = api_key
        self.count = count

    async def fetch(self) -> List[ProxyInfo]:
        """从 API 获取代理"""
        proxies = []

        try:
            params = {"num": self.count}
            if self.api_key:
                params["key"] = self.api_key

            async with httpx.AsyncClient(timeout=10) as client:
                response = await client.get(self.api_url, params=params)
                data = response.json()

                # 根据实际 API 返回格式解析
                # 这里假设返回 {"data": [{"ip": "x.x.x.x", "port": 8080}, ...]}
                for item in data.get("data", []):
                    proxies.append(ProxyInfo(
                        host=item["ip"],
                        port=item["port"],
                        protocol=ProxyProtocol(item.get("protocol", "http"))
                    ))

                logger.info(f"从 API 获取到 {len(proxies)} 个代理")

        except Exception as e:
            logger.error(f"从 API 获取代理失败: {e}")

        return proxies

代理检测器实现

python
import time
import httpx
from typing import Optional


class ProxyChecker(IProxyChecker):
    """
    代理检测器

    检测代理的可用性和响应速度
    """

    # 用于检测的 URL 列表
    CHECK_URLS = [
        "https://httpbin.org/ip",
        "https://api.ipify.org?format=json",
    ]

    def __init__(self, timeout: int = 10):
        """
        初始化检测器

        Args:
            timeout: 检测超时时间
        """
        self.timeout = timeout

    async def check(self, proxy: ProxyInfo) -> bool:
        """
        检测代理是否可用

        Args:
            proxy: 代理信息

        Returns:
            代理是否可用
        """
        start_time = time.time()

        try:
            async with httpx.AsyncClient(
                proxies=proxy.url,
                timeout=self.timeout
            ) as client:
                for url in self.CHECK_URLS:
                    try:
                        response = await client.get(url)
                        if response.status_code == 200:
                            # 更新响应时间
                            response_time = time.time() - start_time
                            proxy.avg_response_time = (
                                proxy.avg_response_time * 0.7 +
                                response_time * 0.3
                            )
                            proxy.last_check_time = time.time()
                            logger.debug(
                                f"代理可用: {proxy.host}:{proxy.port}, "
                                f"响应时间: {response_time:.2f}s"
                            )
                            return True
                    except Exception:
                        continue

        except Exception as e:
            logger.debug(f"代理检测失败: {proxy.host}:{proxy.port} - {e}")

        return False

    async def check_batch(
        self,
        proxies: List[ProxyInfo],
        concurrency: int = 20
    ) -> List[ProxyInfo]:
        """
        批量检测代理

        Args:
            proxies: 代理列表
            concurrency: 并发数

        Returns:
            可用的代理列表
        """
        import asyncio

        semaphore = asyncio.Semaphore(concurrency)
        valid_proxies = []

        async def check_one(proxy: ProxyInfo):
            async with semaphore:
                if await self.check(proxy):
                    valid_proxies.append(proxy)

        tasks = [check_one(p) for p in proxies]
        await asyncio.gather(*tasks, return_exceptions=True)

        logger.info(f"检测完成: {len(valid_proxies)}/{len(proxies)} 可用")
        return valid_proxies

代理池实现

python
import asyncio
import random
import time
from typing import Optional, List, Dict
from collections import defaultdict
from loguru import logger


class ProxyPool(IProxyPool):
    """
    代理池实现

    特性:
    - 自动获取和检测代理
    - 基于评分的智能分配
    - 自动淘汰失效代理
    - 支持代理预热
    """

    def __init__(
        self,
        fetcher: IProxyFetcher,
        checker: IProxyChecker,
        min_proxies: int = 10,
        max_proxies: int = 100,
        check_interval: int = 300,
        max_fail_count: int = 3
    ):
        """
        初始化代理池

        Args:
            fetcher: 代理获取器
            checker: 代理检测器
            min_proxies: 最小代理数量
            max_proxies: 最大代理数量
            check_interval: 检测间隔(秒)
            max_fail_count: 最大失败次数
        """
        self.fetcher = fetcher
        self.checker = checker
        self.min_proxies = min_proxies
        self.max_proxies = max_proxies
        self.check_interval = check_interval
        self.max_fail_count = max_fail_count

        # 代理存储
        self._proxies: Dict[str, ProxyInfo] = {}
        self._lock = asyncio.Lock()

        # 后台任务
        self._refresh_task: Optional[asyncio.Task] = None
        self._running = False

    def _proxy_key(self, proxy: ProxyInfo) -> str:
        """生成代理唯一标识"""
        return f"{proxy.host}:{proxy.port}"

    async def start(self):
        """启动代理池"""
        self._running = True

        # 初始获取代理
        await self._refresh_proxies()

        # 启动后台刷新任务
        self._refresh_task = asyncio.create_task(self._refresh_loop())

        logger.info("代理池已启动")

    async def stop(self):
        """停止代理池"""
        self._running = False

        if self._refresh_task:
            self._refresh_task.cancel()
            try:
                await self._refresh_task
            except asyncio.CancelledError:
                pass

        logger.info("代理池已停止")

    async def _refresh_loop(self):
        """后台刷新循环"""
        while self._running:
            try:
                await asyncio.sleep(self.check_interval)
                await self._refresh_proxies()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"代理刷新异常: {e}")

    async def _refresh_proxies(self):
        """刷新代理"""
        async with self._lock:
            # 检查是否需要补充代理
            if len(self._proxies) >= self.min_proxies:
                return

            logger.info(f"代理不足 ({len(self._proxies)}/{self.min_proxies}),开始获取...")

            # 获取新代理
            new_proxies = await self.fetcher.fetch()

            # 检测代理
            valid_proxies = await self.checker.check_batch(new_proxies)

            # 添加到池中
            for proxy in valid_proxies:
                key = self._proxy_key(proxy)
                if key not in self._proxies and len(self._proxies) < self.max_proxies:
                    self._proxies[key] = proxy

            logger.info(f"代理池更新完成,当前数量: {len(self._proxies)}")

    async def get_proxy(self) -> Optional[ProxyInfo]:
        """
        获取一个可用代理

        使用加权随机选择,评分高的代理被选中概率更大
        """
        async with self._lock:
            if not self._proxies:
                logger.warning("代理池为空")
                return None

            # 计算权重
            proxies = list(self._proxies.values())
            weights = [max(p.score, 0.1) for p in proxies]

            # 加权随机选择
            selected = random.choices(proxies, weights=weights, k=1)[0]

            logger.debug(f"分配代理: {selected.host}:{selected.port} (评分: {selected.score:.2f})")
            return selected

    async def return_proxy(self, proxy: ProxyInfo, success: bool):
        """
        归还代理并报告使用结果

        Args:
            proxy: 代理信息
            success: 使用是否成功
        """
        async with self._lock:
            key = self._proxy_key(proxy)

            if key not in self._proxies:
                return

            stored_proxy = self._proxies[key]

            if success:
                stored_proxy.success_count += 1
            else:
                stored_proxy.fail_count += 1

                # 检查是否需要淘汰
                if stored_proxy.fail_count >= self.max_fail_count:
                    total = stored_proxy.success_count + stored_proxy.fail_count
                    if total > 5 and stored_proxy.score < 0.3:
                        del self._proxies[key]
                        logger.info(f"淘汰低质量代理: {proxy.host}:{proxy.port}")

    async def add_proxy(self, proxy: ProxyInfo):
        """添加代理"""
        async with self._lock:
            key = self._proxy_key(proxy)
            if key not in self._proxies and len(self._proxies) < self.max_proxies:
                self._proxies[key] = proxy

    async def remove_proxy(self, proxy: ProxyInfo):
        """移除代理"""
        async with self._lock:
            key = self._proxy_key(proxy)
            if key in self._proxies:
                del self._proxies[key]

    @property
    def size(self) -> int:
        """代理池大小"""
        return len(self._proxies)

    def get_stats(self) -> Dict:
        """获取统计信息"""
        if not self._proxies:
            return {"total": 0}

        proxies = list(self._proxies.values())
        scores = [p.score for p in proxies]

        return {
            "total": len(proxies),
            "avg_score": sum(scores) / len(scores),
            "max_score": max(scores),
            "min_score": min(scores),
        }

代理与爬虫集成

使用 httpx 设置代理

python
import httpx

async def fetch_with_proxy(url: str, proxy_url: str) -> str:
    """使用代理发送请求"""
    async with httpx.AsyncClient(proxies=proxy_url, timeout=30) as client:
        response = await client.get(url)
        return response.text


# 使用示例
proxy = "http://user:pass@127.0.0.1:8080"
content = await fetch_with_proxy("https://httpbin.org/ip", proxy)

集成代理池的爬虫

python
class ProxiedCrawler:
    """
    集成代理池的爬虫

    自动管理代理的获取、轮换和报告
    """

    def __init__(self, proxy_pool: ProxyPool):
        self.proxy_pool = proxy_pool

    async def fetch(self, url: str) -> Optional[str]:
        """使用代理获取页面"""
        proxy = await self.proxy_pool.get_proxy()

        if not proxy:
            logger.warning("无可用代理")
            return None

        try:
            async with httpx.AsyncClient(
                proxies=proxy.url,
                timeout=30
            ) as client:
                response = await client.get(url)
                response.raise_for_status()

                # 报告成功
                await self.proxy_pool.return_proxy(proxy, success=True)
                return response.text

        except Exception as e:
            logger.warning(f"请求失败: {url} - {e}")
            # 报告失败
            await self.proxy_pool.return_proxy(proxy, success=False)
            return None

隧道代理使用

隧道代理是一种特殊的代理模式,你只需连接到固定的代理入口,每次请求自动分配不同的 IP。

python
class TunnelProxyClient:
    """
    隧道代理客户端

    特点:固定入口,自动轮换 IP
    """

    def __init__(
        self,
        host: str,
        port: int,
        username: str,
        password: str
    ):
        self.proxy_url = f"http://{username}:{password}@{host}:{port}"

    async def get(self, url: str, **kwargs) -> httpx.Response:
        """发送请求(自动使用隧道代理)"""
        async with httpx.AsyncClient(
            proxies=self.proxy_url,
            timeout=30
        ) as client:
            return await client.get(url, **kwargs)


# 使用示例
tunnel = TunnelProxyClient(
    host="tunnel.example.com",
    port=12345,
    username="your_username",
    password="your_password"
)

# 每次请求自动使用不同 IP
response1 = await tunnel.get("https://httpbin.org/ip")
response2 = await tunnel.get("https://httpbin.org/ip")

代理使用最佳实践

IP 封禁机制分析

大多数网站都有反爬虫的 IP 封禁机制,常见的触发条件和处理方式:

常见 IP 封禁特点

触发条件响应码封禁时长解封方式
请求频率过高429几分钟~1小时降低频率后自动解封
风控检测触发403数小时~1天需更换IP
严重违规403/IP拉黑数天~永久需更换IP

代理有效性检测器

使用 httpbin.org 等测试服务来验证代理的可用性:

python
import time
import httpx
from typing import Optional
from loguru import logger


class SiteProxyChecker(IProxyChecker):
    """
    通用代理检测器

    使用 httpbin.org 检测代理可用性和匿名度
    """

    # 使用 httpbin.org 检测代理IP
    CHECK_URL = "https://httpbin.org/ip"

    # 通用请求头
    HEADERS = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/131.0.0.0 Safari/537.36",
        "Accept": "application/json"
    }

    def __init__(self, timeout: int = 10):
        self.timeout = timeout

    async def check(self, proxy: ProxyInfo) -> bool:
        """
        检测代理是否可用

        判断标准:
        - 请求成功(状态码200)
        - 响应包含有效JSON
        - 返回的IP与代理IP一致(验证代理生效)
        """
        start_time = time.time()

        try:
            async with httpx.AsyncClient(
                proxies=proxy.url,
                timeout=self.timeout,
                headers=self.HEADERS
            ) as client:
                response = await client.get(self.CHECK_URL)

                if response.status_code != 200:
                    logger.debug(f"代理状态码异常: {proxy.host}:{proxy.port} - {response.status_code}")
                    return False

                data = response.json()

                # 验证返回的IP(httpbin.org 返回 {"origin": "x.x.x.x"})
                origin_ip = data.get("origin", "")
                if not origin_ip:
                    logger.debug(f"代理响应异常: {proxy.host}:{proxy.port}")
                    return False

                # 更新响应时间
                response_time = time.time() - start_time
                proxy.avg_response_time = (
                    proxy.avg_response_time * 0.7 + response_time * 0.3
                )
                proxy.last_check_time = time.time()

                logger.debug(
                    f"代理可用: {proxy.host}:{proxy.port}, "
                    f"出口IP: {origin_ip}, 响应时间: {response_time:.2f}s"
                )
                return True

        except Exception as e:
            logger.debug(f"代理检测失败: {proxy.host}:{proxy.port} - {e}")
            return False

代理爬虫完整示例

下面展示一个完整的代理爬虫示例,使用 httpbin.org 作为测试目标:

python
import asyncio
import httpx
from typing import Optional, Dict, Any
from loguru import logger
from dataclasses import dataclass


@dataclass
class ProxyCrawlerConfig:
    """代理爬虫配置"""
    # 代理池配置
    min_proxies: int = 10
    max_proxies: int = 50

    # 请求配置
    request_timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 1.0

    # 频率控制
    request_interval: float = 0.5  # 请求间隔(秒)


class ProxyCrawler:
    """
    代理爬虫

    特性:
    - 自动代理轮换
    - 智能重试
    - 频率控制
    - 错误处理
    """

    # 通用请求头
    DEFAULT_HEADERS = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/131.0.0.0 Safari/537.36",
        "Accept": "application/json, text/plain, */*",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    }

    def __init__(
        self,
        proxy_pool: ProxyPool,
        config: Optional[ProxyCrawlerConfig] = None
    ):
        self.proxy_pool = proxy_pool
        self.config = config or ProxyCrawlerConfig()
        self._last_request_time = 0.0

    async def _wait_for_rate_limit(self):
        """频率控制"""
        import time
        elapsed = time.time() - self._last_request_time
        if elapsed < self.config.request_interval:
            await asyncio.sleep(self.config.request_interval - elapsed)
        self._last_request_time = time.time()

    async def _request(
        self,
        url: str,
        params: Optional[Dict] = None,
        headers: Optional[Dict] = None
    ) -> Optional[Dict[str, Any]]:
        """
        发送带代理的请求

        自动处理代理轮换和重试
        """
        await self._wait_for_rate_limit()

        merged_headers = {**self.DEFAULT_HEADERS, **(headers or {})}

        for attempt in range(self.config.max_retries):
            proxy = await self.proxy_pool.get_proxy()
            if not proxy:
                logger.warning("无可用代理,使用直连")
                proxy_url = None
            else:
                proxy_url = proxy.url

            try:
                async with httpx.AsyncClient(
                    proxies=proxy_url,
                    timeout=self.config.request_timeout,
                    headers=merged_headers
                ) as client:
                    response = await client.get(url, params=params)

                    # 处理响应
                    if response.status_code == 200:
                        if proxy:
                            await self.proxy_pool.return_proxy(proxy, success=True)
                        return response.json()

                    # HTTP错误
                    if response.status_code == 429:
                        logger.warning("请求频率过高,等待后重试")
                        if proxy:
                            await self.proxy_pool.return_proxy(proxy, success=False)
                        await asyncio.sleep(self.config.retry_delay * 2)
                        continue

                    if response.status_code in (403, 412):
                        logger.warning(f"IP被封禁 ({response.status_code}),切换代理")
                        if proxy:
                            await self.proxy_pool.return_proxy(proxy, success=False)
                        continue

                    # 其他错误
                    logger.warning(f"HTTP错误: {response.status_code}")
                    if proxy:
                        await self.proxy_pool.return_proxy(proxy, success=True)
                    return None

            except httpx.TimeoutException:
                logger.warning(f"请求超时,切换代理重试 (尝试 {attempt + 1})")
                if proxy:
                    await self.proxy_pool.return_proxy(proxy, success=False)
            except Exception as e:
                logger.error(f"请求异常: {e}")
                if proxy:
                    await self.proxy_pool.return_proxy(proxy, success=False)

        logger.error(f"请求失败,已达最大重试次数: {url}")
        return None

    async def get_with_proxy(self, url: str) -> Optional[Dict[str, Any]]:
        """
        使用代理获取URL

        Args:
            url: 目标URL

        Returns:
            响应数据
        """
        return await self._request(url)


# 使用示例
async def main():
    # 创建代理获取器
    fetcher = APIProxyFetcher(
        api_url="https://your-proxy-api.com/get",
        api_key="your_api_key",
        count=20
    )

    # 创建代理检测器
    checker = SiteProxyChecker(timeout=10)

    # 创建代理池
    pool = ProxyPool(
        fetcher=fetcher,
        checker=checker,
        min_proxies=10,
        max_proxies=50
    )

    # 启动代理池
    await pool.start()

    try:
        # 创建爬虫
        crawler = ProxyCrawler(pool)

        # 测试请求(使用 httpbin.org 验证代理生效)
        result = await crawler.get_with_proxy("https://httpbin.org/ip")
        if result:
            print(f"当前出口IP: {result.get('origin')}")

        # 测试 headers
        result = await crawler.get_with_proxy("https://httpbin.org/headers")
        if result:
            headers = result.get("headers", {})
            print(f"User-Agent: {headers.get('User-Agent', 'N/A')}")

        # 获取代理池统计
        stats = pool.get_stats()
        logger.info(f"代理池统计: {stats}")

    finally:
        await pool.stop()


if __name__ == "__main__":
    asyncio.run(main())

代理使用最佳实践

代理使用的一些通用建议:

关键建议

  1. 代理类型:大型网站对代理检测严格,推荐使用高匿住宅代理
  2. 请求频率:单IP建议 0.5-1 秒/请求,避免触发频率限制
  3. 完整请求头:必须携带 User-Agent、Accept 等头信息
  4. Cookie 携带:部分 API 需要登录态,代理请求也要携带 Cookie
  5. 失败处理:遇到 403/429 立即切换代理,避免 IP 被永久封禁

本章小结

本章我们学习了代理 IP 的完整知识体系:

  1. 代理基础:代理类型、匿名度、来源选择
  2. 代理池设计:获取器、检测器、分配器的接口设计
  3. 核心实现:代理获取、有效性检测、智能分配
  4. 爬虫集成:httpx 代理设置、自动轮换、隧道代理
  5. 最佳实践:代理检测、错误处理、使用建议

代理 IP 是大规模爬虫的基础设施,合理使用可以有效应对 IP 封禁。


下一章预告

下一章我们将学习「Playwright 浏览器自动化入门」。主要内容包括:

  • Playwright 的安装和基本使用
  • 页面导航和元素定位
  • 等待策略和超时处理
  • 截图和 PDF 导出
  • 爬取 JavaScript 渲染的页面

浏览器自动化是应对复杂反爬的利器,让我们一起探索!