Android WebSocket工具类:重连、心跳、消息队列一站式解决方案

server/2025/3/11 0:38:08/
  1. 依赖库
    使用 OkHttp 的WebSocket支持。

在 build.gradle 中添加依赖:

implementation 'com.squareup.okhttp3:okhttp:4.9.3'
  1. WebSocket工具类实现
import okhttp3.*;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class WebSocketManager {private static final String TAG = "WebSocketManager";private OkHttpClient client;private WebSocket webSocket;private String wsUrl;private AtomicInteger reconnectCount = new AtomicInteger(0);private boolean isConnecting = false;private boolean isConnected = false;private Handler mainHandler = new Handler(Looper.getMainLooper());private Runnable heartbeatRunnable;private ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>(); // 线程安全的消息队列private ExecutorService executorService = Executors.newSingleThreadExecutor(); // 线程池private int maxReconnectCount = 5; // 最大重连次数private long reconnectInterval = 5000; // 重连间隔时间(毫秒)private long heartbeatInterval = 30000; // 心跳间隔时间(毫秒)private long heartbeatTimeout = 60000; // 心跳超时时间(毫秒)private WebSocketCallback callback;private WebSocketListener webSocketListener = new WebSocketListener() {@Overridepublic void onOpen(WebSocket webSocket, Response response) {Log.d(TAG, "WebSocket connected");isConnecting = false;isConnected = true;reconnectCount.set(0); // 重置重连次数startHeartbeat(); // 开始心跳sendQueuedMessages(); // 发送缓存的消息if (callback != null) {mainHandler.post(() -> callback.onConnected());}}@Overridepublic void onMessage(WebSocket webSocket, String text) {Log.d(TAG, "Received message: " + text);if (callback != null) {mainHandler.post(() -> callback.onMessage(text));}}@Overridepublic void onClosed(WebSocket webSocket, int code, String reason) {Log.d(TAG, "WebSocket closed: " + reason);isConnecting = false;isConnected = false;stopHeartbeat(); // 停止心跳if (callback != null) {mainHandler.post(() -> callback.onDisconnected());}}@Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {Log.e(TAG, "WebSocket failed: " + t.getMessage());isConnecting = false;isConnected = false;stopHeartbeat(); // 停止心跳if (callback != null) {mainHandler.post(() -> callback.onError(t));}reconnect(); // 尝试重连}};// 私有构造函数,使用Builder模式创建实例private WebSocketManager(Builder builder) {this.wsUrl = builder.wsUrl;this.maxReconnectCount = builder.maxReconnectCount;this.reconnectInterval = builder.reconnectInterval;this.heartbeatInterval = builder.heartbeatInterval;this.heartbeatTimeout = builder.heartbeatTimeout;client = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).writeTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).build();}// 连接WebSocketpublic void connect() {if (!isConnecting && !isConnected) {isConnecting = true;executorService.submit(() -> {Request request = new Request.Builder().url(wsUrl).build();webSocket = client.newWebSocket(request, webSocketListener);});}}// 断开连接public void disconnect() {if (webSocket != null) {executorService.submit(() -> webSocket.close(1000, "Normal closure"));}releaseResources();}// 发送消息public void sendMessage(String message) {if (webSocket != null && isConnected) {executorService.submit(() -> {boolean sent = webSocket.send(message);if (!sent) {// 发送失败,将消息加入队列messageQueue.offer(message);}});} else {// 网络未连接时,将消息加入队列messageQueue.offer(message);}}// 重连机制private void reconnect() {if (reconnectCount.get() < maxReconnectCount) {reconnectCount.incrementAndGet();mainHandler.postDelayed(() -> {Log.d(TAG, "Reconnecting... Attempt: " + reconnectCount.get());connect();}, reconnectInterval);} else {Log.e(TAG, "Max reconnection attempts reached");}}// 发送缓存的消息private void sendQueuedMessages() {executorService.submit(() -> {while (!messageQueue.isEmpty()) {String message = messageQueue.poll();if (message != null) {boolean sent = webSocket.send(message);if (!sent) {// 发送失败,将消息重新加入队列messageQueue.offer(message);break;}}}});}// 开始心跳private void startHeartbeat() {heartbeatRunnable = new Runnable() {@Overridepublic void run() {if (isConnected) {webSocket.send("Heartbeat"); // 发送心跳消息mainHandler.postDelayed(this, heartbeatInterval);}}};mainHandler.post(heartbeatRunnable);}// 停止心跳private void stopHeartbeat() {mainHandler.removeCallbacks(heartbeatRunnable);}// 释放资源private void releaseResources() {executorService.shutdown();mainHandler.removeCallbacksAndMessages(null);}// 设置回调public void setWebSocketCallback(WebSocketCallback callback) {this.callback = callback;}// Builder模式public static class Builder {private String wsUrl;private int maxReconnectCount = 5;private long reconnectInterval = 5000;private long heartbeatInterval = 30000;private long heartbeatTimeout = 60000;public Builder(String wsUrl) {this.wsUrl = wsUrl;}public Builder setMaxReconnectCount(int maxReconnectCount) {this.maxReconnectCount = maxReconnectCount;return this;}public Builder setReconnectInterval(long reconnectInterval) {this.reconnectInterval = reconnectInterval;return this;}public Builder setHeartbeatInterval(long heartbeatInterval) {this.heartbeatInterval = heartbeatInterval;return this;}public Builder setHeartbeatTimeout(long heartbeatTimeout) {this.heartbeatTimeout = heartbeatTimeout;return this;}public WebSocketManager build() {return new WebSocketManager(this);}}// 回调接口public interface WebSocketCallback {void onMessage(String message);void onConnected();void onDisconnected();void onError(Throwable t);}
}
  1. 功能说明
    3.1 连接状态管理
    增加 isConnecting 状态,区分连接中和已连接状态。

3.2 消息重发机制
在发送失败时,将消息重新加入队列,等待重连后发送。

3.3 动态心跳机制
支持动态调整心跳间隔和超时时间。

3.4 资源释放
在断开连接时,释放线程池和Handler资源,避免内存泄漏。

3.5 日志分级
通过 Log.d 和 Log.e 区分调试日志和错误日志。

  1. 使用示例
    4.1 初始化并连接WebSocket
WebSocketManager webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url").setMaxReconnectCount(10).setReconnectInterval(3000).setHeartbeatInterval(60000).setHeartbeatTimeout(120000).build();webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() {@Overridepublic void onMessage(String message) {Log.d(TAG, "Received: " + message);}@Overridepublic void onConnected() {Log.d(TAG, "Connected");}@Overridepublic void onDisconnected() {Log.d(TAG, "Disconnected");}@Overridepublic void onError(Throwable t) {Log.e(TAG, "Error: " + t.getMessage());}
});webSocketManager.connect();

4.2 发送消息

webSocketManager.sendMessage("Hello, WebSocket!");

4.3 断开连接

webSocketManager.disconnect();

4.4 处理生命周期

@Override
protected void onDestroy() {super.onDestroy();webSocketManager.disconnect();
}

在Activity中使用WebSocket工具类非常简单。我们需要确保WebSocket的生命周期与Activity的生命周期绑定,避免内存泄漏和资源浪费。以下是完整的示例代码,展示如何在Activity中初始化、使用和释放WebSocket工具类。

在Activity中使用WebSocket工具类

  1. Activity代码示例
import android.os.Bundle;
import android.util.Log;
import androidx.annotation.Nullable;
import androidx.appcompat.app.AppCompatActivity;public class MainActivity extends AppCompatActivity {private static final String TAG = "MainActivity";private WebSocketManager webSocketManager;@Overrideprotected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);// 初始化WebSocketManagerwebSocketManager = new WebSocketManager.Builder("ws://your-websocket-url").setMaxReconnectCount(10).setReconnectInterval(3000).setHeartbeatInterval(60000).setHeartbeatTimeout(120000).build();// 设置WebSocket回调webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() {@Overridepublic void onMessage(String message) {Log.d(TAG, "Received: " + message);// 在这里处理接收到的消息}@Overridepublic void onConnected() {Log.d(TAG, "Connected");// 在这里处理连接成功事件}@Overridepublic void onDisconnected() {Log.d(TAG, "Disconnected");// 在这里处理连接断开事件}@Overridepublic void onError(Throwable t) {Log.e(TAG, "Error: " + t.getMessage());// 在这里处理错误事件}});// 连接WebSocketwebSocketManager.connect();// 发送消息webSocketManager.sendMessage("Hello, WebSocket!");}@Overrideprotected void onDestroy() {super.onDestroy();// 断开WebSocket连接并释放资源if (webSocketManager != null) {webSocketManager.disconnect();}}
}
  1. 代码说明
    2.1 初始化WebSocketManager
    在 onCreate 方法中,使用 Builder 模式初始化 WebSocketManager,并设置最大重连次数、重连间隔、心跳间隔和超时时间。
webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url").setMaxReconnectCount(10).setReconnectInterval(3000).setHeartbeatInterval(60000).setHeartbeatTimeout(120000).build();

2.2 设置回调
通过 setWebSocketCallback 方法设置回调接口,处理WebSocket的连接、消息、断开和错误事件。

webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() {@Overridepublic void onMessage(String message) {Log.d(TAG, "Received: " + message);// 在这里处理接收到的消息}@Overridepublic void onConnected() {Log.d(TAG, "Connected");// 在这里处理连接成功事件}@Overridepublic void onDisconnected() {Log.d(TAG, "Disconnected");// 在这里处理连接断开事件}@Overridepublic void onError(Throwable t) {Log.e(TAG, "Error: " + t.getMessage());// 在这里处理错误事件}
});

2.3 连接WebSocket
在 onCreate 方法中调用 connect() 方法,启动WebSocket连接。

webSocketManager.connect();

2.4 发送消息
通过 sendMessage 方法发送消息。

webSocketManager.sendMessage("Hello, WebSocket!");

2.5 释放资源
在 onDestroy 方法中调用 disconnect() 方法,断开WebSocket连接并释放资源。

@Override
protected void onDestroy() {super.onDestroy();if (webSocketManager != null) {webSocketManager.disconnect();}
}
  1. 注意事项
    3.1 生命周期绑定
    确保WebSocket的生命周期与Activity的生命周期绑定,避免内存泄漏。

在 onDestroy 中释放资源。

3.2 线程安全
WebSocket的回调方法(如 onMessage)运行在后台线程,如果需要更新UI,请使用 Handler 或 runOnUiThread。

3.3 网络权限
在 AndroidManifest.xml 中添加网络权限:

<!--    网络权限--><uses-permission android:name="android.permission.INTERNET"/>
<!--    wifi权限--><uses-permission android:name="android.permission.ACCESS_WIFI_STATE"/><uses-permission android:name="android.permission.CHANGE_WIFI_STATE"/><uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/><uses-permission android:name="android.permission.CHANGE_NETWORK_STATE"/>

3.4 SSL/TLS支持
如果WebSocket服务器使用 wss 协议(即基于SSL/TLS的安全连接),OkHttp 会自动处理SSL/TLS握手,无需额外配置。

  1. 扩展功能
    4.1 动态调整心跳间隔
    可以根据服务器响应动态调整心跳间隔。例如,在收到服务器的心跳响应后,延长心跳间隔。
@Override
public void onMessage(String message) {if ("HeartbeatResponse".equals(message)) {// 动态调整心跳间隔webSocketManager.setHeartbeatInterval(120000);}
}

4.2 消息重发机制
在发送失败时,将消息加入队列,等待重连后重新发送。

webSocketManager.sendMessage("Hello, WebSocket!");

4.3 日志分级
通过 Log.d 和 Log.e 区分调试日志和错误日志,方便调试和生产环境使用。

  1. 总结
    在Activity中使用优化后的WebSocket工具类非常简单。通过生命周期绑定、回调接口和资源释放机制,可以确保WebSocket的高效运行和资源管理。以下是完整的流程:

初始化:在 onCreate 中初始化 WebSocketManager。

设置回调:处理连接、消息、断开和错误事件。

连接WebSocket:调用 connect() 方法。

发送消息:通过 sendMessage 方法发送消息。

释放资源:在 onDestroy 中调用 disconnect() 方法。


http://www.ppmy.cn/server/174054.html

相关文章

解析 SQL,就用 sqlparse!

文章目录 解析 SQL&#xff0c;就用 sqlparse&#xff01;一、背景&#xff1a;为什么你需要 sqlparse&#xff1f;二、什么是 sqlparse&#xff1f;三、如何安装 sqlparse&#xff1f;四、简单易用的库函数1\. parse(sql)2\. format(sql, **options)3\. split(sql)4\. get_typ…

阿里云 linux centos7安装nacos

1、查看自己的java版本 java -version 2、下载nacos版本 https://github.com/alibaba/nacos/releases?page2&spm5176.28103460.0.0.14675d271E2cB5 3、上传到 目录下 4、解压该文件 tar -zxvf nacos-server-x.x.x.tar.gz5、修改配置文件 打开 application.propert…

如何在Spring Boot中读取JAR包内resources目录下文件

精心整理了最新的面试资料和简历模板&#xff0c;有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 以下是如何在Spring Boot中读取JAR包内resources目录下文件的教程&#xff0c;分为多种方法及详细说明&#xff1a; 方法1&#xff1a;使用 ClassPathResour…

【C语言】--- 动态内存管理详解

动态内存管理 1.为什么需要动态内存分配2.malloc 和 free2.1 malloc2.2 free 3. calloc和realloc3.1calloc3.2 realloc 4.常见的动态内存的错误4.1对空指针解引用的操作4.2 对开辟的空间越界访问4.3 对非动态开辟内存使用free函数4.4 使用free函数释放动态开辟空间的一部分4.5…

用低代码平台集成人工智能:无需专业开发也能实现智能化

引言&#xff1a;人工智能的普及与企业需求 随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;越来越多的企业开始意识到其在提升运营效率、优化客户体验和推动业务创新方面的巨大潜力。从智能客服到自动化决策支持&#xff0c;从数据分析到个性化推荐&#x…

基于NLP的客户意见分析:从数据到洞察

友友们好! 我的新专栏《Python进阶》正式启动啦!这是一个专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会找到: ● 深入解析:每一篇文章都将…

NodeJS学习笔记

NodeJS软件安装 node环境安装&#xff1a; https://nodejs.org 安装好后的node通常在C:\Program Files\nodejs验证安装是否成功 node -v npm -v 进入REPL模式命令行模式 nodeNodeJS在REPL模式和编辑器使用 windos在dos下常用命令 windos命令&#xff1a; 1、cmd dos系统2、…

Spring Boot笔记(上)

01 概要 Spring Boot 是 Java 领域最流行的 快速开发框架&#xff0c;专为简化 Spring 应用的初始搭建和开发而设计。 一、Spring Boot 解决了什么问题&#xff1f; 传统 Spring 痛点 • 繁琐的 XML 配置 • 需要手动管理依赖版本 • 部署依赖外部 Web 服务器&#xff08;如 …