WeiCN

Http Task Queue 异步列队服务搭建记录
最近在开发一套CRM系统,需要用到列队服务做推送,然后看到了有前辈已经造好了轮子 开源的任务队列服务HTQ 直接部...
扫描右侧二维码阅读全文
16
2018/11

Http Task Queue 异步列队服务搭建记录

最近在开发一套CRM系统,需要用到列队服务做推送,然后看到了有前辈已经造好了轮子 开源的任务队列服务HTQ 直接部署一下试试

首先我的服务器用的Centos7并没有装nodejs
下载源码编译一下 ps:编译时间好tm长

开始编译安装nodejs

wget https://nodejs.org/dist/v10.13.0/node-v10.13.0.tar.gz
tar zxvf node-v10.13.0.tar.gz
cd node-v10.13.0
./configure
make
make install

安装完毕后测试一下

[root@Centos htq]# node -v
v10.13.0
[root@Centos htq]# npm -v
6.4.1

ok

开始安装HTQ

执行命令:

npm install 

安装完毕后,首先修改一下config.json里面的token

然后执行以下命令启动:

node htq.js 

上面这种启动方式是临时运行的,关闭命令行窗口就会停止了。如果想一直在后台运行,则可:

nohup node htq.js > ~/htq.log 2>&1 &

如果想关闭退出,可运行:

killall -9 node 

下面就是在业务里面使用了,直接HTTP投递任务

守护进程

采用PM2来守护进程和开机自启

安装PM2

npm install pm2 -g

从PM2启动HTQ服务

pm2 start /home/htq/htq.js --name="HTQServer"

设置开机自启

pm2 startup

HTQ的使用文档

添加队列

请求URL:

  • http://server:5999/api/addQueue

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring队列名,英文+数字,不要出现中文
typestring类型。可以是"real_time"、"timing"、"variable" ,分别对应实时队列、定时队列、可变队列。如果是可变队列,则会对执行任务的返回结果有要求。请参考添加任务接口的备注说明部分
app_keystring请填写config.json里的app_key以供认证。初次安装HTQ可更改默认的值以提高安全性。更改后需要重启HTQ以生效
app_tokenstring请填写config.json里的app_token以供认证。初次安装HTQ可更改默认的值以提高安全性。更改后需要重启HTQ以生效
stepping_timeint步进时间,单位是秒。当队列类型是可变队列的时候请传此参数。可变队列的执行机制请参考下文的备注部分。了解执行机制能让你更清晰地知道该如何设置此值
max_time_intervalint最大时间间隔,当队列类型是可变队列的时候请传此参数。可变队列的执行机制请参考下文的备注部分 。了解执行机制能让你更清晰地知道该如何设置此值

成功返回示例

{ 
     "error_code" : 0, 
     "message" : "添加成功" 
 }

失败返回示例

{ 
     "error_code" : 1000, 
     "message" : "认证失败" 
 }

可变队列的执行机制

可变队列的某个任务进入队列后,其url会被触发访问。URL执行的页面若返回字符串“reset”【 即直接在url的页面打印“reset”,如命令 print(“reset”)】,则任务的执行时间间隔会重置。若返回字符串"done",则结束并删除当前任务.其他情况,URl都会被延迟下一次执行。延迟的时间计算公式是:执行次数 X 步进时间(stepping_time) 。其中,延迟时间最大值为max_time_interval。

所以,当URl没有被重置或者结束的时候,它会慢慢地增大下次执行时间,变得越来越慢,最后达到最大间隔时间max_time_interval后将一直保持这个执行时间间隔。直至得到重置命令或者结束命令.

添加任务到队列

请求URL:

  • http://server:5999/api/addTask

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring
urlstringhttps://cuojue.org/test?test=123&ff=22
app_keystring请填写config.json里的app_key以供认证。初次安装HTQ可更改默认的值以提高安全性。更改后需要重启HTQ以生效
app_tokenstring请填写config.json里的app_token以供认证。初次安装HTQ可更改默认的值以提高安全性。更改后需要重启HTQ以生效
execute_timestring执行时间(可选),如"2016-08-21 10:21:12" 。当队列类型为定时队列的时候请传此参数。

成功返回示例

{ 
     "error_code" : 0, 
     "message" : "添加成功" 
 }

失败返回示例

{ 
     "error_code" : 1000, 
     "message" : "认证失败" 
 }

备注说明

当队列是可变队列时(type ='variable' ),URL执行的页面若返回字符串"reset",则任务的执行时间间隔会重置。若返回字符串"done",则结束并删除当前任务.

获取所有队列

请求URL:

  • http://server:5999/api/allQueue

请求方式:

  • POST

参数:

参数名必选类型说明
app_keystring请填写config.json里的app_key以供认证。初次安装HTQ可更改默认的值以提高安全性。更改后需要重启HTQ以生效
app_tokenstring请填写config.json里的app_token以供认证

成功返回示例

[
    {
        "queue_name": "test",
        "attribute": {
            "type": "real_time",
            "stepping_time": "0",
            "max_time_interval": "0"
        }
    },
    {
        "queue_name": "test2",
        "attribute": {
            "type": "real_time",
            "stepping_time": "0",
            "max_time_interval": "0"
        }
    }
]

获取某个队列的任务数

请求URL:

  • http://server:5999/api/countQueue

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring
app_keystring
app_tokenstring

成功返回示例

{
    "error_code": 0,
    "count": 10
}

删除队列

请求URL:

  • http://server:5999/api/deleteQueue

请求方式:

  • POST

参数:

参数名必选类型说明
queue_namestring
app_keystring请填写config.json里的app_key以供认证
app_tokenstring请填写config.json里的app_token以供认证

成功返回示例

{ 
     "error_code" : 0, 
     "message" : "删除成功" 
 }

失败返回示例

{ 
     "error_code" : 1004, 
     "message" : "删除失败" 
 }

实现伪代码 PHP codeigniter

修改一下官方提供的SDK放到libraries
Htq.php

<?php
defined('BASEPATH') OR exit('No direct script access allowed');
/**
* 
*/
class Htq
{
    private $htq_addr;
    private $app_key;
    private $app_token;

    public function Connect($app_key , $app_token ,$htq_addr = 'http://127.0.0.1:5999')
    {
        $this->htq_addr = $htq_addr;
        $this->app_key = $app_key;
        $this->app_token = $app_token;
    }

    public function addQueue($queue_name,$type,$stepping_time = 0 , $max_time_interval = 0 ){
        $post_data = array(
            "app_key"=>$this->app_key,
            "app_token"=>$this->app_token,
            "queue_name"=>$queue_name,
            "type"=>$type,
            "stepping_time"=>$stepping_time,
            "max_time_interval"=>$max_time_interval,
            );
        $url = $this->htq_addr . '/api/addQueue';
        return $this->_post($url,$post_data);
    }

    public function deleteQueue($queue_name){
        $post_data = array(
            "app_key"=>$this->app_key,
            "app_token"=>$this->app_token,
            "queue_name"=>$queue_name,
            );
        $url = $this->htq_addr . '/api/deleteQueue';
        return $this->_post($url,$post_data);
    }

    public function allQueue(){
        $data = array(
            "app_key"=>$this->app_key,
            "app_token"=>$this->app_token,
            );
        $url = $this->htq_addr . '/api/allQueue';
        return $this->_post($url,$data);
    }

    public function countQueue($queue_name){
        $data = array(
            "app_key"=>$this->app_key,
            "app_token"=>$this->app_token,
            "queue_name"=>$queue_name,
            );
        $url = $this->htq_addr . '/api/countQueue';
        return $this->_post($url,$data);
    }

    public function addTask($queue_name , $url , $execute_time = 0 ){
        $post_data = array(
            "app_key"=>$this->app_key,
            "app_token"=>$this->app_token,
            "queue_name"=>$queue_name,
            "url"=>$url,
            "execute_time"=>$execute_time,
            );
        $url = $this->htq_addr . '/api/addTask';
        return $this->_post($url,$post_data);
    }


    //post数据
    protected function _post($url,$post_data){
        $query_data = http_build_query($post_data);
        $curl = curl_init();
        curl_setopt($curl, CURLOPT_URL, $url);
        curl_setopt($curl, CURLOPT_POST, 1 );
        curl_setopt($curl, CURLOPT_POSTFIELDS, $query_data);
        curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, FALSE);
        curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, FALSE);
        $response = curl_exec($curl);
        $result = json_decode($response,true);
        $error = curl_error($curl);
        return $error ? $error : $result;
    }

    //GET
    protected function _get($url,$query_data){
        $query_data = http_build_query($query_data);
        echo $url = $url."?".$query_data ;
        $curlObj = curl_init();    //初始化curl,
        curl_setopt($curlObj, CURLOPT_URL, $url);   //设置网址
        curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1);  //将curl_exec的结果返回
        curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);
        curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);   
        curl_setopt($curlObj, CURLOPT_HEADER, 0);         //是否输出返回头信息
        $response = curl_exec($curlObj);   //执行
        curl_close($curlObj);          //关闭会话
        return json_decode($response,true);
    }
}

业务model

//插入数据库通知
$msg['title'] = $title;
$msg['body'] = $body;
$msg['type'] = 1;
$msg['time'] = time();
$this->db->insert('push_msg', $msg);
$msgid = $this->db->insert_id();
//加入列队
$this->load->library('htq');
$this->load->helper('url');
$this->htq->Connect($this->system->system_cache['htq_config']['app_key'],$this->system->system_cache['htq_config']['app_token'],$this->system->system_cache['htq_config']['url']);
$this->htq->addQueue("push","real_time");
//查询所有推送人员信息
$_list = $this->db->select('push_channel,push_key')->where(array('lock'=>0,'role<='=>2))->get('admins');
while ($admin = $_list->unbuffered_row('array'))
{
    if(in_array($admin['push_channel'],array('bark','ftqq'))){
        //存在推送频道则加入推送列队
        $push_channel = $admin['push_channel'];
        $push_key = $admin['push_key'];
        $this->htq->addTask("push",site_url("api/task/push/{$push_channel}/{$push_key}/{$msgid}"));
    }
}

执行器

    public function task($type,$channel,$key,$id)
    {
        $id = intval($id);
        //列队任务执行
        if($type=='push'){
            //根据ID取出要推送的内容
            $msg = $this->db->select('title,body')->where('id',$id)->get('push_msg',1)->row_array();
            if($msg){
                //发送推送通知
                $this->load->model('push_model');
                $ret = $this->push_model->push($channel,$key,'CRM 通知:'.$msg['title'],$msg['body']);
                $this->echo(array('code'=>0,'push_msg'=>$ret));
            }
        }
    }

参考内容:
http://blog.star7th.com/2016/09/2114.html
https://github.com/star7th/htq
https://www.showdoc.cc/htq?page_id=38897
https://www.jianshu.com/p/7d3f3fa056e8
https://blog.csdn.net/m0_37792354/article/details/80906113

Last modification:November 17th, 2018 at 04:07 pm

Leave a Comment