使用Ruoyi的定时任务组件结合XxlCrawler进行数据增量同步实战-以中国地震台网为例

server/2024/9/24 11:29:15/

目录

前言

一、数据增量更新机制

1、全量更新机制

2、增量更新机制

二、功能时序图设计

1、原始请求分析

2、业务时序图

三、后台定时任务的设计与实现

四、Ruoyi自动任务配置

1、Ruoyi自动任务配置

2、任务调度

 总结


前言

        在之前的相关文章中,发表文章列表:在Java中使用XxlCrawler时防止被反爬的几种方式,基于Java的XxlCrawler网络信息爬取实战-以中国地震台网为例。在这两篇博客当中,我们介绍了XxlCralwer组件,以及如何进行爬虫反爬对抗。这两篇博客都是讲解的全量更新,即第一次全部抓取数据。在我们实际的信息爬取过程当中,肯定会有增量更新的问题,比如定期更新。不要爬取全部的数据,而只是抓取更新的数据,在进行多次的增量更新之后,就能实现数据源的数据一致性。这样的需求很常见,但是在以往的博客中很少进行提及。

        本文就是在这样的需求背景下诞生的,我们需要使用XxlCrawler组件对中国地震外网的地震信息进行增量同步。在第一次全量获取数据后,后面在系统运行过程当中,不需要人工干预,我们采用自动任务的方式,将信息抓取的过程完全有程序来完成。文章首先讲解了一般数据同步的方法,然后使用代码的方式介绍在本文中使用的数据同步方法,如何进行重复数据去重,最后讲解如何进行入库,结合Ruoyi中的定时任务组件,讲解如何进行定时任务计划的制定和运行,讲解如何将定时任务和爬虫关联起来。对有对信息进行增量更新的需求有一定的参考意义。

一、数据增量更新机制

        要想实现数据的动态更新,一般包含两种更新方式。即全量更新方式和增量更新方式,全量更新模式顾名思义,就是每次进行数据同步时都是全量数据同步。增量更新是基于全量更新基础之上的,每次的数据同步都采用增量的数据同步,即将变化的数据,新增的或者修改的,删除的数据同步到下游系统。下面结合中国地震台网数据,使用两种更新方式进行说明。

1、全量更新机制

        全量更新通常应用在数据的首次同步上,往往第一次需要上游系统的所有数据。因此有必要对上游系统进行全量同步。同时也是因为首次同步时,下游系统中往往还没有数据,因此不需要考虑数据重复的问题。只要将爬取的数据进行新增即可。全量更新的技术难度较低,再此不再进行赘述。

2、增量更新机制

        在首次同步好了全量数据以后,要想实现增量更新,数据增量同步ETL每次只处理增、删、改的变化数据,减少大量非变化数据同步。与数据全量同步ETL相比,数据增量同步ETL可以用最少的资源提高数据同步效率。其大致的思路有以下几种:

        1. 时间戳:最常见的方式。但是在业务系统里,不是每张表都有时间戳。

        2. 触发器:可靠性较高。但是对业务系统数据库性能损耗较大。

        3. 全量对比得出增量数据更新:对源数据库的性能损耗较小。但是大数据量对比更新时,对工具的性能开销需求较大。

        4. 全量对比MD5方式:建立一个结构类似的MD5临时表,通过MD5校验码比对。

        5. 日志解析:常见的数据库具备日志归档等功能,从日志获取变化数据,通过代码来开发和管理。

        通过之前的台网数据抓取得到的数据可以了解到,在我们的数据表格中是包含了时间字段了,因此我们可以使用基于时间戳的方案来实现台网数据的增量更新。

二、功能时序图设计

        为了实现中国地震外网数据定时同步,这里我们采用面向对象分析(OOA)的模式进行。同时为了比较清晰的说明其同步机制,我们将其大致的业务调用时序图进行设计。本节即主要描述增量同步的时序图设计。

1、原始请求分析

        为了简单介绍相关接口及正确获取增量数据,首先我们来看一下中国地震台网的更新接口。在其历史查询的接口列表中。我们可以看到其可以实现日期查询,具体的功能查询界面如下:

        然后我们来看一下实际的请求接口,打开控制台的网络监控Tab页:

https://www.ceic.ac.cn/ajax/search?page=1&&start=2024-05-01&&end=&&jingdu1=&&jingdu2=&&weidu1=&&weidu2=&&height1=&&height2=&&zhenji1=&&zhenji2=&&callback=jQuery18005659035271001251_1714803161229&_=1714807031625

        可以很明显看到,它的开始日期参数放到了start中,在start中绑定了开始查询参数。 在我们接口请求中也是利用这个时间戳字段来实现信息的增量更新。

2、业务时序图

        为了实现信息请求接口的增量更新,同时在实际请求当中,可能会遇到的数据重复判断的问题,因此我们需要进行数据的去重。我们使用cata_id和epi_id进行去重处理。具体的业务时序图如下:

        从时序图来看,大致的数据同步过程分了10步,下面针对每一步来进行简单说明:

        1.1:首先从数据库中获取已经同步数据的最大日期作为同步基础时间戳。

        1.2:如果基础时间戳不为空,在此时间戳之上,我们采用N-1,这么做是为了避免数据历史数据没有及时同步,这里可能会有数据重复,因此一定要进行数据去重的机制保证数据的一致性。

        1.3-1.5:从数据库中获取同步时间戳和拼接系统请求时间戳的方法。

        1.6:构造XxlCrawler爬取器,设置页面处理对象,进行首页信息爬取。

        1.7-1.8:定义下一页爬取规则,实现自动爬取所有其它页面的数据。

        1.9:启动信息抓取器,进行爬取。

三、后台定时任务的设计与实现

        在明确了相关的定时任务之后,这里我们在Ruoyi的框架下进行开发,需要按照时序图设计相关的时序逻辑。完整的增量更新任务类关键代码如下:

package com.yelang.project.monitor.job.task;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.xuxueli.crawler.XxlCrawler;
import com.xuxueli.crawler.parser.strategy.NonPageParser;
import com.yelang.common.utils.DateUtils;
import com.yelang.common.utils.StringUtils;
import com.yelang.project.extend.earthquake.domain.crawler.CeicDateAdapter;
import com.yelang.project.extend.earthquake.domain.crawler.CeicEarthquake;
import com.yelang.project.extend.earthquake.domain.crawler.CeicEarthquakeCrawler;
import com.yelang.project.extend.earthquake.service.ICeicEarthquakeService;
@Component("ceicEqIncrementalUpdateTask")
public class CeicEqIncrementalUpdateTask {private Logger logger = LoggerFactory.getLogger(CeicEqIncrementalUpdateTask.class);private static final String USER_AGENT = "\"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\"";private static final String commonUrl = "https://www.ceic.ac.cn/ajax/search?end=&&jingdu1=&&jingdu2=&&weidu1=&&weidu2=&&height1=&&height2=&&zhenji1=&&zhenji2=&_=";@Autowiredprivate ICeicEarthquakeService eqService;/*** 默认的增量更新方法,从数据库中获取当前更新的最大日期,以此为更新条件*/public void defaultIncrementalUpdate() {logger.info("执行无参方法");CeicEarthquake ceicEq = eqService.getMaxOtime();if (null == ceicEq) {logger.info("增量更新时间点为空!");return;}if (null != ceicEq) {Date yesterday = DateUtils.getPrevOneDay(ceicEq.getOTime());String yesterday_str = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD, yesterday);String firstUrl = commonUrl + System.currentTimeMillis() + "&&start=" + yesterday_str + "&&page=1";NonPageParser firstPageParse = new NonPageParser() {public void parse(String url, String pageSource) {dataParse(true, url, pageSource, yesterday_str);}};// 构造爬虫XxlCrawler crawler = buildXxlCrawler(firstPageParse, firstUrl);crawler.start(false);// 启动异步执行}}private void incrementalUpdate(String startDate, Integer pageNum) {String[] urlList = new String[pageNum - 1];for (int i = 0; i < pageNum - 1; i++) {urlList[i] = commonUrl + System.currentTimeMillis() + "&&start=" + startDate + "&&page=" + (2 + i);}NonPageParser dataParser = new NonPageParser() {public void parse(String url, String pageSource) {dataParse(false, url, pageSource, null);}};// 构造爬虫XxlCrawler crawler = this.buildXxlCrawler(dataParser, urlList);crawler.start(false);// 启动}private XxlCrawler buildXxlCrawler(NonPageParser pageParser, String... urls) {// 构造爬虫XxlCrawler crawler = new XxlCrawler.Builder().setUrls(urls)// 设置请求URL.setThreadCount(3)// 设置线程数.setPauseMillis(2000)// 设置暂停时间.setUserAgent(USER_AGENT)// 设置User-Agent.setIfPost(false).setFailRetryCount(3)// 重试三次.setPageParser(pageParser).build();return crawler;}private void dataParse(boolean isFirst, String url, String pageSource, String yesterday_str) {if (!StringUtils.isBlank(pageSource)) {pageSource = pageSource.substring(1, pageSource.length() - 1);Gson gson = new GsonBuilder().registerTypeAdapter(Date.class, new CeicDateAdapter()).create();CeicEarthquakeCrawler crawler = gson.fromJson(pageSource, CeicEarthquakeCrawler.class);Integer dataSize = crawler.getNum();// 获取数据总页数for (CeicEarthquake data : crawler.getShuju()) {String geom = "SRID=" + 4326 + ";POINT (" + data.getEpiLon() + " " + data.getEpiLat() + ")";// 拼接srid,实现动态写入data.setGeom(geom);}eqService.deduplication(crawler.getShuju());// 数据跟数据库去重后入库// 如果是首页且数据总数大于1,表示有多条,需要循环爬取if (isFirst && dataSize > 1) {logger.info("总页数:{}", crawler.getNum());incrementalUpdate(yesterday_str, dataSize);}}}
}

        自动任务的入口方法是defaultIncrementalUpdate(),在这里进行任务的调度。请注意数据转换的统一定义方法如下:

private void dataParse(boolean isFirst, String url, String pageSource, String yesterday_str) {if (!StringUtils.isBlank(pageSource)) {pageSource = pageSource.substring(1, pageSource.length() - 1);Gson gson = new GsonBuilder().registerTypeAdapter(Date.class, new CeicDateAdapter()).create();CeicEarthquakeCrawler crawler = gson.fromJson(pageSource, CeicEarthquakeCrawler.class);Integer dataSize = crawler.getNum();// 获取数据总页数for (CeicEarthquake data : crawler.getShuju()) {String geom = "SRID=" + 4326 + ";POINT (" + data.getEpiLon() + " " + data.getEpiLat() + ")";// 拼接srid,实现动态写入data.setGeom(geom);}eqService.deduplication(crawler.getShuju());// 数据跟数据库去重后入库// 如果是首页且数据总数大于1,表示有多条,需要循环爬取if (isFirst && dataSize > 1) {logger.info("总页数:{}", crawler.getNum());incrementalUpdate(yesterday_str, dataSize);}}}

        实现增量的逻辑是第一次请求的时候,通过返回的num来决定是否往下爬取,只有超过2页才需要进行剩余页面的请求。

private void incrementalUpdate(String startDate, Integer pageNum) {String[] urlList = new String[pageNum - 1];for (int i = 0; i < pageNum - 1; i++) {urlList[i] = commonUrl + System.currentTimeMillis() + "&&start=" + startDate + "&&page=" + (2 + i);}NonPageParser dataParser = new NonPageParser() {public void parse(String url, String pageSource) {dataParse(false, url, pageSource, null);}};// 构造爬虫XxlCrawler crawler = this.buildXxlCrawler(dataParser, urlList);crawler.start(false);// 启动}

        使用数据库的机制进行数据重复判断的逻辑如下:

@Override
public void deduplication(List<CeicEarthquake> eqDataList) {List<CeicEarthquake> saveList = new ArrayList<CeicEarthquake>();for (CeicEarthquake data : eqDataList) {Long size = this.getSizeByCataIdAndEpiId(data.getCataId(), data.getEpiId());if(size >= 1) {continue;//记录数大于1,表示数据中有记录}String geom = "SRID=" + 4326 +";POINT (" + data.getEpiLon()+ " "+data.getEpiLat()+")";//拼接srid,实现动态写入data.setGeom(geom);saveList.add(data);}if(saveList.size() > 0) {this.saveBatch(saveList, 300);}
}

四、Ruoyi自动任务配置

        这里采用Ruoyi进行自动任务配置,在自动任务框架驱动下进行数据的增量更新。因此需要我们进行任务的配置。本节将重点介绍Ruoyi的定时任务配置,以及如何关联到增量同步组件。

1、Ruoyi自动任务配置

        应用程序启动后,在系统监控中打开定时任务子菜单,可以看到系统中定义的所有定时任务列表。

        这里我已经定义了一个台网信息同步的定时任务,默认的是每天1点执行(时间频率请结合实际业务来进行配置),请不要给目标系统造成太大的异常流量。不要太频繁的发起访问。

        这里调用的ceicEqIncrementalUpdateTask.defaultIncrementalUpdate()。具体的调用参考如下:

        Bean调用示例:ryTask.ryParams('ry');

        Class类调用示例:com.yelang.quartz.task.RyTask.ryParams('ry')
 参数说明:支持字符串,布尔类型,长整型,浮点型,整型。

2、任务调度

        在任务创建好之后,我们可以进行任务开启,开启后,任务会自动在后台运行,在制定时间进行触发。由于我们配置的每天1点进行任务创建及运行,因此这里我们选择人工运行的模式,使用手动运行的模式。点击操作按钮中的执行一次。如下图所示:

        执行任务之前,我们来数据库中看一下数据的总条数是12459条:


select count(1)  from biz_ceic_earthquake;

        然后我们来执行任务调度,可以看到控制台进行了信息输出:

        在数据库中可以看到数据总数发生了变化,变成了12460(+1),成功将最新的数据同步到了数据库中。

 总结

        以上就是本文的主要内容, 本文需要使用XxlCrawler组件对中国地震外网的地震信息进行增量同步。在第一次全量获取数据后,后面在系统运行过程当中,不需要人工干预,我们采用自动任务的方式,将信息抓取的过程完全有程序来完成。文章首先讲解了一般数据同步的方法,然后使用代码的方式介绍在本文中使用的数据同步方法,如何进行重复数据去重,最后讲解如何进行入库,结合Ruoyi中的定时任务组件,讲解如何进行定时任务计划的制定和运行,讲解如何将定时任务和爬虫关联起来。对有对信息进行增量更新的需求有一定的参考意义。行文仓促,难免有不足之处,如有不足之处,欢迎各位专家朋友不吝赐教,万分感谢。


http://www.ppmy.cn/server/32290.html

相关文章

【protobuf】protobuf 开发 (二)

紧接着上一篇文章https://blog.csdn.net/qq_37387199/article/details/137890740 获取丢失的代码 拿到丢失的源代码需要去 Google 的 protobuf GitHub 仓库&#xff0c;地址在 https://github.com/protocolbuffers/protobuf 可以下载压缩包&#xff0c;也可以使用 Git 克隆。…

这书不错,古琴乐理实用教程(尹溧新编),有课学得通透。

通篇阅读后&#xff0c;发现这本书以古琴初习者、未系统接触过现代乐理的读者为对象&#xff0c;将复杂的古琴音乐理论简单化、通俗化。书中采用参照比较的方法、通俗易懂的语言、言简意赅的文字&#xff0c;并结合具体音乐作品将古琴研习中最主要的、最核心的理论知识进行简明…

多态的原理

前言:以下的内容均是在VS2019的环境中&#xff0c;32位平台下的 目录 1.多态的实现条件 虚函数重写的两个例外 一个题加深理解 总结 重载 重写 重定义区别 2.多态的实现原理 单继承 多继承 动态多态和静态多态 多态的好问题 1.多态的实现条件 虚函数&#xff1a;被…

微博一级评论爬虫

cookies需要替换成自己的 import requests import requests from lxml import etree import openpyxl from concurrent.futures.thread import ThreadPoolExecutor import re from datetime import datetime, timedelta from urllib import parse from jsonpath import jsonpa…

Django框架之模板层

一、模版语法 1、模版初识 &#xff08;1&#xff09;语法 {{ }}: 变量相关 {% %}: 逻辑相关 &#xff08;2&#xff09;变量 ① 传值 在Django的模板语言中按此语法使用&#xff1a; {{ 变量名 }}。 当模版引擎遇到一个变量&#xff0c;它将计算这个变量&#xff0c;然…

对命令模式的理解

目录 一、场景1、文本编辑器并不是一个好的例子&#xff0c;设备控制器才是2、设备控制器的demo 二、不用命令模式1、代码2、问题 三、使用命令模式1、代码2、当需求变化时2.1 新增代码2.2 优点 四、进一步思考1、省略对Command的建模可以吗&#xff1f;2、命令模式的价值 一、…

2024五一数学建模B题思路代码与论文分析

2024五一数学建模B题完整代码和成品论文获取↓↓↓↓↓ https://www.yuque.com/u42168770/qv6z0d/gyoz9ou5upvkv6nx?singleDoc# B题 未来新城交通需求规划与可达率问题需要建立的模型和算法: 1. 图论 2. 网络流模型 3. 线性规划/整数规划 4. 组合优化 5. 随机过程 6. …

C#调用skiasharp操作并绘制图片

之前学习ViewFaceCore时采用Panel控件和GDI将图片及识别出的人脸方框和关键点绘制出来&#xff0c;本文将其修改为基于SKControl和SKCanvas实现相同的显示效果并支持保存为本地图片。   新建Winform项目&#xff0c;在Nuget包管理器中搜索并安装一下SkiaSharp和ViewFaceCore…