wo-core-rpcsocket/index.js
2020-06-01 16:42:04 +08:00

143 lines
5.1 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const ws = require('ws')
const {randomBytes} = require('crypto')
const sleep = (ms)=>new Promise((resolve, reject)=>setTimeout(resolve, ms))
// 建议把 onopen, onerror, onclose, onmessage 保留给系统使用例如reconnecting。用户使用 on(event, listener)
const DAD = module.exports = class TicSocket extends ws {
constructor(address, protocols, options){
let ws = super(address, protocols, options)
DAD.upgrade(ws)
}
static upgrade(ws){
ws.__proto__ = DAD.prototype
ws.sid = randomBytes(16).toString('hex')
ws.onMessage()
return ws
}
static connectAsync(url) {
return new Promise(function(resolve, reject) {
let socket = new DAD(url)
socket.onopen = ()=>resolve(socket)
socket.onerror = (err)=>reject(err)
})
}
reconnectAsync(url){
if ( this.readyState < 2 ) {
return Promise.resolve(this)
}
return new Promise((resolve, reject)=>{
let socket = new DAD(url || this.url)
socket.once('open', ()=>resolve(socket))
socket.once('error', (err)=>reject(err))
})
}
notify({rpc, param, options, cb}={}){ // 发起同步的远程调用
this.send(JSON.stringify({type:'NOTIFY', rpc, param, options, cb}))
}
requestAsync( { rpc, param, ack, timeout = 5000 } = {} ) { // 发起异步的远程调用(rpc)
let randomEvent = randomBytes(16).toString('hex')
// console.log('randomEvent is randomized: ', randomEvent)
if ( typeof(ack) === 'function' ){ // 有回调
this.send(JSON.stringify({type:'REQUEST', rpc, param, randomEvent}), ()=>{
// console.log('in callback, randomEvent=', randomEvent)
this.once(randomEvent, ack)
setTimeout(()=>{
if (this.eventNames().indexOf(randomEvent)>=0){
this.removeAllListeners(randomEvent)
ack({_state:'timeout', error:'timeout happens'})
}
}, timeout)
})
} else { // 没有回调
return new Promise((resolve, reject)=>{
this.send(JSON.stringify({type:'REQUEST', rpc, param, randomEvent}), ()=>{
this.once(randomEvent, resolve)
setTimeout(()=>{
if (this.eventNames().indexOf(randomEvent)>=0){
this.removeAllListeners(randomEvent)
resolve({_state:'timeout', error:'timeout happens'})
}
}, timeout)
})
})
}
}
onMessage(){ // 接收到远程消息
this.onmessage = async ({data})=>{ // 不能直接定义成一个函数,而要现场赋值。因为里面用到了 this 还是指向 ws.prototype 而不是socket本身
// console.log('onmessage 被调用')
try {
let obj = JSON.parse(data)
// console.log('message data parsed to obj=',obj)
switch (obj.type) {
case 'NOTIFY': // 接收到同步的远程调用
// console.log(`NOTIFY message: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`)
if (this.hasOwnProperty(obj.rpc)) // 作为rpc调用
this[obj.rpc](obj.param)
else if (this.eventNames().indexOf(obj.rpc)>=0) // 作为event调用
this.emit(obj.rpc, obj.param)
else
console.log('onmessage unknown notification')
case 'REQUEST': // 接收到异步的远程调用
// console.log(`被动方 收到 REQUEST: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`)
let result
if (this.hasOwnProperty(obj.rpc)) {
result = await this[obj.rpc](obj.param)
}else {
result = {_state:'error', error:'unknown rpc'}
}
if (obj.randomEvent){
this.send(JSON.stringify({type:'REPLY', randomEvent: obj.randomEvent, result})) // 返回结果给远程
}
case 'REPLY': // 接收到远程返回的结果
// console.log('主动方 收到 REPLY: result=', obj.result)
this.emit(obj.randomEvent, obj.result)
default:
}
}catch(exception){
console.error('Invalid socket message received!')
return
}
}
}
}
// (async ()=>{
// new TicSocket('http://localhost:9007')
// await sleep(20000)
// new TicSocket.Server({port:9007})
// await sleep(20000)
// })()
/*
//TicSocket=require('./Ling/TicSocket.js');
server = new TicSocket.Server({port:6000});
var serverSocket; server.on('connection', (socket)=>{console.log('socket coming');
serverSocket = TicSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason))
.on('error', (errer)=>console.log('socket has error: ', error))
})
TicSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{
clientSocket.request('testfunc', {height:99}, (result)=>console.log('using request: ', result), 3000)
return clientSocket
}).then((clientSocket)=>{
console.log('sleeping...')
sleep(2000)
return clientSocket
}).then(async (clientSocket)=>{
let result = await clientSocket.requestAsync('testfunc', {height:99}, 3000).catch(console.log)
console.log('using requestAsync:', result)
process.exit()
}).catch((e)=>{
console.log('connect error happens:', e)
})
*/