长连接 、短连接、心跳机制与断线重连

news/2024/10/30 23:13:54/

在不同场景下要考虑长连接还是短连接,那么我们要先了解他。

短连接

概念

client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。

这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。上述可知,短连接一般只会在 client/server间传递一次请求操作。

短连接的优缺点

管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。

使用场景

通常浏览器访问服务器的时候就是短连接,也就是服务接口。

对于服务端来说,长连接会耗费服务端的资源,而且用户用浏览器访问服务端相对而言不是很频繁的

如果有几十万,上百万的连接,服务端的压力会非常大,甚至会崩溃。

所以对于并发量大,请求频率低的,建议使用短连接。

长连接

什么是长连接

client向server发起连接,server接受client连接,双方建立连接。

Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。

长连接的生命周期

正常情况下,一条TCP长连接建立后,只要双不提出关闭请求并且不出现异常情况,这条连接是一直存在的.操作系统不会自动去关闭它,甚至经过物理网络拓扑的改变之后仍然可以使用。

所以一条连接保持几天、几个月、几年或者更长时间都有可能,只要不出现异常情况或由用户(应用层)主动关闭。客户端和服务单可一直使用该连接进行数据通信。

长连接的优点

长连接可以省去较多的TCP建立和关闭的操作,减少网络阻塞的影响,

当发生错误时,可以在不关闭连接的情况下进行提示,

减少CPU及内存的使用,因为不需要经常的建立及关闭连接。

长连接的缺点

连接数过多时,影响服务端的性能和并发数量。

使用场景

数据库的连接就是采用TCP长连接.

RPC,远程服务调用,在服务器,一个服务进程频繁调用另一个服务进程,可使用长连接,减少连接花费的时间。

最近在维护物联网设备上报数据到服务端,采用的netty长连接,所以记录一下短连接和长连接的区别。

总结

1.对于长连接和短连接的使用是需要根据应用场景来判断的。

2.长连接并不是万能的,也是需要维护的。

长连接的实现

心跳机制

应用层协议大多都有HeartBeat机制,通常是客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。

并传输一些可能必要的数据。使用心跳包的典型协议是IM,比如QQ/MSN/飞信等协议。

在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项:SO_KEEPALIVE。

系统默认是设置的2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。

而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。

为什么需要心跳机制?

因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等,

会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的.

心跳机制即可解决此类问题。

TCP协议的KeepAlive机制

默认KeepAlive状态是不打开的。

需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,

并且可以设置三个参数:

tcp_keepalive_time  ,tcp_keepalive_probes  , tcp_keepalive_intvl

分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。

很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。

典型做法是LRU,把最久没有数据的连接给T掉。

通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。

如何实现心跳机制?

两种方式实现心跳机制:

  • 使用 TCP 协议层面的 keepalive 机制.

  • 在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  1. 它不是 TCP 的标准协议, 并且是默认关闭的.

  2. TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.

  3. TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,

这里主要介绍应用层方面实现心跳机制,使用netty实现心跳和断线重连。

netty实现心跳机制

netty对心跳机制提供了机制,实现的关键是IdleStateHandler先来看一下他的构造函数

    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}

实例化一个 IdleStateHandler 需要提供三个参数:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 读和写都超时. 即当在指定的时间间隔内没有读并且写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

netty心跳流程

1. 客户端成功连接服务端。

2.在客户端中的ChannelPipeline中加入IdleStateHandler,设置写事件触发事件为5s.

3.客户端超过5s未写数据,触发写事件,向服务端发送心跳包,

4.同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,减轻服务端的压力

5.超过三次,1过0s服务端都会收到来自客户端的心跳信息,服务端可以认为客户端挂了,可以close链路。

6.客户端恢复正常,发现链路已断,重新连接服务端。

代码实现

服务端handler:

package com.heartbreak.server;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;import java.util.Random;/*** @author janti* @date 2018/6/10 12:21*/
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {// 失败计数器:未收到client端发送的ping请求private int unRecPingTimes = 0;// 定义服务端没有收到心跳消息的最大次数private static final int MAX_UN_REC_PING_TIMES = 3;private Random random = new Random(System.currentTimeMillis());@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {if (msg!=null && msg.equals("Heartbeat")){System.out.println("客户端"+ctx.channel().remoteAddress()+"--心跳信息--");}else {System.out.println("客户端----请求消息----:"+msg);String resp = "商品的价格是:"+random.nextInt(1000);ctx.writeAndFlush(resp);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state()==IdleState.READER_IDLE){System.out.println("===服务端===(READER_IDLE 读超时)");// 失败计数器次数大于等于3次的时候,关闭链接,等待client重连if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {System.out.println("===服务端===(读超时,关闭chanel)");// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连ctx.close();} else {// 失败计数器加1unRecPingTimes++;}}else {super.userEventTriggered(ctx,evt);}}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);System.out.println("一个客户端已连接");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.out.println("一个客户端已断开连接");}
}

服务端server:

package com.heartbreak.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** @author tangj* @date 2018/6/10 10:46*/
public class HeartBeatServer {private static int port = 9817;public HeartBeatServer(int port) {this.port = port;}ServerBootstrap bootstrap = null;ChannelFuture f;// 检测chanel是否接受过心跳数据时间间隔(单位秒)private static final int READ_WAIT_SECONDS = 10;public static void main(String args[]) {HeartBeatServer heartBeatServer = new HeartBeatServer(port);heartBeatServer.startServer();}public void startServer() {EventLoopGroup bossgroup = new NioEventLoopGroup();EventLoopGroup workergroup = new NioEventLoopGroup();try {bootstrap = new ServerBootstrap();bootstrap.group(bossgroup, workergroup).channel(NioServerSocketChannel.class).childHandler(new HeartBeatServerInitializer());// 服务器绑定端口监听f = bootstrap.bind(port).sync();System.out.println("server start ,port: "+port);// 监听服务器关闭监听,此方法会阻塞f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossgroup.shutdownGracefully();workergroup.shutdownGracefully();}}private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 监听读操作,读超时时间为5秒,超过5秒关闭channel;pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", new HeartbeatServerHandler());}}}

 客户端handler

package com.heartbreak.client;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author tangj* @date 2018/6/11 22:55*/
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{private HeartBeatClient client;private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",CharsetUtil.UTF_8));public HeartBeatClientHandler(HeartBeatClient client) {this.client = client;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到服务端回复:"+msg);if (msg.equals("Heartbeat")) {ctx.write("has read message from server");ctx.flush();}ReferenceCountUtil.release(msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());}} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.err.println("客户端与服务端断开连接,断开的时间为:"+format.format(new Date()));// 定时线程 断线重连final EventLoop eventLoop = ctx.channel().eventLoop();eventLoop.schedule(new Runnable() {@Overridepublic void run() {client.doConncet();}}, 10, TimeUnit.SECONDS);}}

客户端启动:

package com.heartbreak.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author tangj* @date 2018/6/10 16:18*/
public class HeartBeatClient {private Random random = new Random();public Channel channel;public Bootstrap bootstrap;protected String host = "127.0.0.1";protected int port = 9817;public static void main(String args[]) throws Exception {HeartBeatClient client = new HeartBeatClient();client.run();client.sendData();}public void run() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new SimpleClientInitializer(HeartBeatClient.this));doConncet();} catch (Exception e) {e.printStackTrace();}}/*** 发送数据* @throws Exception*/public void sendData() throws Exception {BufferedReader in = new BufferedReader(new InputStreamReader(System.in));while (true){String cmd = in.readLine();switch (cmd){case "close" :channel.close();break;default:channel.writeAndFlush(in.readLine());break;}}}/*** 连接服务端*/public void doConncet() {if (channel != null && channel.isActive()) {return;}ChannelFuture channelFuture = bootstrap.connect(host, port);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture futureListener) throws Exception {if (channelFuture.isSuccess()) {channel = futureListener.channel();System.out.println("connect server successfully");} else {System.out.println("Failed to connect to server, try connect after 10s");futureListener.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doConncet();}}, 10, TimeUnit.SECONDS);}}});}private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {private HeartBeatClient client;public SimpleClientInitializer(HeartBeatClient client) {this.client = client;}@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new IdleStateHandler(0, 5, 0));pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("handler", new HeartBeatClientHandler(client));}}}

运行结果:

1.客户端长时间未发送心跳包,服务端关闭连接

server start ,port: 9817
一个客户端已连接
===服务端===(READER_IDLE 读超时)
===服务端===(READER_IDLE 读超时)
===服务端===(READER_IDLE 读超时)
===服务端===(READER_IDLE 读超时)
===服务端===(读超时,关闭chanel)
一个客户端已断开连接

2.客户端发送心跳包,服务端和客户端保持心跳信息

一个客户端已连接
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--

3.服务单宕机,断开连接,客户端进行重连

客户端与服务端断开连接,断开的时间为:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully

代码实例:资源包

最终指定基于Zookeeper + Kafka + Netty + Redis实现一个支持高并发高可用分布式集群部署的服务端,长连接客户端设备,消息发送到kafka供下游数据消费分析。


http://www.ppmy.cn/news/65814.html

相关文章

模式串匹配算法(朴素模式匹配与KMP)的机算与手算。

一.朴素模式匹配 1.机算 其实就是暴力匹配。 使用双指针 i (指向主串) j (指向模式串) 从主串 S 第一字符起,与模式串 T, 第一个字符比较&#xff0c;   ①若相同&#xff0c;则 i 与 j 统一向后移   ②若遇到 i 与 j 指向字符不同&#xff0c;回溯 i j 指针。继续如此&a…

JVM系列-第6章-方法区

方法区 栈、堆、方法区的交互关系 从线程共享与否的角度来看 ThreadLocal&#xff1a;如何保证多个线程在并发环境下的安全性&#xff1f;典型场景就是数据库连接管理&#xff0c;以及会话管理。 栈、堆、方法区的交互关系 下面涉及了对象的访问定位 Person 类的 .class …

如何压缩pdf文件大小?四种方法随意选择

如何压缩pdf文件大小&#xff1f;PDF文件格式由于其跨平台性&#xff0c;易于浏览、打印和传输等特点&#xff0c;在现代社会中广泛应用于各个领域。然而&#xff0c;随着PDF文件越来越大&#xff0c;传输及存储所需的时间也会变得越来越长&#xff0c;从而降低了工作效率。在这…

“技术开发最应该做什么?”,聊聊我在服务端开发5年的理解和收获

我们新推出大淘宝技术年度特刊《长期主义&#xff0c;往往从一些小事开始——工程师成长总结专题》&#xff0c;专题收录多位工程师真诚的心路历程与经验思考&#xff0c;覆盖终端、服务端、数据算法、技术质量等7大技术领域&#xff0c;欢迎一起沟通交流。 本文为此系列第二篇…

RK3588平台开发系列讲解(内存篇)Linux 伙伴系统数据结构

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、 页二、区三、内存节点沉淀、分享、成长,让自己和他人都能有所收获!😄 📢Linux 系统中,用来管理物理内存页面的伙伴系统,以及负责分配比页更小的内存对象的 SLAB 分配器了。 本篇将介绍伙伴系统相关数据结…

Swift的高级语法特性,如可选类型、闭包、枚举、泛型、协议、扩展等

Swift是一门现代化的编程语言&#xff0c;具有许多高级语法特性&#xff0c;下面我们来逐一介绍。 1. 可选类型&#xff08;Optional&#xff09; Swift中的可选类型是一种特殊的类型&#xff0c;它可以表示一个值是存在或不存在的。在声明一个可选类型时&#xff0c;需要在类…

Kyligence Zen产品体验-从人找数据到数据找人

目录 前言&#xff1a; 一、什么是Kyligence Zen&#xff1f; 1、个人总结 2、官方简介 二、1分钟打开新世界大门 个人总结&#xff1a; 1、注册 2、验证登录 三、上手初体验 1、快速上手&#xff08;入门&#xff09; 2、定制化应用 四、实战体验 综述&#xff1a; 1、卡…

性能测试入门实践路线图

我转行做软件测试工作已有六年多了&#xff0c; 从功能到自动化测试&#xff0c;然后负责性能测试团队和质量团队的技术专项治理&#xff0c;再到测试专家角色&#xff0c;负责整个技术项目的产品/运营和质量保障工作。 其中性能测试和线上稳定性保障&#xff0c;算是我最擅长…