合作伙伴工作台
注册

MQ消息推送

MQ消息推送是天翼物联网平台(AIoT)的消息队列服务,提供基于主题和消息缓存的可靠消息推送服务。北向应用可基于该服务按需获取设备上行消息。相比较HTTP消息推送,MQ消息推送服务提供消息缓存功能,可避免由于应用或网络故障导致的消息丢失。用户定制好消息主题后,配置相关数据源,MQ消息推送服务将会把相关设备上行消息转发至用户定制主题,北向应用基于平台提供的SDK,开发接收程序,获取设备消息。

服务开通

从左侧导航栏选择“MQ消息推送”菜单,点击右侧主界面“立即开通”按钮。进入MQ消息推送页面。

主题定制

服务开通后,进入MQ消息推送主页。该页面显示租户ID以及MQ消息服务权限token,该token是北向应用获取消息的凭证,后续基于SDK开发接收程序时需要使用。点击“添加topic”按钮,填写topic名称和描述信息,可创建主题。主页下方显示用户已经定制的topic列表。

配置数据源

从topic列表选择topic,点击操作列中“数据源”图标,进入“配置数据源”操作界面。在该界面可按产品和消息类型配置需要转发到该topic的上行消息。 数据源选择“全部信息”表示该租户下的所有类型的消息都推送到该主题。数据源选择“按规则选择”则可以按需求选择所要配置的产品以及对应的消息类型。 该界面右上角为配置数据源的开关,可以通过单击来打开或关闭,本页面无保存确认按钮,勾选后会立即保存配置到服务端。

北向接收程序开发(Java)

依赖环境:JDK8、maven3.6及以上

1.下载SDK和Demo程序(右键->另存为),解压。 包中主要包括开发SDK(mq-msgpush-sdk-X.X.X.jar&mq-msgpush-sdk-pom.xml)、demo程序。

2.安装SDK到本地maven repository。

mvn install:install-file -Dfile=mq-msgpush-sdk-X.X.X.jar -DpomFile=mq-msgpush-sdk.pom

注意将上述指令中sdk版本号替换为相应版本;Windows系统下,使用CMD控制台,不要使用 Powershell。

3.参照demo程序中Demo.java进行开发。

MQ消息服务地址为:msgpush.ctwing.cn:16651

public class Demo {
    public static void main(String[] args) {
        String server = "msgpush.ctwing.cn:16651"; //消息服务地址
        String tenantId = "xxx";//租户ID
        String token = "xxx";//身份认证token串
        String certFilePath = ""; //直接填空字符串,CA证书,JDK已经内置相关根证书,无需指定

        //创建消息接收Listener
        IMsgListener msgListener = new IMsgListener() {

            @Override
            public void onMessage(String msg) {
                //接收消息
                System.out.println(msg);
                //消息处理...
                //为了提高效率,建议对消息进行异步处理(使用其它线程、发送到Kafka等)
            }
        };


        //创建消息接收类
        IMsgConsumer consumer = new MqMsgConsumer();
        try {
            //初始化
            /**
             * @param server  消息服务server地址
             * @param tenantId 租户Id
             * @param token    用户认证token
             * @param certFilePath 证书文件路径
             * @param topicNames   主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
             * @param msgListener 消息接收者
             * @return 是否初始化成功
             */
            consumer.init(server, tenantId, token, certFilePath, null, msgListener);

            //开始接收消息
            consumer.start();

            //程序退出时,停止接收、销毁
            //consumer.stop();
            //consumer.destroy();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("exit");
    }
}

北向接收程序开发(C#)

1.下载MQ消息推送C#.rar(右键->另存为),解压。包中主要包括动态链接库(MQDLL)、demo程序。

2.创建项目,引用下载的MQDLL动态数据库,并在NuGet中添加Pulsar.Client程序包,注:本示例引用的Pulsar.Client版本为2.6.2。

                                              


3.参照demo程序进行C#开发。注意要using MQDLL;

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQDLL;
 
namespace MQPushDemoC
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
 
        private void button1_Click(object sender, EventArgs e)
        {
            String server = "msgpush.ctwing.cn:16651"; //消息服务server地址
            String tenantId = "";//租户ID
            String token = "";//MQ推送的用户认证token                                                                                                          
            String certFilePath = "";//证书文件路径,已内置,无需调整
            List<String> topicNames = new List<string>();
            topicNames.Add(""); //添加需要消费的主题
            IMsgConsumer consumer = new MqMsgConsumer();
            IMsgListener msgListener = new MsgListener();
 
            try
            {
                consumer.init(server, tenantId, token, certFilePath, topicNames, msgListener);
                consumer.start();
 
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            // 程序退出时,  停止接收、销毁
            // Stop();
            // Destroy();
        }
 
        class MsgListener : IMsgListener
        {
            public void onMessage(string msg)
            {
                Console.WriteLine(msg);
            }
        }
    }
}

北向接收程序开发(go)

1.下载SDK和Demo程序(右键->另存为),解压。包中主要包括开发SDK(com.ctiot.aep@0.0.1)、demo程序(mqClientTest)。

2.参照demo mqClientTest进行配置

配置go.mod,在其中加入依赖,按需修改sdk路径

replace com.ctiot.aep/mqClient => ../com.ctiot.aep@0.0.1/mqClient

require com.ctiot.aep/mqClient v0.0.1

3.参照demo程序中MqClientTest.go进行开发。

package main
 
import (
"fmt"
"log"
"time"
"com.ctiot.aep/mqClient"  //导入MQ消息接收辅助包
)
 
//消息接收回调
type MyMsgListener struct {
}
 
//收到消息后,会回调该方法,建议耗时操作异步处理
func (listener MyMsgListener) OnMessage(msg string) {
fmt.Println("receive:" + msg)
//对收到消息进行异步处理
}
 
func main() {
myListener := MyMsgListener{}  //回调接口
var tenantId string = "xxx" //租户id
var token string= "xxx"  //mq推送token
var serverAddr string = "msgpush.ctwing.cn:16651" //mq服务地址
var topics = []string{"xxx","xxx"} //mq消息topic,可多个
 
//启动MQ消息异步接收
go mqClient.Start(serverAddr,tenantId,token,topics,myListener)
 
for i := 0; i < 10000; i++ {
time.Sleep(10 * time.Second)
log.Println("sleep")
}

//停止MQ消息接收
mqClient.Stop()
log.Println("exit program")
}

北向接收程序开发(python)

开发环境:python版本:3.8.11,pulsar-client版本:2.7.0,CentOS 8 64 位 Linux。

1. 下载mq-msgpush-sdk-demo-python.rar(右键->另存为),解压。包中主要包括whl文件、demo程序。

2. 进入文件目录,打开终端输入以下命令:

pip install pulsar

pip install pulsar-client==2.7.0

之后安装whl文件,输入命令:

pip install mq_msgpush_sdk_python-0.1-py2.py3-none-any.whl

注意上述命令中.whl文件的名称需与所要安装的包名一致

3.参照demo程序中demo1.py进行开发。

MQ消息服务地址为:msgpush.ctwing.cn:16651。

import traceback
 
 
from sdk import IMsgListener1
from sdk import MqMsgConsumer1
import time
 
 
class Demo:
    class IMsgListenerObject(IMsgListener1.IMsgListener):
        def on_message(self, msg):
            print(msg)
 
    def __init__(self):
        self.topic_names: list = ['xxx']
        # 消息服务地址
        self.server = 'msgpush.ctwing.cn:16651'
        self.tenant_id = 'xxx'
        # 身份认证token串
        self.token = 'xxxxx'
        # CA证书,JDK已经内置相关根证书,无需指定
        self.cert_file_path = ''
 
        # 创建消息接收Listener
        self.msg_listener = Demo.IMsgListenerObject()
 
    def main(self):
 
        try:
            # 初始
            '''
            @param server 消息服务server地址
            @param tenant_id 租户Id
            @param token 用户认证token
            @param certFilePath 证书文件路径
            @param topic_names 主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
            @param msg_listener 消息接收者
            @return 返回是否初始化成功
            '''
            MqMsgConsumer1.MqMsgConsumer.return_init(server=self.server, tenant_id=self.tenant_id, token=self.token,
                                                     cert_file_path=self.cert_file_path,
                                                     topic_names=self.topic_names, msg_listener=self.msg_listener)
            MqMsgConsumer1.MqMsgConsumer.start(topic_names=self.topic_names)
            # time.sleep(10)
            # MqMsgConsumer1.MqMsgConsumer.stop()
            # MqMsgConsumer1.MqMsgConsumer.destroy()
        except Exception:
            traceback.print_exc()
 
        print('exit')
 
 
if __name__ == '__main__':
    Demo().main()



 


注意事项

1.接收程序启动,对相关定制主题进行一次消费后,该主题消息缓存才能生效。用户定制好主题后,应尽快启动接收程序,防止数据丢失。 2.目前每个租户最多能创建10个主题。


这篇文档是否帮助您解决了问题?
如果您愿意进一步帮助我们改进文档 ,请留下您的联系方式。