gmq 正在参加 2020 年度 OSC 中国开源项目评选,请投票支持!
gmq 在 2020 年度 OSC 中国开源项目评选 中已获得 {{ projectVoteCount }} 票,请投票支持!
投票让它出道
已投票
gmq 获得 2020 年度 OSC 中国开源项目评选「最佳人气项目」 !
gmq 获得 2020 年度 OSC 中国开源项目评选「最佳人气项目」「最积极运营项目」 !
gmq 获得 2020 年度 OSC 中国开源项目评选「最积极运营项目」 !

软件简介

1. 概述

gmq是基于redis提供的特性,使用go语言开发的一个简单易用的队列;关于 redis 使用特性可以参考之前本人写过一篇很简陋的文章 Redis 实现队列gmq的灵感和设计是基于有赞延迟队列设计,文章内容清晰而且很好理解,但是没有提供源码,在文章的最后也提到了一些未来架构方向; gmq不是简单按照有赞延迟队列的设计实现功能,在它的基础上,做了一些修改和优化,主要如下:

  • 功能上
    • 多种任务模式,不单单只是延迟队列;例如:延迟队列,普通队列,优先级队列
  • 架构上:
    • 添加 job 由dispatcher调度分配各个bucket,而不是由timer
    • 每个bucket维护一个timer,而不是所有bucket一个timer
    • timer每次扫描bucket到期job时,会一次性返回多个到期job,而不是每次只返回一个job
    • timer的扫描时钟由bucket中下个job到期时间决定,而不是每秒扫描一次

2. 应用场景

  • 延迟任务
    • 延迟任务,例如用户下订单一直处于未支付状态,半个小时候自动关闭订单
  • 异步任务
    • 异步任务,一般用于耗时操作,例如群发邮件等批量操作
  • 超时任务
    • 规定时间内(TTR)没有执行完毕或程序被意外中断,则消息重新回到队列再次被消费,一般用于数据比较敏感,不容丢失的
  • 优先级任务
    • 当多个任务同时产生时,按照任务设定等级优先被消费,例如a,b两种类型的job,优秀消费a,然后再消费b

3. 安装

3.1 源码运行

配置文件位于gmq/conf.ini,可以根据自己项目需求修改配置

cd $GOPATH/src # 进入gopath/src目录
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢,可能需要翻墙
go run main.go start

3.2 执行文件运行

cd $GOPATH/src/gmq
# 编译成可执行文件
go build 
# 启动
./gmq start
# 停止
./gmq stop

# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1  &
# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)
tail -f gmq.log

4. 客户端

目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq

运行

# php
# 生产者
php producer.php
# 消费者
php consumer.php

# python
# 生产者
python producer.py
# 消费者
python consumer.py

一条消息结构

{
    "id": "xxxx",	# 任务id,这个必须是一个唯一值,将作为redis的缓存键
    "topic": "xxx",  # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
    "body": "xxx",   # 消息内容
    "delay": "111",  # 延迟时间,单位秒
    "TTR": "11111",  # 执行超时时间,单位秒
    "status": 1,     # job执行状态,该字段由gmq生成
    "consumeNum":1,  # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}

延迟任务

 $data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '1800', // 单位秒,半个小时后执行
        'TTR'   => '0'
    ];

超时任务

 $data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
    ];

异步任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

优先级任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

5. gmq 流程图如下:

一个不规范的流程图

5.1 延迟时间 delay

  • 当 job.delay>0时,job 会被分配到 bucket 中,bucket 会有周期性扫描到期 job,如果到期,job 会被 bucket 移到ready queue,等待被消费
  • 当 job.delay=0 时,job 会直接加到ready queue,等待被消费

5.2 执行超时时间 TTR

参考第一个图的流程,当 job 被消费者读取后,如果job.TTR>0,即 job 设置了执行超时时间,那么 job 会在读取后会被添加到 TTRBucket(专门存放设置了超时时间的 job),并且设置job.delay = job.TTR,如果在 TTR 时间内没有得到消费者 ack 确认然后删除 job,job 将在 TTR 时间之后添加到ready queue,然后再次被消费(如果消费者在 TTR 时间之后才请求 ack,会得到失败的响应)

5.3 确认机制

主要和TTR的设置有关系,确认机制可以分为两种:

  • 当 job.TTR=0 时,消费者pop出 job 时,即会自动删除job pool中的 job 元数据
  • 当 job.TTR>0 时,即 job 执行超时时间,这个时间是指用户pop出 job 时开始到用户ack确认删除结束这段时间,如果在这段时间没有ACK,job 会被再次加入到ready queue,然后再次被消费,只有用户调用了ACK,才会去删除job pool中 job 元数据

6. web 监控

gmq提供了一个简单 web 监控平台(后期会提供根据 job.Id 追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000,例如:http://127.0.0.1:8000,界面如下: 后台模板来源于https://github.com/george518/PPGo_Job

7. 遇到问题

以下是开发遇到的问题,以及一些粗糙的解决方案

7.1 安全退出

如果强行中止gmq的运行,可能会导致一些数据丢失,例如下面一个例子:
gmq之timer定时器
如果发生上面的情况,就会出现 job 不在bucket中,也不在ready queue,这就出现了 job 丢失的情况,而且将没有任何机会去删除job pool中已丢失的 job,长久之后job pool可能会堆积很多的已丢失 job 的元数据;所以安全退出需要在接收到退出信号时,应该等待所有goroutine处理完手中的事情,然后再退出

7.1.1 gmq退出流程

gmq安全退出.png
首先gmq通过 context 传递关闭信号给dispatcher,dispatcher接收到信号会关闭dispatcher.closed,每个bucket会收到close信号,然后先退出timer检索,再退出bucket,dispatcher等待所有 bucket 退出后,然后退出

dispatcher退出顺序流程: timer -> bucket -> dispatcher

7.1.2 注意

不要使用kill -9 pid来强制杀死进程,因为系统无法捕获 SIGKILL 信号,导致 gmq 可能执行到一半就被强制中止,应该使用kill -15 pid,kill -1 pidkill -2 pid,各个数字对应信号如下:

7.2 智能定时器

  • 每一个bucket都会维护一个timer,不同于有赞设计,timer不是每秒轮询一次,而是根据bucket下一个 job 到期时间来设置timer的定时时间 ,这样的目的在于如果bucket没有 job 或 job 到期时间要很久才会发生,就可以减少不必要的轮询;
  • timer只有处理完一次业务后才会重置定时器,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了
  • 如果到期的时间很相近,timer就会频繁重置定时器时间,就目前使用来说,还没出现什么性能上的问题

7.3 原子性问题

我们知道 redis 的命令是排队执行,在一个复杂的业务中可能会多次执行 redis 命令,如果在大并发的场景下,这个业务有可能中间插入了其他业务的命令,导致出现各种各样的问题;
redis 保证整个事务原子性和一致性问题一般用multi/execlua脚本gmq在操作涉及复杂业务时使用的是lua脚本,因为lua 脚本除了有multi/exec的功能外,还有Pipepining功能(主要打包命令,减少和redis server通信次数),下面是一个gmq定时器扫描 bucket 集合到期 job 的 lua 脚本:

-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do 
	if k%2~=0 then
		local jobKey = string.format('%s:%s', ARGV[3], jobId)
		local status = redis.call('hget', jobKey, 'status')
		-- 检验job状态
		if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
			-- 先移除集合中到期的job,然后到期的job返回给timer
			local isDel = redis.call('zrem', KEYS[1], jobId)
			if isDel == 1 then
				table.insert(res, jobId)
			end
		end
	end
end

local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
	nextTime = -1
else
	nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
	if nextTime < 0 then
		nextTime = 1
	end
end

table.insert(res,1,nextTime)
return res

7.4 redis 连接池

可能一般 phper 写业务很少会接触到连接池,其实这是由 php 本身所决定他应用不大,当然在 php 的扩展swoole还是很有用处的
gmq的 redis 连接池是使用gomodule/redigo/redis自带连接池,它带来的好处是限制 redis 连接数,通过复用 redis 连接来减少开销,另外可以防止 tcp 被消耗完,这在生产者大量生成数据时会很有用

// gmq/mq/redis.go
Redis = &RedisDB{
    Pool: &redis.Pool{
        MaxIdle:     30,    // 最大空闲链接
        MaxActive:   10000, // 最大链接
        IdleTimeout: 240 * time.Second, // 空闲链接超时
        Wait:        true, // 当连接池耗尽时,是否阻塞等待
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
            if err != nil {
                return nil, err
            }
            return c, nil
		},
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
            	return nil
            }
        	_, err := c.Do("PING")
        	return err
        },
    },
}

8. 注意问题

  • job.id 在job pool是唯一的,它将作为 redis 的缓存键;gmq不自动为 job 生成唯一 id 值是为了用户可以根据自己生成的 job.id 来追踪 job 情况,如果 job.id 是重复的,push 时会报重复 id 的错误
  • bucket 数量不是越多越好,一般来说,添加到ready queue的速度取决与 redis 性能,而不是 bucket 数量

9. 使用中可能出现的问题

9.1 客户端出现大量的 TIME_WAIT 状态,并且新的连接被拒绝

netstat -anp | grep 9503 | wc -l
tcp        0      0 10.8.8.188:41482        10.8.8.185:9503         TIME_WAIT   -                   

这个是正常现象,由 tcp 四次挥手可以知道,当接收到 LAST_ACK 发出的 FIN 后会处于TIME_WAIT状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到 ACK,会等待 2MSL 时间,这个时间是为了再次发送 ACK,例如被动关闭方可能因为接收不到 ACK 而重传 FIN;另外也是为了旧数据过期,不影响到下一个链接;如果要避免大量TIME_WAIT的连接导致 tcp 被耗尽,一般方法如下:

  • 使用长连接
  • 配置文件,网上很多教程,就是让系统尽快的回收TIME_WAIT状态的连接
  • 使用连接池,当连接池耗尽时,阻塞等待,直到回收再利用

10. 相关链接

展开阅读全文

代码

的 Gitee 指数为
超过 的项目

评论 (1)

加载中
打分: 力荐
挺不错的小组件!实用性和稳定性有待大家反馈
2020/10/12 17:06
回复
举报
更多评论
暂无内容
发表于云计算专区
2016/06/22 11:34

windows配置rocketmq开发环境(idea-eclipse)

一、windows下配置jdk及Rocketmq环境变量: vim /root/.bashrc #在该文件添加一下内容     # Set RocketMQ Environment ROCKETMQ_HOME=/home/MyRocketMQ-bin # 此处为上述步骤中的maven编译后文件目录代码 ROCKETMQ_CLASSPATH=$ROCKETMQ_HOME/lib # 此处为rocketmq运行所依赖的jar的classpath ROCKETMQ_PATH=$ROCKETMQ_HOME/bin # 此处为rocketmq运行bin目录,加入到可执行命令 NAMESRV_ADDR=127.0.0.1:9876 # Set Java En...

0
3
发表于云计算专区
2016/06/30 22:56

windows配置rocketmq开发环境(idea-eclipse)

一、windows下配置jdk及Rocketmq环境变量: vim /root/.bashrc #在该文件添加一下内容     # Set RocketMQ Environment ROCKETMQ_HOME=/home/MyRocketMQ-bin # 此处为上述步骤中的maven编译后文件目录代码 ROCKETMQ_CLASSPATH=$ROCKETMQ_HOME/lib # 此处为rocketmq运行所依赖的jar的classpath ROCKETMQ_PATH=$ROCKETMQ_HOME/bin # 此处为rocketmq运行bin目录,加入到可执行命令 NAMESRV_ADDR=127.0.0.1:9876 # Set Java En...

0
0
2020/09/07 16:59

sql用逗号连接多张表对应哪个join?

交叉连接(cross join) https://blog.csdn.net/chanmufeng/article/details/78234654 https://www.cnblogs.com/gmq-sh/p/7581150.html 24 = 4*6

0
0
发表了博客
2015/01/19 16:49

java读取Excel内容

原文地址:http://www.cnblogs.com/gmq/archive/0001/01/01/1521496.html 首先下载poi.jar,将jar包放到项目下,并将需要读取的excel文件(本例子是.xls格式的excel文件)放入根目录即可。 import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.text.DecimalFormat; import java.text.SimpleDateF...

0
0
01/04 00:35

Linux Crontab内环境变量与Shell环境变量的关系及解决问题的办法

Linux Crontab内环境变量与Shell环境变量的关系及解决问题的办法 参考文章: (1)Linux Crontab内环境变量与Shell环境变量的关系及解决问题的办法 (2)https://www.cnblogs.com/gmq-sh/p/6971588.html 备忘一下。

0
0
发表了博客
2015/10/09 12:39

javaweb之动态读取 excel,导入excel

因为项目需求用到excel导入功能,技术有限花了很多时间终于弄出来了。借鉴了别人的方法,分享给有需要的人。 之前在搜索后看了很多人的方法,但是和我实际的需求不符合。 借鉴 http://www.cnblogs.com/gmq/archive/0001/01/01/1521496.html 有什么问题或者改良 可以联系我qq1136533879 为什么不符合呢,都是采用遍历的方法,直接读取表格内容,不取表头,固定列数的。 File file = new File(path); getData(file, 0); //p...

1
0
2018/02/27 14:47

hive gateway功能介绍

官网介绍: Because the Hive service does not have worker roles, another mechanism is needed to enable the automatic propagation of client configurations to the other hosts in your cluster. Gateway roles fulfill this function. Gateways in fact aren't really roles and do not have state, but they act as indicators for where client configurations should be placed. Hive gateways are created by defau...

0
0
发表于服务端专区
2017/10/21 16:25

spring mvc 中文乱码

最近做项目,springmvc的url中文参数乱码: 请求url: http://localhost:8080/supply/supply_list.htm?productName=测试&isHomePage= 在后端: @RequestMapping(value = SupplyURL.SUPPLY_LIST, method = RequestMethod.GET) public String toSupplyList (Model model, String productName) { PageBean<Supply> pagination = supplyFacade.querySupplyByPage(searchSupplyVO); model.addAttribu...

0
0
没有更多内容
加载失败,请刷新页面
点击加载更多
加载中
下一页
暂无内容
1 评论
17 收藏
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部