爬虫——将数据保存到MongoDB中

embedded/2025/3/30 18:00:37/

目录

deepseek推荐的,爬虫mongodbmysql效果要好一些,所以就尝试一下

mongodb_3">安装mongodb

肯定是线程的mongodb比较方便啊,所以就搞了个docker的mongodb,只要将数据库挂在出来就行,不过一些配置要搞一下。

mongod.conf

storage:dbPath: /data/dbwiredTiger:engineConfig:cacheSizeGB: 1journalCompressor: snappycollectionConfig:blockCompressor: snappy
operationProfiling:mode: slowOpslowOpThresholdMs: 100
replication:oplogSizeMB: 100

【细节讲解】

  • dbPath/data/db指定数据库文件的存储路径,这个也是需要挂载出来的地方
  • wiredTiger :配置 WiredTiger 存储引擎的相关参数。
    • engineConfig:引擎配置。
      • cacheSizeGB:指定缓存大小为 1 GB,用于控制内存使用。.
      • journalCompressor:指定snappy算法来压缩日志
    • collectionConfig:集合配置。
      • blockCompressor指定snappy算法来压缩集合数据块,以提高存储效率。
  • operationProfiling:操作分析配置。
    • modeslowOp仅记录执行时间超过慢操作阈值的操作。
    • slowOpThresholdMs100执行时间超过 100 毫秒的操作将被记录。
  • replication
    • oplogSizeMB100设置操作日志(oplog)的大小为 100 MB。

init-mongo.js

db.createUser({user: "admin1",pwd: "admin1",roles: [{ role: "readWrite", db: "mydb" },{ role: "clusterMonitor", db: "admin" }]
});

这个就简单了,创建用户,以及赋予用户权限。

docker-yml文件

为了方便部署,直接使用docker-compose一键部署就行了

version: '1.0'services:mongodb:image: mongo  # 使用官方特定版本container_name: mongodbrestart: unless-stopped  # 自动重启策略ports:- "27017:27017"environment:MONGO_INITDB_ROOT_USERNAME: adminMONGO_INITDB_ROOT_PASSWORD: admin1# 更改为强密码MONGO_INITDB_DATABASE: mydb  # 初始化数据库# 性能优化参数MONGO_WIREDTIGER_CACHE_SIZE: 1  # 更准确的环境变量名MONGO_OPLOG_SIZE: 100volumes:- /e/Temp/mongodb/mongo_data:/data/db  # 数据持久化- /e/Temp/mongodb/init-mongo.js:/docker-entrypoint-initdb.d/init-mongo.js  # 初始化脚本- /e/Temp/mongodb/mongod.conf:/etc/mongo/mongod.conf  # 自定义配置文件command: ["--config", "/etc/mongo/mongod.conf"]  # 加载自定义配置deploy:resources:limits:cpus: '0.5'memory: 512M  # 根据需求适当增加reservations:memory: 256M

【细节讲解】

  • volumes:这个挂载路径需要根据自己的意愿进行配置。
  • restart: unless-stopped,自动重启策略,可以使容器随着docker重启而重启。
# 指定yml
docker-compose -f mongo_docker.yml up -d

mongodb_89">编写mongodb

有现成的库pymongo,不过还是要我们编写一些内容,也就是自己定义增删改查

mongodb_92">初始化mongodb

from pymongo import MongoClient, errors
from pymongo.errors import BulkWriteError
from pymongo.server_api import ServerApi
from pymongo import UpdateOneclass MongoDBManager:def __init__(self,mongo_uri="mongodb://admin:admin1@localhost:27017/amazon?authSource=mydb",db_name="mydb",collection_name="products",retry_attempts=3,retry_delay=2):""":param mongo_uri: MongoDB 连接字符串:param db_name: 数据库名称(不存在时自动创建):param collection_name: 集合名称(不存在时自动创建):param retry_attempts: 连接重试次数:param retry_delay: 重试间隔(秒)"""self.mongo_uri = mongo_uriself.db_name = db_nameself.collection_name = collection_nameself.retry_attempts = retry_attemptsself.retry_delay = retry_delay# 初始化连接self.client = Noneself.db = Noneself.collection = Noneself._connect()

在一开始的初始化mongo对象的时候,尝试连接一下数据库,如果数据库ok的情况下,那么就可以连接。

class MongoDBManager:# ...def _connect(self):# 尝试连接就尝试3次吧,也就是self.retry_attemptsfor attemp in range(self.retry_attemps):try:self.client = MongoClient(self.mongo_uri,serverSelectionTimeoutMS=5000,server_api=ServerApi('1'),)# 初始化数据库和集合(不会实际创建,直到插入数据)self.db = self.client[self.db_name]self.collection = self.db[self.collection_name]# 检查/创建索引(幂等操作)self._ensure_indexes()print(f"成功连接 MongoDB | 数据库: {self.db_name} | 集合: {self.collection_name}")returnexcept errors.ServerSelectionTimeoutError as e:print(f"连接尝试 {attempt + 1}/{self.retry_attempts} 失败: {str(e)}")if attempt < self.retry_attempts - 1:print(f"{self.retry_delay}秒后重试...")time.sleep(self.retry_delay)else:raise RuntimeError(f"无法连接 MongoDB,请检查服务状态或连接字符串") from eexcept errors.OperationFailure as e:if "Authentication failed" in str(e):raise RuntimeError("MongoDB 认证失败,请检查用户名密码") from eelse:raisedef _ensure_indexes(self):"""创建必要索引(幂等操作)"""try:# 创建唯一索引(如果已存在会自动跳过)self.collection.create_index([("ASIN", 1)],unique=True,name="asin_unique_idx",)except errors.OperationFailure as e:if "already exists" not in str(e):print(f"创建索引失败: {str(e)}")raise

【细节讲解】

  • ServerApi:用于指定客户端与 MongoDB 服务器交互时所遵循的稳定 API 版本。表示使用服务器 API 的第一个版本。
  • create_index:在 ASIN 字段上创建一个唯一索引,以确保该字段的值在集合中不重复,并为该索引指定名称,判断是否连接成功。当然也可以使用其他的验证连接的方式,比如心跳检测、验证用户权限、集合存在性验证、服务级健康检查等等。

mongodb提供了多种的方法,比如insertOne()insertMany()bulkWrite()upsert()。接下来就是用上面这些来写入。

class MongoDBManager:# ...def insert_data(self, data):"""插入数据(自动处理初次创建集合)"""try:if isinstance(data, list):result = self.collection.insert_many(data, ordered=False)return len(result.inserted_ids)else:result = self.collection.insert_one(data)return result.inserted_idexcept errors.BulkWriteError as e:handled_errors = [err for err in e.details['writeErrors']if err['code'] != 11000  # 忽略重复键错误(11000)]if handled_errors:raisereturn len(e.details['nInserted'])except errors.DuplicateKeyError:print("重复数据已跳过")return 0def upsert_batch(self, records, key_field="ASIN", batch_size=50000, max_retries=3):"""批量插入或更新数据:param records: 数据列表:param key_field: 用于更新的唯一键字段:param batch_size: 每批处理的数据量:param max_retries: 最大重试次数:return: 插入或更新的文档数量"""if not records:return 0, 0total_success = 0total_fail = 0operations = []for record in records:try:query = {key_field: record[key_field]}operations.append(UpdateOne(query, {"$set": record}, upsert=True))except KeyError:print(f"记录缺少关键字段 {key_field}: {record}")continuefor i in range(0, len(operations), batch_size):batch_ops = operations[i:i + batch_size]retries = 0while retries < max_retries:try:result = self.collection.bulk_write(batch_ops, ordered=False)return result.upserted_count + result.modified_count, 0except BulkWriteError as e:success = e.details['nInserted'] + e.details['nModified']total_success += successtotal_fail += (len(batch_ops) - success)print(f"部分操作失败: {len(batch_ops) - success} 条, 第 {retries + 1} 次重试")except Exception as e:total_fail += len(batch_ops)print(f"批量操作失败: {str(e)}")traceback.print_exc()return total_success, total_fail

【细节讲解】

  • insert_data:这里提供两种方法,当传递进来的是列表的时候,就调用insert_many,否则就调用insert_one()
  • upsert_batch:这里就使用了bulkWrite()+upsert()方法
    • UpdateOne(query, {"$set": record}, upsert=True):对每个数据构建更新操作,设置upsert=True,查询条件匹配到一个时执行更新操作,没有匹配到任何文档,则会根据提供的更新内容创建一个新文档。
    • batch_size:执行分批操作。

    def delete_data(self, query):"""删除符合条件的数据:param query: 删除条件(字典格式,例如 {"ASIN": "B0XXXXXXX"}):return: 删除的文档数量"""try:result = self.collection.delete_data(query)return result.deleted_countexcept errors.PyMongoError as e:print(f"删除数据时发生错误: {str(e)}")return 0

【细节讲解】

  • delete_data:方法已支持根据任意条件删除数据,但是关键的是如何构造。
    • 等值条件删除:当满足特定值时,就执行删除,比如query = {"ASIN": "B0001"} # 精确匹配
    • 范围条件删除:符合某一个范围时,就执行删除,比如query = {"price": {"$gt": 100}} # $gt 表示大于
    • 多条件同时满足:同时满足两个以上的,就执行删除,比如query = {"$and": [{"category": "Electronics"}, {"stock": 0}]},当然也能写成下面这种方式:
    query = {
    "category": "Electronics",
    "stock": 0}
    
    • 满足任一条件(OR):满足其中之一的条件即可,比如query = {"$or": [{"category": "Electronics"}, {"price": {"$lt": 10}]}
    • 正则表达式匹配:通过正则表达,获取对应的内容再删除,比如query = {"title": {"$regex": "Wireless", "$options": "i"}}

class MongoDBManager:# ...# ------------ 改 ------------def update_data(self, query, update_data, upsert=False):"""更新符合条件的数据:param query: 查询条件(字典格式):param update_data: 更新操作(需使用MongoDB更新操作符,例如 {"$set": {"Price": 99.99}}):param upsert: 如果不存在是否插入新文档:return: 修改的文档数量"""try:result = self.collection.update_many(query,update_data,upsert=upsert)return result.modified_countexcept errors.PyMongoError as e:print(f"更新数据时发生错误: {str(e)}")return 0

【细节讲解】

  • 批量更新(update_many):也就是更新所有匹配查询条件的文档,除了这个还有update_one():更新第一个匹配查询条件的文档、replace_one():替换整个文档。
  • 对于爬虫来说,一股脑更新!

class MongoDBManager:# ...# ------------ 查 ------------def find_data(self, query=None, projection=None, limit=0):"""查询符合条件的文档:param query: 查询条件(字典格式):param projection: 返回字段控制(例如 {"ASIN": 1, "Price": 1}):param limit: 返回结果数量限制(0表示无限制):return: 匹配的文档列表"""try:if query is None:query = {}cursor = self.collection.find(query, projection).limit(limit)return list(cursor)except errors.PyMongoError as e:print(f"查询数据时发生错误: {str(e)}")return []

mongodb_331">将爬虫的数据保存到mongodb

我在上一篇中,通过搜索数据获取翻页等操作,搞到了对应的数据,然后仅仅是简单的保存成csv文件,当然也可以通过自己编写对应的管理器来管理csv文件中的数据内容,不过还是没有现成的数据库好用,哈哈哈哈哈哈。

save_to_mongo

这里要在AmazonBrowser类里面写一个保存的步骤,也就是save_to_mongo

class AmazonBrowser:def __init__(self, mongo_manager):# ...self.mongo_manager = mongo_manager # 新增这一步def save_to_mongodb(self, batch_size=50000):"""将数据保存到MongoDB"""if self.df.empty:print("没有数据需要保存")returnclean_df = self._preprocess_data()total_records = len(clean_df)total_batches = (total_records // batch_size) + 1total_success = 0total_fail = 0print(f"开始写入 {total_records} 条数据,分 {total_batches} 批处理")records = clean_df.to_dict(orient='records')for i in range(0, len(records), batch_size):batch = records[i:i + batch_size]success, fail = self.mongo_manager.upsert_batch(batch)total_success += successtotal_fail += failprint(f"进度: {min(i + batch_size, total_records)}/{total_records} | 成功: {success} | 失败: {fail}")print(f"写入完成!总成功: {total_success} | 总失败: {total_fail}")return total_success, total_faildef _preprocess_data(self):"""数据预处理"""# TODO: 实现数据预处理逻辑df = self.df.copy()# 处理空值df = df.replace({pd.NA: None, '': None})# 去重(保留最后出现的记录)df = df.drop_duplicates(subset=['ASIN'], keep='last')return df

【细节讲解】

  • _preprocess_data:在数据保存之前,先对爬取的数据进行数据清洗,确保没有问题。
  • self.mongo_manager.upsert_batch(batch):这就是执行数据保存了。

补充

async def main():target_url = "https://www.amazon.com/"max_attempts = 2scraper = Nonewith MongoDBManager() as mongo_manager:for attempt in range(max_attempts):try:scraper = AmazonBrowser(target_url, mongo_manager)await scraper.run()if scraper.get_dataframe() is not None:success, fail = scraper.save_to_mongodb()print(f"数据保存结果: 成功 {success} 条,失败 {fail} 条")breakexcept playwright._impl._errors.TimeoutError:if attempt == max_attempts:raise  # 最后一次尝试仍失败则抛出异常print(f"第{attempt}次尝试超时,20秒后重试...")if scraper:await scraper.close()  # 关闭当前实例释放资源await asyncio.sleep(20)except Exception as e:if scraper:await scraper.close()raise e  # 其他异常直接抛出
if __name__ == '__main__':asyncio.run(main())

在这里插入图片描述

就酱~
在这里插入图片描述

文章来源:https://blog.csdn.net/qq_24680545/article/details/146425168
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ppmy.cn/embedded/176065.html

相关文章

如何理解java中Stream流?

在Java中&#xff0c;Stream 是 Java 8 引入的一个强大API&#xff0c;用于处理集合&#xff08;如 List、Set、Map 等&#xff09;数据的流式操作。它提供了一种声明式、函数式的编程风格&#xff0c;可以高效地进行过滤、映射、排序、聚合等操作。 Stream 的核心概念 流&…

Docker入门篇4:查看容器资源、查看容器详细信息、查看容器日志、查看容器内运行的进程

大家好我是木木&#xff0c;在当今快速发展的云计算与云原生时代&#xff0c;容器化技术蓬勃兴起&#xff0c;Docker 作为实现容器化的主流工具之一&#xff0c;为开发者和运维人员带来了极大的便捷 。下面我们一起开始入门第四篇&#xff1a;查看容器资源、查看容器详细信息、…

华为ipd流程华为流程体系管理华为数字化转型流程数字化管理解决方案介绍81页精品PPT

华为流程体系最佳实践主要包括构建完善的流程框架&#xff0c;明确各层级流程要素与职责&#xff0c;梳理涵盖研发、采购、营销、服务、资产管理等多领域的流程&#xff0c;通过梳理业务场景和核心能力搭建差异化流程框架&#xff0c;采用自上而下与自下而上相结合的建模方法&a…

win关闭高危端口和其造成ensp的一些问题

Windows操作系统开放TCP135&#xff0c;445&#xff0c;139&#xff0c;UDP137&#xff0c;138等高危端口。 1、关闭135端口 第一步运行dcomcnfg&#xff0c;打开“组件服务”→“计算机”&#xff0c;在“我的电脑”上右键点击&#xff0c;选“属性”&#xff1b;然后点默认属…

022-spdlog

spdlog 以下是从原理到代码实现的全方位spdlog技术调研结果&#xff0c;结合核心架构、优化策略和完整代码示例&#xff1a; 一、核心架构设计原理 spdlog三级架构 &#xff08;图示说明&#xff1a;spdlog采用三级结构实现日志系统解耦&#xff09; Registry管理中枢 全局…

hackmyvm-Icecream

arp-scan -l nmap -sS -v 192.168.222.106 enum4linux 192.168.222.106 445端口 smbmap -H 192.168.222.106 icecream为只读模式 smbclient \\192.168.222.106\icecream 反弹shell(上传put php-reverse-shell.php) 开启监听 nc -lnvp 1234 拿到webshell cat /etc/passwd 9000端…

前端对接生成式AI接口(类ChatGPT)问题汇总

文章目录 前端实现对话流问题总结流式数据传输问题后台Response Headers问题大量数据分段接收问题多个流时间戳&#xff08;Time&#xff09;相同导致被合并的问题 中止对话问题复制问题部署上线问题&#xff08;Nginx缓冲导致&#xff09; 前端实现对话流问题总结 流式数据传…

Vue.js 中的 Tree Shaking:优化你的应用性能

在现代前端开发中&#xff0c;应用的性能优化是一个永恒的话题。随着项目规模的增大&#xff0c;JavaScript 文件的体积也会随之膨胀&#xff0c;导致加载时间变长&#xff0c;用户体验下降。为了解决这个问题&#xff0c;Tree Shaking 应运而生。本文将详细介绍什么是 Tree Sh…