MQ消息推送
MQ消息推送是天翼物联网平台(AIoT)的消息队列服务,提供基于主题和消息缓存的可靠消息推送服务。北向应用可基于该服务按需获取设备上行消息。相比较HTTP消息推送,MQ消息推送服务提供消息缓存功能,可避免由于应用或网络故障导致的消息丢失。用户定制好消息主题后,配置相关数据源,MQ消息推送服务将会把相关设备上行消息转发至用户定制主题,北向应用基于平台提供的SDK,开发接收程序,获取设备消息。
后续MQ消息推送请点击菜单消息流转--目的地管理--MQ中配置,原在“MQ消息推送”中设置的数据源将暂时保留,仍能推送,但无法修改。若关闭数据源设置,将无法再次启用。请及时使用消息流转--目的地管理—MQ,点击查看操作方法
北向接收程序开发(Java)
依赖环境:JDK8、maven3.6及以上
1.下载SDK和Demo程序(右键->另存为),解压。 包中主要包括开发SDK(mq-msgpush-sdk-X.X.X.jar&mq-msgpush-sdk-pom.xml)、demo程序。
2.安装SDK到本地maven repository。
mvn install:install-file -Dfile=mq-msgpush-sdk-X.X.X.jar -DpomFile=mq-msgpush-sdk.pom
注意将上述指令中sdk版本号替换为相应版本;Windows系统下,使用CMD控制台,不要使用 Powershell。
3.参照demo程序中Demo.java进行开发。
MQ消息服务地址为:*.mq-msgpush.ctwing.cn: 16651。
public class Demo {
public static void main(String[] args) {
String server = "*.mq-msgpush.ctwing.cn: 16651"; //消息服务地址,*为租户id
String tenantId = "xxx";//租户ID
String token = "xxx";//身份认证token串
String certFilePath = ""; //直接填空字符串,CA证书,JDK已经内置相关根证书,无需指定
//创建消息接收Listener
IMsgListener msgListener = new IMsgListener() {
@Override
public void onMessage(String msg) {
//接收消息
System.out.println(msg);
//消息处理...
//为了提高效率,建议对消息进行异步处理(使用其它线程、发送到Kafka等)
}
};
//创建消息接收类
IMsgConsumer consumer = new MqMsgConsumer();
try {
//初始化
/**
* @param server 消息服务server地址
* @param tenantId 租户Id
* @param token 用户认证token
* @param certFilePath 证书文件路径
* @param topicNames 主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
* @param msgListener 消息接收者
* @return 是否初始化成功
*/
consumer.init(server, tenantId, token, certFilePath, null, msgListener);
//开始接收消息
consumer.start();
//程序退出时,停止接收、销毁
//consumer.stop();
//consumer.destroy();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("exit");
}
}
北向接收程序开发(C#)
1.下载MQ消息推送C#.rar(右键->另存为),解压。包中主要包括动态链接库(MQDLL)、demo程序。
2.创建项目,引用下载的MQDLL动态数据库,并在NuGet中添加Pulsar.Client程序包,注:本示例引用的Pulsar.Client版本为2.6.2。
3.参照demo程序进行C#开发。注意要using MQDLL;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQDLL;
namespace MQPushDemoC
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
String server = "*.mq-msgpush.ctwing.cn: 16651"; //消息服务地址,*为租户id
String tenantId = "";//租户ID
String token = "";//MQ推送的用户认证token
String certFilePath = "";//证书文件路径,已内置,无需调整
List<String> topicNames = new List<string>();
topicNames.Add(""); //添加需要消费的主题
IMsgConsumer consumer = new MqMsgConsumer();
IMsgListener msgListener = new MsgListener();
try
{
consumer.init(server, tenantId, token, certFilePath, topicNames, msgListener);
consumer.start();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
// 程序退出时, 停止接收、销毁
// Stop();
// Destroy();
}
class MsgListener : IMsgListener
{
public void onMessage(string msg)
{
Console.WriteLine(msg);
}
}
}
}
北向接收程序开发(go)
1.下载SDK和Demo程序(右键->另存为),解压。包中主要包括开发SDK(com.ctiot.aep@0.0.1)、demo程序(mqClientTest)。
2.参照demo mqClientTest进行配置
配置go.mod,在其中加入依赖,按需修改sdk路径
replace com.ctiot.aep/mqClient => ../com.ctiot.aep@0.0.1/mqClient
require com.ctiot.aep/mqClient v0.0.1
3.参照demo程序中MqClientTest.go进行开发。
package main
import (
"fmt"
"log"
"time"
"com.ctiot.aep/mqClient" //导入MQ消息接收辅助包
)
//消息接收回调
type MyMsgListener struct {
}
//收到消息后,会回调该方法,建议耗时操作异步处理
func (listener MyMsgListener) OnMessage(msg string) {
fmt.Println("receive:" + msg)
//对收到消息进行异步处理
}
func main() {
myListener := MyMsgListener{} //回调接口
var tenantId string = "xxx" //租户id
var token string= "xxx" //mq推送token
var serverAddr string = "*.mq-msgpush.ctwing.cn: 16651" //mq服务地址,*为租户id
var topics = []string{"xxx","xxx"} //mq消息topic,可多个
//启动MQ消息异步接收
go mqClient.Start(serverAddr,tenantId,token,topics,myListener)
for i := 0; i < 10000; i++ {
time.Sleep(10 * time.Second)
log.Println("sleep")
}
//停止MQ消息接收
mqClient.Stop()
log.Println("exit program")
}
北向接收程序开发(python)
开发环境:python版本:3.8.11,pulsar-client版本:2.7.0,CentOS 8 64 位 Linux。
1. 下载mq-msgpush-sdk-demo-python.rar(右键->另存为),解压。包中主要包括whl文件、demo程序。
2. 进入文件目录,打开终端输入以下命令:
pip install pulsar
pip install pulsar-client==2.7.0
之后安装whl文件,输入命令:
pip install mq_msgpush_sdk_python-0.1-py2.py3-none-any.whl
注意上述命令中.whl文件的名称需与所要安装的包名一致
3.参照demo程序中demo1.py进行开发。
MQ消息服务地址为:msgpush.ctwing.cn:16651。
import traceback
from sdk import IMsgListener1
from sdk import MqMsgConsumer1
import time
class Demo:
class IMsgListenerObject(IMsgListener1.IMsgListener):
def on_message(self, msg):
print(msg)
def __init__(self):
self.topic_names: list = ['xxx']
# 消息服务地址,*为租户id
self.server = '*.mq-msgpush.ctwing.cn: 16651'
self.tenant_id = 'xxx'
# 身份认证token串
self.token = 'xxxxx'
# CA证书,JDK已经内置相关根证书,无需指定
self.cert_file_path = ''
# 创建消息接收Listener
self.msg_listener = Demo.IMsgListenerObject()
def main(self):
try:
# 初始
'''
@param server 消息服务server地址
@param tenant_id 租户Id
@param token 用户认证token
@param certFilePath 证书文件路径
@param topic_names 主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
@param msg_listener 消息接收者
@return 返回是否初始化成功
'''
MqMsgConsumer1.MqMsgConsumer.return_init(server=self.server, tenant_id=self.tenant_id, token=self.token,
cert_file_path=self.cert_file_path,
topic_names=self.topic_names, msg_listener=self.msg_listener)
MqMsgConsumer1.MqMsgConsumer.start(topic_names=self.topic_names)
# time.sleep(10)
# MqMsgConsumer1.MqMsgConsumer.stop()
# MqMsgConsumer1.MqMsgConsumer.destroy()
except Exception:
traceback.print_exc()
print('exit')
if __name__ == '__main__':
Demo().main()
北向接收程序开发(Nodejs)
开发环境:python版本:3.8.11,pulsar-client版本:2.7.0,CentOS 8 64 位 Linux。
1、 下载SDK和demo程序(右键->另存为),解压。包中主要包括demo程序,依赖模块(IMsgListenter.js,MqMsgConsumer.js)。
2、 创建项目,引用依赖模块,需要安装pulsar-client库 npm install pulsar-client。
3、 要求Node.js版本高于10.x。
3.参照demo程序进行开发。
var http=require('http');
var sdk = require('./MqMsgConsumer');
var IMsgListener=require('./IMsgListener');
const { publicDecrypt } = require('crypto');
http.createServer(function(request, response){ //请求对象;响应对象
response.writeHead(200, {'Content-Type':'text/html; charset=utf-8'});
//http文件头,输出类型:html文本类型/ utf-8编码
if(request.url!=='/favicon.ico'){ //清除避免2次访问
console.log('开始访问'); //后台输出: 开始访问
var server=' *.mq-msgpush.ctwing.cn: 16651';// 消息服务地址,*为租户id
var tenantId = "";//租户ID
var token1 = "";// MQ推送的用户认证token
var topicNames=[];
topicNames.push("99"); //添加需要消费的主题
class MsgListener extends IMsgListener{
onMessage(msg){
console.log(msg);
};
}
var msgListener=new MsgListener();
//初始化
sdk.init({
token: token1,
tenantId:tenantId,
serviceUrl:server,
topicNames:topicNames,
msgListener:msgListener,
});
//开始消费
sdk.start();
//摧毁连接
// sdk.destroy();
response.write('开始消费');
response.end(''); //必须带内容;至少引号('')空字符串
}
}).listen(8000);
console.log('Server running at http://127.0.0.1:8000/');
注意事项
1.接收程序启动,对相关定制主题进行一次消费后,该主题消息缓存才能生效。用户定制好主题后,应尽快启动接收程序,防止数据丢失。 2.目前每个租户最多能创建10个主题。