爬虫入门实战3 数据存储实现
数据存储,顾名思义,将我们爬取的数据存储下来,这一章将带着大家一步步的实现一个数据存储的过程 我们将会使用到两种存储方式,一种是存储到本地文件,另一种是存储到数据库中。 本地文件又可以分为2种,一种是存储到csv文件,另一种是存储到json文件。
你不妨思考一下,如果是你,你会怎么设计你的爬虫程序支持不同的存储方式呢?
前置准备
在正式讨论数据存储之前,我们先来回顾一下在 动态数据这一章 是如何获取动态数据。
在动态数据获取这一章,我们是不是爬取了一个雅虎财经方面的加密货币数据,然后将数据保存到CSV文件中了,当时对数据存储没有做过的介绍。
这一章,我们将会详细的讲解数据存储的过程,以及如何将数据存储到数据库中。
回顾上一章我们存储动态数据的容器类定义:
from typing import List
class SymbolContent:
symbol: str = ""
name: str = ""
price: str = "" # 价格(盘中)
change_price: str = "" # 跌涨价格
change_percent: str = "" # 跌涨幅
market_price: str = "" # 市值
@classmethod
def get_fields(cls) -> List[str]:
return [key for key in cls.__dict__.keys() if not key.startswith('__') and key != "get_fields"]
def __str__(self):
return f"""
Symbol: {self.symbol}
Name: {self.name}
Price: {self.price}
Change Price: {self.change_price}
Change Percent: {self.change_percent}
Market Price: {self.market_price}
"""
从上面的容器类定义可以看见,我们已经将爬取到的数据有意识的转换成一个python结构化数据了,但是呢,这个数据是存储在内存中的,我们需要将这个数据存储到文件或者数据库中,这样我们才能更好的利用这些数据。
那么问题来了,当前的SymbolContent类,要支持存储到文件或db中,要做一转换工作,打个比方,如何转成json格式,如何转成python的dict,这些都是我们需要考虑的问题。
幸好的是,在python这边,有一个库,它非常适合做这件事情,那就是Pydantic库,这个库可以帮助我们将python的数据结构转换成json格式,dict格式,有了dict和json,其实存储CSV、存储数据库都是非常方便的了。
Pydantic的基本使用介绍
Pydantic是一个数据验证和序列化库,它可以帮助我们定义数据模型,然后将数据模型转换成json格式,dict格式,这样我们就可以很方便的将数据存储到文件或者数据库中。
使用Pydantic最简单的demo
# 导入Pydantic库的BaseModel基类
from pydantic import BaseModel
# 定义你的模型类
class User(BaseModel):
id: int
name: str
age: int
# 实例化你的模型类
user = User(id=1, name="小明", age=18)
# 将模型类转换成dict
user_dict = user.model_dump()
print(type(user_dict), user_dict) # <class 'dict'> {'id': 1, 'name': '小明', 'age': 18}
# 将模型类转换成json
user_json = user.model_dump_json()
print(type(user_json), user_json) # <class 'str'> {"id": 1, "name": "小明", "age": 18}
相信你从上面例子中可以看出,Pydantic的使用是非常简单的,只需要定义你的模型类,然后实例化你的模型类,然后调用model_dump()方法就可以将模型类转换成dict格式,调用model_dump_json()方法就可以将模型类转换成json格式。
pydantic真的非常非常好用,尤其是在python这门动态类型的脚本语言,使用pydantic能够加强你代码的类型检查,让你的代码更加健壮。更多的使用方法,可以参考Pydantic官方文档
如果python是你的主力语言的话, 强力推荐!
学习目标
- 设计一个数据存储抽象类
- 将数据存储到CSV文件
- 将数据存储到JSON文件
- 封装一个mysql数据库操作类
- 将数据存储到数据库
- 改造动态数据章节的存储代码,支持不同类型存储
设计原理
流程设计
存储层类图设计
类图是软件工程的统一建模语言一种静态结构图,该图描述了系统的类集合,类的属性和类之间的关系。
类图是面向对象式的建模。他们一般都被用于概念建模(conceptual modelling)的系统分类的应用程序,并可将模型建模转译成代码。
一个类有三个区域
- 最上面是类名称
- 中间部分包含类的属性
- 底部部分包含类的方法
- 为了进一步描述系统的行为,这些类图可以辅之以状态图或UML状态机。
代码实现过程
后面源码都用python的异步编程来写吧,这样可以更好的提高代码的执行效率,提高爬虫的效率。而且我比较擅长异步编程,哈哈哈
SymbolContent类转换成Pydantic模型类
from pydantic import BaseModel, Field
class SymbolContent(BaseModel):
symbol: str = Field(default="", title="Symbol")
name: str = Field(default="", title="Name")
price: str = Field(default="", title="价格盘中")
change_price: str = Field(default="", title="跌涨价格")
change_percent: str = Field(default="", title="跌涨幅")
market_price: str = Field(default="", title="市值")
可以看到 SymbolContent
模型类,相较于原来的变化不是很大,只是增加了一些Pydantic的Field字段,这些字段可以帮助我们定义字段的默认值,字段的标题等信息。 另外差异比较大的是现在的 SymbolContent
类,继承了 BaseModel
类,这样我们就可以使用Pydantic的一些方法了。
设计存储抽象类
抽象的好处:
一个好的抽象可以将大量实现细节隐藏在一个干净,简单易懂的外观下面。一个好的抽象也可以广泛用于各类不同应用。比起重复造很多轮子,重用抽象不仅更有效率,而且有助于开发高质量的软件。抽象组件的质量改进将使所有使用它的应用受益。
例如,高级编程语言是一种抽象,隐藏了机器码、CPU 寄存器和系统调用。SQL 也是一种抽象,隐藏了复杂的磁盘 / 内存数据结构、来自其他客户端的并发请求、崩溃后的不一致性。当然在用高级语言编程时,我们仍然用到了机器码;只不过没有 直接(directly) 使用罢了,正是因为编程语言的抽象,我们才不必去考虑这些实现细节。
抽象可以帮助我们将系统的复杂度控制在可管理的水平,不过,找到好的抽象是非常困难的。在分布式系统领域虽然有许多好的算法,但我们并不清楚它们应该打包成什么样抽象。
from ddia
从类图的我们很轻松就能把存储抽象类给定义出来,代码定义如下:
from abc import ABC, abstractmethod
from common import SymbolContent
class AbstractStore(ABC):
@abstractmethod
async def save(self, save_item: SymbolContent):
"""
存储数据
:param save_item:
:return:
"""
raise NotImplementedError
其中save方法就是抽象方法,我们需要在子类中实现这个方法,这样我们就可以将数据存储到不同的地方了。
这里多说一点关于python这边的抽象类,
- python这边提供了一个abc模块,我们可以使用这个模块来定义抽象类,这样我们就可以在子类中实现抽象方法了。
- @abstractmethod装饰器是一个抽象方法的标志,如果一个类中有抽象方法,那么这个类就是一个抽象类,抽象类不能被实例化,只能被继承。
- raise NotImplementedError是一个异常,如果子类没有实现抽象方法,那么就会抛出这个异常。
存储到CSV文件
首先,从类图看,我们需要实现一个存储到CSV文件的类,这个类需要继承我们的抽象类,然后实现抽象方法,代码如下:
import csv
import pathlib
import time
from typing import Dict
import aiofiles
from abstract_store import AbstractStore
from common import SymbolContent
class CsvStoreImpl(AbstractStore):
def __init__(self):
self.csv_store_path = "data/csv"
def make_save_file_name(self) -> str:
"""
make save file name
:return:
"""
return f"{self.csv_store_path}/symbol_content_{int(time.time())}.csv"
async def save(self, save_item: SymbolContent):
"""
save data to csv
:param save_item:
:return:
"""
pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name()
async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
f.fileno()
writer = csv.writer(f)
save_item_dict: Dict = save_item.model_dump()
if await f.tell() == 0:
await writer.writerow(save_item_dict.keys())
await writer.writerow(save_item_dict.values())
这个类的实现非常简单,首先我们需要实现save方法,这个方法是抽象方法,我们需要将数据存储到CSV文件中,这里我们使用了aiofiles库来异步写文件,这样可以提高写文件的效率。
另外可以看到函数签名中save_item
的这个参数的类型是SymbolContent
,这个是我们定义的数据模型类,这个类是我们爬取到的数据,我们需要将这个数据存储到CSV文件中。
我们是用pydantic的model_dump()方法将数据转换成dict格式,然后使用csv库将数据写入到CSV文件中。使用起来非常的方便。
存储到JSON文件
存储到JSON文件的实现和存储到CSV文件的实现非常类似,只是存储的格式不同,代码如下:
import json
import os
import pathlib
import time
import aiofiles
from abstract_store import AbstractStore
from common import SymbolContent
class JsonStoreImpl(AbstractStore):
def __init__(self):
self.json_store_path = "data/json"
def make_save_file_name(self) -> str:
"""
make save file name
:return:
"""
return f"{self.json_store_path}/symbol_content_{int(time.time())}.json"
async def save(self, save_item: SymbolContent):
"""
save data to json
:param save_item:
:return:
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name()
save_data_list = []
# todo 如果这里涉及并发写入,需要加锁, 可以查看MediaCrawler项目中的实现方式
# 先判断文件是否存在,如果存在则读取文件内容放到save_data_list中,然后再将新的数据添加到save_data_list中
if os.path.exists(save_file_name):
async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file:
save_data_list = json.loads(await file.read())
save_data_list.append(save_item.model_dump())
# 将数据写入到文件中
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data_list, ensure_ascii=False))
存储数据到json的实现思路是每一次存储数据的时候,我们都将数据读取出来,然后将新的数据添加到数据中,然后再将数据写入到文件中,这样可以保证数据不会丢失。 (因为json特殊的数据结构的原因,它无法像存储csv那样可以在文件末尾行追加写入)
存储到数据库
封装一个mysql数据库操作类
与mysql数据库交互,我们需要使用到一个库,这个库就是aiomysql,这个库是一个异步的mysql库,可以帮助我们异步的操作mysql数据库,这样可以提高数据库的操作效率。
由于aiomyql提供的方法比较偏底层,并且在我们连接数据库查询到数据之后,我们还要进行部分数据转换,所以为了使用方面,我们可以封装一些基础的数据库操作方法,这样我们在存储数据的时候,就可以直接调用这些方法,而不用关心底层的实现。
主要是增、删、改、查,下面给出我用了几年的一个封装aiomysql代码,供大家参考。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Name : 程序员阿江-Relakkes
# @Time : 2024/6/7 17:08
# @Desc :
import os
from typing import Any, Dict, List, Optional, Union
import aiomysql
class AsyncMysqlDB:
def __init__(self, pool: aiomysql.Pool) -> None:
self.__pool = pool
async def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]:
"""
从给定的 SQL 中查询记录,返回的是一个列表
:param sql: 查询的sql
:param args: sql中传递动态参数列表
:return:
"""
async with self.__pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
data = await cur.fetchall()
return data or []
async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]:
"""
从给定的 SQL 中查询记录,返回的是符合条件的第一个结果
:param sql: 查询的sql
:param args:sql中传递动态参数列表
:return:
"""
async with self.__pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
data = await cur.fetchone()
return data
async def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int:
"""
表中插入数据
:param table_name: 表名
:param item: 一条记录的字典信息
:return:
"""
fields = list(item.keys())
values = list(item.values())
fields = [f'`{field}`' for field in fields]
fieldstr = ','.join(fields)
valstr = ','.join(['%s'] * len(item))
sql = "INSERT INTO %s (%s) VALUES(%s)" % (table_name, fieldstr, valstr)
async with self.__pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, values)
lastrowid = cur.lastrowid
return lastrowid
async def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str,
value_where: Union[str, int, float]) -> int:
"""
更新指定表的记录
:param table_name: 表名
:param updates: 需要更新的字段和值的 key - value 映射
:param field_where: update 语句 where 条件中的字段名
:param value_where: update 语句 where 条件中的字段值
:return:
"""
upsets = []
values = []
for k, v in updates.items():
s = '`%s`=%%s' % k
upsets.append(s)
values.append(v)
upsets = ','.join(upsets)
sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
table_name,
upsets,
field_where, value_where,
)
async with self.__pool.acquire() as conn:
async with conn.cursor() as cur:
rows = await cur.execute(sql, values)
return rows
async def execute(self, sql: str, *args: Union[str, int]) -> int:
"""
需要更新、写入等操作的 excute 执行语句
:param sql:
:param args:
:return:
"""
async with self.__pool.acquire() as conn:
async with conn.cursor() as cur:
rows = await cur.execute(sql, args)
return rows
class MysqlConnect:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(MysqlConnect, cls).__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.db: Optional[AsyncMysqlDB] = None
async def async_init(self):
if not hasattr(self, 'db') or self.db is None:
pool = await aiomysql.create_pool(
**self.mysql_conn_config,
autocommit=True,
)
self.db: AsyncMysqlDB = AsyncMysqlDB(pool)
return self
@property
def mysql_conn_config(self) -> Dict[str, str]:
return {
"host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "root"),
"password": os.getenv("MYSQL_PASSWORD", "123456"),
"db": os.getenv("MYSQL_DB", "crawler_turorial"),
}
def get_db(self) -> AsyncMysqlDB:
return self.db
从AsyncMysqlDB
这个类的构造函数__init__看,它接收一个aiomysql.Pool对象,这个对象是一个连接池对象,我们可以通过这个连接池对象来获取数据库连接,然后执行我们的sql语句。 这样做的好处是,我们创建了一个连接池,可以减少数据库连接的开销,提高数据库的操作效率。
这个类中有几个方法,分别是query、get_first、item_to_table、update_table、execute,这几个方法分别是查询、查询第一个、插入、更新、执行sql语句的方法,这几个方法是我们在存储数据的时候经常会用到的方法,我们可以直接调用这些方法,而不用关心底层的实现。
MysqlConnect
这个类是一个单例模式,它的作用是创建一个数据库连接对象,我们可以通过这个对象来操作数据库,这样可以减少数据库连接的开销。
如果想查看AsyncMysqlDB的更多用法,可以查看我在这里写的使用示例:test_mysql_async_db.py
DB存储数据实现
存储到数据库的实现和存储到文件的实现有一些不同,我们需要先将数据存储到内存中(save_item这个模型类),然后再将数据存储到数据库中,代码如下:
from typing import Optional
from abstract_store import AbstractStore
from async_db import MysqlConnect, AsyncMysqlDB
from common import SymbolContent
class DbStoreImpl(AbstractStore):
def __init__(self):
self.db: Optional[AsyncMysqlDB] = None
async def save(self, save_item: SymbolContent):
"""
save data to db
:param save_item:
:return:
"""
self.db = (await MysqlConnect().async_init()).get_db()
from sqls import (insert_symbol_content,
query_symbol_content_by_symbol,
update_symbol_content)
# 查询是否存在
exist_item = await query_symbol_content_by_symbol(self.db, save_item.symbol)
if exist_item.symbol:
# 更新
await update_symbol_content(self.db, save_item)
else:
# 插入
await insert_symbol_content(self.db, save_item)
从 DbStoreImpl
类的save方法的逻辑看,其实非常的简单,就是先查询数据库中是否存在这个数据,如果存在则更新,如果不存在则插入,这样就可以保证数据不会重复。
sqls文件定义:
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Name : 程序员阿江-Relakkes
# @Time : 2024/6/7 17:09
# @Desc :
from async_db import AsyncMysqlDB
from common import SymbolContent
async def insert_symbol_content(db: AsyncMysqlDB, symbol_content: SymbolContent) -> int:
"""
插入数据
:param db:
:param symbol_content:
:return:
"""
item = symbol_content.model_dump()
return await db.item_to_table("symbol_content", item)
async def update_symbol_content(db: AsyncMysqlDB, symbol_content: SymbolContent) -> int:
"""
更新数据
:param db:
:param symbol_content:
:return:
"""
item = symbol_content.model_dump()
return await db.update_table("symbol_content", item, "symbol", symbol_content.symbol)
async def query_symbol_content_by_symbol(db: AsyncMysqlDB, symbol: str) -> SymbolContent:
"""
查询数据
:param db:
:param symbol:
:return:
"""
sql = f"select * from symbol_content where symbol = '{symbol}'"
rows = await db.query(sql)
if len(rows) > 0:
return SymbolContent(**rows[0])
return SymbolContent()
存储数据到数据库的好处太多了,比如数据的持久化,数据的查询,数据的分析等等,这些都是存储到文件无法实现的。
所以在我们平时做数据爬取的时候,我建议大家将数据存储到数据库中,这样可以更好的利用数据。
动态数据爬虫章节主代码修改
修改动态数据章节的代码,支持将加密货币数据存储到不同的地方
代码改动在run_crawler
中, 我们从网页中抓取到数据后,遍历每一个数据,然后将数据存储到指定的存储介质中,代码如下
async def run_crawler(data_save_type: str) -> None:
"""
爬虫主流程
:param data_save_type: 数据存储的类型,支持csv、json、db
:return:
"""
# step1 获取最大数据总量
max_total: int = await get_max_total_count()
# step2 遍历每一页数据并解析存储到数据容器中
data_list: List[SymbolContent] = await fetch_currency_data_list(max_total)
# step3 将数据保存到指定存储介质中
for data_item in data_list:
await StoreFactory.get_store(data_save_type).save(data_item)
if __name__ == '__main__':
_data_save_type = "csv" # 可选配置(csv、json、db)
asyncio.run(run_crawler(_data_save_type))
从使用层面看,我们指定了存储类型,根据存储工厂类的设计,我们可以很方便的切换存储类型,这样我们就可以将数据存储到不同的地方了。 下面给出存储工厂类的实现(设计模式中的简单工厂):
class StoreFactory:
@staticmethod
def get_store(store_type: str) -> AbstractStore:
if store_type == "csv":
return CsvStoreImpl()
elif store_type == "json":
return JsonStoreImpl()
elif store_type == "db":
return DbStoreImpl()
else:
raise ValueError(f"Unknown store type: {store_type}")
大家有没有发现一问题,从09章动态数据提取到这一章,我们的代码是不是越来越规范了,越来越好维护了
这就是我们学习的意义,良好的抽象能力,良好的设计能力,能够让我们的代码更加的优雅,更加的易维护。
依赖安装 & 代码运行
依赖安装
# 依赖安装,进入到`10_爬虫入门实战3_数据存储实现`目录下,运行以下命令
# 创建python虚拟环境
python3 -m venv venv
# 激活虚拟环境
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
代码运行
# 如果是保存的mysql中的话,需要创建数据库和表结构,可以查看源代码目录下的sql文件
python main.py
- 如果是以
csv
的方式存储数据,可以查看data/csv
目录下的文件, - 如果是
json
的方式存储数据,可以查看data/json
目录下的文件, - 如果是
db
的方式存储数据,可以查看数据库中的数据。