“租约”消队列

通常而言,一个队列 提供了 enqueue dequeue front 操作,从数据结构的角度看,这就够了,但是从队列服务的角度看,并不满足需求。

假设消费者使用 dequeue 取出消息后崩溃了,消息已经不在队列中,并且没有得到处理,无从恢复这条消息,相当于消息丢失了。

又假设消费者使用 front 查看消息,处理完后 dequeue 删除消息,这时就不怕消费者崩溃了,即时崩溃也不会丢失消息。这种方式的缺点是,如果有多于一个的消费者, front 会返回相同的消息,这要求消息的处理是 幂等 的,这带来额外的复杂性。

其实我们需要的是消息 租约 ,从队列取消息只是获得一定时间的使用权,在此期间消息对其它消费者不可见,如果不手动删除消息,租约到期后,消息自动对其它消费者可见。这样可以防止消费者崩溃,又可以同时有多个消费者。 这并不适合队列中的消息需要被严格有序处理的情况,这种队列只能有一个消费者

可以使用MySQL模拟这样的服务

;; 表定义
CREATE TABLE IF NOT EXISTS leaseQueue (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  data          VARCHAR(128),
  holder        SMALLINT DEFAULT 0,
  lease         TIMESTAMP
);

;; 入队列
INSERT INTO leaseQueue(data) VALUES(#{data});

;; 选出队尾消息,并且以5分钟为期租该消息,重复下面两个SQL,直到 UPDATE 影响的行数是1
SELECT id, data FROM leaseQueue ORDER BY id ASC limit 1;
UPDATE leaseQueue SET holder = holder + 1 WHERE id = #{id} AND TIMESTAMPDIFF(MINUTE, lease, NOW()) >= 5

;; 处理完后删除消息
DELETE FROM leaseQueue WHERE id = #{id}

Author: zzyongx

Created: 2016-01-19 二 17:10

Emacs 24.4.1 (Org mode 8.2.10)

Validate