使用场景
整体架构

主要功能及产品组件
注: 本手册中用到的中科时代的软件包,均可以从官网的子页面获取。官网提供的版本可能比本手册中提到的版本更高,一般情况下这不会影响您按照本手册的例子执行相应的操作
| 产品组件 | 描述说明 | 
|---|---|
| SF4202_JsonDom_1.0.6_adm64.deb | JsonDom RTE组件 | 
| SF4202_JsonDom_1.0.1.1.library | 上位机程序使用的库文件 | 
| 产品组件 | 描述说明 | 
|---|---|
| SF4202_iotmqtt_1.0.8_amd64.deb | MQTT RTE 组件 | 
| SF4202_IoTMqtt_1.0.6.0.library | 上位机程序使用的库文件 | 
工智机端安装JsonDom RTE组件
打开Device Manager软件;
安装ACP服务(以工智机(192.168.105.54)为例),选择“在线安装”;

点击“扫描”,扫描到需要使用的工智机;

如果扫描不到工智机设备,可以输入工智机的IP地址手动添加;

选择工智机(192.168.105.54),点击“本地”,进入工智机组件管理页面;

进入工智机设备管理器页面,点击“安装RTE插件服务”

输入用户名和密码,点击“在线安装”

进入工智机设备管理器页面,点击“软件”下拉选择“组件管理”,进入如下页面;

点击“浏览”,选择“jsondom”查看版本进行安装;

传输完成后,弹窗选择确定安装;安装完成后,选择确认重启组件;
安装完成后,本地新增1.0.6版本的“jsondom”;

IDE侧部署JsonDom的library
MetaFacture中点击最上面的菜单栏 ”工具“ -- ”库存储“ ;

弹出的对话窗中点击”安装“ -- 选中SF4402_JsonDom_1.0.1.1.library -- 点击“打开”;


工程中双击“库管理器”  -- “添加库”  -- 双击“SF4402_JsonDom” ,加载库完成;

工智机端安装MQTT RTE组件
本例实验中会用到MQTT通信,所以需要安装MQTT RTE组件和库文件
打开Device Manager软件;
点击“扫描”,扫描到需要使用的工智机;

如果扫描不到工智机设备,可以输入工智机的IP地址手动添加;

选择工智机(192.168.105.54),点击“本地”,进入工智机组件管理页面;

进入工智机设备管理器页面,点击“软件”下拉选择“组件管理”,进入如下页面;

点击“浏览”,选择“mqtt”查看版本进行安装;

传输完成后,弹窗选择确定安装;安装完成后,选择确认重启组件;
安装完成后,本地新增1.0.6版本的“iotmqtt”;

IDE侧部署MQTT的library
MetaFacture中点击最上面的菜单栏“工具” -- “库存储” ;

弹出的对话窗中点击”安装“ -- 选中SF4202_IotMqtt_1.0.6.0.library -- 点击“打开”;


工程中双击“库管理器”  -- “添加库”  -- 双击“SF4601_IotMqtt” ,加载库完成;

升级工智机JsonDom RTE组件
打开Device Manager,选择工智机(192.168.105.54),进入组件管理页面;
点击“jsondom”组件,选择需要更新的版本,点击更新;

升级IDE侧JsonDom的library
MetaFacture中点击最上面的菜单栏 ”工具“ -- ”库存储“ ;

弹出的对话窗中点击”安装“ -- 选中SF4601_JsonDom_1.0.1.0.library -- 点击“打开”;


工程中双击“库管理器”  -- “添加库”  -- 双击“SF4601_JsonDom” ,加载库完成;

卸载工智机JsonDom RTE组件

4.2. 卸载IDE侧的JsonDom library
MetaFacture界面点击“工具” -- “库存储”;

对话框中选中安装的的SF4601_JsonDom库,点击“卸载”

本例软、硬件配置
硬件:
1.SX21工智机
2.Win11 PC
软件:
1.MetaFacutre V1.0.7.1
2.Spyder(Pyhton编译软件)
3.EMQX Broker
本例实验说明
结构体数据转为Json格式数据的实验步骤如下:
TYPE ST_SampleData :
STRUCT
	//基本数据类型
	bEnabled : BOOL;
    nCounter : INT;
    fTemperature : REAL;
    sName : STRING(20);
	
	//一维数组
    arrValues : ARRAY[0..2] OF DINT;
	
	//二维数组
    arrMatrix : ARRAY[0..1, 0..2] OF REAL;
	
	//时间类型
    dtTimestamp : DATE_AND_TIME;
    tDuration : TIME;
	
	 // 嵌套结构体
    stSubItem : ST_SubStruct;
    
    // 结构体数组
    arrStructs : ARRAY[0..1] OF ST_SubStruct;
END_STRUCT
END_TYPE
        TYPE ST_SubStruct :
STRUCT
    bStatus : BOOL;
    nValue : INT;
    sDescription : STRING(15);
END_STRUCT
END_TYPE
        stData : ST_SampleData; //存放测试数据
        // 初始化结构体数据
    stData.bEnabled := TRUE;
    stData.nCounter := 42;
    stData.fTemperature := 23.5;
    stData.sName := 'SampleDevice';
    
    // 一维数组
    stData.arrValues[0] := 10;
    stData.arrValues[1] := 20;
    stData.arrValues[2] := 30;
    
    // 二维数组
    stData.arrMatrix[0,0] := 1.1;
    stData.arrMatrix[0,1] := 1.2;
    stData.arrMatrix[0,2] := 1.3;
    stData.arrMatrix[1,0] := 2.1;
    stData.arrMatrix[1,1] := 2.2;
    stData.arrMatrix[1,2] := 2.3;
    
    // 时间类型
    stData.dtTimestamp := DT#2023-05-15-14:30:00;
    stData.tDuration := T#1H2M3S;
    
    // 嵌套结构体
    stData.stSubItem.bStatus := FALSE;
    stData.stSubItem.nValue := 99;
    stData.stSubItem.sDescription := 'Nested item';
    
    // 结构体数组
    stData.arrStructs[0].bStatus := TRUE;
    stData.arrStructs[0].nValue := 100;
    stData.arrStructs[0].sDescription := 'Array item 1';
    
    stData.arrStructs[1].bStatus := FALSE;
    stData.arrStructs[1].nValue := 200;
    stData.arrStructs[1].sDescription := 'Array item 2';
        sJsonBuffer : STRING(10000);   // JSON缓冲区
fbJsonDom : MetaCore_JsonDom.FB_JsonSaxWriter;
fbJsonDataType : MetaCore_JsonDom.FB_JsonReadWriteDataType;
        fbJsonDom.ResetDocument();
 fbJsonDataType.AddJsonKeyValueFromSymbol(
        fbJsonDom, 
        'data', 
        'ST_SampleData', 
        SIZEOF(stData), 
        ADR(stData)
);
nJsonLength := fbJsonDom.GetDocumentLength(); // 获取JSON文档长度
fbJsonDom.CopyDocument(sJsonBuffer, nJsonLength);
        右键设备树的“Application” ——>添加对象——>符号配置
勾选存放结构体数据和Json文本数据的变量


Json格式数据转换为结构体数据的实验步骤如下:
data : ST_SampleData; // 用于接收解析后的数据
        fbJsonDataType : MetaCore_JsonDom.FB_JsonReadWriteDataType;
         // 测试JSON转回结构体
    fbJsonDataType.SetSymbolFromJson(
        sJsonBuffer,
        SIZEOF(sJsonBuffer),  
        ADR(data),
        SIZEOF(data)
    );
        
将得到的Json数据通过MQTT通信传输并使用python获取数据的实验步骤如下:
// MQTT客户端
    Client : MetaCore_IoTMqtt.FB_IotMqttClient;
    MessageQueue : MetaCore_IoTMqtt.FB_IotMqttMessageQueue;
    Message : MetaCore_IoTMqtt.FB_IotMqttMessage;
    sWillingMessage : MetaCore_IoTMqtt.ST_IotMqttWill;
WillingPayload : STRING := 'PLC offline';
// 控制标志
run : BOOL := TRUE;
bPublish : BOOL := FALSE;  // 手动发布触发标志
bDataPublished : BOOL := FALSE;  //是否发布标志
bConnect : BOOL := FALSE;        // 手动连接触发标志
         // MQTT连接管理
Client(
      sClientId := 'PLC_Client',
      sHostName := '192.168.105.16',
      nHostPort := 1883,
      nKeepAlive := 5,
      stWill := sWillingMessage,
      ipMessageQueue := ADR(MessageQueue),
      bError => ,
      hrErrorCode => ,
      eConnectionState => ,
      bConnected => 
);
Client.Execute(bCo
// 发布消息
IF Client.bConnected AND bPublish THEN
	    Client.Publish(
        'plc/data',
        ADR(sJsonBuffer),
        nJsonLength,  // 使用实际JSON长度
        2,            // QoS=2确保可靠传输
        FALSE,
        FALSE
    );
    
// 发布后重置触发标志
bPublish := FALSE; 
bDataPublished := TRUE;
END_IF
        import paho.mqtt.client as mqtt
import json
from datetime import datetime
def on_connect(client, userdata, flags, rc):
    print(f" Connected to MQTT broker with code: {rc}")
    client.subscribe("plc/data")
def parse_duration(duration_str):
    """解析CODESYS的TIME格式(如T1H2M3S)"""
    if not isinstance(duration_str, str) or not duration_str.startswith('T'):
        return None
    
    hours = minutes = seconds = 0
    time_str = duration_str[1:]  # 去掉'T'
    
    if 'H' in time_str:
        hours = int(time_str.split('H')[0])
        time_str = time_str.split('H')[1]
    if 'M' in time_str:
        minutes = int(time_str.split('M')[0])
        time_str = time_str.split('M')[1]
    if 'S' in time_str:
        seconds = int(time_str.split('S')[0])
    
    return {
        'hours': hours,
        'minutes': minutes,
        'seconds': seconds,
        'total_seconds': hours*3600 + minutes*60 + seconds
    }
def on_message(client, userdata, msg):
    try:
        # 原始数据检查
        raw_data = msg.payload.decode('utf-8')
        print(f"\n Raw message length: {len(raw_data)} bytes")
        
        # 调试:打印前200个字符(避免日志过长)
        print(f" Sample data (first 200 chars):\n{raw_data[:200]}{'...' if len(raw_data)>200 else ''}")
        try:
            data = json.loads(raw_data)
        except json.JSONDecodeError as e:
            print(f" JSON解析失败!错误位置:{e.pos},可能原因:数据截断或格式错误")
            print(f" 错误附近的上下文:\n{raw_data[max(0, e.pos-50):e.pos+50]}")
            return
        # 美化打印完整JSON
        print("\n 完整JSON数据结构:")
        print(json.dumps(data, indent=2, ensure_ascii=False))
        # 提取数据字段
        plc_data = data.get('data', {})
        
        # 基本字段
        print("\n 基本数据:")
        print(f"• 设备名称: {plc_data.get('sName', 'N/A')}")
        print(f"• 使能状态: {'ON' if plc_data.get('bEnabled') else 'OFF'}")
        print(f"• 计数器值: {plc_data.get('nCounter', 0)}")
        print(f"• 温度: {plc_data.get('fTemperature', 0.0):.2f}°C")
        
        # 数组数据
        print("\n 数组数据:")
        print(f"• 一维数组: {plc_data.get('arrValues', [])}")
        print(f"• 二维数组: {plc_data.get('arrMatrix', [])}")
        # 智能时间处理
        print("\n 时间数据:")
        timestamp = plc_data.get('dtTimestamp')
        if timestamp:
            if isinstance(timestamp, (int, float)):  # Unix时间戳
                dt = datetime.fromtimestamp(timestamp)
                print(f"• 时间戳 (Unix): {timestamp} → {dt.isoformat()}")
            elif isinstance(timestamp, str):  # ISO格式
                try:
                    dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")
                    print(f"• 时间戳 (ISO): {dt.isoformat()}")
                except ValueError:
                    print(f"• 无法解析的时间格式: {timestamp}")
        
        # 持续时间处理
        duration = plc_data.get('tDuration')
        if duration:
            if isinstance(duration, str):
                parsed = parse_duration(duration)
                if parsed:
                    print(f"• 持续时间: {duration}")
                    print(f"  → 合计: {parsed['total_seconds']}秒 (H:M:S = {parsed['hours']}:{parsed['minutes']}:{parsed['seconds']})")
                else:
                    print(f"• 非标准持续时间格式: {duration}")
            elif isinstance(duration, (int, float)):
                print(f"• 持续时间 (秒): {duration}")
        
        # 结构体数据
        print("\n 结构体数据:")
        if 'stSubItem' in plc_data:
            sub = plc_data['stSubItem']
            print(f"• 子结构体: Status={sub.get('bStatus')}, Value={sub.get('nValue')}, Desc='{sub.get('sDescription','')}'")
        
        if 'arrStructs' in plc_data:
            print("• 结构体数组:")
            for i, item in enumerate(plc_data['arrStructs']):
                print(f"  [{i}] Status={item.get('bStatus')}, Value={item.get('nValue')}, Desc='{item.get('sDescription','')}'")
    except Exception as e:
        print(f"\n‼️ 处理消息时发生严重错误: {str(e)}")
        import traceback
        traceback.print_exc()
# MQTT客户端配置
client = mqtt.Client(
    client_id="PythonSubscriber",
    protocol=mqtt.MQTTv311,
    transport="tcp"
)
# 回调绑定
client.on_connect = on_connect
client.on_message = on_message
# 连接设置
broker_ip = "192.168.105.16"
client.connect(broker_ip, 1883, 60)
print(f" 开始监听MQTT主题 plc/data,连接服务器: {broker_ip}...")
try:
    client.loop_forever()
except KeyboardInterrupt:
    print("\n手动停止订阅")
    client.disconnect()
        建立MQTT服务端MQTT Broker并登录(详细操作步骤见3.实验注意点)

运行Python代码
登录工智机设备,下载程序,程序运行后置bConnect为True,成功连接后将发布消息

Python端将收到主题为plc/data的消息,即转换后的Json文本并输出
这是Python端收到的前200个字符

这是Python端收到的完整的Json数据

这是Python端处理后的数据

可以看到MQTT服务端的连接情况,此时有两个客户端连接,分别是工智机端和Python端
主题数为1,即工程中发布消息的主题plc/data
订阅数为1.即Python端订阅了plc/data这个主题


安装EMQX(MQTT Broker)
将官网下载的emqx-5.3.2-windows-amd64.zip进行解压缩,自行解压到相应的文件夹中。

打开文件夹目录bin,在文件夹位置处输入“cmd”,回车。

在命令行中输入——>emqx start——>回车

启动完成

EMQX端口号:
| 端口号 | 说明 | 
|---|---|
| 1833 | TCP 端口 | 
| 8883 | WebSocket 端口 | 
| 8884 | WebSocket Secure 端口 | 
| 8883 | SSL/TLS 端口 | 
| 18083 | Broker的Dashboard访问端口 | 
本机访问:127.0.0.1:18083/
外部访问:192.168.110.147:18083/
(本例MQTT Broker使用的是127.0.0.1:18083/)
用户名:admin
密码:public

首次登录会提示修改密码,也可以选择跳过。

Dashboard界面,可以进行Broker的设置以及查看客户端连接情况、消息订阅等等。

功能块方法FB_JsonReadWriterDataType.AddJsonKeyValueFromSymbol介绍

参数介绍
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| fbWriter | FB_JsonSaxWriter | JSON文档写入器实例 | 
| sKey | STRING | 要添加的JSON字段名称(键) | 
| sDatatype | STRING | PLC变量的数据类型名称(如"ST_SampleData") | 
| nData | UINT | 目标变量的内存大小(字节数) | 
| pData | POINTER TO UDINT | 指向源变量的内存地址 | 
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| AddJsonKeyValueFromSymbol | BOOL | 当功能块被激活时,被置为True,一直保持到收到确认信号 | 
| hrErrorCode | UINT | 返回操作结果状态码 | 
功能块方法FB_JsonReadWriterDataType.SetSymbolFromJson介绍

参数介绍
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| pJson | STRING | 包含要解析的JSON文本 | 
| nSize | UDINT | 指定pJson字符串的实际长度(字节数) | 
| pData | POINTER TO UDINT | 指向目标变量的内存地址 | 
| nData | UINT | 指定目标变量的内存大小(字节数) | 
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| SetSymbolFromJson | BOOL | |
| hrErrorCode | UINT | 返回操作结果状态码 | 
功能块FB_JsonSaxWriter介绍

功能介绍
FB_JsonSaxWriter 是用于流式生成 JSON 文档的功能块,采用 SAX(Simple API for XML/JSON)模型,特别适合高效处理大尺寸数据或动态构建复杂 JSON 结构。其核心特性包括:
低内存消耗:通过逐步生成 JSON 内容,避免一次性加载全部数据到内存;
全面数据类型支持:可处理嵌套对象/数组及 PLC 特殊类型(如 TIME、DATE_AND_TIME);
精准状态控制:通过 initStatus状态码实时监控初始化与执行状态;
灵活文档构建:提供AddString等方法,支持结构化生成 JSON。
功能块方法FB_JsonSaxWriter.ResetDocument介绍

功能介绍
ResetDocument 是 FB_JsonSaxWriter 功能块的核心方法,用于 清空当前JSON文档并重置生成器状态,为构建新JSON文档做准备。
| 特性 | 说明 | 
|---|---|
| 初始化作用 | 清除内存中的JSON数据,将生成器恢复到初始状态 | 
| 必须首调用 | 在开始构建新JSON文档前必须调用 | 
| 不影响配置 | 保留之前设置的参数(如浮点数精度) | 
| 无参数设计 | 直接调用,无需输入/输出参数 | 
功能块FB_JsonSaxWriter.GetDocumentLength介绍

功能介绍
GetDocumentLength 用于 获取当前生成的JSON文档的精确字节长度(不含终止符\0),是内存管理和安全传输的关键方法。
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| GetDocumentLength | UINT | 返回当前JSON文档的长度(字节数) | 
| hrErrorCode | UINT | 返回操作结果状态码 | 
功能块FB_JsonSaxWriter.CopyDocument介绍

功能介绍
CopyDocument 用于 将生成的JSON文档复制到指定的字符串缓冲区,是获取最终JSON数据的核心方法。
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| pDoc | STRING | 目标缓冲区(接收JSON文本) | 
| nDoc | UDINT | 目标缓冲区的最大容量(需用SIZEOF(buffer)获取) | 
| 参数名称 | 参数类型 | 描述 | 
|---|---|---|
| CopyDocument | UDINT | 返回值 | 
| hrErrorCode | UINT | 返回操作结果状态码 | 
安装要求
中科时代出厂的工智机;
熟悉基础的Linux操作命令;、
安装过程
工智机端安装SF4601_JsonDom RTE组件
上传deb包到工智机Linux环境的/home/sinsegye目录下
上传完成后在工智机上执行命令安装(参考下方截图,如果模块文件名发生变化则命令行中的文件名做相应更改)
cd $HOME
sudo dpkg -i SF4601_JsonDom_1.0.3_amd64.deb
        
sudo nano /usr/local/etc/SinsegyeRTE/SinsegyeRTE.cfg
        [ComponentManager]
  Component.0=retainDeamon
  Component.1=CmpCanBusUtils
  Component.2=CmpSinsegyeLibs
  Component.3=SinsegyeCmp
  Component.4=jsondom
        sudo systemctl restart sinsegyerte.service
        更新安装
工智机端升级JsonDom RTE组件
上传升级版deb包到工智机Linux环境的/home/sinsegye目录下,上传方法参考附录;
上传完成后在工智机上执行命令安装(参考下方截图,如果模块文件名发生变化则命令行中的文件名做相应更改)
cd $HOME
sudo dpkg -i SF4601_JsonDom_1.0.3_amd64.deb
        sudo systemctl restart sinsegyerte.service
        卸载工智机JsonDom RTE组件
sudo dpkg -r jsondom
        sudo nano /usr/local/etc/SinsegyeRTE/SinsegyeRTE.cfg
        sudo systemctl restart sinsegyerte.service