changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更
想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具
chang stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。
关于changestream做如下说明,提供参考
在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;change stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;由于change stream 利用了存储在 oplog 中的信息,因此对于单进程部署的mongodb无法支持change stream功能,其只能用于启用了副本集的独立集群或分片集群changestream可用于监听的mongodb目标类型
单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持整个集群,整个集群内除去系统库( (admin稚嫩的意思/local/config)之外的集合 ,4.0版本支持一个change stream event的基本结构如下所示:
关于上面的数据结构,做简单的解释说明,
_id,变更事件的token对象operationtype,变更类video什么意思型(见下面介绍)fulldocument,文档内容ns,监听的目标ns.db,变更的数据库ns.coll,变更的集合documentkey,变更文档的键值,含_id字段updatedescription,变更描述updatedescription.updatedfields,变更中更新字段updatedescription.removedfields,变更中删除字段clustertime,对应oplog的时间戳txnnumber,事务编号,仅在多文档事务中出现,4.0版本支持lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持change steram支持的变更类型,对于上面的operationtype 这个参数,主要包括有以下几个:
inrt,插入文档delete,删除文档replace,替换文档,当执行replace操作指定uprt时,可能是inrt事件update,更新文档,当执行update操作指定uprt时,可能是inrt事件invalidate,失效事件,比如执行了collection.drop或collection.rename以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等
以上为changestream的必备理论知识,想要深入学习的话无比要了解,下面通过实操来展示下changestream的使用
mongdb复制集群,本例的复制集群对应的mongodb版本为 4.0.x
登录primary节点,创建一个数据库
友情提醒:数据库需要提前创建
1、启动两个mongo shell,一个操作数据库,一个watch
在其中一个窗口执行如下命令,开启监听
2、在另一个窗口下,给上面的articledb插入一条数据
数据写入成功后,在第一个窗口下,执行下面的命令:
说明已经成功监听到新增的数据,修改、删除事件可以做类似的操作即可
以上先通过shell窗口展示了一下changestream的使用效果,接下来,将通过程序演示下如何在客户端集成并使用changestream
这段程序聪明绝顶主要分为几个核心部分,做如下解释说明,
连接mogodb服务端及相关配置通过pipline开启watch监听监听到特定数据库下集合的数据变化,然后打印出变化的数据启动这段程序,观察控制台日志数据
在未对articledb数据库下的comment集合做任何操作之前,由于watch为检测到任何数据变化,所以无法进入到while循环中,接下来,从shell端给comment集合新增一条数据,然后再次观察控制台数据变化
可以看到,控制台很快就检测到变化的数据
以下为完整的日志数据
{ operationtype=operationtype{value=’inrt’}, resumetoken={“_data”: “8262138891000000022b022c0100296e5a1004b9065629412942f8852d592b9fd441b946645f696400646213889158b116a29c3fd1140004”}, namespace=articledb.comment, destinationnamespace=null, fulldocument=document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, urid=1010, nickname=marry}}, documentkey={“_id”: {“$oid”: “6213889158b116a29c3fd114”}}, clustertime=timestamp{value=7067142396626075650, conds=1645447313, inc=2}, updatedescription=null, txnnumber=null, lsid=null}
至于在业务中的具体使用,可以结合自身的情况,举例来说,应用程序只想监听修改数据的事件,那么就可以在修改数据事件的监听逻辑中,解析变化后的数据做后续的操作
springboot整合changestream
在实际开发中,更通用的场景是整合到springboot工程中使用,有过一定的开发经验的同学应该很容易想到核心的逻辑长什么样了,和canal的客户端操作类似,需要在一个配置类去监听即可
本例演示的是基于上文搭建的mongodb复制集群
简单的添加2个用接口测试的方法
启动本工程,然后浏览器调用下查询所有数据的接口,数据能正常返回,说明工程的基础结构就完成了
mongomessagelistener 类 ,顾名思义,该类用于监听特定数据库下的集合数据变化使用的,在实际开发中,该类的作用也是非常重要的,类似于许多中间件的客户端监听程序,当监听到数据变化后,做出后续的业务响应,比如,数据入库、推送消息到kafka、发送相关的事件等等
changestream 类 ,事件注册类,即开篇中提到的那几种事件类型的操作等
mongoconfig 配置messagelistenercontainer 容器的相关参数
3个类添加完成后,再次启动程序,并观察控制台数据日志
测试1:通过shell窗口登录primary节点,并给comment集合添加一条数据
几乎是实时的监听到事件操作的数据变化,下面是完整的输出日志
测试2:通过shell窗口删除上面新增的这条数据
如果一个系统的数据需要迁移到另一个系统,可以考虑使用mongodb changestream这种方式,试想,如果老系统数据非常杂乱,并且文档中存在一些脏数据时,为了确保迁移后的数据能较快的投产,通过关于儿童的诗句应用程序的方式,能够原始的数据做类似etl的处理,这样更加方便
如果您的系统对数据监管较为严格,可以考虑使用changestream这种方式,订阅特定事件的数据操作,比如修改和删除数据的事件,然后及时的发送告警通知
我们知道,mongodb作为一款性能优秀的分布式文档型数据库,其实是可以存储海量数据的,在一些大数据场景下,比如下游其他的应用采用大数据技术,需要对mongo中的数据做轨迹行为分析,changestream就是一种不错的选择,当监听到特定事件的数据变化时,向消息队列,比如kafka推送相应的消息,下游相关的大数据应用就可以做后续的业务处理了
到此这篇关于springboot整合mongodbchangestream的示例代码的文章就介绍到这了,更多相关springboot整合mongodbchangestream内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!
本文发布于:2023-04-05 20:19:50,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/cc2329e2cad33cb4776bbfdddb904491.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:springboot整合mongodb changestream的示例代码.doc
本文 PDF 下载地址:springboot整合mongodb changestream的示例代码.pdf
留言与评论(共有 0 条评论) |