【MongoDB集群基于Oplog日志,实现数据监听】

devtools/2024/10/19 5:28:08/

使用背景

项目后台使用mongoDb,门户页使用elasticsearch,需求是将mongoDb的数据同步到elasticsearch中。于是联想到mysql有基础binlog开发的canal。mg是否有类似binlog这种机制实现的实时监控同步工具。

方案

  1. MongoShake(项目地址:https://github.com/alibaba/MongoShake)
  2. ChangeStream (monggodb的3.6版本之后推出的)

MongoShake

  MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志, 对MongoDB的数据进行复制,后续通过操作日志实现特定需求。
  MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。


阿里官方文档: https://developer.aliyun.com/article/763827?spm=a2c6h.13813017.content3.1.3eb032a4yx0ZoM#slide-11
项目地址: https://github.com/alibaba/MongoShake

在这里插入图片描述

ChangeStream

changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更
想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具

关于changestream做如下说明,提供参考

  • 在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;
  • 但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;
  • Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;
  • 由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群

changestream可用于监听的mongodb目标类型

  • 单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持
  • 单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持
  • 整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持

一个Change Stream Event的基本结构如下所示:

{_id : { <BSON Object> },"operationType" : "<operation>","fullDocument" : { <document> },"ns" : {"db" : "<database>","coll" : "<collection"},"documentKey" : { "_id" : <ObjectId> },"updateDescription" : {"updatedFields" : { <document> },"removedFields" : [ "<field>", ... ]}"clusterTime" : <Timestamp>,"txnNumber" : <NumberLong>,"lsid" : {"id" : <UUID>,"uid" : <BinData>}
}

关于上面的数据结构,做简单的解释说明,

  • _id,变更事件的Token对象
  • operationType,变更类型(见下面介绍)
  • fullDocument,文档内容
  • ns,监听的目标
  • ns.db,变更的数据库
  • ns.coll,变更的集合
  • documentKey,变更文档的键值,含_id字段
  • updateDescription,变更描述
  • updateDescription.updatedFields,变更中更新字段
  • updateDescription.removedFields,变更中删除字段
  • clusterTime,对应oplog的时间戳
  • txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持
  • lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持

Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:

  • insert,插入文档
  • delete,删除文档
  • replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
  • update,更新文档,当执行update操作指定upsert时,可能是insert事件
  • invalidate,失效事件,比如执行了collection.drop或collection.rename
    以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等

Java客户端操作changestream

1、引入maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

2、注册MessageListenerContainer

package com.iflytek.databus.conf;import com.iflytek.databus.entity.OriginalOplogCkpt;
import com.iflytek.databus.init.DocumnetMessageListener;
import com.mongodb.client.model.changestream.FullDocument;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;import java.util.concurrent.Executor;import static org.springframework.data.mongodb.core.aggregation.Aggregation.match;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;
import static org.springframework.data.mongodb.core.query.Criteria.where;/*** @author xyding6* @Classname MongoConfig* @Description TODO* @date 2023/2/21*/
@Configuration
@Slf4j
public class MongoConfig {@Value("${spring.data.mongodb.database}")String mongoDatabase;@BeanMessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener,Executor asyncExecutor) {
//        Executor executor = Executors.newSingleThreadExecutor();MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, asyncExecutor) {@Overridepublic boolean isAutoStartup() {return true;}};//oplog_check_pointOriginalOplogCkpt originalOplogCkpt = template.findById("oplog_check_point", OriginalOplogCkpt.class, "original_oplog_ckpt");ChangeStreamRequest.ChangeStreamRequestBuilder<Document> documentChangeStreamRequestBuilder = ChangeStreamRequest.builder(documnetMessageListener).database(mongoDatabase)
//                .collection("original_aydoc_test")  //需要监听的集合名,不指定默认监听数据库.filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete"))))  //过滤需要监听的操作类型,可以根据需求指定过滤条件.fullDocumentLookup(FullDocument.UPDATE_LOOKUP);//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息if(originalOplogCkpt != null && StringUtils.isNotBlank(originalOplogCkpt.getResumeToken())){log.info("==mongo监听从位点处恢复执行,位点resumeToken:{}==",originalOplogCkpt.getResumeToken());documentChangeStreamRequestBuilder.resumeToken(BsonDocument.parse(originalOplogCkpt.getResumeToken()));}ChangeStreamRequest request = documentChangeStreamRequestBuilder.build();//TODO 测试暂不开启messageListenerContainer.register(request, Document.class);return messageListenerContainer;}
}

这一步是注册自己编写的监听器,oplog_check_point这是本地mg的一个集合,用来记录每次位点的信息。

3. 编写MessageListener

package com.iflytek.databus.init;import com.google.common.collect.Lists;
import com.iflytek.databus.entity.OriginalOplogCkpt;
import com.iflytek.databus.service.TransToEsService;
import com.iflytek.databus.utils.EntityUtils;
import com.iflytek.databus.utils.GsonUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** @author xyding6* @Classname DocumnetMessageListener* @Description TODO* @date 2023/2/21*/
@Component
@Slf4j
public class DocumnetMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {@AutowiredRestHighLevelClient client;@AutowiredMongoTemplate mongoTemplate;@AutowiredTransToEsService transToEsService;@Overridepublic void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {try {//数据库String databaseName = message.getProperties().getDatabaseName();//集合名称String collectionName = message.getProperties().getCollectionName();//操作类型String operationType = message.getRaw().getOperationType().getValue();//位点tokenString resumeToken = message.getRaw().getResumeToken().toJson();//消息Document fullDocument = message.getRaw().getFullDocument();if(Lists.newArrayList("original_oplog_ckpt","original_mq_log","original_sys_login_log").contains(collectionName)){return;}log.info("Received Message in collection: {},message raw: {}, message body:{}",message.getProperties().getCollectionName(), message.getRaw(), message.getBody());//消息主键String messageId = message.getRaw().getDocumentKey().getObjectId("_id").getValue().toHexString();Class clazz = Class.forName("com.iflytek.databus.entity."+ EntityUtils.toHump(collectionName.replace("original_","")));//1. 处理消息  TODO 考虑放入消息中间件消费if(Lists.newArrayList(OperationType.INSERT.getValue(),OperationType.UPDATE.getValue(),OperationType.REPLACE.getValue()).contains(operationType)){Object claszzObj = GsonUtils.fromJsonToBean(fullDocument, clazz);transToEsService.mongoToEs(collectionName, Lists.newArrayList(claszzObj),false);}if(OperationType.DELETE.getValue().equals(operationType)){transToEsService.oplogDelTrigger(collectionName,messageId,clazz);}//2. 每次处理消息成功,更新resumeToken,以便于故障重启后能继续同步消息OriginalOplogCkpt originalOplogCkpt = new OriginalOplogCkpt();originalOplogCkpt.setId("oplog_check_point");originalOplogCkpt.setMessageId(messageId);originalOplogCkpt.setDatabaseName(databaseName);originalOplogCkpt.setCollectionName(collectionName);originalOplogCkpt.setOperationType(operationType);originalOplogCkpt.setResumeToken(resumeToken);originalOplogCkpt.setCreateTime(new Date());mongoTemplate.save(originalOplogCkpt);} catch (Exception e) {e.printStackTrace();log.error("Mongo消息实时监控处理失败,错误信息:{}",e.getStackTrace());}}
}

消息监听器中可以去实现自己对消息处理的逻辑,这里我的需求是将mg的变化同步到es中。

4. 动态的启停changestream

package com.iflytek.databus.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.stereotype.Service;/*** @author xyding6* @Classname ChangeStreamService* @Description TODO* @date 2023/2/24*/
@Service
@Slf4j
public class ChangeStreamService {@Autowiredprivate MessageListenerContainer messageListenerContainer;public void start(){log.info("ChangeStreamService start");messageListenerContainer.start();}public void stop(){log.info("ChangeStreamService stop MessageListenerContainer");messageListenerContainer.stop();}
}

可以在容器运行过程中,去操作changestream的启停。自己可以编写Controller来进行调用。

到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,如果项目对性能和延迟性要求不是太高可以选中changestream进行实现,否则还是推荐使用阿里开源的MongoShake.


http://www.ppmy.cn/devtools/27228.html

相关文章

mongodb 分片集群认证

增加认证 副本间认证外部使用认证 如果是开启状态,先关闭路由,再关闭配置服务,最后关闭分片数据复本集中的每个mongod&#xff0c;从次节点开始。直到副本集的所 有成员都离线&#xff0c;包括任何仲裁者。主节点必须是最后一个成员关闭以避免潜在的回滚.最好通过 db.shutdow…

LangChain入门2 RAG详解

RAG概述 一个典型的RAG应用程序,它有两个主要组件&#xff1a; 索引&#xff1a;从源中获取数据并对其进行索引的管道。这通常在脱机情况下发生。检索和生成&#xff1a;在运行时接受用户查询&#xff0c;并从索引中检索相关数据&#xff0c;然后将其传递给模型。 从原始数据…

Ansible playbook之循环

1.标准Loops 当我们想安装10个软件包的时候&#xff0c;为了避免写10个task来安装&#xff0c;我们可以直接使用标准的loops简单快速实现10个软件包的安装&#xff0c;下面例子是分别打印了one two这两个值&#xff1a; #1.编写loop.yaml [rootansible01 ansible]# cat loops.…

【数据结构】链表经典算法OJ题目练习

203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; 思路1&#xff1a;遍历原链表&#xff0c;将val链表释放掉 思路2&#xff1a;创建新链表 206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 思路1&#xff1a;创建新链表&#xff0c;将原链表中的节点拿过来…

商超物联网方案-Hotspot Service和客流分析方案概述

商超物联网方案-Hotspot Service和客流分析方案概述 场景概述 大型商场、大型综合体在相互竞争及线上消费的影响下&#xff0c;利润增长缓慢&#xff0c;迫切需要通过提供个性化服务提升顾客购物体验&#xff0c;促进利润增长。 向不同顾客推送其感兴趣的广告&#xff0c;不仅…

启明云端2.4寸屏+ESP32-S3+小型智能调速电动家用除草机案例 触控三档调速,能显示电压故障码

今天给大家分享个启明云端2.4寸屏ESP32-S3小型智能调速电动家用除草机案例&#xff0c;国外有草坪文化&#xff0c;这个机器能智能触控三档调速&#xff0c;带屏能显示电压故障码&#xff0c;数显档位&#xff08;3档最大&#xff09;&#xff0c;触控屏&#xff0c;长按3秒就能…

三种滤波(EKF、UKF、CKF)的对比,含MATLAB源代码

使用MATLAB模拟三维的滤波,包含扩展卡尔曼滤波EKF、无迹卡尔曼滤波UKF、容积卡尔曼滤波CKF。 状态更新和观测更新均为非线性的,模拟一定强度的机动性,可用于卡尔曼滤波方法的对比学习,自己修改成需要的运动模型后,可以用于组合导航(GPS+DVL形式)。 运行结果 真值的三轴…

深入了解Java中的Thread类

在Java编程中&#xff0c;Thread类是一个核心的类&#xff0c;用于创建和管理线程。线程是程序执行的最小单元&#xff0c;多线程编程可以提高程序的并发性和效率。 本文将深入介绍Java中的Thread类&#xff0c;包括其基本概念、创建线程的方法、线程状态的转换、线程同步与通…