wo-core-rpcsocket/index.js

138 lines
5.1 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const ws = require('ws')
const { randomBytes } = require('crypto')
const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms))
// 建议把 onopen, onerror, onclose, onmessage 保留给系统使用例如reconnecting。用户使用 on(event, listener)
const DAD = (module.exports = class RpcSocket extends ws {
constructor(address, protocols, options) {
let ws = super(address, protocols, options)
DAD.upgrade(ws)
}
static upgrade(ws) {
ws.__proto__ = DAD.prototype
ws.sid = randomBytes(16).toString('hex')
ws.onmessage = async ({ data }) => {
// console.log('onmessage 被调用')
try {
let { rpcType, rpcHow, rpcWhat, randomEvent, rpcResult } = JSON.parse(data)
// console.log('message data parsed to obj=',obj)
switch (rpcType) {
case 'SEND_NOTIFY': // 接收到同步的远程调用
// console.log(`SEND_REQUEST_SYNC message: rpcHow=${obj.rpcHow}, rpcWhat=${JSON.stringify(obj.rpcWhat)}`)
if (ws.hasOwnProperty(rpcHow)) ws[rpcHow](rpcWhat)
else if (ws.eventNames().indexOf(rpcHow) >= 0) ws.emit(rpcHow, rpcWhat)
else console.log('onmessage unknown notification')
case 'SEND_REQUEST': // 接收到异步的远程调用
// console.log(`被动方 收到 SEND_REQUEST_ASYNC: rpcHow=${obj.rpcHow}, rpcWhat=${JSON.stringify(obj.rpcWhat)}`)
let rpcResult
if (ws.hasOwnProperty(rpcHow)) rpcResult = await ws[rpcHow](rpcWhat)
else rpcResult = { _state: 'error', error: 'unknown rpcHow' }
if (randomEvent) {
ws.send(JSON.stringify({ rpcType: 'SEND_RESULT', randomEvent, rpcResult })) // 返回结果给远程
}
case 'SEND_RESULT': // 接收到远程返回的结果
// console.log('主动方 收到 SEND_RESULT: rpcResult=', obj.rpcResult)
ws.emit(randomEvent, rpcResult)
default:
}
} catch (exception) {
console.error('Invalid socket message received!')
return
}
}
return ws
}
static connectAsync(url) {
return new Promise(function (resolve, reject) {
let socket = new DAD(url)
socket.onopen = () => resolve(socket)
socket.onerror = (err) => reject(err)
})
}
reconnectAsync(url) {
if (this.readyState < ws.CLOSING) {
return Promise.resolve(this)
}
return new Promise((resolve, reject) => {
let socket = new DAD(url || this.url)
socket.once('open', () => resolve(socket))
socket.once('error', (err) => reject(err))
})
}
sendNotify({ rpcHow, rpcWhat, options, cb } = {}) {
// 发起同步的远程调用
this.send(JSON.stringify({ rpcType: 'SEND_NOTIFY', rpcHow, rpcWhat }), options, cb)
}
sendRequest({ rpcHow, rpcWhat, ack, timeout = 5000 } = {}) {
// 发起异步的远程调用
let randomEvent = randomBytes(16).toString('hex')
// console.log('randomEvent is randomized: ', randomEvent)
if (typeof ack === 'function') {
// 有回调
this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => {
// console.log('in callback, randomEvent=', randomEvent)
this.once(randomEvent, ack)
setTimeout(() => {
if (this.eventNames().indexOf(randomEvent) >= 0) {
this.removeAllListeners(randomEvent)
ack({ _state: 'timeout', error: `RpcSocket sendRequest ${rpcHow} timeout` })
}
}, timeout)
})
} else {
// 没有回调
return new Promise((resolve, reject) => {
this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => {
this.once(randomEvent, resolve)
setTimeout(() => {
if (this.eventNames().indexOf(randomEvent) >= 0) {
this.removeAllListeners(randomEvent)
resolve({ _state: 'timeout', error: `RpcSocket sendRequest ${rpcHow} timeout` })
}
}, timeout)
})
})
}
}
})
// (async ()=>{
// new RpcSocket('http://localhost:9007')
// await sleep(20000)
// new RpcSocket.Server({port:9007})
// await sleep(20000)
// })()
/*
//RpcSocket=require('./Ling/RpcSocket.js');
server = new RpcSocket.Server({port:6000});
var serverSocket; server.on('connection', (socket)=>{console.log('socket coming');
serverSocket = RpcSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason))
.on('error', (errer)=>console.log('socket has error: ', error))
})
RpcSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{
clientSocket.request('testfunc', {height:99}, (rpcResult)=>console.log('using request: ', rpcResult), 3000)
return clientSocket
}).then((clientSocket)=>{
console.log('sleeping...')
sleep(2000)
return clientSocket
}).then(async (clientSocket)=>{
let rpcResult = await clientSocket.sendRequest('testfunc', {height:99}, 3000).catch(console.log)
console.log('using sendRequest:', rpcResult)
process.exit()
}).catch((e)=>{
console.log('connect error happens:', e)
})
*/