153 lines
5.1 KiB
JavaScript
153 lines
5.1 KiB
JavaScript
const ws = require('ws')
|
||
const { randomBytes } = require('crypto')
|
||
const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms))
|
||
|
||
// 建议把 onopen, onerror, onclose, onmessage 保留给系统使用,例如reconnecting。用户使用 on(event, listener)
|
||
|
||
const DAD = (module.exports = class RpcSocket extends (
|
||
ws
|
||
) {
|
||
constructor(address, protocols, options) {
|
||
let ws = super(address, protocols, options)
|
||
DAD.upgrade(ws)
|
||
}
|
||
|
||
static upgrade(ws) {
|
||
ws.__proto__ = DAD.prototype
|
||
ws.sid = randomBytes(16).toString('hex')
|
||
ws.onmessage = async ({ data }) => {
|
||
// console.log('onmessage 被调用')
|
||
try {
|
||
let obj = JSON.parse(data)
|
||
// console.log('message data parsed to obj=',obj)
|
||
switch (obj.type) {
|
||
case 'NOTIFY': // 接收到同步的远程调用
|
||
// console.log(`NOTIFY message: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`)
|
||
if (ws.hasOwnProperty(obj.rpc))
|
||
// 作为rpc调用
|
||
ws[obj.rpc](obj.param)
|
||
else if (ws.eventNames().indexOf(obj.rpc) >= 0)
|
||
// 作为event调用
|
||
ws.emit(obj.rpc, obj.param)
|
||
else console.log('onmessage unknown notification')
|
||
case 'REQUEST': // 接收到异步的远程调用
|
||
// console.log(`被动方 收到 REQUEST: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`)
|
||
let result
|
||
if (ws.hasOwnProperty(obj.rpc)) {
|
||
result = await ws[obj.rpc](obj.param)
|
||
} else {
|
||
result = { _state: 'error', error: 'unknown rpc' }
|
||
}
|
||
if (obj.randomEvent) {
|
||
ws.send(
|
||
JSON.stringify({
|
||
type: 'REPLY',
|
||
randomEvent: obj.randomEvent,
|
||
result,
|
||
})
|
||
) // 返回结果给远程
|
||
}
|
||
case 'REPLY': // 接收到远程返回的结果
|
||
// console.log('主动方 收到 REPLY: result=', obj.result)
|
||
ws.emit(obj.randomEvent, obj.result)
|
||
default:
|
||
}
|
||
} catch (exception) {
|
||
console.error('Invalid socket message received!')
|
||
return
|
||
}
|
||
}
|
||
|
||
return ws
|
||
}
|
||
|
||
static connectAsync(url) {
|
||
return new Promise(function (resolve, reject) {
|
||
let socket = new DAD(url)
|
||
socket.onopen = () => resolve(socket)
|
||
socket.onerror = (err) => reject(err)
|
||
})
|
||
}
|
||
|
||
reconnectAsync(url) {
|
||
if (this.readyState < 2) {
|
||
return Promise.resolve(this)
|
||
}
|
||
return new Promise((resolve, reject) => {
|
||
let socket = new DAD(url || this.url)
|
||
socket.once('open', () => resolve(socket))
|
||
socket.once('error', (err) => reject(err))
|
||
})
|
||
}
|
||
|
||
notify({ rpc, param, options, cb } = {}) {
|
||
// 发起同步的远程调用
|
||
this.send(JSON.stringify({ type: 'NOTIFY', rpc, param, options, cb }))
|
||
}
|
||
|
||
requestAsync({ rpc, param, ack, timeout = 5000 } = {}) {
|
||
// 发起异步的远程调用(rpc)
|
||
let randomEvent = randomBytes(16).toString('hex')
|
||
// console.log('randomEvent is randomized: ', randomEvent)
|
||
if (typeof ack === 'function') {
|
||
// 有回调
|
||
this.send(JSON.stringify({ type: 'REQUEST', rpc, param, randomEvent }), () => {
|
||
// console.log('in callback, randomEvent=', randomEvent)
|
||
this.once(randomEvent, ack)
|
||
setTimeout(() => {
|
||
if (this.eventNames().indexOf(randomEvent) >= 0) {
|
||
this.removeAllListeners(randomEvent)
|
||
ack({ _state: 'timeout', error: 'timeout happens' })
|
||
}
|
||
}, timeout)
|
||
})
|
||
} else {
|
||
// 没有回调
|
||
return new Promise((resolve, reject) => {
|
||
this.send(JSON.stringify({ type: 'REQUEST', rpc, param, randomEvent }), () => {
|
||
this.once(randomEvent, resolve)
|
||
setTimeout(() => {
|
||
if (this.eventNames().indexOf(randomEvent) >= 0) {
|
||
this.removeAllListeners(randomEvent)
|
||
resolve({ _state: 'timeout', error: 'timeout happens' })
|
||
}
|
||
}, timeout)
|
||
})
|
||
})
|
||
}
|
||
}
|
||
})
|
||
|
||
// (async ()=>{
|
||
|
||
// new RpcSocket('http://localhost:9007')
|
||
// await sleep(20000)
|
||
// new RpcSocket.Server({port:9007})
|
||
// await sleep(20000)
|
||
|
||
// })()
|
||
|
||
/*
|
||
//RpcSocket=require('./Ling/RpcSocket.js');
|
||
server = new RpcSocket.Server({port:6000});
|
||
var serverSocket; server.on('connection', (socket)=>{console.log('socket coming');
|
||
serverSocket = RpcSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason))
|
||
.on('error', (errer)=>console.log('socket has error: ', error))
|
||
})
|
||
|
||
RpcSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{
|
||
clientSocket.request('testfunc', {height:99}, (result)=>console.log('using request: ', result), 3000)
|
||
return clientSocket
|
||
}).then((clientSocket)=>{
|
||
console.log('sleeping...')
|
||
sleep(2000)
|
||
return clientSocket
|
||
}).then(async (clientSocket)=>{
|
||
let result = await clientSocket.requestAsync('testfunc', {height:99}, 3000).catch(console.log)
|
||
console.log('using requestAsync:', result)
|
||
process.exit()
|
||
}).catch((e)=>{
|
||
console.log('connect error happens:', e)
|
||
})
|
||
*/
|