在Neo4j图数据库的实际应用场景中,经常需要为大量节点创建关联关系,比如用户与订单的购买关系、文章与标签的归属关系等。当关联数据达到百万级规模时,使用单条Cypher语句逐次创建关系的方式会出现严重的性能瓶颈,甚至导致数据库响应超时。LOAD CSV是Neo4j内置的批量数据加载工具,能够直接读取本地或远程的CSV文件,配合Cypher语句快速完成百万级关系的批量创建,是处理大规模关联导入的首选方案。

LOAD CSV使用基础
前置准备
使用LOAD CSV前需要先准备好符合规范的CSV数据文件,关系导入的CSV文件至少需要包含三个核心字段:起始节点的唯一标识、结束节点的唯一标识、关系的属性(可选)。比如我们要导入用户购买商品的关系,CSV文件内容可以如下:
user_id,goods_id,order_time,amount 1001,2001,2024-03-01,2 1001,2002,2024-03-02,1 1002,2001,2024-03-01,3 1003,2003,2024-03-03,1
需要注意CSV文件的编码建议使用UTF-8,避免中文乱码问题。如果CSV文件放在Neo4j安装目录的import文件夹下,可以直接通过文件名访问;如果是其他路径或者远程文件,需要提供完整的URL地址。
基础语法说明
LOAD CSV的核心语法结构如下:
LOAD CSV WITH HEADERS FROM '文件路径' AS 行变量
MATCH (起始节点:标签 {唯一标识属性: 行变量.起始节点标识字段})
MATCH (结束节点:标签 {唯一标识属性: 行变量.结束节点标识字段})
MERGE (起始节点)-[关系名:关系类型 {属性1: 行变量.属性字段1}]->(结束节点)
其中WITH HEADERS表示CSV第一行是表头,可以通过行变量.字段名的方式获取对应值。如果没有表头则不需要该关键字,通过行变量[索引]的方式获取值,索引从0开始。
百万级关系导入完整示例
场景说明
假设我们已经在Neo4j中创建了User节点和Goods节点,User节点有userId唯一属性,Goods节点有goodsId唯一属性,现在需要导入100万条用户购买商品的关系,关系类型为PURCHASE,包含orderTime和amount两个属性。
导入步骤
第一步,将准备好的CSV文件放到Neo4j的import目录下,文件名为purchase_relations.csv。
第二步,执行以下Cypher语句完成关系批量创建:
// 先为User节点的userId属性创建索引,提升匹配效率
CREATE INDEX user_id_index IF NOT EXISTS FOR (u:User) ON (u.userId);
// 为Goods节点的goodsId属性创建索引
CREATE INDEX goods_id_index IF NOT EXISTS FOR (g:Goods) ON (g.goodsId);
// 执行LOAD CSV导入关系
LOAD CSV WITH HEADERS FROM 'file:///purchase_relations.csv' AS row
MATCH (u:User {userId: toInteger(row.user_id)})
MATCH (g:Goods {goodsId: toInteger(row.goods_id)})
MERGE (u)-[p:PURCHASE {orderTime: row.order_time, amount: toInteger(row.amount)}]->(g)
这里使用MERGE而不是CREATE是为了避免重复创建相同的关系,如果确认CSV中没有重复的关系数据,也可以替换为CREATE提升执行速度。
百万级导入优化技巧
提前创建索引
在进行MATCH匹配节点时,如果没有对应属性的索引,Neo4j会进行全表扫描,百万级数据下会极大地拖慢执行速度。因此导入前必须为起始节点和结束节点的匹配属性创建索引,这也是提升导入效率最核心的操作。
使用PERIODIC COMMIT分批提交
默认情况下LOAD CSV会将所有操作放在一个事务中执行,百万级数据会导致事务过大,占用大量内存甚至失败。可以使用PERIODIC COMMIT关键字设置每多少行提交一次事务,语法如下:
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM 'file:///purchase_relations.csv' AS row
MATCH (u:User {userId: toInteger(row.user_id)})
MATCH (g:Goods {goodsId: toInteger(row.goods_id)})
MERGE (u)-[p:PURCHASE {orderTime: row.order_time, amount: toInteger(row.amount)}]->(g)
上述语句表示每处理10000行数据就提交一次事务,避免单个事务过大,也可以根据实际情况调整分批的行数。
数据预处理
可以在导入前对CSV文件做预处理,比如将不需要的字段删除,对字段类型做统一处理,避免导入时做过多的类型转换操作。同时如果CSV中存在空值,需要提前处理,避免导入时出现匹配失败的问题。
常见问题说明
- 如果导入时出现找不到文件的错误,需要确认CSV文件的路径是否正确,Neo4j的import目录默认路径为安装目录下的import文件夹,也可以在配置文件中修改dbms.directories.import参数自定义路径。
- 如果导入速度仍然很慢,可以检查是否所有匹配属性都创建了索引,同时可以适当调大Neo4j的内存配置,提升批量处理的性能。
- 导入完成后可以通过
MATCH ()-[p:PURCHASE]->() RETURN count(p)语句验证导入的关系总数是否符合预期。