合作伙伴工作台
注册

MQ消息推送

MQ消息推送是天翼物联网平台(AIoT)的消息队列服务,提供基于主题和消息缓存的可靠消息推送服务。北向应用可基于该服务按需获取设备上行消息。相比较HTTP消息推送,MQ消息推送服务提供消息缓存功能,可避免由于应用或网络故障导致的消息丢失。用户定制好消息主题后,配置相关数据源,MQ消息推送服务将会把相关设备上行消息转发至用户定制主题,北向应用基于平台提供的SDK,开发接收程序,获取设备消息。

后续MQ消息推送请点击菜单消息流转--目的地管理--MQ中配置,原在“MQ消息推送”中设置的数据源将暂时保留,仍能推送,但无法修改。若关闭数据源设置,将无法再次启用。请及时使用消息流转--目的地管理—MQ,点击查看操作方法

北向接收程序开发(Java)

依赖环境:JDK8、maven3.6及以上

1.下载SDK和Demo程序(右键->另存为),解压。 包中主要包括开发SDK(mq-msgpush-sdk-X.X.X.jar&mq-msgpush-sdk-pom.xml)、demo程序。

2.安装SDK到本地maven repository。

mvn install:install-file -Dfile=mq-msgpush-sdk-X.X.X.jar -DpomFile=mq-msgpush-sdk.pom

注意将上述指令中sdk版本号替换为相应版本;Windows系统下,使用CMD控制台,不要使用 Powershell。

3.参照demo程序中Demo.java进行开发。

MQ消息服务地址为:*.mq-msgpush.ctwing.cn: 16651

public class Demo {
    public static void main(String[] args) {
        String server = "*.mq-msgpush.ctwing.cn: 16651"; //消息服务地址,*为租户id
        String tenantId = "xxx";//租户ID
        String token = "xxx";//身份认证token串
        String certFilePath = ""; //直接填空字符串,CA证书,JDK已经内置相关根证书,无需指定

        //创建消息接收Listener
        IMsgListener msgListener = new IMsgListener() {

            @Override
            public void onMessage(String msg) {
                //接收消息
                System.out.println(msg);
                //消息处理...
                //为了提高效率,建议对消息进行异步处理(使用其它线程、发送到Kafka等)
            }
        };


        //创建消息接收类
        IMsgConsumer consumer = new MqMsgConsumer();
        try {
            //初始化
            /**
             * @param server  消息服务server地址
             * @param tenantId 租户Id
             * @param token    用户认证token
             * @param certFilePath 证书文件路径
             * @param topicNames   主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
             * @param msgListener 消息接收者
             * @return 是否初始化成功
             */
            consumer.init(server, tenantId, token, certFilePath, null, msgListener);

            //开始接收消息
            consumer.start();

            //程序退出时,停止接收、销毁
            //consumer.stop();
            //consumer.destroy();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("exit");
    }
}

北向接收程序开发(C#)

1.下载MQ消息推送C#.rar(右键->另存为),解压。包中主要包括动态链接库(MQDLL)、demo程序。

2.创建项目,引用下载的MQDLL动态数据库,并在NuGet中添加Pulsar.Client程序包,注:本示例引用的Pulsar.Client版本为2.6.2。

                                              


3.参照demo程序进行C#开发。注意要using MQDLL;

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQDLL;
 
namespace MQPushDemoC
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
 
        private void button1_Click(object sender, EventArgs e)
        {
            String server = "*.mq-msgpush.ctwing.cn: 16651"; //消息服务地址,*为租户id
            String tenantId = "";//租户ID
            String token = "";//MQ推送的用户认证token                                                                                                          
            String certFilePath = "";//证书文件路径,已内置,无需调整
            List<String> topicNames = new List<string>();
            topicNames.Add(""); //添加需要消费的主题
            IMsgConsumer consumer = new MqMsgConsumer();
            IMsgListener msgListener = new MsgListener();
 
            try
            {
                consumer.init(server, tenantId, token, certFilePath, topicNames, msgListener);
                consumer.start();
 
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            // 程序退出时,  停止接收、销毁
            // Stop();
            // Destroy();
        }
 
        class MsgListener : IMsgListener
        {
            public void onMessage(string msg)
            {
                Console.WriteLine(msg);
            }
        }
    }
}

北向接收程序开发(go)

1.下载SDK和Demo程序(右键->另存为),解压。包中主要包括开发SDK(com.ctiot.aep@0.0.1)、demo程序(mqClientTest)。

2.参照demo mqClientTest进行配置

配置go.mod,在其中加入依赖,按需修改sdk路径

replace com.ctiot.aep/mqClient => ../com.ctiot.aep@0.0.1/mqClient

require com.ctiot.aep/mqClient v0.0.1

3.参照demo程序中MqClientTest.go进行开发。

package main
 
import (
"fmt"
"log"
"time"
"com.ctiot.aep/mqClient"  //导入MQ消息接收辅助包
)
 
//消息接收回调
type MyMsgListener struct {
}
 
//收到消息后,会回调该方法,建议耗时操作异步处理
func (listener MyMsgListener) OnMessage(msg string) {
fmt.Println("receive:" + msg)
//对收到消息进行异步处理
}
 
func main() {
myListener := MyMsgListener{}  //回调接口
var tenantId string = "xxx" //租户id
var token string= "xxx"  //mq推送token
var serverAddr string = "*.mq-msgpush.ctwing.cn: 16651" //mq服务地址,*为租户id
var topics = []string{"xxx","xxx"} //mq消息topic,可多个
 
//启动MQ消息异步接收
go mqClient.Start(serverAddr,tenantId,token,topics,myListener)
 
for i := 0; i < 10000; i++ {
time.Sleep(10 * time.Second)
log.Println("sleep")
}

//停止MQ消息接收
mqClient.Stop()
log.Println("exit program")
}

北向接收程序开发(python)

开发环境:python版本:3.8.11,pulsar-client版本:2.7.0,CentOS 8 64 位 Linux。

1. 下载mq-msgpush-sdk-demo-python.rar(右键->另存为),解压。包中主要包括whl文件、demo程序。

2. 进入文件目录,打开终端输入以下命令:

pip install pulsar

pip install pulsar-client==2.7.0

之后安装whl文件,输入命令:

pip install mq_msgpush_sdk_python-0.1-py2.py3-none-any.whl

注意上述命令中.whl文件的名称需与所要安装的包名一致

3.参照demo程序中demo1.py进行开发。

MQ消息服务地址为:msgpush.ctwing.cn:16651。

import traceback
 
 
from sdk import IMsgListener1
from sdk import MqMsgConsumer1
import time
 
 
class Demo:
    class IMsgListenerObject(IMsgListener1.IMsgListener):
        def on_message(self, msg):
            print(msg)
 
    def __init__(self):
        self.topic_names: list = ['xxx']
        # 消息服务地址,*为租户id
        self.server = '*.mq-msgpush.ctwing.cn: 16651'
        self.tenant_id = 'xxx'
        # 身份认证token串
        self.token = 'xxxxx'
        # CA证书,JDK已经内置相关根证书,无需指定
        self.cert_file_path = ''
 
        # 创建消息接收Listener
        self.msg_listener = Demo.IMsgListenerObject()
 
    def main(self):
 
        try:
            # 初始
            '''
            @param server 消息服务server地址
            @param tenant_id 租户Id
            @param token 用户认证token
            @param certFilePath 证书文件路径
            @param topic_names 主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
            @param msg_listener 消息接收者
            @return 返回是否初始化成功
            '''
            MqMsgConsumer1.MqMsgConsumer.return_init(server=self.server, tenant_id=self.tenant_id, token=self.token,
                                                     cert_file_path=self.cert_file_path,
                                                     topic_names=self.topic_names, msg_listener=self.msg_listener)
            MqMsgConsumer1.MqMsgConsumer.start(topic_names=self.topic_names)
            # time.sleep(10)
            # MqMsgConsumer1.MqMsgConsumer.stop()
            # MqMsgConsumer1.MqMsgConsumer.destroy()
        except Exception:
            traceback.print_exc()
 
        print('exit')
 
 
if __name__ == '__main__':
    Demo().main()


北向接收程序开发(Nodejs)

开发环境:python版本:3.8.11,pulsar-client版本:2.7.0,CentOS 8 64 位 Linux。

1、 下载SDK和demo程序(右键->另存为),解压。包中主要包括demo程序,依赖模块(IMsgListenter.js,MqMsgConsumer.js)。

2、 创建项目,引用依赖模块,需要安装pulsar-client库 npm install pulsar-client。

3、 要求Node.js版本高于10.x。

3.参照demo程序进行开发。   

var http=require('http');
var sdk = require('./MqMsgConsumer');
var IMsgListener=require('./IMsgListener');
const { publicDecrypt } = require('crypto');
http.createServer(function(request, response){ //请求对象;响应对象
response.writeHead(200, {'Content-Type':'text/html; charset=utf-8'});
//http文件头,输出类型:html文本类型/ utf-8编码
if(request.url!=='/favicon.ico'){ //清除避免2次访问
console.log('开始访问');  //后台输出: 开始访问
var server=' *.mq-msgpush.ctwing.cn: 16651';// 消息服务地址,*为租户id
var tenantId = "";//租户ID
var token1 = "";// MQ推送的用户认证token
var topicNames=[];
topicNames.push("99"); //添加需要消费的主题
class MsgListener extends IMsgListener{
 onMessage(msg){
   console.log(msg);
 };
}
var msgListener=new MsgListener();
//初始化
sdk.init({
 token: token1,
 tenantId:tenantId,
 serviceUrl:server,
 topicNames:topicNames,
 msgListener:msgListener,
});
//开始消费
sdk.start();
//摧毁连接
//  sdk.destroy();

response.write('开始消费');
response.end(''); //必须带内容;至少引号('')空字符串
}
}).listen(8000);
console.log('Server running at http://127.0.0.1:8000/');


注意事项

1.接收程序启动,对相关定制主题进行一次消费后,该主题消息缓存才能生效。用户定制好主题后,应尽快启动接收程序,防止数据丢失。 2.目前每个租户最多能创建10个主题。


这篇文档是否帮助您解决了问题?
如果您愿意进一步帮助我们改进文档 ,请留下您的联系方式。