From 6ea84fc8113c7df6fdf00dcb93dd9e1f29b42424 Mon Sep 17 00:00:00 2001 From: Luk Lu Date: Mon, 1 Jun 2020 16:42:04 +0800 Subject: [PATCH] init --- index.js | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 13 +++++ 2 files changed, 156 insertions(+) create mode 100644 index.js create mode 100644 package.json diff --git a/index.js b/index.js new file mode 100644 index 0000000..1225ebf --- /dev/null +++ b/index.js @@ -0,0 +1,143 @@ +const ws = require('ws') +const {randomBytes} = require('crypto') +const sleep = (ms)=>new Promise((resolve, reject)=>setTimeout(resolve, ms)) + +// 建议把 onopen, onerror, onclose, onmessage 保留给系统使用,例如reconnecting。用户使用 on(event, listener) + +const DAD = module.exports = class TicSocket extends ws { + constructor(address, protocols, options){ + let ws = super(address, protocols, options) + DAD.upgrade(ws) + } + + static upgrade(ws){ + ws.__proto__ = DAD.prototype + ws.sid = randomBytes(16).toString('hex') + ws.onMessage() + return ws + } + + static connectAsync(url) { + return new Promise(function(resolve, reject) { + let socket = new DAD(url) + socket.onopen = ()=>resolve(socket) + socket.onerror = (err)=>reject(err) + }) + } + + reconnectAsync(url){ + if ( this.readyState < 2 ) { + return Promise.resolve(this) + } + return new Promise((resolve, reject)=>{ + let socket = new DAD(url || this.url) + socket.once('open', ()=>resolve(socket)) + socket.once('error', (err)=>reject(err)) + }) + } + + notify({rpc, param, options, cb}={}){ // 发起同步的远程调用 + this.send(JSON.stringify({type:'NOTIFY', rpc, param, options, cb})) + } + + requestAsync( { rpc, param, ack, timeout = 5000 } = {} ) { // 发起异步的远程调用(rpc) + let randomEvent = randomBytes(16).toString('hex') +// console.log('randomEvent is randomized: ', randomEvent) + if ( typeof(ack) === 'function' ){ // 有回调 + this.send(JSON.stringify({type:'REQUEST', rpc, param, randomEvent}), ()=>{ +// console.log('in callback, randomEvent=', randomEvent) + this.once(randomEvent, ack) + setTimeout(()=>{ + if (this.eventNames().indexOf(randomEvent)>=0){ + this.removeAllListeners(randomEvent) + ack({_state:'timeout', error:'timeout happens'}) + } + }, timeout) + }) + } else { // 没有回调 + return new Promise((resolve, reject)=>{ + this.send(JSON.stringify({type:'REQUEST', rpc, param, randomEvent}), ()=>{ + this.once(randomEvent, resolve) + setTimeout(()=>{ + if (this.eventNames().indexOf(randomEvent)>=0){ + this.removeAllListeners(randomEvent) + resolve({_state:'timeout', error:'timeout happens'}) + } + }, timeout) + }) + }) + } + } + + onMessage(){ // 接收到远程消息 + this.onmessage = async ({data})=>{ // 不能直接定义成一个函数,而要现场赋值。因为里面用到了 this 还是指向 ws.prototype 而不是socket本身 +// console.log('onmessage 被调用') + try { + let obj = JSON.parse(data) +// console.log('message data parsed to obj=',obj) + switch (obj.type) { + case 'NOTIFY': // 接收到同步的远程调用 +// console.log(`NOTIFY message: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) + if (this.hasOwnProperty(obj.rpc)) // 作为rpc调用 + this[obj.rpc](obj.param) + else if (this.eventNames().indexOf(obj.rpc)>=0) // 作为event调用 + this.emit(obj.rpc, obj.param) + else + console.log('onmessage unknown notification') + case 'REQUEST': // 接收到异步的远程调用 +// console.log(`被动方 收到 REQUEST: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) + let result + if (this.hasOwnProperty(obj.rpc)) { + result = await this[obj.rpc](obj.param) + }else { + result = {_state:'error', error:'unknown rpc'} + } + if (obj.randomEvent){ + this.send(JSON.stringify({type:'REPLY', randomEvent: obj.randomEvent, result})) // 返回结果给远程 + } + case 'REPLY': // 接收到远程返回的结果 +// console.log('主动方 收到 REPLY: result=', obj.result) + this.emit(obj.randomEvent, obj.result) + default: + } + }catch(exception){ + console.error('Invalid socket message received!') + return + } + } + } + +} + +// (async ()=>{ + +// new TicSocket('http://localhost:9007') +// await sleep(20000) +// new TicSocket.Server({port:9007}) +// await sleep(20000) + +// })() + +/* +//TicSocket=require('./Ling/TicSocket.js'); +server = new TicSocket.Server({port:6000}); +var serverSocket; server.on('connection', (socket)=>{console.log('socket coming'); +serverSocket = TicSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason)) +.on('error', (errer)=>console.log('socket has error: ', error)) +}) + +TicSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{ + clientSocket.request('testfunc', {height:99}, (result)=>console.log('using request: ', result), 3000) + return clientSocket +}).then((clientSocket)=>{ + console.log('sleeping...') + sleep(2000) + return clientSocket +}).then(async (clientSocket)=>{ + let result = await clientSocket.requestAsync('testfunc', {height:99}, 3000).catch(console.log) + console.log('using requestAsync:', result) + process.exit() +}).catch((e)=>{ + console.log('connect error happens:', e) +}) +*/ \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..0da3826 --- /dev/null +++ b/package.json @@ -0,0 +1,13 @@ +{ + "name": "websocket", + "version": "0.1.0", + "private": true, + "dependencies": { + "ws": "^7.2.1" + }, + "devDependencies": {}, + "scripts": { + "setup": "npm install" + }, + "author": "" +}