diff --git a/index.js b/index.js index a43b839..07dd008 100644 --- a/index.js +++ b/index.js @@ -4,9 +4,7 @@ const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms)) // 建议把 onopen, onerror, onclose, onmessage 保留给系统使用,例如reconnecting。用户使用 on(event, listener) -const DAD = (module.exports = class RpcSocket extends ( - ws -) { +const DAD = (module.exports = class RpcSocket extends ws { constructor(address, protocols, options) { let ws = super(address, protocols, options) DAD.upgrade(ws) @@ -18,38 +16,25 @@ const DAD = (module.exports = class RpcSocket extends ( ws.onmessage = async ({ data }) => { // console.log('onmessage 被调用') try { - let obj = JSON.parse(data) + let { rpcType, rpcHow, rpcWhat, randomEvent, rpcResult } = JSON.parse(data) // console.log('message data parsed to obj=',obj) - switch (obj.type) { - case 'NOTIFY': // 接收到同步的远程调用 - // console.log(`NOTIFY message: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) - if (ws.hasOwnProperty(obj.rpc)) - // 作为rpc调用 - ws[obj.rpc](obj.param) - else if (ws.eventNames().indexOf(obj.rpc) >= 0) - // 作为event调用 - ws.emit(obj.rpc, obj.param) + switch (rpcType) { + case 'SEND_NOTIFY': // 接收到同步的远程调用 + // console.log(`SEND_REQUEST_SYNC message: rpcHow=${obj.rpcHow}, rpcWhat=${JSON.stringify(obj.rpcWhat)}`) + if (ws.hasOwnProperty(rpcHow)) ws[rpcHow](rpcWhat) + else if (ws.eventNames().indexOf(rpcHow) >= 0) ws.emit(rpcHow, rpcWhat) else console.log('onmessage unknown notification') - case 'REQUEST': // 接收到异步的远程调用 - // console.log(`被动方 收到 REQUEST: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) - let result - if (ws.hasOwnProperty(obj.rpc)) { - result = await ws[obj.rpc](obj.param) - } else { - result = { _state: 'error', error: 'unknown rpc' } + case 'SEND_REQUEST': // 接收到异步的远程调用 + // console.log(`被动方 收到 SEND_REQUEST_ASYNC: rpcHow=${obj.rpcHow}, rpcWhat=${JSON.stringify(obj.rpcWhat)}`) + let rpcResult + if (ws.hasOwnProperty(rpcHow)) rpcResult = await ws[rpcHow](rpcWhat) + else rpcResult = { _state: 'error', error: 'unknown rpcHow' } + if (randomEvent) { + ws.send(JSON.stringify({ rpcType: 'SEND_RESULT', randomEvent, rpcResult })) // 返回结果给远程 } - if (obj.randomEvent) { - ws.send( - JSON.stringify({ - type: 'REPLY', - randomEvent: obj.randomEvent, - result, - }) - ) // 返回结果给远程 - } - case 'REPLY': // 接收到远程返回的结果 - // console.log('主动方 收到 REPLY: result=', obj.result) - ws.emit(obj.randomEvent, obj.result) + case 'SEND_RESULT': // 接收到远程返回的结果 + // console.log('主动方 收到 SEND_RESULT: rpcResult=', obj.rpcResult) + ws.emit(randomEvent, rpcResult) default: } } catch (exception) { @@ -70,7 +55,7 @@ const DAD = (module.exports = class RpcSocket extends ( } reconnectAsync(url) { - if (this.readyState < 2) { + if (this.readyState < ws.CLOSING) { return Promise.resolve(this) } return new Promise((resolve, reject) => { @@ -80,36 +65,36 @@ const DAD = (module.exports = class RpcSocket extends ( }) } - notify({ rpc, param, options, cb } = {}) { + sendNotify({ rpcHow, rpcWhat, options, cb } = {}) { // 发起同步的远程调用 - this.send(JSON.stringify({ type: 'NOTIFY', rpc, param, options, cb })) + this.send(JSON.stringify({ rpcType: 'SEND_NOTIFY', rpcHow, rpcWhat }), options, cb) } - requestAsync({ rpc, param, ack, timeout = 5000 } = {}) { - // 发起异步的远程调用(rpc) + sendRequest({ rpcHow, rpcWhat, ack, timeout = 5000 } = {}) { + // 发起异步的远程调用 let randomEvent = randomBytes(16).toString('hex') // console.log('randomEvent is randomized: ', randomEvent) if (typeof ack === 'function') { // 有回调 - this.send(JSON.stringify({ type: 'REQUEST', rpc, param, randomEvent }), () => { + this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => { // console.log('in callback, randomEvent=', randomEvent) this.once(randomEvent, ack) setTimeout(() => { if (this.eventNames().indexOf(randomEvent) >= 0) { this.removeAllListeners(randomEvent) - ack({ _state: 'timeout', error: 'timeout happens' }) + ack({ _state: 'timeout', error: `RpcSocket sendRequest ${rpcHow} timeout` }) } }, timeout) }) } else { // 没有回调 return new Promise((resolve, reject) => { - this.send(JSON.stringify({ type: 'REQUEST', rpc, param, randomEvent }), () => { + this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => { this.once(randomEvent, resolve) setTimeout(() => { if (this.eventNames().indexOf(randomEvent) >= 0) { this.removeAllListeners(randomEvent) - resolve({ _state: 'timeout', error: 'timeout happens' }) + resolve({ _state: 'timeout', error: `RpcSocket sendRequest ${rpcHow} timeout` }) } }, timeout) }) @@ -136,15 +121,15 @@ serverSocket = RpcSocket.upgrade(socket).on('close', (code, reason)=>console.log }) RpcSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{ - clientSocket.request('testfunc', {height:99}, (result)=>console.log('using request: ', result), 3000) + clientSocket.request('testfunc', {height:99}, (rpcResult)=>console.log('using request: ', rpcResult), 3000) return clientSocket }).then((clientSocket)=>{ console.log('sleeping...') sleep(2000) return clientSocket }).then(async (clientSocket)=>{ - let result = await clientSocket.requestAsync('testfunc', {height:99}, 3000).catch(console.log) - console.log('using requestAsync:', result) + let rpcResult = await clientSocket.sendRequest('testfunc', {height:99}, 3000).catch(console.log) + console.log('using sendRequest:', rpcResult) process.exit() }).catch((e)=>{ console.log('connect error happens:', e)