Skip to main content

最佳 websocket 封装

看到公众号一篇文章封装 websocket,完美支持断网重连、自动心跳,确实比之前封装的简单清晰,故以篇文章封装为主。

回顾 WebSocket API

const ws = new WebSocket('ws://localhost:8080/test')
ws.onopen = function () {
console.log('WebSocket 连接已经建立。')
ws.send('Hello, server!')
}
ws.onmessage = function (event) {
console.log('收到服务器消息:', event.data)
}
ws.onerror = function (event) {
console.error('WebSocket 连接出现错误:', event)
}
ws.onclose = function () {
console.log('WebSocket 连接已经关闭。')
}

封装成功演示

一个好的封装就要它的使用方法和官方 Api 完全一致,零学习成本,上手即用!

import { WebSocketClient } from '@/utils/dataDispatcher/WebSocketClient'

// 创建实例
const ws = new WebSocketClient('ws://localhost:3200')
// 自定义方法-连接
ws.connect()
// 原生方法
ws.onopen(() => {})
// 原生方法
ws.onclose(() => {})
// 原生方法
ws.onerror(() => {})
// 原生方法
ws.onmessage(() => {
// 原生方法
ws.send('自定义发送的数据')
})
// 自定义方法-关闭连接
ws.close()

开始封装

基本框架搭建

首先实现一个基本的 WebSocket 客户端,具有以下功能:

  • 初始化连接并处理各种 WebSocket 事件(打开、消息、关闭、错误)。
  • 发送消息到服务器。
  • 关闭连接。
WebSocketClient.ts
export class WebSocketClient {
// #socket链接
private url = ''
// #socket实例
private socket: WebSocket | null = null

constructor(url: string) {
super()
this.url = url
}

// >消息发送
public send(message: string): void {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(message)
} else {
console.error('[WebSocket] 未连接')
}
}

// !初始化连接,并为其设置事件处理函数
public connect(): void {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
return
}
this.socket = new WebSocket(this.url)

// !websocket连接成功
this.socket.onopen = event => {
console.log(`连接成功,等待服务端数据推送[onopen]...`)
}

this.socket.onmessage = event => {}

this.socket.onclose = event => {
console.log(`连接断开[onclose]...`)
}

this.socket.onerror = event => {
console.log(`连接异常[onerror]...`)
}
}

// >关闭连接
// 手动关闭 WebSocket 连接并将 socket 设置为 null。
public close(): void {
if (this.socket) {
this.socket.close()
this.socket = null
}
}
}

断网重连封装

实现自动断网重连的机制,其核心逻辑在于以下几个方面:

  • 「记录重连次数」:通过 reconnectAttempts 属性记录当前已经尝试重连的次数。
  • 「设置最大重连次数」:通过 maxReconnectAttempts 属性设置允许的最大重连次数。
  • 「重连逻辑」:在 onclose 事件中调用重连处理函数 handleReconnect。
  • 「重连间隔」:通过 reconnectInterval 属性设置每次重连的间隔时间,可以在每次重连时增加间隔以实现指数退避。
  • 「停止重连」:通过 stopWs 属性记录是否彻底终止 ws,当 stopWs 为 true 时,不再进行重连操作。
WebSocketClient.ts
export class WebSocketClient {
private url = ''
private socket: WebSocket | null = null
// #重连次数
private reconnectAttempts = 0
// #最大重连数
private maxReconnectAttempts = 5
// #重连间隔
private reconnectInterval = 10000
// #彻底终止ws
private stopWs = false

constructor(url: string) {
super()
this.url = url
}

public send(message: string): void {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(message)
} else {
console.error('[WebSocket] 未连接')
}
}

public connect(): void {
if (this.reconnectAttempts === 0) {
console.log(`初始化连接中...`)
}
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
return
}
this.socket = new WebSocket(this.url)

this.socket.onopen = (event: Event) => {
this.stopWs = false
// 重置重连尝试成功连接
this.reconnectAttempts = 0
console.log(`连接成功,等待服务端数据推送[onopen]...`)
}

this.socket.onmessage = event => {}

// 在连接关闭(onclose)和出现错误(onerror)时调用重连逻辑。
this.socket.onclose = (event: CloseEvent) => {
if (this.reconnectAttempts === 0) {
console.log(`连接断开[onclose]...`)
}
if (!this.stopWs) {
this.handleReconnect()
}
}

// 关于 WebSocket 调用 onerror 的逻辑这里重点说明下!!!
// 一般情况下,触发了 onerror 事件,可以预期很快就会触发 close 事件,比如网络异常导致连接异常,然后ws关闭了连接。
// 但是也有一些情况是仅触发 onerror 方法,比如连接成功后 server 端 close 掉连接,然后 client 端接到RST包这种情况。
// 此处只考虑一般情况下网络异常的情况才调用重连逻辑,因此不需要重复写 handleReconnect 方法。
this.socket.onerror = event => {
if (this.reconnectAttempts === 0) {
console.log(`连接异常[onerror]...`)
}
}
}

// > 断网重连逻辑
// 该方法会递增 reconnectAttempts,检查是否达到最大重连次数,
// 如果没有达到,则在指定的重连间隔后再次调用 connect 方法尝试重连。
private handleReconnect(): void {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
console.log(`尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)

// 重连间隔可以增加,例如指数退避:
// this.reconnectInterval * this.reconnectAttempts
setTimeout(() => {
this.connect()
}, this.reconnectInterval)
} else {
console.log(`最大重连失败,终止重连: ${this.url}`)
}
}

public close(): void {
if (this.socket) {
this.stopWs = true
this.socket.close()
this.socket = null
}
}
}

自动心跳封装

自动心跳(Automatic Heartbeat)是一种在网络通信中常用的机制,用于维持连接的活跃状态,检测连接是否仍然有效,并及时发现和处理连接断开或故障的情况。

心跳机制通过定期发送“心跳”消息(通常是一个简单的 ping 或者 pong 消息)来确认连接双方的状态。

实现自动心跳的基本思路:

  • 「发送心跳消息」:在 WebSocket 连接建立后,启动一个定时器,定期发送心跳消息到服务器。
  • 「接收心跳响应」:服务器收到心跳消息后返回响应,客户端接收到响应后重置定时器。
  • 「检测心跳超时」:如果在指定时间内没有收到心跳响应,则认为连接断开,进行重连。
  • 「停止心跳检测」:在连接断开或发生错误时停止心跳检测。
WebSocketClient.ts
export class WebSocketClient {
private url = ''
private socket: WebSocket | null = null
private reconnectAttempts = 0
private maxReconnectAttempts = 5
private reconnectInterval = 10000
private stopWs = false
// #发送心跳数据间隔
private heartbeatInterval = 1000 * 30
// #计时器id
private heartbeatTimer?: NodeJS.Timeout

constructor(url: string) {
super()
this.url = url
}

public send(message: string): void {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(message)
} else {
console.error('[WebSocket] 未连接')
}
}

public connect(): void {
if (this.reconnectAttempts === 0) {
console.log('WebSocket', `初始化连接中...`)
}
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
return
}
this.socket = new WebSocket(this.url)

this.socket.onopen = event => {
this.stopWs = false
this.reconnectAttempts = 0
// 在连接成功时启动心跳检测
this.startHeartbeat()
console.log(`连接成功,等待服务端数据推送[onopen]...`)
}

this.socket.onmessage = event => {
// 客户端收到响应后重新启动心跳检测
this.startHeartbeat()
}

this.socket.onclose = event => {
if (this.reconnectAttempts === 0) {
console.log(`连接断开[onclose]...`)
}
if (!this.stopWs) {
// 如果重连成功会调用 startHeartbeat,如果重连都失败会调用 closeHeartbeat。
this.handleReconnect()
}
}

// 此处考虑只触发 onerror 不触发 onclose 的情况,因此要单独调用 closeHeartbeat。
this.socket.onerror = event => {
if (this.reconnectAttempts === 0) {
console.log(`连接异常[onerror]...`)
}
this.closeHeartbeat()
}
}

// 重连逻辑中会调用 closeHeartbeat 停止心跳检测
private handleReconnect(): void {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
console.log('WebSocket', `尝试重连...`)
setTimeout(() => {
this.connect()
}, this.reconnectInterval)
} else {
this.closeHeartbeat()
console.log(`最大重连失败,终止重连: ${this.url}`)
}
}

public close(): void {
if (this.socket) {
this.stopWs = true
this.socket.close()
this.socket = null
}
this.closeHeartbeat()
}

// >开始心跳检测 -> 定时发送心跳消息
private startHeartbeat(): void {
if (this.stopWs) return
// 先停止当前的心跳检测
if (this.heartbeatTimer) {
this.closeHeartbeat()
}
this.heartbeatTimer = setInterval(() => {
if (this.socket) {
this.socket.send(JSON.stringify({ type: 'heartBeat', data: {} }))
console.log('WebSocket', '发送心跳数据...')
} else {
console.error('[WebSocket] 未连接')
}
}, this.heartbeatInterval)
}

// >关闭心跳
private closeHeartbeat(): void {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = undefined
}
}

如何触发原生函数

现在已经基本完成了功能的封装,那么如何在外部调用原生的 websocket Api 呢?非常简单,借助几个自定义的生命周期函数即可!

WebSocketClient.ts
import { EventDispatcher } from './utils'

// 继承 EventDispatcher 获得 dispatchEvent 和 addEventListener 方法
export class WebSocketClient extends EventDispatcher {
//...

// >生命周期钩子
onopen(callBack: Function) {
this.addEventListener('open', callBack)
}
onmessage(callBack: Function) {
this.addEventListener('message', callBack)
}
onclose(callBack: Function) {
this.addEventListener('close', callBack)
}
onerror(callBack: Function) {
this.addEventListener('error', callBack)
}

public connect(): void {
// ...
this.socket.onopen = event => {
// ...
this.dispatchEvent('open', event)
}

this.socket.onmessage = event => {
// ...
this.dispatchEvent('message', event)
}

this.socket.onclose = event => {
// ...
this.dispatchEvent('close', event)
}

this.socket.onerror = event => {
// ...
this.dispatchEvent('error', event)
}
}

public close(): void {
if (this.socket) {
// ...
this.removeEventListener('open')
this.removeEventListener('message')
this.removeEventListener('close')
this.removeEventListener('error')
}
// ...
}

// ...
}

当原生的 onclose、onopen 方法触发时,会通过 dispatchEvent 触发相应的调度,进而触发通过 addEventListener 绑定的生命周期函数!

utils.ts
export class EventDispatcher {
private listeners: { [type: string]: Function[] } = {}

protected addEventListener(type: string, listener: Function) {
if (!this.listeners[type]) {
this.listeners[type] = []
}
if (this.listeners[type].indexOf(listener) === -1) {
this.listeners[type].push(listener)
}
}

protected removeEventListener(type: string) {
this.listeners[type] = []
}

protected dispatchEvent(type: string, data: any) {
const listenerArray = this.listeners[type] || []
if (listenerArray.length === 0) return
listenerArray.forEach(listener => {
listener.call(this, data)
})
}
}

优化 console 输出

定义 Log 类,将 WebSocketClient 中的 console.log 替换成 this.log,更优雅的显示输出信息。

utils.ts
export class Log {
private static console = true
log(title: string, text: string) {
if (!Log.console) return
if (import.meta.env.MODE === 'production') return
const color = '#ff4d4f'
console.log(
`%c ${title} %c ${text} %c`,
`background:${color};border:1px solid ${color}; padding: 1px; border-radius: 2px 0 0 2px; color: #fff;`,
`border:1px solid ${color}; padding: 1px; border-radius: 0 2px 2px 0; color: ${color};`,
'background:transparent'
)
}
logErr(text: string) {
console.error(text)
}
closeConsole() {
Log.console = false
}
}

// 继承 Log
export class EventDispatcher extends Log {
// ...
}

完整代码 TS 版

WebSocketClient.ts
import { EventDispatcher } from './utils'

export class WebSocketClient extends EventDispatcher {
private url = ''
private socket: WebSocket | null = null
private reconnectAttempts = 0
private maxReconnectAttempts = 5
private reconnectInterval = 10000
private heartbeatInterval = 1000 * 30
private heartbeatTimer?: NodeJS.Timeout
private stopWs = false

constructor(url: string) {
super()
this.url = url
}

onopen(callBack: Function) {
this.addEventListener('open', callBack)
}
onmessage(callBack: Function) {
this.addEventListener('message', callBack)
}
onclose(callBack: Function) {
this.addEventListener('close', callBack)
}
onerror(callBack: Function) {
this.addEventListener('error', callBack)
}

public send(message: string): void {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(message)
} else {
this.logErr('[WebSocket] 未连接')
}
}

public connect(): void {
if (this.reconnectAttempts === 0) {
this.log('WebSocket', `初始化连接中... ${this.url}`)
}
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
return
}
this.socket = new WebSocket(this.url)

this.socket.onopen = event => {
this.stopWs = false
this.reconnectAttempts = 0
this.startHeartbeat()
this.log('WebSocket', `连接成功,等待服务端数据推送[onopen]... ${this.url}`)
this.dispatchEvent('open', event)
}

this.socket.onmessage = event => {
this.dispatchEvent('message', event)
this.startHeartbeat()
}

this.socket.onclose = event => {
if (this.reconnectAttempts === 0) {
this.log('WebSocket', `连接断开[onclose]... ${this.url}`)
}
if (!this.stopWs) {
this.handleReconnect()
}
this.dispatchEvent('close', event)
}

this.socket.onerror = event => {
if (this.reconnectAttempts === 0) {
this.log('WebSocket', `连接异常[onerror]... ${this.url}`)
}
this.closeHeartbeat()
this.dispatchEvent('error', event)
}
}

private handleReconnect(): void {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
this.log('WebSocket', `尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts}) ${this.url}`)
setTimeout(() => {
this.connect()
}, this.reconnectInterval)
} else {
this.closeHeartbeat()
this.log('WebSocket', `最大重连失败,终止重连: ${this.url}`)
}
}

public close(): void {
if (this.socket) {
this.stopWs = true
this.socket.close()
this.socket = null
this.removeEventListener('open')
this.removeEventListener('message')
this.removeEventListener('close')
this.removeEventListener('error')
}
this.closeHeartbeat()
}

private startHeartbeat(): void {
if (this.stopWs) return
if (this.heartbeatTimer) {
this.closeHeartbeat()
}
this.heartbeatTimer = setInterval(() => {
if (this.socket) {
this.socket.send(JSON.stringify({ type: 'heartBeat', data: {} }))
this.log('WebSocket', '送心跳数据...')
} else {
this.logErr('[WebSocket] 未连接')
}
}, this.heartbeatInterval)
}

private closeHeartbeat(): void {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = undefined
}
}
utils.ts
export class Log {
private static console = true
log(title: string, text: string) {
if (!Log.console) return
if (import.meta.env.MODE === 'production') return
const color = '#ff4d4f'
console.log(
`%c ${title} %c ${text} %c`,
`background:${color};border:1px solid ${color}; padding: 1px; border-radius: 2px 0 0 2px; color: #fff;`,
`border:1px solid ${color}; padding: 1px; border-radius: 0 2px 2px 0; color: ${color};`,
'background:transparent'
)
}
closeConsole() {
Log.console = false
}
}

export class EventDispatcher extends Log {
private listeners: { [type: string]: Function[] } = {}

protected addEventListener(type: string, listener: Function) {
if (!this.listeners[type]) {
this.listeners[type] = []
}
if (this.listeners[type].indexOf(listener) === -1) {
this.listeners[type].push(listener)
}
}

protected removeEventListener(type: string) {
this.listeners[type] = []
}

protected dispatchEvent(type: string, data: any) {
const listenerArray = this.listeners[type] || []
if (listenerArray.length === 0) return
listenerArray.forEach(listener => {
listener.call(this, data)
})
}
}

完整代码 JS 版

WebSocketClient.js
export class WebSocketClient extends EventDispatcher {
url = ''
socket = null
reconnectAttempts = 0
maxReconnectAttempts = 5
reconnectInterval = 10000
heartbeatInterval = 1000 * 30
heartbeatTimer = undefined
stopWs = false

constructor(url) {
super()
this.url = url
}

onopen(callBack) {
this.addEventListener('open', callBack)
}
onmessage(callBack) {
this.addEventListener('message', callBack)
}
onclose(callBack) {
this.addEventListener('close', callBack)
}
onerror(callBack) {
this.addEventListener('error', callBack)
}

send(message) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(message)
} else {
this.logErr('[WebSocket] 未连接')
}
}

connect() {
if (this.reconnectAttempts === 0) {
this.log('WebSocket', `初始化连接中... ${this.url}`)
}
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
return
}
this.socket = new WebSocket(this.url)

this.socket.onopen = event => {
this.stopWs = false
this.reconnectAttempts = 0
this.startHeartbeat()
this.log('WebSocket', `连接成功,等待服务端数据推送[onopen]... ${this.url}`)
this.dispatchEvent('open', event)
}

this.socket.onmessage = event => {
this.dispatchEvent('message', event)
this.startHeartbeat()
}

this.socket.onclose = event => {
if (this.reconnectAttempts === 0) {
this.log('WebSocket', `连接断开[onclose]... ${this.url}`)
}
if (!this.stopWs) {
this.handleReconnect()
}
this.dispatchEvent('close', event)
}

this.socket.onerror = event => {
if (this.reconnectAttempts === 0) {
this.log('WebSocket', `连接异常[onerror]... ${this.url}`)
}
this.closeHeartbeat()
this.dispatchEvent('error', event)
}
}

handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
this.log('WebSocket', `尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts}) ${this.url}`)
setTimeout(() => {
this.connect()
}, this.reconnectInterval)
} else {
this.closeHeartbeat()
this.log('WebSocket', `最大重连失败,终止重连: ${this.url}`)
}
}

close() {
if (this.socket) {
this.stopWs = true
this.socket.close()
this.socket = null
this.removeEventListener('open')
this.removeEventListener('message')
this.removeEventListener('close')
this.removeEventListener('error')
}
this.closeHeartbeat()
}

startHeartbeat() {
if (this.stopWs) return
if (this.heartbeatTimer) {
this.closeHeartbeat()
}
this.heartbeatTimer = setInterval(() => {
if (this.socket) {
this.socket.send(JSON.stringify({ type: 'heartBeat', data: {} }))
this.log('WebSocket', '送心跳数据...')
} else {
this.logErr('[WebSocket] 未连接')
}
}, this.heartbeatInterval)
}

closeHeartbeat() {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = undefined
}
}
utils.js
export class Log {
static console = true
log(title, text) {
if (!Log.console) return
if (import.meta.env.MODE === 'production') return
const color = '#ff4d4f'
console.log(
`%c ${title} %c ${text} %c`,
`background:${color};border:1px solid ${color}; padding: 1px; border-radius: 2px 0 0 2px; color: #fff;`,
`border:1px solid ${color}; padding: 1px; border-radius: 0 2px 2px 0; color: ${color};`,
'background:transparent'
)
}
closeConsole() {
Log.console = false
}
}

export class EventDispatcher extends Log {
listeners = {}

addEventListener(type, listener) {
if (!this.listeners[type]) {
this.listeners[type] = []
}
if (this.listeners[type].indexOf(listener) === -1) {
this.listeners[type].push(listener)
}
}

removeEventListener(type) {
this.listeners[type] = []
}

dispatchEvent(type, data) {
const listenerArray = this.listeners[type] || []
if (listenerArray.length === 0) return
listenerArray.forEach(listener => {
listener.call(this, data)
})
}
}

效果演示

后端服务创建

先使用 node 创建一个后端服务,安装 ws 库:

npm install ws

创建 node index.js 文件,引入 WebSocket 服务器:

index.js
const WebSocket = require('ws')

const wss = new WebSocket.Server({ port: 3200 })

console.log('服务运行在http://localhost:3200/')

wss.on('connection', ws => {
console.log('[服务器]:客官您来了~里边请')
ws.send(`[websocket云端]您已经连接云端!数据推送中!`)
let index = 1
let interval
// 两分钟后开始推送数据
setTimeout(() => {
interval = setInterval(() => {
ws.send(`[websocket]数据推送第${index}`)
index++
}, 1000 * 10)
}, 120000)

ws.on('close', () => {
interval && clearInterval(interval)
console.log('[服务器]:客官下次再来呢~')
})
})

启动服务:

node index.js

前端 WebSocket 测试

const ws = new WebSocketClient('ws://localhost:3200')

// 连接
ws.connect()
ws.onclose(() => {})
ws.onerror(() => {})
ws.onmessage(() => {
ws.send('自定义发送的数据')
})
ws.onopen(() => {})

启动项目,控制台已经有了提示:

打开 NetWork 面板,可以看到心跳检测和数据交互:

当我们断开服务端的时候,断网重连被自动触发: