netty之SpringBoot+Netty+Elasticsearch收集日志信息数据存储

ops/2025/2/12 4:37:51/

前言


将大量的业务以及用户行为数据存储起来用于分析处理,但是由于数据量较大且需要具备可分析功能所以将数据存储到文件系统更为合理。尤其是一些互联网高并发级应用,往往数据库都采用分库分表设计,那么将这些分散的数据通过binlog汇总到一个统一的文件系统就显得非常有必要。

#开发环境
在这里插入图片描述
环境准备
windows安装包 下载
注意 es是以来java环境 所以需要安装jdk 支持1.7以上
es-hander下载可视化操作插件

@Document(indexName = "stack", type = "group_user")
public class User {@Idprivate String id;private String name;   //姓名private Integer age;   //年龄private String level;  //级别private Date entryDate;//时间private String mobile; //电话private String email;  //邮箱private String address;//地址public User(String id, String name, Integer age, String level, Date entryDate, String mobile, String email, String address) {this.id = id;this.name = name;this.age = age;this.level = level;this.entryDate = entryDate;this.mobile = mobile;this.email = email;this.address = address;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getLevel() {return level;}public void setLevel(String level) {this.level = level;}public Date getEntryDate() {return entryDate;}public void setEntryDate(Date entryDate) {this.entryDate = entryDate;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getEmail() {return email;}public void setEmail(String email) {this.email = email;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}
@Service("myServerHandler")
public class MyServerHandler extends ChannelInboundHandlerAdapter {private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);@Autowiredprivate UserService userService;/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();logger.info("链接报告开始");logger.info("链接报告信息:有一客户端链接到本服务端");logger.info("链接报告IP:{}", channel.localAddress().getHostString());logger.info("链接报告Port:{}", channel.localAddress().getPort());logger.info("链接报告完毕");}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + JSON.toJSONString(msg));//接收数据写入到ElasticsearchTransportProtocol transportProtocol = (TransportProtocol) msg;userService.save((User) transportProtocol.getObj());}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();logger.info("异常信息:\r\n" + cause.getMessage());}}
@Component("nettyServer")
public class NettyServer {private Logger logger = LoggerFactory.getLogger(NettyServer.class);@Resourceprivate MyChannelInitializer myChannelInitializer;//配置服务端NIO线程组private final EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private final EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(InetSocketAddress address) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(myChannelInitializer);channelFuture = b.bind(address).syncUninterruptibly();channel = channelFuture.channel();} catch (Exception e) {logger.error(e.getMessage());} finally {if (null != channelFuture && channelFuture.isSuccess()) {logger.info("itstack-demo-netty server start done. {关注明哥,获取源码}");} else {logger.error("itstack-demo-netty server start error. {关注明哥,获取源码}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

public interface UserService {void save(User user);void deleteById(String id);User queryUserById(String id);Iterable<User> queryAll();Page<User> findByName(String name, PageRequest request);}

提供一个可拓展的操作实体表的接口


public interface UserRepository extends ElasticsearchRepository<User, String> {Page<User> findByName(String name, Pageable pageable);}
@Service("userService")
public class UserServiceImpl implements UserService {private UserRepository dataRepository;@Autowiredpublic void setDataRepository(UserRepository dataRepository) {this.dataRepository = dataRepository;}@Overridepublic void save(User user) {dataRepository.save(user);}@Overridepublic void deleteById(String id) {dataRepository.deleteById(id);}@Overridepublic User queryUserById(String id) {Optional<User> optionalUser = dataRepository.findById(id);return optionalUser.get();}@Overridepublic Iterable<User> queryAll() {return dataRepository.findAll();}@Overridepublic Page<User> findByName(String name, PageRequest request) {return dataRepository.findByName(name, request);}}
@RestController
public class NettyController {@Resourceprivate NettyServer nettyServer;@RequestMapping("/localAddress")public String localAddress() {return "nettyServer localAddress " + nettyServer.getChannel().localAddress();}}
@SpringBootApplication
public class Application implements CommandLineRunner {private Logger logger = LoggerFactory.getLogger(Application.class);@Value("${netty.host}")private String host;@Value("${netty.port}")private int port;@Resourceprivate NettyServer nettyServer;public static void main(String[] args) {System.setProperty("es.set.netty.runtime.available.processors", "false");SpringApplication.run(Application.class, args);}@Overridepublic void run(String... args) throws Exception {InetSocketAddress address = new InetSocketAddress(host, port);ChannelFuture channelFuture = nettyServer.bing(address);Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));channelFuture.channel().closeFuture().syncUninterruptibly();}}
## 服务端口
server.port = 8080## Netty服务端配置
netty.host = 127.0.0.1
netty.port = 7397## Elasticsearch配置{更换为自己的cluster-name、cluster-nodes}
spring.data.elasticsearch.cluster-name=es-itstack
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true

ApiTest.java *Netty客户端,用于向服务端发送数据

public class ApiTest {public static void main(String[] args) {System.out.println("hi 微信公众号:关注明哥");EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(TransportProtocol.class));channel.pipeline().addLast(new ObjEncoder(TransportProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {}});}});ChannelFuture f = b.connect("127.0.0.1", 7397).sync();System.out.println("itstack-demo-netty client start done. {关注明哥,获取源码}");TransportProtocol tp1 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李小明", 1, "T0-1", new Date(), "13566668888", "184172133@qq.com", "北京"));TransportProtocol tp2 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "张大明", 2, "T0-2", new Date(), "13566660001", "huahua@qq.com", "南京"));TransportProtocol tp3 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李书鹏", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));TransportProtocol tp4 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "韩小雪", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));TransportProtocol tp5 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "董叔飞", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "河北"));TransportProtocol tp6 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "候明相", 2, "T5-1", new Date(), "13566660002", "xiaobai@qq.com", "下花园"));TransportProtocol tp7 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "田明明", 2, "T3-1", new Date(), "13566660002", "xiaobai@qq.com", "东平"));TransportProtocol tp8 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "王大伟", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "西湖"));TransportProtocol tp9 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李雪明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "南昌"));TransportProtocol tp10 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "朱小飞", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "吉林"));TransportProtocol tp11 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "牛大明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "长春"));TransportProtocol tp12 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "关雪儿", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "深圳"));//向服务端发送信息f.channel().writeAndFlush(tp1);f.channel().writeAndFlush(tp2);f.channel().writeAndFlush(tp3);f.channel().writeAndFlush(tp4);f.channel().writeAndFlush(tp5);f.channel().writeAndFlush(tp6);f.channel().writeAndFlush(tp7);f.channel().writeAndFlush(tp8);f.channel().writeAndFlush(tp9);f.channel().writeAndFlush(tp10);f.channel().writeAndFlush(tp11);f.channel().writeAndFlush(tp12);f.channel().closeFuture().syncUninterruptibly();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}

好了到这里就结束了netty之SpringBoot+Netty+Elasticsearch收集日志信息数据存储的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;


http://www.ppmy.cn/ops/124089.html

相关文章

vite学习教程05、vite+vue2构建本地 SVG 图标

文章目录 前言一、构建本地SVG图标详细步骤1、安装开发依赖2、配置vite2.1、配置vite.config.js2.2、封装vite引入插件脚本 解决报错&#xff1a;can not find package fast-glob imported 二、实际应用应用1&#xff1a;未封装&#xff0c;直接vue应用应用2&#xff1a;封装vu…

BugReport中的App Processor wakeup字段意义

一、功耗字段意义&#xff1a; App processor wakeup:Netd基于xt_idletimer 待机下监视网络设备的收发工作状态&#xff0c;即当设备发生联网从休眠态变成为唤醒态时&#xff0c;会记录打醒者的uid(uid大于0)和网络类型(wifi或数据类型)、时间戳 实际日志&#xff1a;我们在B…

Linux: 网络: tcp_mem遭遇hard limit时,是否要上报警告?

tcp_mem: https://mzhan017.blog.csdn.net/article/details/142647143. 根据Linux内核的代码看,tcp_mem的设置是下面的默认值(按照当前系统所拥有内存容量的一个比例): static void __init tcp_init_mem(void) {unsigned long limit = nr_free_buffer_pages()

从零学编程- C语言-第18天

1.malloc 2.free 3.calloc 4.malloc 跟calloc 一个不能自动初始化一个能自动初始化 使用那个无所谓&#xff0c;看自己 calloc mallocmemset 5.realloc ​​​​​​​ ​​​​​​​ 6.申请空间是需要浪费时间的&#xff0c;频繁的添加空间耗时间&#xff0c;需要操作系…

mysql学习教程,从入门到精通,SQL 复制表(36)

1、SQL 复制表 在 SQL 中&#xff0c;复制表是一个常见的任务&#xff0c;通常用于备份、测试或数据迁移。下面是一个基本的指南&#xff0c;演示如何在不同的 SQL 数据库管理系统中复制表。 1.1. 使用 CREATE TABLE ... AS SELECT ... 语句 这种方法适用于大多数 SQL 数据库…

从零学编程-C语言-第17天

今天是学习C语言的第17天 时间&#xff1a;2024/10/6 21:16分 使用编译器&#xff1a;vs2019 此贴记录自己的成长 今天学习内容如下 1.自定义类型-结构体 结构体 枚举 联合 //结构体 struct stu {char name[20]; }s1, s2; 这里是全局变量 int main() {struct stu s1,s2 …

基于SpringBoot+Uniapp的家庭记账本微信小程序系统设计与实现

项目运行截图 展示效果图 展示效果图 展示效果图 展示效果图 展示效果图 5. 技术框架 5.1 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更…

数学建模算法与应用 第8章 时间序列分析

目录 8.1 确定性时间序列分析方法 Matlab代码示例&#xff1a;移动平均法提取趋势 8.2 平稳时间序列模型 Matlab代码示例&#xff1a;差分法与ADF检验 8.3 时间序列的Matlab相关工具箱及命令 Matlab代码示例&#xff1a;ARIMA模型的建立 8.4 ARIMA序列与季节性序列 Matl…