`

node.js使用axon模块

 
阅读更多

axon是一个很好的底层通信框架,在socket的基础上封装了很多功能。

它一个有四种模式:

push / pull
pub / sub
req / rep
pub-emitter / sub-emitter

 

1.push/pull模式:模式特点,单向单发,服务器发消息,客户端接受消息。而且是每条消息只被一个客户端接受到。

 

2.PUB/SUB模式:模式特点,单向群发,服务器发消息,客户端接受消息,而且是每条消息都会被每个客户端收到。

在axon模块里有两组API实现pub和sub消息推送。

pub & sub:

PubSocket.prototype.send = function(msg){
  var socks = this.socks;
  var len = socks.length;
  var sock;

  var buf = this.pack(arguments);

  for (var i = 0; i < len; i++) {
    sock = socks[i];
    if (sock.writable) sock.write(buf);
  }

  return this;
};

 这是PubSocket广播消息的方法,从方法里可以看出,它只是做了一个很简单的广播,没有做任何处理。

SubSocket.prototype.onmessage = function(sock){
  var subs = this.hasSubscriptions();
  var self = this;

  return function(buf){
    var msg = new Message(buf);

    if (subs) {
      var topic = msg.args[0];
      if (!self.matches(topic)) return debug('not subscribed to "%s"', topic);
    }

    self.emit.apply(self, ['message'].concat(msg.args));
  };
};

SubSocket.prototype.subscribe = function(re){
  debug('subscribe to "%s"', re);
  this.subscriptions.push(re = toRegExp(re));
  return re;
};

SubSocket.prototype.unsubscribe = function(re){
  debug('unsubscribe from "%s"', re);
  re = toRegExp(re);
  for (var i = 0; i < this.subscriptions.length; ++i) {
    if (this.subscriptions[i].toString() === re.toString()) {
      this.subscriptions.splice(i--, 1);
    }
  }
};

SubSocket.prototype.clearSubscriptions = function(){
  this.subscriptions = [];
};

 在sock.on('message', function(msg)的时候,如果添加了订阅项,它会只接收订阅相关的信息。如果没有添加任何订阅项,它会接收所有的信息。判断依据就是subscriptions里面的条目。完整的例子代码:

// 服务端
var axon = require('axon');
var sock = axon.socket('pub');

sock.bind(3000);
console.log('pub server started');

setInterval(function(){
  sock.send('hello');
}, 500);

// 客户端
var axon = require('axon');
var sock = axon.socket('sub');

sock.connect(3000);
sock.subscribe('user:login');
sock.subscribe('upload:*:progress');

sock.on('message', function(topic, msg){

});

 

PubEmitter / SubEmitter:

function PubEmitterSocket() {
  this.sock = new PubSocket;
  this.emit = this.sock.send.bind(this.sock);
  this.bind = this.sock.bind.bind(this.sock);
  this.connect = this.sock.connect.bind(this.sock);
  this.close = this.sock.close.bind(this.sock);
}

  PubEmitterSocket是建立在pub的基础之上的,在这之后只是重新声明了几个方法名。例如,emit等价于pub的send。另外就是,pub既可以做bind服务器端,也可以做connect客户端。但是,无论是做服务器还是客户端它都是消息的推送方,也就是它是发送消息的。

SubEmitterSocket.prototype.onmessage = function(){
  var listeners = this.listeners;
  var self = this;

  return function(buf){
    var msg = new Message(buf);
    var topic = msg.shift();

    for (var i = 0; i < listeners.length; ++i) {
      var listener = listeners[i];

      var m = listener.re.exec(topic);
      if (!m) continue;

      listener.fn.apply(this, m.slice(1).concat(msg.args));
    }
  }
};

SubEmitterSocket.prototype.on = function(event, fn){
  var re = this.sock.subscribe(event);
  this.listeners.push({
    event: event,
    re: re,
    fn: fn
  });
  return this;
};

SubEmitterSocket.prototype.off = function(event){
  for (var i = 0; i < this.listeners.length; ++i) {
    if (this.listeners[i].event === event) {
      this.sock.unsubscribe(this.listeners[i].re);
      this.listeners.splice(i--, 1);
    }
  }
  return this;
};

 在使用sub的时候,我们只能使用一种规则定制接收信息的方式,这显然是不够的。所以SubEmitterSocket增加了一个listeners数组,用来存储多套规则,这些规则也拥有他们自己对信息处理的方法。

另外就是,在把信息apply到listener的时候,listener.fn.apply(this, m.slice(1).concat(msg.args));貌似把信息转换成了json对象,这样获取到的信息可以直接用对象的方式使用。

完整的例子:

// 服务器
var axon = require('axon');
var sock = axon.socket('pub-emitter');

sock.connect(3000);

setInterval(function(){
  sock.emit('login', { name: 'tobi' });
}, 500);

//客户端
var axon = require('axon');
var sock = axon.socket('sub-emitter');

sock.bind(3000);

sock.on('user:login', function(user){
  console.log('%s signed in', user.name);
});

sock.on('user:*', function(action, user){
  console.log('%s %s', user.name, action);
});

sock.on('*', function(event){
  console.log(arguments);
});

 理论上来说,我们必然使用第二种方式。 

 

 3.REP/REQ模式:模式特点,双向应答,点对点,服务器发送消息,客户端接受消息,然后客户端发送,反正双向不受限制。

RPC进程间调用,可以理解为远程调用方法。

RepSocket.prototype.onmessage = function(sock){
  var self = this;

  return function (buf){
    var msg = new Message(buf);
    var args = msg.args;

    var id = args.pop();
    args.unshift('message');
    args.push(reply);
    self.emit.apply(self, args);

    function reply() {
      var fn = function(){};
      var args = slice(arguments);
      args[0] = args[0] || null;

      var hasCallback = 'function' == typeof args[args.length - 1];
      if (hasCallback) fn = args.pop();

      args.push(id);

      if (sock.writable) {
        sock.write(self.pack(args), function(){ fn(true) });
        return true;
      } else {
        debug('peer went away');
        process.nextTick(function(){ fn(false) });
        return false;
      }
    }
  };
};

 rep作为服务端,其实是消息的接收方和处理方。首先它收到的是一个字符串,所以第一步就是把这个字符串解析出对应的参数列表,然后处理这些参数列表,再最后又通过socket传回去。这里的args可以是多个参数。

ReqSocket.prototype.send = function(msg){
  var socks = this.socks;
  var len = socks.length;
  var sock = socks[this.n++ % len];
  var args = slice(arguments);

  if (sock) {
    var hasCallback = 'function' == typeof args[args.length - 1];
    if (!hasCallback) args.push(function(){});
    var fn = args.pop();
    fn.id = this.id();
    this.callbacks[fn.id] = fn;
    args.push(fn.id);
  }

  if (sock) {
    sock.write(this.pack(args));
  } else {
    debug('no connected peers');
    this.enqueue(args);
  }
};

ReqSocket.prototype.onmessage = function(){
  var self = this;
  return function(buf){
    var msg = new Message(buf);
    var id = msg.pop();
    var fn = self.callbacks[id];
    if (!fn) return debug('missing callback %s', id);
    fn.apply(null, msg.args);
    delete self.callbacks[id];
  };
};

 在req里面有两个方法,一个是发送请求,一个是接受结果。我需要用到的只是发送请求。貌似不用解释很多。

完整用法:

// 客户端
var axon = require('axon');
var sock = axon.socket('req');

sock.bind(3000);

sock.send('resize', img, function(res){

});

// 服务器
var axon = require('axon');
var sock = axon.socket('rep');

sock.connect(3000);

sock.on('message', function(task, img, reply){
  switch (task) {
    case 'resize':
      // resize the image
      reply(img);
      break;
  }
});

 首先是发送的时候,发送过来两个参数'resize', img,返回的时候返回了img这个参数 reply(img);。这些只是看代码猜的,还需要实际用的时候进行测试。

 

总结:node.js真心厉害,在我认为比较难的功能,尤其是通信这块,尽然用这么少的代码完成了大部分需要用到的通信模式。选择一个好的技术真的很重要

 

 

  • 大小: 33.7 KB
  • 大小: 13 KB
  • 大小: 6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics