代理 IP 的使用与管理
当你的爬虫遭遇 IP 封禁时,代理 IP 就成了必不可少的工具。本章将深入讲解代理 IP 的原理、类型选择,以及如何设计和实现一个实用的代理池管理系统。
代理 IP 基础
什么是代理 IP
代理服务器(Proxy Server)是一种位于客户端和目标服务器之间的中间服务器。当你通过代理发送请求时:
- 你的请求先发送到代理服务器
- 代理服务器代替你向目标服务器发送请求
- 目标服务器将响应返回给代理服务器
- 代理服务器再将响应转发给你
这样,目标服务器看到的是代理服务器的 IP,而不是你的真实 IP。
代理类型详解
按协议分类
| 类型 | 特点 | 使用场景 |
|---|---|---|
| HTTP 代理 | 只支持 HTTP 协议 | 普通网页爬取 |
| HTTPS 代理 | 支持 HTTPS 协议 | 加密网站爬取 |
| SOCKS4 代理 | 支持 TCP 连接 | 需要更底层控制 |
| SOCKS5 代理 | 支持 TCP/UDP,可认证 | 最灵活的代理类型 |
按匿名度分类
| 类型 | 特点 | 识别难度 |
|---|---|---|
| 透明代理 | 目标服务器能看到你的真实 IP | 容易被识别 |
| 匿名代理 | 隐藏真实 IP,但暴露代理身份 | 中等 |
| 高匿代理 | 完全隐藏真实 IP 和代理身份 | 较难识别 |
对于爬虫来说,高匿代理是首选。
按来源分类
| 类型 | 优点 | 缺点 |
|---|---|---|
| 免费代理 | 成本为零 | 不稳定、速度慢、可用率低 |
| 付费代理 | 稳定、速度快、可用率高 | 需要成本 |
| 自建代理 | 完全可控 | 需要服务器资源 |
代理提供商选择指南
选择代理提供商时需要考虑:
- IP 质量:是否是高匿代理,是否已被目标网站封禁
- IP 数量:IP 池的规模
- 地理分布:是否覆盖目标地区
- 稳定性:连接成功率和响应速度
- 价格:按流量计费还是按 IP 数计费
- API 支持:是否提供便捷的 API 获取代理
常见的代理类型:
- API 提取型:通过 API 获取代理 IP 列表
- 隧道代理型:固定入口,自动轮换 IP
- 动态转发型:每次请求自动更换 IP
代理池设计
为什么需要代理池
直接使用单个代理存在以下问题:
- 代理可能随时失效
- 单个 IP 容易被封禁
- 无法动态切换和管理
代理池可以解决这些问题:
- 统一管理多个代理
- 自动检测代理有效性
- 智能分配和轮换代理
- 记录代理质量评分
代理池架构设计
代理池工作流程:
核心接口设计
python
from abc import ABC, abstractmethod
from typing import Optional, List
from dataclasses import dataclass
from enum import Enum
class ProxyProtocol(Enum):
"""代理协议"""
HTTP = "http"
HTTPS = "https"
SOCKS5 = "socks5"
@dataclass
class ProxyInfo:
"""代理信息"""
host: str
port: int
protocol: ProxyProtocol = ProxyProtocol.HTTP
username: Optional[str] = None
password: Optional[str] = None
# 质量指标
success_count: int = 0
fail_count: int = 0
avg_response_time: float = 0.0
last_check_time: float = 0.0
@property
def url(self) -> str:
"""构建代理 URL"""
auth = ""
if self.username and self.password:
auth = f"{self.username}:{self.password}@"
return f"{self.protocol.value}://{auth}{self.host}:{self.port}"
@property
def score(self) -> float:
"""计算代理评分"""
total = self.success_count + self.fail_count
if total == 0:
return 0.5 # 未测试的代理给中等分数
success_rate = self.success_count / total
# 考虑响应时间,越快分数越高
time_score = max(0, 1 - self.avg_response_time / 10)
return success_rate * 0.7 + time_score * 0.3
class IProxyFetcher(ABC):
"""代理获取器接口"""
@abstractmethod
async def fetch(self) -> List[ProxyInfo]:
"""获取代理列表"""
pass
class IProxyChecker(ABC):
"""代理检测器接口"""
@abstractmethod
async def check(self, proxy: ProxyInfo) -> bool:
"""检测代理是否可用"""
pass
class IProxyPool(ABC):
"""代理池接口"""
@abstractmethod
async def get_proxy(self) -> Optional[ProxyInfo]:
"""获取一个可用代理"""
pass
@abstractmethod
async def return_proxy(self, proxy: ProxyInfo, success: bool):
"""归还代理并报告使用结果"""
pass
@abstractmethod
async def add_proxy(self, proxy: ProxyInfo):
"""添加代理"""
pass
@abstractmethod
async def remove_proxy(self, proxy: ProxyInfo):
"""移除代理"""
pass代理获取器实现
免费代理获取(仅供学习)
python
import httpx
from typing import List
from loguru import logger
class FreeProxyFetcher(IProxyFetcher):
"""
免费代理获取器
注意:免费代理质量较差,仅供学习测试使用
"""
async def fetch(self) -> List[ProxyInfo]:
"""从免费代理网站获取代理"""
proxies = []
# 示例:从 API 获取(这里用一个示例 API)
try:
async with httpx.AsyncClient(timeout=10) as client:
# 这是一个示例 URL,实际使用时替换为真实的代理 API
response = await client.get(
"https://api.proxyscrape.com/v2/"
"?request=getproxies&protocol=http&timeout=10000&country=all"
)
if response.status_code == 200:
lines = response.text.strip().split("\n")
for line in lines:
try:
host, port = line.strip().split(":")
proxies.append(ProxyInfo(
host=host,
port=int(port),
protocol=ProxyProtocol.HTTP
))
except ValueError:
continue
logger.info(f"获取到 {len(proxies)} 个免费代理")
except Exception as e:
logger.error(f"获取免费代理失败: {e}")
return proxiesAPI 代理获取器
python
class APIProxyFetcher(IProxyFetcher):
"""
API 代理获取器
从付费代理服务商的 API 获取代理
"""
def __init__(
self,
api_url: str,
api_key: Optional[str] = None,
count: int = 10
):
"""
初始化 API 代理获取器
Args:
api_url: API 地址
api_key: API 密钥
count: 每次获取数量
"""
self.api_url = api_url
self.api_key = api_key
self.count = count
async def fetch(self) -> List[ProxyInfo]:
"""从 API 获取代理"""
proxies = []
try:
params = {"num": self.count}
if self.api_key:
params["key"] = self.api_key
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(self.api_url, params=params)
data = response.json()
# 根据实际 API 返回格式解析
# 这里假设返回 {"data": [{"ip": "x.x.x.x", "port": 8080}, ...]}
for item in data.get("data", []):
proxies.append(ProxyInfo(
host=item["ip"],
port=item["port"],
protocol=ProxyProtocol(item.get("protocol", "http"))
))
logger.info(f"从 API 获取到 {len(proxies)} 个代理")
except Exception as e:
logger.error(f"从 API 获取代理失败: {e}")
return proxies代理检测器实现
python
import time
import httpx
from typing import Optional
class ProxyChecker(IProxyChecker):
"""
代理检测器
检测代理的可用性和响应速度
"""
# 用于检测的 URL 列表
CHECK_URLS = [
"https://httpbin.org/ip",
"https://api.ipify.org?format=json",
]
def __init__(self, timeout: int = 10):
"""
初始化检测器
Args:
timeout: 检测超时时间
"""
self.timeout = timeout
async def check(self, proxy: ProxyInfo) -> bool:
"""
检测代理是否可用
Args:
proxy: 代理信息
Returns:
代理是否可用
"""
start_time = time.time()
try:
async with httpx.AsyncClient(
proxies=proxy.url,
timeout=self.timeout
) as client:
for url in self.CHECK_URLS:
try:
response = await client.get(url)
if response.status_code == 200:
# 更新响应时间
response_time = time.time() - start_time
proxy.avg_response_time = (
proxy.avg_response_time * 0.7 +
response_time * 0.3
)
proxy.last_check_time = time.time()
logger.debug(
f"代理可用: {proxy.host}:{proxy.port}, "
f"响应时间: {response_time:.2f}s"
)
return True
except Exception:
continue
except Exception as e:
logger.debug(f"代理检测失败: {proxy.host}:{proxy.port} - {e}")
return False
async def check_batch(
self,
proxies: List[ProxyInfo],
concurrency: int = 20
) -> List[ProxyInfo]:
"""
批量检测代理
Args:
proxies: 代理列表
concurrency: 并发数
Returns:
可用的代理列表
"""
import asyncio
semaphore = asyncio.Semaphore(concurrency)
valid_proxies = []
async def check_one(proxy: ProxyInfo):
async with semaphore:
if await self.check(proxy):
valid_proxies.append(proxy)
tasks = [check_one(p) for p in proxies]
await asyncio.gather(*tasks, return_exceptions=True)
logger.info(f"检测完成: {len(valid_proxies)}/{len(proxies)} 可用")
return valid_proxies代理池实现
python
import asyncio
import random
import time
from typing import Optional, List, Dict
from collections import defaultdict
from loguru import logger
class ProxyPool(IProxyPool):
"""
代理池实现
特性:
- 自动获取和检测代理
- 基于评分的智能分配
- 自动淘汰失效代理
- 支持代理预热
"""
def __init__(
self,
fetcher: IProxyFetcher,
checker: IProxyChecker,
min_proxies: int = 10,
max_proxies: int = 100,
check_interval: int = 300,
max_fail_count: int = 3
):
"""
初始化代理池
Args:
fetcher: 代理获取器
checker: 代理检测器
min_proxies: 最小代理数量
max_proxies: 最大代理数量
check_interval: 检测间隔(秒)
max_fail_count: 最大失败次数
"""
self.fetcher = fetcher
self.checker = checker
self.min_proxies = min_proxies
self.max_proxies = max_proxies
self.check_interval = check_interval
self.max_fail_count = max_fail_count
# 代理存储
self._proxies: Dict[str, ProxyInfo] = {}
self._lock = asyncio.Lock()
# 后台任务
self._refresh_task: Optional[asyncio.Task] = None
self._running = False
def _proxy_key(self, proxy: ProxyInfo) -> str:
"""生成代理唯一标识"""
return f"{proxy.host}:{proxy.port}"
async def start(self):
"""启动代理池"""
self._running = True
# 初始获取代理
await self._refresh_proxies()
# 启动后台刷新任务
self._refresh_task = asyncio.create_task(self._refresh_loop())
logger.info("代理池已启动")
async def stop(self):
"""停止代理池"""
self._running = False
if self._refresh_task:
self._refresh_task.cancel()
try:
await self._refresh_task
except asyncio.CancelledError:
pass
logger.info("代理池已停止")
async def _refresh_loop(self):
"""后台刷新循环"""
while self._running:
try:
await asyncio.sleep(self.check_interval)
await self._refresh_proxies()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"代理刷新异常: {e}")
async def _refresh_proxies(self):
"""刷新代理"""
async with self._lock:
# 检查是否需要补充代理
if len(self._proxies) >= self.min_proxies:
return
logger.info(f"代理不足 ({len(self._proxies)}/{self.min_proxies}),开始获取...")
# 获取新代理
new_proxies = await self.fetcher.fetch()
# 检测代理
valid_proxies = await self.checker.check_batch(new_proxies)
# 添加到池中
for proxy in valid_proxies:
key = self._proxy_key(proxy)
if key not in self._proxies and len(self._proxies) < self.max_proxies:
self._proxies[key] = proxy
logger.info(f"代理池更新完成,当前数量: {len(self._proxies)}")
async def get_proxy(self) -> Optional[ProxyInfo]:
"""
获取一个可用代理
使用加权随机选择,评分高的代理被选中概率更大
"""
async with self._lock:
if not self._proxies:
logger.warning("代理池为空")
return None
# 计算权重
proxies = list(self._proxies.values())
weights = [max(p.score, 0.1) for p in proxies]
# 加权随机选择
selected = random.choices(proxies, weights=weights, k=1)[0]
logger.debug(f"分配代理: {selected.host}:{selected.port} (评分: {selected.score:.2f})")
return selected
async def return_proxy(self, proxy: ProxyInfo, success: bool):
"""
归还代理并报告使用结果
Args:
proxy: 代理信息
success: 使用是否成功
"""
async with self._lock:
key = self._proxy_key(proxy)
if key not in self._proxies:
return
stored_proxy = self._proxies[key]
if success:
stored_proxy.success_count += 1
else:
stored_proxy.fail_count += 1
# 检查是否需要淘汰
if stored_proxy.fail_count >= self.max_fail_count:
total = stored_proxy.success_count + stored_proxy.fail_count
if total > 5 and stored_proxy.score < 0.3:
del self._proxies[key]
logger.info(f"淘汰低质量代理: {proxy.host}:{proxy.port}")
async def add_proxy(self, proxy: ProxyInfo):
"""添加代理"""
async with self._lock:
key = self._proxy_key(proxy)
if key not in self._proxies and len(self._proxies) < self.max_proxies:
self._proxies[key] = proxy
async def remove_proxy(self, proxy: ProxyInfo):
"""移除代理"""
async with self._lock:
key = self._proxy_key(proxy)
if key in self._proxies:
del self._proxies[key]
@property
def size(self) -> int:
"""代理池大小"""
return len(self._proxies)
def get_stats(self) -> Dict:
"""获取统计信息"""
if not self._proxies:
return {"total": 0}
proxies = list(self._proxies.values())
scores = [p.score for p in proxies]
return {
"total": len(proxies),
"avg_score": sum(scores) / len(scores),
"max_score": max(scores),
"min_score": min(scores),
}代理与爬虫集成
使用 httpx 设置代理
python
import httpx
async def fetch_with_proxy(url: str, proxy_url: str) -> str:
"""使用代理发送请求"""
async with httpx.AsyncClient(proxies=proxy_url, timeout=30) as client:
response = await client.get(url)
return response.text
# 使用示例
proxy = "http://user:pass@127.0.0.1:8080"
content = await fetch_with_proxy("https://httpbin.org/ip", proxy)集成代理池的爬虫
python
class ProxiedCrawler:
"""
集成代理池的爬虫
自动管理代理的获取、轮换和报告
"""
def __init__(self, proxy_pool: ProxyPool):
self.proxy_pool = proxy_pool
async def fetch(self, url: str) -> Optional[str]:
"""使用代理获取页面"""
proxy = await self.proxy_pool.get_proxy()
if not proxy:
logger.warning("无可用代理")
return None
try:
async with httpx.AsyncClient(
proxies=proxy.url,
timeout=30
) as client:
response = await client.get(url)
response.raise_for_status()
# 报告成功
await self.proxy_pool.return_proxy(proxy, success=True)
return response.text
except Exception as e:
logger.warning(f"请求失败: {url} - {e}")
# 报告失败
await self.proxy_pool.return_proxy(proxy, success=False)
return None隧道代理使用
隧道代理是一种特殊的代理模式,你只需连接到固定的代理入口,每次请求自动分配不同的 IP。
python
class TunnelProxyClient:
"""
隧道代理客户端
特点:固定入口,自动轮换 IP
"""
def __init__(
self,
host: str,
port: int,
username: str,
password: str
):
self.proxy_url = f"http://{username}:{password}@{host}:{port}"
async def get(self, url: str, **kwargs) -> httpx.Response:
"""发送请求(自动使用隧道代理)"""
async with httpx.AsyncClient(
proxies=self.proxy_url,
timeout=30
) as client:
return await client.get(url, **kwargs)
# 使用示例
tunnel = TunnelProxyClient(
host="tunnel.example.com",
port=12345,
username="your_username",
password="your_password"
)
# 每次请求自动使用不同 IP
response1 = await tunnel.get("https://httpbin.org/ip")
response2 = await tunnel.get("https://httpbin.org/ip")代理使用最佳实践
IP 封禁机制分析
大多数网站都有反爬虫的 IP 封禁机制,常见的触发条件和处理方式:
常见 IP 封禁特点:
| 触发条件 | 响应码 | 封禁时长 | 解封方式 |
|---|---|---|---|
| 请求频率过高 | 429 | 几分钟~1小时 | 降低频率后自动解封 |
| 风控检测触发 | 403 | 数小时~1天 | 需更换IP |
| 严重违规 | 403/IP拉黑 | 数天~永久 | 需更换IP |
代理有效性检测器
使用 httpbin.org 等测试服务来验证代理的可用性:
python
import time
import httpx
from typing import Optional
from loguru import logger
class SiteProxyChecker(IProxyChecker):
"""
通用代理检测器
使用 httpbin.org 检测代理可用性和匿名度
"""
# 使用 httpbin.org 检测代理IP
CHECK_URL = "https://httpbin.org/ip"
# 通用请求头
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36",
"Accept": "application/json"
}
def __init__(self, timeout: int = 10):
self.timeout = timeout
async def check(self, proxy: ProxyInfo) -> bool:
"""
检测代理是否可用
判断标准:
- 请求成功(状态码200)
- 响应包含有效JSON
- 返回的IP与代理IP一致(验证代理生效)
"""
start_time = time.time()
try:
async with httpx.AsyncClient(
proxies=proxy.url,
timeout=self.timeout,
headers=self.HEADERS
) as client:
response = await client.get(self.CHECK_URL)
if response.status_code != 200:
logger.debug(f"代理状态码异常: {proxy.host}:{proxy.port} - {response.status_code}")
return False
data = response.json()
# 验证返回的IP(httpbin.org 返回 {"origin": "x.x.x.x"})
origin_ip = data.get("origin", "")
if not origin_ip:
logger.debug(f"代理响应异常: {proxy.host}:{proxy.port}")
return False
# 更新响应时间
response_time = time.time() - start_time
proxy.avg_response_time = (
proxy.avg_response_time * 0.7 + response_time * 0.3
)
proxy.last_check_time = time.time()
logger.debug(
f"代理可用: {proxy.host}:{proxy.port}, "
f"出口IP: {origin_ip}, 响应时间: {response_time:.2f}s"
)
return True
except Exception as e:
logger.debug(f"代理检测失败: {proxy.host}:{proxy.port} - {e}")
return False代理爬虫完整示例
下面展示一个完整的代理爬虫示例,使用 httpbin.org 作为测试目标:
python
import asyncio
import httpx
from typing import Optional, Dict, Any
from loguru import logger
from dataclasses import dataclass
@dataclass
class ProxyCrawlerConfig:
"""代理爬虫配置"""
# 代理池配置
min_proxies: int = 10
max_proxies: int = 50
# 请求配置
request_timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
# 频率控制
request_interval: float = 0.5 # 请求间隔(秒)
class ProxyCrawler:
"""
代理爬虫
特性:
- 自动代理轮换
- 智能重试
- 频率控制
- 错误处理
"""
# 通用请求头
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def __init__(
self,
proxy_pool: ProxyPool,
config: Optional[ProxyCrawlerConfig] = None
):
self.proxy_pool = proxy_pool
self.config = config or ProxyCrawlerConfig()
self._last_request_time = 0.0
async def _wait_for_rate_limit(self):
"""频率控制"""
import time
elapsed = time.time() - self._last_request_time
if elapsed < self.config.request_interval:
await asyncio.sleep(self.config.request_interval - elapsed)
self._last_request_time = time.time()
async def _request(
self,
url: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None
) -> Optional[Dict[str, Any]]:
"""
发送带代理的请求
自动处理代理轮换和重试
"""
await self._wait_for_rate_limit()
merged_headers = {**self.DEFAULT_HEADERS, **(headers or {})}
for attempt in range(self.config.max_retries):
proxy = await self.proxy_pool.get_proxy()
if not proxy:
logger.warning("无可用代理,使用直连")
proxy_url = None
else:
proxy_url = proxy.url
try:
async with httpx.AsyncClient(
proxies=proxy_url,
timeout=self.config.request_timeout,
headers=merged_headers
) as client:
response = await client.get(url, params=params)
# 处理响应
if response.status_code == 200:
if proxy:
await self.proxy_pool.return_proxy(proxy, success=True)
return response.json()
# HTTP错误
if response.status_code == 429:
logger.warning("请求频率过高,等待后重试")
if proxy:
await self.proxy_pool.return_proxy(proxy, success=False)
await asyncio.sleep(self.config.retry_delay * 2)
continue
if response.status_code in (403, 412):
logger.warning(f"IP被封禁 ({response.status_code}),切换代理")
if proxy:
await self.proxy_pool.return_proxy(proxy, success=False)
continue
# 其他错误
logger.warning(f"HTTP错误: {response.status_code}")
if proxy:
await self.proxy_pool.return_proxy(proxy, success=True)
return None
except httpx.TimeoutException:
logger.warning(f"请求超时,切换代理重试 (尝试 {attempt + 1})")
if proxy:
await self.proxy_pool.return_proxy(proxy, success=False)
except Exception as e:
logger.error(f"请求异常: {e}")
if proxy:
await self.proxy_pool.return_proxy(proxy, success=False)
logger.error(f"请求失败,已达最大重试次数: {url}")
return None
async def get_with_proxy(self, url: str) -> Optional[Dict[str, Any]]:
"""
使用代理获取URL
Args:
url: 目标URL
Returns:
响应数据
"""
return await self._request(url)
# 使用示例
async def main():
# 创建代理获取器
fetcher = APIProxyFetcher(
api_url="https://your-proxy-api.com/get",
api_key="your_api_key",
count=20
)
# 创建代理检测器
checker = SiteProxyChecker(timeout=10)
# 创建代理池
pool = ProxyPool(
fetcher=fetcher,
checker=checker,
min_proxies=10,
max_proxies=50
)
# 启动代理池
await pool.start()
try:
# 创建爬虫
crawler = ProxyCrawler(pool)
# 测试请求(使用 httpbin.org 验证代理生效)
result = await crawler.get_with_proxy("https://httpbin.org/ip")
if result:
print(f"当前出口IP: {result.get('origin')}")
# 测试 headers
result = await crawler.get_with_proxy("https://httpbin.org/headers")
if result:
headers = result.get("headers", {})
print(f"User-Agent: {headers.get('User-Agent', 'N/A')}")
# 获取代理池统计
stats = pool.get_stats()
logger.info(f"代理池统计: {stats}")
finally:
await pool.stop()
if __name__ == "__main__":
asyncio.run(main())代理使用最佳实践
代理使用的一些通用建议:
关键建议:
- 代理类型:大型网站对代理检测严格,推荐使用高匿住宅代理
- 请求频率:单IP建议 0.5-1 秒/请求,避免触发频率限制
- 完整请求头:必须携带 User-Agent、Accept 等头信息
- Cookie 携带:部分 API 需要登录态,代理请求也要携带 Cookie
- 失败处理:遇到 403/429 立即切换代理,避免 IP 被永久封禁
本章小结
本章我们学习了代理 IP 的完整知识体系:
- 代理基础:代理类型、匿名度、来源选择
- 代理池设计:获取器、检测器、分配器的接口设计
- 核心实现:代理获取、有效性检测、智能分配
- 爬虫集成:httpx 代理设置、自动轮换、隧道代理
- 最佳实践:代理检测、错误处理、使用建议
代理 IP 是大规模爬虫的基础设施,合理使用可以有效应对 IP 封禁。
下一章预告
下一章我们将学习「Playwright 浏览器自动化入门」。主要内容包括:
- Playwright 的安装和基本使用
- 页面导航和元素定位
- 等待策略和超时处理
- 截图和 PDF 导出
- 爬取 JavaScript 渲染的页面
浏览器自动化是应对复杂反爬的利器,让我们一起探索!