const ws = require('ws') const webtoken = require('wo-base-webtoken') const crypto = require('crypto') const my = { wsServer: undefined, // socketPool: {}, listeners: {}, heartbeat: false, heartbeatInterval: 30000, } module.exports = { initSocket ({ webServer, heartbeat, heartbeatInterval }) { my.wsServer = new ws.Server({ server: webServer }) //console.info({ _at: new Date().toJSON(), _from: 'Socket:initSocket', _type: 'CLOG', about: 'Base Socket Server is initialized.' }, '\n,') my.wsServer.on('connection', (socket, req) => { console.info( { _at: new Date().toJSON(), _from: 'basesocket:onConnection', _type: 'CLOG', about: `A socket is connecting from ${req.connection.remoteAddress}:${req.connection.remotePort}.`, }, '\n,' ) // socket.isAlive = true // socket.on('ping', () => { socket.isAlive = true }) // Most WebSocket server implementations, including the ws library, automatically respond to ping frames with pong frames. However, the server can listen for ping events using socket.on('ping', ...) if it wants to perform additional actions. socket.on('message', (data) => { let dataObj try { dataObj = JSON.parse(data) if (dataObj?.skevent === 'PING') { // socket.isAlive = true return } //console.log({ _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CLOG', usid: socket.usid, skevent: dataObj?.skevent }, '\n,') } catch (exception) { console.error({ _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CERROR', about: 'Unable to parse socket message', data }, '\n,') return } console.log('WebSocket-onMessage: dataObj', dataObj) if (['SOCKET_OWNER', 'SOCKET_OWNER_RECONNECT'].includes(dataObj.skevent)) { // 前端断线重连时,并不会自动再次提供 _passtoken。在前端的initSocket时,应当把_passtoken送过来。 const _passtokenSource = webtoken.verifyToken(dataObj._passtoken) console.log('WebSocket-onMessge: _passtokenSource', _passtokenSource) if (typeof _passtokenSource?.usid === 'string') { socket.appkey = _passtokenSource.appkey socket.usid = _passtokenSource.usid socket.commid = _passtokenSource.commid socket.skid = _passtokenSource.clid || socket.skid || 'skid' + crypto.randomBytes(16).toString('hex') // 注意,skid 这个名字 仅限在本文件内使用,在外部都使用 clid (client id) // my.socketPool[socket.skid] = socket console.log( { _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CLOG', skevent: dataObj.skevent, usid: socket.usid, skid: socket.skid, // socketPool: Object.keys(my.socketPool), clientsSize: my.wsServer.clients.size, }, '\n,' ) } } else { const listeners = my.listeners[dataObj.skevent] || [] for (const listener of listeners) { listener(dataObj) } } }) socket.on('close', () => { console.log({ _at: new Date().toJSON(), _from: 'basesocket:onClose', _type: 'CLOG', usid: socket?.usid, commid: socket?.commid, skid: socket?.skid }, '\n,') // don't know why, but this output happens too often without usid. //delete my.socketPool[socket?.usid] // 20240917 恍然大悟,同一个用户可以多次登录,多个socket会互相覆盖。如果仅仅根据 usid 来删除,其他尚在连接的socket也会被删了。 // delete my.socketPool[socket?.skid] }) }) my.wsServer.on('close', () => { }) // 一个全局的 heartbeat,不必给每个 socket 一个 heartbeat if (heartbeat) { console.log('WebSocket_heartbeat: starting...') setInterval(() => { my.wsServer.clients.forEach((socket) => { console.log('WebSocket_heartbeat: ', { usid: socket.usid, commid: socket.commid, skid: socket.skid, appkey: socket.appkey, readyState: socket.readyState }) if (socket.readyState !== ws.OPEN) { //socket.isAlive = false } else { //socket.ping() } }) console.log('WebSocket_heartbeat: clientsSize =', my.wsServer.clients.size) }, heartbeatInterval || my.heartbeatInterval) } return this }, removeUserSocket ({ clid } = {}) { my.wsServer.clients.forEach((socket) => { if (clid && socket.skid === clid) { delete socket.usid delete socket.commid // 要不要清除 socket.?clid 似乎不清除也没问题 } }) }, addListener (skevent, listener) { if (Array.isArray(my.listeners[skevent]) && typeof listener === 'function') { my.listeners[skevent].push(listener) } else { my.listeners[skevent] = [listener] } return this }, sendToAll (dataObj) { console.log('sendToAll: dataObj =', dataObj) my.wsServer.clients.forEach((socket) => { try { console.log('sendToOne: socket', { usid: socket.usid, skid: socket.skid }) if (socket.readyState === ws.OPEN) { socket.send(typeof dataObj !== 'string' ? JSON.stringify(dataObj) : dataObj) } else { console.warn({ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CWARN', msg: 'sendToOne: socket not open', dataObj }, '\n,') } } catch (expt) { console.error( { _at: new Date().toJSON(), _from: 'Socket:sendToAll', _type: 'CERROR', msg: 'sendToAll: Failed sending to unconnected socket', dataObj, usid: socket.usid, skid: socket.skid, }, '\n,' ) // delete my.socketPool[socket.skid] } }) }, sendToOne (dataObj = {}) { console.log('sendToOne: dataObj =', dataObj) my.wsServer.clients.forEach((socket) => { console.log('sendToOne: socket', { usid: socket.usid, skid: socket.skid, appkey: socket.appkey }) if ((dataObj.appkey && dataObj.appkey === socket.appkey) || !dataObj.appkey) { if ((dataObj.clid && socket.skid === dataObj.clid) || (!dataObj.clid && dataObj.usid && socket.usid === dataObj.usid)) { try { if (socket.readyState === ws.OPEN) { socket.send(typeof dataObj !== 'string' ? JSON.stringify(dataObj) : dataObj) } else { console.warn({ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CWARN', msg: 'sendToOne: socket not open', dataObj }, '\n,') } } catch (expt) { console.error({ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CERROR', msg: 'sendToOne: Failed sending to socket', dataObj }, '\n,') // delete my.socketPool[socket.skid] } } } }) }, }