Http Task Queue 异步列队服务搭建记录

最近在开发一套 CRM 系统,需要用到列队服务做推送,然后看到了有前辈已经造好了轮子 开源的任务队列服务 HTQ 直接部署一下试试

首先我的服务器用的 Centos7 并没有装 nodejs
下载源码编译一下 ps:编译时间好 tm 长

开始编译安装 nodejs

wget https://nodejs.org/dist/v10.13.0/node-v10.13.0.tar.gz
tar zxvf node-v10.13.0.tar.gz
cd node-v10.13.0
./configure
make
make install

安装完毕后测试一下

[root@Centos htq]# node -v
v10.13.0
[root@Centos htq]# npm -v
6.4.1

ok

开始安装 HTQ

执行命令:

1
npm install

安装完毕后,首先修改一下config.json里面的 token

然后执行以下命令启动:

1
node htq.js

上面这种启动方式是临时运行的,关闭命令行窗口就会停止了。如果想一直在后台运行,则可:

1
nohup node htq.js > ~/htq.log 2>&1 &

如果想关闭退出,可运行:

1
killall -9 node

下面就是在业务里面使用了,直接 HTTP 投递任务

守护进程

采用 PM2 来守护进程和开机自启

安装 PM2

1
npm install pm2 -g

从 PM2 启动 HTQ 服务

1
pm2 start /home/htq/htq.js --name="HTQServer"

设置开机自启

1
pm2 startup

HTQ 的使用文档

添加队列

请求 URL:

  • http://server:5999/api/addQueue

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring队列名,英文+数字,不要出现中文
typestring类型。可以是”real_time”、”timing”、”variable” ,分别对应实时队列、定时队列、可变队列。如果是可变队列,则会对执行任务的返回结果有要求。请参考添加任务接口的备注说明部分
app_keystring请填写 config.json 里的 app_key 以供认证。初次安装 HTQ 可更改默认的值以提高安全性。更改后需要重启 HTQ 以生效
app_tokenstring请填写 config.json 里的 app_token 以供认证。初次安装 HTQ 可更改默认的值以提高安全性。更改后需要重启 HTQ 以生效
stepping_timeint步进时间,单位是秒。当队列类型是可变队列的时候请传此参数。可变队列的执行机制请参考下文的备注部分。了解执行机制能让你更清晰地知道该如何设置此值
max_time_intervalint最大时间间隔,当队列类型是可变队列的时候请传此参数。可变队列的执行机制请参考下文的备注部分 。了解执行机制能让你更清晰地知道该如何设置此值

成功返回示例

1
2
3
4
5
{
"error_code" : 0,
"message" : "添加成功"
}

失败返回示例

1
2
3
4
5
{
"error_code" : 1000,
"message" : "认证失败"
}

可变队列的执行机制

可变队列的某个任务进入队列后,其 url 会被触发访问。URL 执行的页面若返回字符串“reset”【 即直接在 url 的页面打印“reset”,如命令 print(“reset”)】,则任务的执行时间间隔会重置。若返回字符串”done”,则结束并删除当前任务.其他情况,URl 都会被延迟下一次执行。延迟的时间计算公式是:执行次数 X 步进时间(stepping_time) 。其中,延迟时间最大值为 max_time_interval。

所以,当 URl 没有被重置或者结束的时候,它会慢慢地增大下次执行时间,变得越来越慢,最后达到最大间隔时间 max_time_interval 后将一直保持这个执行时间间隔。直至得到重置命令或者结束命令.

添加任务到队列

请求 URL:

  • http://server:5999/api/addTask

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring
urlstringhttps://cuojue.org/test?test=123&ff=22
app_keystring请填写 config.json 里的 app_key 以供认证。初次安装 HTQ 可更改默认的值以提高安全性。更改后需要重启 HTQ 以生效
app_tokenstring请填写 config.json 里的 app_token 以供认证。初次安装 HTQ 可更改默认的值以提高安全性。更改后需要重启 HTQ 以生效
execute_timestring执行时间(可选),如”2016-08-21 10:21:12” 。当队列类型为定时队列的时候请传此参数。

成功返回示例

1
2
3
4
5
{
"error_code" : 0,
"message" : "添加成功"
}

失败返回示例

1
2
3
4
5
{
"error_code" : 1000,
"message" : "认证失败"
}

备注说明

当队列是可变队列时(type =’variable’ ),URL 执行的页面若返回字符串”reset”,则任务的执行时间间隔会重置。若返回字符串”done”,则结束并删除当前任务.

获取所有队列

请求 URL:

  • http://server:5999/api/allQueue

请求方式:

  • POST

参数:

参数名必选类型说明
app_keystring请填写 config.json 里的 app_key 以供认证。初次安装 HTQ 可更改默认的值以提高安全性。更改后需要重启 HTQ 以生效
app_tokenstring请填写 config.json 里的 app_token 以供认证

成功返回示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[
{
"queue_name": "test",
"attribute": {
"type": "real_time",
"stepping_time": "0",
"max_time_interval": "0"
}
},
{
"queue_name": "test2",
"attribute": {
"type": "real_time",
"stepping_time": "0",
"max_time_interval": "0"
}
}
]

获取某个队列的任务数

请求 URL:

  • http://server:5999/api/countQueue

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring
app_keystring
app_tokenstring

成功返回示例

1
2
3
4
{
"error_code": 0,
"count": 10
}

删除队列

请求 URL:

  • http://server:5999/api/deleteQueue

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring
app_keystring请填写 config.json 里的 app_key 以供认证
app_tokenstring请填写 config.json 里的 app_token 以供认证

成功返回示例

1
2
3
4
5
{
"error_code" : 0,
"message" : "删除成功"
}

失败返回示例

1
2
3
4
5
{
"error_code" : 1004,
"message" : "删除失败"
}

实现伪代码 PHP codeigniter

修改一下官方提供的 SDK 放到 libraries
Htq.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
<?php
defined('BASEPATH') OR exit('No direct script access allowed');
/**
*
*/
class Htq
{
private $htq_addr;
private $app_key;
private $app_token;

public function Connect($app_key , $app_token ,$htq_addr = 'http://127.0.0.1:5999')
{
$this->htq_addr = $htq_addr;
$this->app_key = $app_key;
$this->app_token = $app_token;
}

public function addQueue($queue_name,$type,$stepping_time = 0 , $max_time_interval = 0 ){
$post_data = array(
"app_key"=>$this->app_key,
"app_token"=>$this->app_token,
"queue_name"=>$queue_name,
"type"=>$type,
"stepping_time"=>$stepping_time,
"max_time_interval"=>$max_time_interval,
);
$url = $this->htq_addr . '/api/addQueue';
return $this->_post($url,$post_data);
}

public function deleteQueue($queue_name){
$post_data = array(
"app_key"=>$this->app_key,
"app_token"=>$this->app_token,
"queue_name"=>$queue_name,
);
$url = $this->htq_addr . '/api/deleteQueue';
return $this->_post($url,$post_data);
}

public function allQueue(){
$data = array(
"app_key"=>$this->app_key,
"app_token"=>$this->app_token,
);
$url = $this->htq_addr . '/api/allQueue';
return $this->_post($url,$data);
}

public function countQueue($queue_name){
$data = array(
"app_key"=>$this->app_key,
"app_token"=>$this->app_token,
"queue_name"=>$queue_name,
);
$url = $this->htq_addr . '/api/countQueue';
return $this->_post($url,$data);
}

public function addTask($queue_name , $url , $execute_time = 0 ){
$post_data = array(
"app_key"=>$this->app_key,
"app_token"=>$this->app_token,
"queue_name"=>$queue_name,
"url"=>$url,
"execute_time"=>$execute_time,
);
$url = $this->htq_addr . '/api/addTask';
return $this->_post($url,$post_data);
}


//post数据
protected function _post($url,$post_data){
$query_data = http_build_query($post_data);
$curl = curl_init();
curl_setopt($curl, CURLOPT_URL, $url);
curl_setopt($curl, CURLOPT_POST, 1 );
curl_setopt($curl, CURLOPT_POSTFIELDS, $query_data);
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, FALSE);
curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, FALSE);
$response = curl_exec($curl);
$result = json_decode($response,true);
$error = curl_error($curl);
return $error ? $error : $result;
}

//GET
protected function _get($url,$query_data){
$query_data = http_build_query($query_data);
echo $url = $url."?".$query_data ;
$curlObj = curl_init(); //初始化curl,
curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址
curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回
curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);
curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);
curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息
$response = curl_exec($curlObj); //执行
curl_close($curlObj); //关闭会话
return json_decode($response,true);
}
}

业务 model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//插入数据库通知
$msg['title'] = $title;
$msg['body'] = $body;
$msg['type'] = 1;
$msg['time'] = time();
$this->db->insert('push_msg', $msg);
$msgid = $this->db->insert_id();
//加入列队
$this->load->library('htq');
$this->load->helper('url');
$this->htq->Connect($this->system->system_cache['htq_config']['app_key'],$this->system->system_cache['htq_config']['app_token'],$this->system->system_cache['htq_config']['url']);
$this->htq->addQueue("push","real_time");
//查询所有推送人员信息
$_list = $this->db->select('push_channel,push_key')->where(array('lock'=>0,'role<='=>2))->get('admins');
while ($admin = $_list->unbuffered_row('array'))
{
if(in_array($admin['push_channel'],array('bark','ftqq'))){
//存在推送频道则加入推送列队
$push_channel = $admin['push_channel'];
$push_key = $admin['push_key'];
$this->htq->addTask("push",site_url("api/task/push/{$push_channel}/{$push_key}/{$msgid}"));
}
}

执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public function task($type,$channel,$key,$id)
{
$id = intval($id);
//列队任务执行
if($type=='push'){
//根据ID取出要推送的内容
$msg = $this->db->select('title,body')->where('id',$id)->get('push_msg',1)->row_array();
if($msg){
//发送推送通知
$this->load->model('push_model');
$ret = $this->push_model->push($channel,$key,'CRM 通知:'.$msg['title'],$msg['body']);
$this->echo(array('code'=>0,'push_msg'=>$ret));
}
}
}

参考内容:
http://blog.star7th.com/2016/09/2114.html
https://github.com/star7th/htq
https://www.showdoc.cc/htq?page_id=38897
https://www.jianshu.com/p/7d3f3fa056e8
https://blog.csdn.net/m0_37792354/article/details/80906113


Http Task Queue 异步列队服务搭建记录
https://cuojue.org/read/87.html
作者
WeiCN
发布于
2018年11月16日
许可协议