diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3d3c395 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +.DS_Store +node_modules/ +npm-debug.log* +yarn-debug.log* +yarn-error.log* +/test/unit/coverage/ +/test/e2e/reports/ +selenium-debug.log + +# Editor directories and files +.idea +.vscode +*.suo +*.ntvs* +*.njsproj +*.sln +/package-lock.json diff --git a/.prettierrc.js b/.prettierrc.js new file mode 100644 index 0000000..e001ecd --- /dev/null +++ b/.prettierrc.js @@ -0,0 +1,16 @@ +/* +对 VSCode Prettier 有效;建议一直要有本配置文件,否则不同版本的 Prettier 的默认配置会不同,例如 TrailingComma +对 VSCode Prettier Standard 无效,似乎是集成了不能修改的配置。 +*/ +module.exports = { + printWidth: 160, // default 80 + tabWidth: 2, // default 2 + useTabs: false, + semi: false, // default true + singleQuote: true, // default false + trailingComma: 'es5', // none (default in v 1.*), es5 (default in v2.0.0), all + bracketSpacing: true, // default true + jsxBracketSameLine: false, // default false + arrowParens: 'always', // avoid (default in v1.9.0), always (default since v2.0.0) + quoteProps: 'as-needed', // as-needed (default), consistent, preserve +} diff --git a/index.js b/index.js index 1225ebf..a43b839 100644 --- a/index.js +++ b/index.js @@ -1,132 +1,141 @@ const ws = require('ws') -const {randomBytes} = require('crypto') -const sleep = (ms)=>new Promise((resolve, reject)=>setTimeout(resolve, ms)) +const { randomBytes } = require('crypto') +const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms)) // 建议把 onopen, onerror, onclose, onmessage 保留给系统使用,例如reconnecting。用户使用 on(event, listener) -const DAD = module.exports = class TicSocket extends ws { - constructor(address, protocols, options){ +const DAD = (module.exports = class RpcSocket extends ( + ws +) { + constructor(address, protocols, options) { let ws = super(address, protocols, options) DAD.upgrade(ws) } - static upgrade(ws){ + static upgrade(ws) { ws.__proto__ = DAD.prototype ws.sid = randomBytes(16).toString('hex') - ws.onMessage() + ws.onmessage = async ({ data }) => { + // console.log('onmessage 被调用') + try { + let obj = JSON.parse(data) + // console.log('message data parsed to obj=',obj) + switch (obj.type) { + case 'NOTIFY': // 接收到同步的远程调用 + // console.log(`NOTIFY message: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) + if (ws.hasOwnProperty(obj.rpc)) + // 作为rpc调用 + ws[obj.rpc](obj.param) + else if (ws.eventNames().indexOf(obj.rpc) >= 0) + // 作为event调用 + ws.emit(obj.rpc, obj.param) + else console.log('onmessage unknown notification') + case 'REQUEST': // 接收到异步的远程调用 + // console.log(`被动方 收到 REQUEST: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) + let result + if (ws.hasOwnProperty(obj.rpc)) { + result = await ws[obj.rpc](obj.param) + } else { + result = { _state: 'error', error: 'unknown rpc' } + } + if (obj.randomEvent) { + ws.send( + JSON.stringify({ + type: 'REPLY', + randomEvent: obj.randomEvent, + result, + }) + ) // 返回结果给远程 + } + case 'REPLY': // 接收到远程返回的结果 + // console.log('主动方 收到 REPLY: result=', obj.result) + ws.emit(obj.randomEvent, obj.result) + default: + } + } catch (exception) { + console.error('Invalid socket message received!') + return + } + } + return ws } - + static connectAsync(url) { - return new Promise(function(resolve, reject) { + return new Promise(function (resolve, reject) { let socket = new DAD(url) - socket.onopen = ()=>resolve(socket) - socket.onerror = (err)=>reject(err) + socket.onopen = () => resolve(socket) + socket.onerror = (err) => reject(err) }) } - - reconnectAsync(url){ - if ( this.readyState < 2 ) { - return Promise.resolve(this) - } - return new Promise((resolve, reject)=>{ - let socket = new DAD(url || this.url) - socket.once('open', ()=>resolve(socket)) - socket.once('error', (err)=>reject(err)) - }) - } - - notify({rpc, param, options, cb}={}){ // 发起同步的远程调用 - this.send(JSON.stringify({type:'NOTIFY', rpc, param, options, cb})) - } - requestAsync( { rpc, param, ack, timeout = 5000 } = {} ) { // 发起异步的远程调用(rpc) + reconnectAsync(url) { + if (this.readyState < 2) { + return Promise.resolve(this) + } + return new Promise((resolve, reject) => { + let socket = new DAD(url || this.url) + socket.once('open', () => resolve(socket)) + socket.once('error', (err) => reject(err)) + }) + } + + notify({ rpc, param, options, cb } = {}) { + // 发起同步的远程调用 + this.send(JSON.stringify({ type: 'NOTIFY', rpc, param, options, cb })) + } + + requestAsync({ rpc, param, ack, timeout = 5000 } = {}) { + // 发起异步的远程调用(rpc) let randomEvent = randomBytes(16).toString('hex') -// console.log('randomEvent is randomized: ', randomEvent) - if ( typeof(ack) === 'function' ){ // 有回调 - this.send(JSON.stringify({type:'REQUEST', rpc, param, randomEvent}), ()=>{ -// console.log('in callback, randomEvent=', randomEvent) + // console.log('randomEvent is randomized: ', randomEvent) + if (typeof ack === 'function') { + // 有回调 + this.send(JSON.stringify({ type: 'REQUEST', rpc, param, randomEvent }), () => { + // console.log('in callback, randomEvent=', randomEvent) this.once(randomEvent, ack) - setTimeout(()=>{ - if (this.eventNames().indexOf(randomEvent)>=0){ + setTimeout(() => { + if (this.eventNames().indexOf(randomEvent) >= 0) { this.removeAllListeners(randomEvent) - ack({_state:'timeout', error:'timeout happens'}) + ack({ _state: 'timeout', error: 'timeout happens' }) } }, timeout) }) - } else { // 没有回调 - return new Promise((resolve, reject)=>{ - this.send(JSON.stringify({type:'REQUEST', rpc, param, randomEvent}), ()=>{ + } else { + // 没有回调 + return new Promise((resolve, reject) => { + this.send(JSON.stringify({ type: 'REQUEST', rpc, param, randomEvent }), () => { this.once(randomEvent, resolve) - setTimeout(()=>{ - if (this.eventNames().indexOf(randomEvent)>=0){ + setTimeout(() => { + if (this.eventNames().indexOf(randomEvent) >= 0) { this.removeAllListeners(randomEvent) - resolve({_state:'timeout', error:'timeout happens'}) + resolve({ _state: 'timeout', error: 'timeout happens' }) } }, timeout) }) }) } } - - onMessage(){ // 接收到远程消息 - this.onmessage = async ({data})=>{ // 不能直接定义成一个函数,而要现场赋值。因为里面用到了 this 还是指向 ws.prototype 而不是socket本身 -// console.log('onmessage 被调用') - try { - let obj = JSON.parse(data) -// console.log('message data parsed to obj=',obj) - switch (obj.type) { - case 'NOTIFY': // 接收到同步的远程调用 -// console.log(`NOTIFY message: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) - if (this.hasOwnProperty(obj.rpc)) // 作为rpc调用 - this[obj.rpc](obj.param) - else if (this.eventNames().indexOf(obj.rpc)>=0) // 作为event调用 - this.emit(obj.rpc, obj.param) - else - console.log('onmessage unknown notification') - case 'REQUEST': // 接收到异步的远程调用 -// console.log(`被动方 收到 REQUEST: rpc=${obj.rpc}, param=${JSON.stringify(obj.param)}`) - let result - if (this.hasOwnProperty(obj.rpc)) { - result = await this[obj.rpc](obj.param) - }else { - result = {_state:'error', error:'unknown rpc'} - } - if (obj.randomEvent){ - this.send(JSON.stringify({type:'REPLY', randomEvent: obj.randomEvent, result})) // 返回结果给远程 - } - case 'REPLY': // 接收到远程返回的结果 -// console.log('主动方 收到 REPLY: result=', obj.result) - this.emit(obj.randomEvent, obj.result) - default: - } - }catch(exception){ - console.error('Invalid socket message received!') - return - } - } - } - -} +}) // (async ()=>{ -// new TicSocket('http://localhost:9007') +// new RpcSocket('http://localhost:9007') // await sleep(20000) -// new TicSocket.Server({port:9007}) +// new RpcSocket.Server({port:9007}) // await sleep(20000) // })() /* -//TicSocket=require('./Ling/TicSocket.js'); -server = new TicSocket.Server({port:6000}); +//RpcSocket=require('./Ling/RpcSocket.js'); +server = new RpcSocket.Server({port:6000}); var serverSocket; server.on('connection', (socket)=>{console.log('socket coming'); -serverSocket = TicSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason)) +serverSocket = RpcSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason)) .on('error', (errer)=>console.log('socket has error: ', error)) }) -TicSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{ +RpcSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{ clientSocket.request('testfunc', {height:99}, (result)=>console.log('using request: ', result), 3000) return clientSocket }).then((clientSocket)=>{ @@ -140,4 +149,4 @@ TicSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{ }).catch((e)=>{ console.log('connect error happens:', e) }) -*/ \ No newline at end of file +*/ diff --git a/package.json b/package.json index 0da3826..88a1947 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "websocket", + "name": "so.rpcsocket", "version": "0.1.0", "private": true, "dependencies": {