Skip to the content.

连接断开自动重试

rascal 内部通过 generic-pool 维护连接池,并使用 poolQueue (使用了 async queue )维护需要的发送回调 (err, channel),其内部调用 channelpublish 方法与 rabbitMQ 通信发送数据。 使用自动重连的机制就是当由于网络,rabbitMQ 宕机等原因导致连接断开会被 handleConnectionError 捕获,代码如下

// 绑定错误回调
function attachErrorHandlers(connection, config) {
    connection.removeAllListeners('error');
    var errorHandler = _.once(handleConnectionError.bind(null, connection, config));
    connection.once('error', errorHandler);
    connection.once('close', errorHandler);
  }
// 捕获连接错误
 function handleConnectionError(borked, config, err) {
    debug('Handling connection error: %s initially from connection: %s, %s', err.message, borked._rascal_id, connectionConfig.loggableUrl);
    self.emit('disconnect');
    pauseChannelAllocation();
    connection = undefined;
    self.emit('error', err);
    console.log('retry is ', connectionConfig.retry)
    connectionConfig.retry && self.init(function(err) {
      if (!err) return;
      var delay = timer.next();
      debug('Will attempt reconnection in in %dms', delay);
      return setTimeout(handleConnectionError.bind(null, borked, config, err), delay).unref();
    });
  }

此函数会当默认配置重试的时候,在一定延时后递归调用自身,实现断开重连。

消息发送预存队列

在尝试连接的这段时间内如何保证发送队列不会丢失数据? rascal 使用了 queue 来保存所有的发送调用并依次调用,当连接断开的时候队列的调用会被暂停,因为在连接断开的错误捕获函数 handleConnectionError 内部调用了 pauseChannelAllocation 函数,如下

  function pauseChannelAllocation() {
    channelCreator.pause();
    regularChannelPool && regularChannelPool.pause();
    confirmChannelPool && confirmChannelPool.pause();
  }

confirmChannelPool 是什么呢?可以理解成下面这样,

confirmChannelPool = {
      stats: stats,
      borrow: borrow,
      return: release,
      drain: drain,
      pause: poolQueue.pause.bind(poolQueue),
      resume: poolQueue.resume.bind(poolQueue),
    };

从上面源码可以看出来调用了 poolQueuepause 方法,该方法会使队列暂停工作(此时队列可能由于会有新加入的回调造成堆积),而连接恢复的时间会调用对应的 resume 方法来使队列继续工作,那么也就可以保证当集群恢复的时候不会丢失发送数据,因为发送队列在断开的时候进行已经暂停了。

队列核心代码如下:

// 定义队列
poolQueue = async.queue(function(__, cb) {
      pool.acquire().then(function(channel) {
        setImmediate(function() {
          cb(null, channel);
        });
      }).catch(cb);
    }, 1);
// 推送队列回调函数
poolQueue.push(null, function cb (err, channel) {
            // 这个next 是下面的borrowChannelFn 的参数(回调函数)
        if (err) return next(err);
        debug('Borrowed %s channel: %s. %o', mode, channel._rascal_id, stats());
        next(null, channel); 
});
// 实际调用的回调
borrowChannelFn(function(err, channel) {
      if (err) return emitter.emit('error', err, messageId);
      var errorHandler = _.once(handleChannelError.bind(null, channel, messageId, emitter, config));
      var returnHandler = emitter.emit.bind(emitter, 'return');
      addListeners(channel, errorHandler, returnHandler);
      try {
        publishFn(channel, buffer, publishConfig, function(err, ok) {
          ok ? returnChannel(channel, errorHandler, returnHandler)
             : deferReturnChannel(channel, errorHandler, returnHandler);
          if (err) return emitter.emit('error', err, messageId);
          emitter.emit('success', messageId);
        });
      } catch(err) {
        returnChannel(channel, errorHandler, returnHandler);
      };
});

BUG

测试发现当 pool.acquire() 拿到已经关闭的连接时(因为获取的同时连接被关闭),会正常走 next ,这时候会导致消息丢失,因为这个队列回调已经被调用了,而后续的 error 没有触发 error 事件,相关已经提了 MR

https://github.com/guidesmiths/rascal/pull/81

生产者顺序发送

使用 poolQueue 维护待发布消息的发布方法回调,依次进行调用,进而实现顺序发送。

生产者确认模式

publish 方法内部声明了 EventEmitter 实例,通过事件回调的形式处理 rabbitMQ 的生产者确定模式。

  function _publish(buffer, publishConfig, next) {
    var emitter = new EventEmitter(); // 声明 emitter
    var messageId = publishConfig.options.messageId;
    borrowChannelFn(function(err, channel) {
      if (err) return emitter.emit('error', err, messageId);
      var errorHandler = _.once(handleChannelError.bind(null, channel, messageId, emitter, config));
      var returnHandler = emitter.emit.bind(emitter, 'return');
      addListeners(channel, errorHandler, returnHandler);
      try {
        publishFn(channel, buffer, publishConfig, function(err, ok) {
          ok ? returnChannel(channel, errorHandler, returnHandler)
             : deferReturnChannel(channel, errorHandler, returnHandler);
          if (err) return emitter.emit('error', err, messageId);
          emitter.emit('success', messageId);
        });
      } catch(err) {
        returnChannel(channel, errorHandler, returnHandler);
        return emitter.emit('error', err, messageId);
      };
    });
    next(null, emitter); // 这里返回了这个 emitter
  }

使用的时候需要监听这几个事件,然后对应不同的事件需要自己决定选择重发或者别的方案。

broker.publish("p1", "some message", (err, publication) => {
  if (err) throw err; // publication didn't exist
  publication.on("success", (messageId) => {
     console.log("Message id was: ", messageId)
  }).on("error", (err, messageId) => {
     console.error("Error was: ", err.message)
  }).on("return", (message) => {
     console.warn("Message was returned: ", message.properties.messageId)
  })
})

生产者自动重试

根据上面的确认机制, racal 本身并没有默认的确认模式下的消息失败处理机制,只是给了相关的事件回调,需要开发者自身处理。

消费者模型

调用 subscribe 返回的 SubscriberSession 是 EventEmitter 对象,所以可以通过监听 message 事件来收到通知,可以分析 subscribeLater 函数来找到触发事件的地方。

  this.subscribe = function(overrides, next) {
    var session = new SubscriberSession(sequentialChannelOperations, config);
    subscribeLater(session, _.defaultsDeep(overrides, config));
    return next(null, session);
  };

subscribeNow 调用 channel.consume 和 rabbitMQ 建立 socket 连接,并将 _onMessage 绑定到回调函数。

  function subscribeNow(session, config, next) {
    sequentialChannelOperations.push(function(done) {
      if (session.isCancelled()) {
        debug('Subscription to queue: %s has been cancelled', config.queue);
        return done();
      }
      debug('Subscribing to queue: %s', config.queue);
      vhost.getChannel(function(err, channel) {
        if (err) return done(err);
        if (config.prefetch) channel.prefetch(config.prefetch);
        var removeErrorHandlers = attachErrorHandlers(channel, session, config);
        var onMessage = _onMessage.bind(null, session, config, removeErrorHandlers);
        channel.consume(config.source, onMessage, config.options, function(err, response) {
          if (err) {
            debug('Error subscribing to %s using channel: %s. %s', config.source, channel._rascal_id, err.message);
            removeErrorHandlers();
            return session.isCancelled() ? done() : done(err);
          }
          session._open(channel, response.consumerTag, function(err) {
            if (err) return done(err);
            timer.reset();
            done();
          });
        });
      });
    }, next);
  }

onMessage 方法内部调用 emit('message') 触发事件。

  function _onMessage(session, config, removeErrorHandlers, message) {
    if (!message) return handleConsumerCancel(session, config, removeErrorHandlers);
    debug('Received message: %s from queue: %s', message.properties.messageId, config.queue);
    decorateWithRoutingHeaders(message);
    if (immediateNack(message)) return ackOrNack(session, message, true);
    decorateWithRedeliveries(message, function(err) {
      if (err) return handleRedeliveriesError(err, session, message);
      if (redeliveriesExceeded(message)) return handleRedeliveriesExceeded(session, message);
      getContent(message, config, function(err, content) {
        err ? handleContentError(session, message, err)
            : session.emit('message', message, content, ackOrNack.bind(null, session, message));
      });
    });
  }

消费者重连机制

channel 异常断开当时候,会被 handleChannelError 捕获并尝试重新调用 subscribeNow 来建立连接,如果失败会根据默认的重试配置在一定间隔后递归调用 handleChannelError 来无限尝试重连。

  function handleChannelError(session, config, removeErrorHandlers, attempts, err) {
    debug('Handling channel error: %s from %s using channel: %s', err.message, config.name, session._getRascalChannelId());
    if (removeErrorHandlers) removeErrorHandlers();
    session.emit('error', err);
    config.retry && subscribeNow(session, config, function(err) {
      if (!err) return;
      var delay = timer.next();
      debug('Will attempt resubscription(%d) to %s in %dms', attempts + 1, config.name, delay);
      return setTimeout(handleChannelError.bind(null, session, config, null, attempts + 1, err), delay).unref();
    });
  }

消息 ACK

肯定确认,用于消息正常处理或者因为幂等和其他原因忽略此消息的情况。

aclOrNack() 

否定确认,消息会被删除。如果配置了死信队列,会被保存到死信队列。

ackorNack(err) 

否定确认,要求在 defer 延迟后重发消息,消息会被重新放到队列前面。

ackorNack(err, { strategy: "nack", requeue: true, defer: 5000 })

否定确认,要求在 defer 延迟后重发消息,消息会被放到队列后面,原理是复制此条消息然后重新发送,使用确认模式,发送完成后 ack 源消息。如果配置了 attempts ,则达到 attempts 次数后,消息会 nack , 等同 ackorNack(err)

ackOrNack(err, { strategy: 'republish', defer: 1000, attempts: 10 })

否定确认,attempts 次数内尝试转发到指定的 publication,如果将该 publication 路由到延迟队列,延迟队列再配置死信 exchange,在消息超时后通过死信 exchange 路由重新到队列,功能和 republish 差不多,但是超时的状态和消息保存在额外队列中,不会因为应用重启等原因导致重启计时器。

ackOrNack(err, { strategy: 'forward', publication: 'some_exchange' , attempts: 10 })