本手册使用一台SX21作为ACP服务器,Ubuntu电脑作为客户端来读取、写入和订阅PLC变量功能。如下图所示:
下表概述了本手册各个产品组件
产品组件 | 描述说明 |
---|---|
DeviceManager_0.0.2.5 | 工智机插件安装管理器 |
SF1000_AcpPlcAccess_1.0.11_amd64.deb | 客户端安装deb组件 |
libacp_0.2.9_amd64.deb | AcpPlcAccess依赖文件 |
AcpPythonServer.projectarchive | PLC工程存档 |
Acp_python.zip | python示例程序 |
安装要求
安装过程
工智机安装acpplcaccess.deb组件
1.打开Device Manager软件,使用工智机固定网口与个人电脑通讯。
2.初次使用,需要安装ACP通讯服务。点击左下角“安装ACP服务”。
3.输入工智机固定IP地址、端口、用户名和密码后,点击“在线安装”。
IP地址 | 192.168.1.200 |
---|---|
端口 | 2224 |
用户名 | sinsegye |
密码 | 1 |
![]() |
等待安装完成。
4.安装完成后,点击“扫描”,即可扫描出固定网口192.168.1.200连接的工智机。
点击“本地”,进入设备管理页面。同时,左下角会显示“已连接”。
点击进入工智机后,左下角显示“已连接”状态。
输入用户名:sinsegye,密码:1,点击“在线安装”。
6.安装完成后,点击“软件”,在下拉菜单下选择“组件管理”
进入组件管理后,“本地”目录下为当前工智机安装的组件。
7.点击浏览,可以在线浏览可以安装的组件。
9.在组件中找到“SF1000-acpplcaccess”--点击“安装”, 会先将组件传送至工智机,完成传送后,点击确认安装。
安装完成后,点击“确认”重启生效。
10.安装完成后,可以在“本地”页面浏览到SF1000-acpplcaccess已安装成功。
安装Python 环境
本示例使用Widows系统安装Python 3.8.5版本。
2.1 Python 3.8.5版本安装包下载
官网下载Python 3.8(社区版)安装包;python-3.8.5-amd64.exe
2.2 安装包安装
2.3 .进入以下界面,按如图步骤执行
自定义安装路径:
勾选第一个选项:
点击“install”开始安装
安装成功,进入命令行输入python出现python版本号就证明安装成功
2.4安装第三方库PyQT5
pip install pyqt5 |
---|
更新安装
ACP服务器更新acpplcaccess组件
打开Device Mananger--组件管理--本地管理页面中,点击acpplcaccess,选择可更新的版本后,点击“更新”。
卸载过程
ACP服务器卸载acpplcaccess组件
打开Device Mananger--组件管理--本地管理页面中,选择acpplcaccess,点击“卸载”
点击“确定”
卸载成功后,点击“确定”重启生效。
卸载成功后,“本地”页面组件消失。
硬件:
1.工智机SX21
2.Ubuntu 20.04系统客户端
3.个人电脑
软件:
1.插件管理器DeviceManager_0.0.2.5
2.依赖库文件libacp_0.2.9_amd64.deb
3.服务端PLC工程存档AcpPythonServer.projectarchive
4.中科时代IDE MetaFacture V1.0.7.1
1.实验要求
a.工智机使用插件管理器成功安装acpplcaccess组件;
b.系统客户端安装python 环境。
2.实验原理图
3.实验步骤
3.1 解压官网下载的工程存档文件“AcpPythonServer.projectarchive”,登录工智机,下载运行程序。
3.2 将官网下载的python程序压缩包“ACP_python.zip”拷贝到客户端中,解压ACP_python.zip。
3.3. 进入ACP_python 文件夹,命令行输入
python ACP_test.py |
---|
3.4 在弹窗输入测试使用的工智机IP,点击“连接”如下图:
3.5 点击“批量读取”,读取PLC_PRG程序中的变量
3.6 点击“批量写入”,批量写入PLC_PRG程序中的变量。(参数配置值在ACPDataProcessing.py程序中已做测试配置)
查看工智机的PLC变量值,写入成功。
3.7 输入变量名,变量数据类型,点击“读取”,获取PLC_PRG程序中的变量值。
3.8 输入变量名,变量数据类型,点击“写入”,写入PLC_PRG程序中的变量值。
查看工智机的PLC变量值,写入成功。
3.9点击“订阅”,获取PLC_PRG程序中的变量数值不断在变化。
3.10点击“取消订阅”, PLC_PRG程序中的变量数值停止变更。
定义Acp_connection函数,使用AcpCreatConnHandler获得连接句柄。
def Acp_connection(host):
handler = wrapper.AcpCreatConnHandler(AcpConnType.TCP, host)
if isinstance(handler,int):
return handler
else:
pass
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
创建函数ReadValues,对字典中存在的key,value类型使用AcpReadTagValue进行读取。
详细参数说明见功能介绍。
xVar1 = PY_ST_BOOL()
bVar2 = PY_ST_BYTE() # 0A;
wVar3 = PY_ST_WORD()
dwVar4 = PY_ST_DWORD()
lwVar5 = PY_ST_LWORD()
siVar6 = PY_ST_SINT()
usiVar7 = PY_ST_USINT()
iVar8 = PY_ST_INT()
uiVar9 = PY_ST_UINT()
diVar10 = PY_ST_DINT()
udiVar11 = PY_ST_UDINT()
liVar12 = PY_ST_LINT()
rVar13 = PY_ST_REAL()
lrVar14 = PY_ST_LREAL()
sVar15 = (ctypes.c_char * 81)()
st_dict = {
'Application.PLC_PRG.xVar1': xVar1,
'Application.PLC_PRG.bVar2': bVar2,
'Application.PLC_PRG.wVar3': wVar3,
'Application.PLC_PRG.dwVar4': dwVar4,
'Application.PLC_PRG.lwVar5': lwVar5,
'Application.PLC_PRG.siVar6': siVar6,
'Application.PLC_PRG.usiVar7': usiVar7,
'Application.PLC_PRG.iVar8': iVar8,
'Application.PLC_PRG.uiVar9': uiVar9,
'Application.PLC_PRG.diVar10': diVar10,
'Application.PLC_PRG.udiVar11': udiVar11,
'Application.PLC_PRG.liVar12': liVar12,
'Application.PLC_PRG.rVar13': rVar13,
'Application.PLC_PRG.lrVar14': lrVar14,
'Application.PLC_PRG.sVar15': sVar15
}
def ReadValues(host,var_name,var_type):
handler = Acp_connection(host)
if isinstance(handler, int):
var_type = python_to_c_type(var_type) # 对string 类型的var_type 进行处理
var_name = find_match_in_list(list(st_dict.keys()), var_name) # 查找匹配的key 值
tagInfo = TagInfo(var_name, var_type)
result = wrapper.AcpReadTagValue(handler, tagInfo, 1000)
if type(var_type.value) == bytes:
return var_name, var_type.value.decode()
else:
return var_name, var_type.value
else:
print(f"连接失败,读参数失败!")
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
创建函数WriteValues,对字典中存在的key,value类型使用AcpWriteTagValue进行写入。
详细参数说明见功能介绍。
xVar1 = PY_ST_BOOL()
bVar2 = PY_ST_BYTE() # 0A;
wVar3 = PY_ST_WORD()
dwVar4 = PY_ST_DWORD()
lwVar5 = PY_ST_LWORD()
siVar6 = PY_ST_SINT()
usiVar7 = PY_ST_USINT()
iVar8 = PY_ST_INT()
uiVar9 = PY_ST_UINT()
diVar10 = PY_ST_DINT()
udiVar11 = PY_ST_UDINT()
liVar12 = PY_ST_LINT()
rVar13 = PY_ST_REAL()
lrVar14 = PY_ST_LREAL()
sVar15 = (ctypes.c_char * 81)()
st_dict = {
'Application.PLC_PRG.xVar1': xVar1,
'Application.PLC_PRG.bVar2': bVar2,
'Application.PLC_PRG.wVar3': wVar3,
'Application.PLC_PRG.dwVar4': dwVar4,
'Application.PLC_PRG.lwVar5': lwVar5,
'Application.PLC_PRG.siVar6': siVar6,
'Application.PLC_PRG.usiVar7': usiVar7,
'Application.PLC_PRG.iVar8': iVar8,
'Application.PLC_PRG.uiVar9': uiVar9,
'Application.PLC_PRG.diVar10': diVar10,
'Application.PLC_PRG.udiVar11': udiVar11,
'Application.PLC_PRG.liVar12': liVar12,
'Application.PLC_PRG.rVar13': rVar13,
'Application.PLC_PRG.lrVar14': lrVar14,
'Application.PLC_PRG.sVar15': sVar15
}
def WriteValues(host,var_name,var_type,var_value):
st_dict_1={}
handler = Acp_connection(host)
if isinstance(handler, int):
var_name = find_match_in_list(list(st_dict.keys()), var_name)
var_type = python_to_c_type(var_type) # 将python 类型转换成对应的 c 类型
var_value = rewrite_update_ctypes_var(var_type,var_value,type(var_type))
var_type.value = var_value
st_dict_1[var_name] = var_type
for var_name, var_type in st_dict_1.items():
tagInfo = TagInfo(var_name, var_type)
result = wrapper.AcpWriteTagValue(handler, tagInfo, 1000)
if type( var_type.value) == bytes:
return var_name, var_type.value.decode()
else:
return var_name, var_type.value
else:
print(f"连接失败,写参数失败!")
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
创建函数BatchReadVar,对arr数组中的变量进行批量读取,使用AcpReadTagList进行读取。
详细参数说明见功能介绍。
xVar1 = PY_ST_BOOL()
bVar2 = PY_ST_BYTE() # 0A;
wVar3 = PY_ST_WORD()
dwVar4 = PY_ST_DWORD()
lwVar5 = PY_ST_LWORD()
siVar6 = PY_ST_SINT()
usiVar7 = PY_ST_USINT()
iVar8 = PY_ST_INT()
uiVar9 = PY_ST_UINT()
diVar10 = PY_ST_DINT()
udiVar11 = PY_ST_UDINT()
liVar12 = PY_ST_LINT()
rVar13 = PY_ST_REAL()
lrVar14 = PY_ST_LREAL()
sVar15 = (ctypes.c_char * 81)()
st_dict = {
'Application.PLC_PRG.xVar1': xVar1,
'Application.PLC_PRG.bVar2': bVar2,
'Application.PLC_PRG.wVar3': wVar3,
'Application.PLC_PRG.dwVar4': dwVar4,
'Application.PLC_PRG.lwVar5': lwVar5,
'Application.PLC_PRG.siVar6': siVar6,
'Application.PLC_PRG.usiVar7': usiVar7,
'Application.PLC_PRG.iVar8': iVar8,
'Application.PLC_PRG.uiVar9': uiVar9,
'Application.PLC_PRG.diVar10': diVar10,
'Application.PLC_PRG.udiVar11': udiVar11,
'Application.PLC_PRG.liVar12': liVar12,
'Application.PLC_PRG.rVar13': rVar13,
'Application.PLC_PRG.lrVar14': lrVar14,
'Application.PLC_PRG.sVar15': sVar15
}
def BatchReadVar(host):
handler = Acp_connection(host)
tagNum = 15
tagInfoArray = TagInfo * tagNum
arr = tagInfoArray(
TagInfo('Application.PLC_PRG.xVar1', xVar1),
TagInfo('Application.PLC_PRG.bVar2', bVar2),
TagInfo('Application.PLC_PRG.wVar3', wVar3),
TagInfo('Application.PLC_PRG.dwVar4', dwVar4),
TagInfo('Application.PLC_PRG.lwVar5', lwVar5),
TagInfo('Application.PLC_PRG.siVar6', siVar6),
TagInfo('Application.PLC_PRG.usiVar7', usiVar7),
TagInfo('Application.PLC_PRG.iVar8', iVar8),
TagInfo('Application.PLC_PRG.uiVar9', uiVar9),
TagInfo('Application.PLC_PRG.diVar10', diVar10),
TagInfo('Application.PLC_PRG.udiVar11', udiVar11),
TagInfo('Application.PLC_PRG.liVar12', liVar12),
TagInfo('Application.PLC_PRG.rVar13', rVar13),
TagInfo('Application.PLC_PRG.lrVar14', lrVar14),
TagInfo('Application.PLC_PRG.sVar15', sVar15),
)
result = wrapper.AcpReadTagList(handler, arr[0], tagNum, 1000)
for key, value in st_dict.items():
tagInfo = TagInfo(key, value)
result = wrapper.AcpReadTagValue(handler, tagInfo, 1000)
if type(value.value) == bytes:
print(f"{key}: {value.value.decode()}")
else:
print(f"{key}: {value.value}")
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
创建函数BatchReadVar,对arr数组中的变量进行批量写入,使用AcpWriteTagList进行写入。
详细参数说明见功能介绍。
xVar1 = PY_ST_BOOL()
bVar2 = PY_ST_BYTE() # 0A;
wVar3 = PY_ST_WORD()
dwVar4 = PY_ST_DWORD()
lwVar5 = PY_ST_LWORD()
siVar6 = PY_ST_SINT()
usiVar7 = PY_ST_USINT()
iVar8 = PY_ST_INT()
uiVar9 = PY_ST_UINT()
diVar10 = PY_ST_DINT()
udiVar11 = PY_ST_UDINT()
liVar12 = PY_ST_LINT()
rVar13 = PY_ST_REAL()
lrVar14 = PY_ST_LREAL()
sVar15 = (ctypes.c_char * 81)()
Batch_xVar1 = PY_ST_BOOL(1)
Batch_bVar2 = PY_ST_BYTE(11)
Batch_wVar3 = PY_ST_WORD(11)
Batch_dwVar4 = PY_ST_DWORD(11)
Batch_lwVar5 = PY_ST_LWORD(11)
Batch_siVar6 = PY_ST_SINT(11)
Batch_usiVar7 = PY_ST_USINT(11)
Batch_iVar8 = PY_ST_INT(11)
Batch_uiVar9 = PY_ST_UINT(11)
Batch_diVar10 = PY_ST_DINT(11)
Batch_udiVar11 = PY_ST_UDINT(11)
Batch_liVar12 = PY_ST_LINT(11)
Batch_rVar13 = PY_ST_REAL(11)
Batch_lrVar14 = PY_ST_LREAL(11)
Batch_sVar15 = (ctypes.c_char * 81)(*b'123')
st_dict = {
'Application.PLC_PRG.xVar1': xVar1,
'Application.PLC_PRG.bVar2': bVar2,
'Application.PLC_PRG.wVar3': wVar3,
'Application.PLC_PRG.dwVar4': dwVar4,
'Application.PLC_PRG.lwVar5': lwVar5,
'Application.PLC_PRG.siVar6': siVar6,
'Application.PLC_PRG.usiVar7': usiVar7,
'Application.PLC_PRG.iVar8': iVar8,
'Application.PLC_PRG.uiVar9': uiVar9,
'Application.PLC_PRG.diVar10': diVar10,
'Application.PLC_PRG.udiVar11': udiVar11,
'Application.PLC_PRG.liVar12': liVar12,
'Application.PLC_PRG.rVar13': rVar13,
'Application.PLC_PRG.lrVar14': lrVar14,
'Application.PLC_PRG.sVar15': sVar15
}
def BatchWriteVar(host):
handler = Acp_connection(host)
tagNum = 15
tagInfoArray = TagInfo * tagNum
arr = tagInfoArray(
TagInfo('Application.PLC_PRG.xVar1', Batch_xVar1),
TagInfo('Application.PLC_PRG.bVar2', Batch_bVar2),
TagInfo('Application.PLC_PRG.wVar3', Batch_wVar3),
TagInfo('Application.PLC_PRG.dwVar4', Batch_dwVar4),
TagInfo('Application.PLC_PRG.lwVar5', Batch_lwVar5),
TagInfo('Application.PLC_PRG.siVar6', Batch_siVar6),
TagInfo('Application.PLC_PRG.usiVar7', Batch_usiVar7),
TagInfo('Application.PLC_PRG.iVar8', Batch_iVar8),
TagInfo('Application.PLC_PRG.uiVar9', Batch_uiVar9),
TagInfo('Application.PLC_PRG.diVar10', Batch_diVar10),
TagInfo('Application.PLC_PRG.udiVar11', Batch_udiVar11),
TagInfo('Application.PLC_PRG.liVar12', Batch_liVar12),
TagInfo('Application.PLC_PRG.rVar13', Batch_rVar13),
TagInfo('Application.PLC_PRG.lrVar14', Batch_lrVar14),
TagInfo('Application.PLC_PRG.sVar15', Batch_sVar15),
)
result = wrapper.AcpWriteTagList(handler, arr[0], tagNum, 1000)
BatchReadVar(host)
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
使用AcpRegisterNotifyCallBack注册回调函数py_callback;
创建订阅变量结构体,设定订阅变量路径,使用AcpAddTagsNotification添加订阅变量列表。
详细参数说明见功能介绍。
xVar1 = PY_ST_BOOL()
bVar2 = PY_ST_BYTE() # 0A;
wVar3 = PY_ST_WORD()
dwVar4 = PY_ST_DWORD()
lwVar5 = PY_ST_LWORD()
siVar6 = PY_ST_SINT()
usiVar7 = PY_ST_USINT()
iVar8 = PY_ST_INT()
uiVar9 = PY_ST_UINT()
diVar10 = PY_ST_DINT()
udiVar11 = PY_ST_UDINT()
liVar12 = PY_ST_LINT()
rVar13 = PY_ST_REAL()
lrVar14 = PY_ST_LREAL()
sVar15 = (ctypes.c_char * 81)()
GVL_dict={
'Application.GVL.BOOL1': xVar1,
'Application.GVL.BYTE2': bVar2,
'Application.GVL.WORD3': wVar3,
'Application.GVL.DWORD4':dwVar4,
'Application.GVL.LWORD5':lwVar5,
'Application.GVL.SINT6':siVar6,
'Application.GVL.USINT7':usiVar7,
'Application.GVL.INT8':iVar8,
'Application.GVL.UINT9':uiVar9,
'Application.GVL.DINT10':diVar10,
'Application.GVL.UDINT11':udiVar11,
'Application.GVL.LINT12': liVar12,
'Application.GVL.REAL13':rVar13,
'Application.GVL.LREAL14':lrVar14,
'Application.GVL.STRING15': sVar15,
'Application.GVL.glvar16.S1': dwVar4
}
def py_callback(pInfo, ctx):
global getSubId # 设置getSubId 为全局变量
getSubId=pInfo.contents.nReplySubId # 获取订阅id
print(f"C触发的回调: {hex(ctypes.addressof(pInfo.contents))} {pInfo.contents.enReplyType}")
if pInfo.contents.enReplyType == 3:
# 如果pInfo是指针类型:
tag_info = pInfo.contents # 解引用指针
array_ptr = tag_info.subOkTagsInfo
array_length = tag_info.subOkSize # 直接获取长度
for i in range(array_length):
subtag = array_ptr[i]
plc_name = subtag.tagDesc.szSymbolFullName.decode('utf-8')
py_Vartype = GVL_dict[plc_name]
data_ptr = subtag.tagValue.pValueData
data_size = subtag.tagValue.nValueSize
# print(f"plc_name:{plc_name},data_ptr:{data_ptr},data_size:{data_size}")
ctypes.memmove(ctypes.addressof(py_Vartype), data_ptr, data_size)
if py_Vartype==sVar15:
print(f"订阅值 {plc_name}: {py_Vartype.value.decode('utf-8')}")
else:
print(f"订阅值 {plc_name}: {py_Vartype.value}")
def ACP_StartSub(host): #,var_type
global Datatype,var_name
handler = Acp_connection(host)
print(f"TCP连接测试通过,句柄值: {hex(handler)}")
wrapper.AcpRegisterNotifyCallBack(handler, py_callback, None)
tagNum =16
tagInfoArray = TagInfo * tagNum
arr = tagInfoArray(
TagInfo('Application.GVL.BOOL1', xVar1),
TagInfo('Application.GVL.BYTE2', bVar2),
TagInfo('Application.GVL.WORD3', wVar3),
TagInfo('Application.GVL.DWORD4',dwVar4),
TagInfo('Application.GVL.LWORD5',lwVar5),
TagInfo('Application.GVL.SINT6',siVar6),
TagInfo('Application.GVL.USINT7',usiVar7),
TagInfo('Application.GVL.INT8',iVar8),
TagInfo('Application.GVL.UINT9',uiVar9),
TagInfo('Application.GVL.DINT10',diVar10),
TagInfo('Application.GVL.UDINT11',udiVar11),
TagInfo('Application.GVL.LINT12', liVar12),
TagInfo('Application.GVL.REAL13',rVar13),
TagInfo('Application.GVL.LREAL14',lrVar14),
TagInfo('Application.GVL.STRING15', sVar15),
TagInfo('Application.GVL.glvar16.S1', dwVar4),
)
reqid = 6
subParam = TagSubParam()
subParam.nIntervalMs = 16
subParam.nRequestId = reqid
wrapper.AcpAddTagsNotification(handler, arr[0], tagNum, subParam, 1000)
while True:
time.sleep(1.0)
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
通过指定订阅函数,获取订阅id (getSubId);
使用AcpDelTagsNotification取消订阅变量列表。
详细参数说明见功能介绍。
def ACP_StopSub(host):
handler = Acp_connection(host)
wrapper.AcpDelTagsNotification(handler, getSubId, 1000)
通过调用Acp_connection使用AcpCreatConnHandler获得连接句柄;
使用destroy_connection断开ACP连接。
详细参数说明见功能介绍。
def Dis_Connect(host):
handler = Acp_connection(host)
wrapper.destroy_connection(handler)
取消相应的注释来调用读取变量、写入变量和订阅变量的方法。
if __name__ == '__main__':
host="192.168.105.48" # 测试工智机IP
var_name=' Application.PLC_PRG.sVar15'
var_type="STRING"
var_value='TEST'
# ReadValues(host, var_name, var_type)
# WriteValues(host, var_name, var_type, var_value)
# BatchReadVar(host)
# BatchWriteVar(host)
# ACP_StartSub(host)
# ACP_StopSub(host)
Dis_Connect(host)
ST_BOOL
ST_BYTE
ST_WORD
ST_DWORD
ST_LWORD
ST_SINT
ST_USINT
ST_INT
ST_UINT
ST_DINT
ST_UDINT
ST_LINT
ST_REAL
ST_LREAL
ST_STRING
“””
* @brief 创建一个连接
* ConnParam 连接参数结构体
* return: 成功返回handler的句柄,失败返回nullptr
“””
def AcpCreatConnHandler(self, conn_type: ConnParam, conn_param: str) -> int:
param = ConnParam()
param.connType = conn_type.value
param.szConnParam = conn_param.encode('utf-8')
return self._dll.AcpCreatConnHandler(ctypes.byref(param))
“””
* @brief 读取变量
* handler: 连接句柄
* taginfo: 变量信息结构体,包含变量路径,读取变量地址和大小
* nTimeOutMs: 超时时间ms
* tagNum: 读取列表数量
* return: 成功返回0,失败返回-1或者错误码
““”
# 单一读
def AcpReadTagValue(self, handler: PY_VOIDP, tagInfo: TagInfo, timeout_ms: int): # 读变量列表
result = self._dll.AcpReadTagValue(handler, ctypes.byref(tagInfo), timeout_ms)
return result
# 批量读
def AcpReadTagList(self, handler: PY_VOIDP, tagInfo: TagInfo , tagNum: int, timeout_ms: int):
result = self._dll.AcpReadTagList(handler, ctypes.byref(tagInfo), tagNum ,timeout_ms)
return result
“””
* @brief 写入变量
* handler: 连接句柄
* taginfo: 变量信息结构体,包含变量路径,读取变量地址和大小
* nTimeOutMs: 超时时间ms
* tagNum: 读取列表数量
* return: 成功返回0,失败返回-1或者错误码
“”“
# 单一变量写
def AcpWriteTagValue(self, handler: PY_VOIDP, tagInfo: TagInfo, timeout_ms: int):
result = self._dll.AcpWriteTagValue(handler, ctypes.byref(tagInfo), timeout_ms)
return result
# 批量写
def AcpWriteTagList(self, handler: PY_VOIDP, tagInfo: TagInfo , tagNum: int, timeout_ms: int):
result = self._dll.AcpWriteTagList(handler, ctypes.byref(tagInfo), tagNum ,timeout_ms)
return result
“””
* @brief 注册回调函数
* handler: 连接句柄
* Callback: 注册的回调函数名称
* pContext:通用指针,允许你传递一个上下文数据到回调函数中
* return: 成功返回0,失败返回-1或者错误码
“””
def AcpRegisterNotifyCallBack(self, handler: PY_VOIDP, callback: callable , pContext : PY_VOIDP):
c_callback = NOTIFY_CALLBACK(callback)
self._callback_ref = c_callback
result = self._dll.AcpRegisterNotifyCallBack(handler, c_callback , pContext)
return result
“””
* @brief 移除订阅回调
* handler: 连接句柄
* return: 成功返回0,失败返回-1或者错误码
“””
# 删除回调注册
def AcpRemoveNotifyCallBack(self, handler: PY_VOIDP):
result = self._dll.AcpRemoveNotifyCallBack(handler)
return result
“””
* @brief 添加变量组订阅
* handler: 连接句柄
* taginfo: 变量信息结构体,包含订阅变量路径
* tagNum: 订阅变量数量
* timeout_ms: 超时时间ms
* return: 成功返回true,失败返回false
“””
# 订阅通知
def AcpAddTagsNotification(self, handler: PY_VOIDP, tagInfo: TagInfo , tagNum: int, tagSubParam : TagSubParam, timeout_ms: int):
result = self._dll.AcpAddTagsNotification(handler, ctypes.byref(tagInfo), tagNum , ctypes.byref(tagSubParam),timeout_ms)
return result
“””
* @brief 添加变量组订阅
* handler: 连接句柄
* nSubId: 订阅id
* timeout_ms: 超时时间ms
* return: 成功返回true,失败返回false
“””
# 取消订阅通知
def AcpDelTagsNotification(self, handler: PY_VOIDP, nSubId: int, timeout_ms: int):
result = self._dll.AcpDelTagsNotification(handler, nSubId , timeout_ms)
return result