Neo4j插入数据逐级提升速度4倍又4倍

embedded/2024/12/23 4:43:06/

语雀版:https://www.yuque.com/xw76/back/dtukgqfkfwg1d6yo

目录

    • 背景介绍
    • 初始方案Node()创建
    • 事务批量提交
    • 记录Node是否存在
    • 生成Cypher语句执行
    • 数据库参数优化
    • 切换成85k个三元组测试
    • 建索引(很显著!!!)
    • MATCH替代MERGE
    • CREATE替代MERGE
    • 总结
    • Other可能的优化

背景介绍

待插入数据的背景:

Wikipedia的数据,一个标题title就是一个页面,一个页面下有很多个子章节chapter(我这里称之为section),每个section都是由若干个块chunk组成的(我这里每个块100个token)。

如图所示,红色是title,棕色是section,绿色是chunk(显示的chunkID)

我的三元组,(s,p,o)是三元组主谓宾,

titleOrSection表示主语s是一个标题还是章节:为了标记label

titleName表示这个主语谓语是哪一个title页面下的:为了唯一区分相同的section,比如上面图里"Career"就出现了两个,但是它们的属性是不同的,一个是"Henry->Career",一个是"Adelina->Career"。就应该分开,举个例子,论文A包含章节"Approach",“Approach"包含"Retrieval Augmented Generation”,论文B也包含章节"Approach",“Approach"包含"Bacterial Culture”,如果没有区分"Approach",那论文A的节点向下探索就会找到论文B的章节块,所以这里会有一个titleName进行区分。

初始方案Node()创建

Node()创建,用的py2neo,是逻辑层面。

def create_node_and_relation_t1(data):for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]oLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + ostart_node = Node(sLabel, name=s, unique_name=nameAndTitleNameS)end_node = Node(oLabel, name=o, unique_name=nameAndTitleNameO)# 使用 MERGE 确保节点不会重复创建graph.merge(start_node, sLabel, "unique_name")# 根据标签和 unique_name 属性进行合并graph.merge(end_node, oLabel, "unique_name")relation = Relationship(start_node, p, end_node)graph.create(relation)

用的数据包含2697个三元组,用时:32s。

事务批量提交

事务开销大

  • 每次创建节点和关系都会产生网络开销
  • 频繁的单个事务提交会显著降低性能

批量操作不充分

  • 没有利用Neo4j的批量写入能力
  • 每个节点和关系都单独处理

每次执行 graph.run() 都会发送一个请求给数据库,如果是逐行执行,会非常慢。使用事务可以将多条语句合并到一个请求中,这样能大大减少请求次数。

加事务,batch

def create_node_and_relation_t2_batch(data, batch_size=1000):tx = graph.begin()for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]oLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + ostart_node = Node(sLabel, name=s, unique_name=nameAndTitleNameS)end_node = Node(oLabel, name=o, unique_name=nameAndTitleNameO)# 使用 MERGE 确保节点不会重复创建tx.merge(start_node, sLabel, "unique_name")  # 根据描述和 unique_name 属性进行合并tx.merge(end_node, oLabel, "unique_name")relation = Relationship(start_node, p, end_node)tx.create(relation)# 每 batch_size 条记录提交一次if i % batch_size == 0:graph.commit(tx)tx = graph.begin()  # 开始新的事务# 提交剩余的数据graph.commit(tx)

用的数据包含2697个三元组,用时:20s,快了12秒,快了37.5%。

不同的batchSize不一样的结果,为10的时候最好

记录Node是否存在

如果Node存在,就不用去new了

def create_node_and_relation_batch_exist(data, batch_size=1000):tx = graph.begin()existNode = {}for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]if p != "hasChunk" and p != "hasSection":logging.error(f"关系类型错误:{p}")breakoLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + oif nameAndTitleNameS in existNode:start_node = existNode[nameAndTitleNameS]else:start_node = Node(sLabel, name=s, unique_name=nameAndTitleNameS)existNode[nameAndTitleNameS] = start_nodeif nameAndTitleNameO in existNode:end_node = existNode[nameAndTitleNameO]else:end_node = Node(oLabel, name=o, unique_name=nameAndTitleNameO)existNode[nameAndTitleNameO] = end_noderelation = Relationship(start_node, p, end_node)tx.create(relation)if i % batch_size == 0:graph.commit(tx)tx = graph.begin()  # 开始新的事务graph.commit(tx)

用的数据包含2697个三元组,用时:8s,快了12秒,快了60%。比初始快了75%。

生成Cypher语句执行

cypher语句生成,依然每一个都得tx.run()一次。

tx.run()的批量参数,可惜我这里不适用

def create_cypher(data, batch_size=1000):# 开始一个事务tx = graph.begin()for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]oLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + oparams = {'s': s,'o': o,'nameAndTitleNameS': nameAndTitleNameS,'nameAndTitleNameO': nameAndTitleNameO}query = f"""MERGE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})MERGE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})MERGE (s)-[:{p}]->(o)"""tx.run(query, params)# 每 batch_size 条记录提交一次if i % batch_size == 0:graph.commit(tx)tx = graph.begin()  # 开始新的事务# 提交剩余的数据graph.commit(tx)

用的数据包含2697个三元组,用时:7s,快了1秒,快了12.5%。比初始快了78%。

数据库参数优化

数据库参数调整。没什么感觉,可能数据量比较小吧

server.memory.heap.initial_size=8g
server.memory.heap.max_size=16g
server.memory.pagecache.size=10g
dbms.memory.transaction.total.max=6g

ChatGPT推荐的

# 确保 neo4j.conf 中的内存配置足够大
dbms.memory.pagecache.size
# 禁用事务日志
dbms.tx_log.rotation.retention_policy=0
dbms.tx_log.rotation.retention_policy=100Mdbms.memory.heap.initial_size=8G
dbms.memory.heap.max_size=16G
dbms.memory.pagecache.size=4G

Claude推荐的

# 堆内存和页缓存
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=16G
dbms.memory.pagecache.size=8G# 并发和事务配置
dbms.tx_log.rotation.size=512M
dbms.tx_log.rotation.retention_policy=keep_latest 3# 索引和约束优化
dbms.index.operational_sampling_enabled=false
dbms.index.background_sampling_enabled=true# 批量导入优化
dbms.import.csv.legacy_quote_escaping=false
dbms.import.csv.multi_line_fields=false

切换成85k个三元组测试

事务批量提交+Node是否存在+数据库参数优化:用的数据包含85661个三元组,

batch_size = 1000 用时:09:25

batch_size = 10 用时:04:40

事务批量提交+Cypher语句+数据库参数优化:用的数据包含85661个三元组,

batch_size = 1000 用时:1:12:57

:::color5
Node是否存在完胜

:::

建索引(很显著!!!)

如果在 MERGE 操作中某些属性(如 unique_name)只是为了保证唯一性,但不会频繁变动,可以通过优化索引和减少查询字段来提高性能。

CREATE CONSTRAINT FOR (s:Title) REQUIRE s.unique_name IS UNIQUE;
CREATE CONSTRAINT FOR (s:Section) REQUIRE s.unique_name IS UNIQUE;
CREATE CONSTRAINT FOR (o:Chunk) REQUIRE o.unique_name IS UNIQUE;

事务批量提交+Node是否存在+数据库参数优化:用的数据包含85661个三元组

batch_size = 1000 用时:09:25

batch_size = 10 用时:04:50

事务批量提交+Cypher语句+数据库参数优化:用的数据包含85661个三元组

batch_size = 1000 用时:03:14

batch_size = 10 用时:02:06

:::color5
Cypher语句完胜,因为索引对Node是否存在没用

batch_size小一点会更好

:::

MATCH替代MERGE

合并操作:当前代码是基于 MERGE 来进行节点的创建和关系的建立。每次都在执行 MERGE 操作,会影响性能。如果确定节点已经存在,可以使用 MATCH 代替 MERGE。python代码里用内存记录是否存在,不用Neo4j记录,if nameAndTitleNameS in alreadyNode:

同步可以改第一种方案不去用MERGE来做判断,而是用内存,并记录存在的Node()对象,直接字典取。理论来说这个应该最快,但是这是py2neo,不是cypher语句层面

  • 对于每个源节点和目标节点,首先检查它是否已经在 alreadyNode 中。
  • 如果节点已经存在,使用 MATCH 语句来查找它。
  • 如果节点不存在,使用 MERGE 来创建它。
if nameAndTitleNameS in alreadyNode:s_query = f"""MATCH (s:{sLabel} {{unique_name: $nameAndTitleNameS}})"""
else:s_query = f"""MERGE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})"""alreadyNode.append(nameAndTitleNameS)if nameAndTitleNameO in alreadyNode:o_query = f"""MATCH (o:{oLabel} {{unique_name: $nameAndTitleNameO}})"""
else:o_query = f"""MERGE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})"""alreadyNode.append(nameAndTitleNameO)# MERGE 和 MATCH 之间需要有 with
query = f"""
{s_query}
WITH s
{o_query}
WITH s, o
CREATE (s)-[:{p}]->(o)
"""
tx.run(query, params)

batch_size = 1000 用时:04:11

batch_size = 10 用时:03:37

CREATE替代MERGE

使用MERGE或者MATCH,会随着KG里数据的变多而变得缓慢

CREATEMERGE 的区别:

  • **CREATE**:直接创建新的节点或关系,不做任何检查。如果节点或关系已经存在,CREATE 不会做任何检查,也不会返回已存在的节点。
  • **MERGE**:执行节点或关系的查找(MATCH),如果不存在则创建。如果节点已经存在,MERGE 会进行比 CREATE 更多的检查(例如,索引匹配等),这会增加查询的开销。

因为明确知道了是不存在的Node,可以直接CREATE,不用走MERGE的检查。如果你确定不会出现重复的节点或关系,CREATE 可能更快。

if nameAndTitleNameS in alreadyNode:s_query = f"""MATCH (s:{sLabel} {{unique_name: $nameAndTitleNameS}})"""
else:# s_query = f"""MERGE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})"""s_query = f"""CREATE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})"""alreadyNode.append(nameAndTitleNameS)if nameAndTitleNameO in alreadyNode:o_query = f"""MATCH (o:{oLabel} {{unique_name: $nameAndTitleNameO}})"""
else:# o_query = f"""MERGE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})"""o_query = f"""CREATE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})"""alreadyNode.append(nameAndTitleNameO)

batch_size = 10 用时:03:12

总结

当没有建立索引的时候:记录Node是否存在,是最快的。

当建立了索引之后:纯MERGE的Cypher是最快的,各种判断更换为MATCH和CREATE只会减慢速度(与理论分析有点相反)。

事务批量提交不是越大越好,设置为10最佳

以上总结都针对于我当前的环境和数据,建议选取少部分数据进行实验,针对上述提到的几种方案进行测试,找到适用的。

85647个三元组:用时02:06,平均680条/秒

469101个三元组:用时10:40,平均733条/秒

Other可能的优化

MATCH优化?能不能也存到内存里?每次MATCH也很花时间,能不能把match省略掉,比如能不能把这个节点存到字典里,如果key存在,就直接去取,然后再放到后面的query里?

网上搜的性能对比参考:

  • py2neo: 约100-500条/秒
  • neo4j-python-driver: 约1000-5000条/秒
  • 直接Cypher批量: 可达10000+条/秒

LOAD_CSV方式:如果节点有大量重复数据,先通过 LOAD CSV 等批量导入方式将数据导入到数据库,再通过 MATCHCREATEMERGE 来建立关系。

异步执行:如果你的应用允许,你可以考虑异步提交查询,将多个 tx.run(query, params) 放在异步队列中并并行执行。


http://www.ppmy.cn/embedded/147980.html

相关文章

Java 实现 AES 加密和解密

目录 前言 一、AES 加密算法简介 1.诞生背景 2.简介 二、AES 加密算法的核心原理 1.分组密码模式 2.密钥扩展 3.加密轮次与操作 三、AES 加密算法的优势 1.高效性 2.安全性 3.广泛的适用性 四、AES加密模式的几种方式 1.电子密码本模式(ECB&#xff0c…

【嵌入式开发笔记】OpenOCD到嵌入式调试

最近在把玩一块Risc-V的开发板,使用开发板调试时,需要用到专门的下载器和OpenOCD进行调试。 为了连接这个板子,费了九牛二虎之力。 这里简单记录一下自己的折腾经过吧。 0x00 环境准备 0x0001 调试背景 系统:Virtual Box Ub…

【软考高级】系统架构设计师复习笔记-精华版

文章目录 前言0 系统架构设计师0.1 考架构还是考系分0.2 架构核心知识0.3 架构教材变化 1 计算机操作系统1.1 cpu 组成1.2 内核的五大功能1.3 流水线技术1.4 段页式存储1.5 I/O 软件1.6 文件管理1.7 系统工程相关 2 嵌入式2.1 嵌入式技术2.2 板级支持包(BSP&#xf…

Windows开机黑屏|Windows开机黑屏只有鼠标|Windows开机不运行explorer

输入regedit打开注册表,进入以下路径 计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon 左键双击右侧的 Shell 查看是否设置为 Explorer.exe

MONI后台管理系统-swagger3(springdoc-openapi)集成

springdoc-openapi Java 库有助于使用 Spring Boot 项目自动生成 API 文档。springdoc-openapi 通过在运行时检查应用程序来根据 Spring 配置、类结构和各种注释推断 API 语义。 该库会自动生成 JSON/YAML 和 HTML 格式的页面文档。生成的文档可以使用swagger-api注释进行补充。…

聚观早报 | 百度回应进军短剧;iPad Air将升级OLED

聚观早报每日整理最值得关注的行业重点事件,帮助大家及时了解最新行业动态,每日读报,就读聚观365资讯简报。 整理丨Cutie 12月18日消息 百度回应进军短剧 iPad Air将升级OLED 三星Galax S25 Ultra配色细节 一加Ace 5系列存储规格 小米…

呼入机器人:24小时客户服务的未来趋势

呼入机器人:24小时客户服务的未来趋势 作者:开源大模型智能呼叫中心系统FreeAICC,Github:https://github.com/FreeIPCC/FreeAICC 在当今快节奏的商业环境中,客户服务已成为企业竞争的核心要素之一。随着人工智能技术…

Spring篇--xml方式整合第三方框架

Spring xml方式整合第三方框架 xml整合第三方框架有两种整合方案: ​ 不需要自定义名空间,不需要使用Spring的配置文件配置第三方框架本身内容,例如:MyBatis; ​ 需要引入第三方框架命名空间,需要使用…