Python – OPCUA Client 和 Server

工作中需要对工业设备进行数据采集,一般使用UDPModBus进行数据通信。UDP用来处理快速大量的实时数据(如振动数据等),ModBus用于慢变量数据(如温度、压力等),出于工作内容和兴趣,了解了一下OPC工业协议,以此记录学习。

常见的工业协议按照OSI(Open System Interconnect)参考模型分到了不同层次,粗略的总结,可以分类如下:

软件接口OPC
应用层ModBus
数据链路层CAN,ProfiBus
物理层RS232,RS485

Client

from opcua import Client
from datetime import datetime
from opcua import ua

def set_value_to_OPC(tagID,value,time_str = None):
    # tagID: 指对应OPCUA server中点对应的ID号
    # value:需要写入的值
    # time_str:写入值对应的UTC时间,格式为'%Y-%m-%d %H:%M:%S'
    # client = Client("opc.tcp://127.0.0.1:48408/freeopcua/server/")
    # client = Client("opc.tcp://172.16.102.105:48408/freeopcua/server/") #AWS 服务上地址
    #通过tagID找到对应的tag,并写入值
    # try:
    #     client.connect()
    #     tagID = tagID
    #     value = value
    #     tag = client.get_node("ns=2;i={}".format(tagID))
    #     if time_str == None:
    #         datavalue = value
    #     else:
    #         datavalue = ua.DataValue(value)
    #         date_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
    #         datavalue.SourceTimestamp = date_time
    #     tag.set_value(datavalue)
    #     # v.set_value(i)
    #     client.disconnect()
    # except enumerate as e:
    #     print(e)


    client = Client("opc.tcp://192.168.70.134:62551/DataAccessServer")

    try:
        print(client.connect())
        tagID = tagID
        value = value
        # tag = client.get_node("ns=2;i={}".format(tagID))
        tag = client.get_node("ns=2;s=1:IN7?Value")
        print(tag.get_value())

        # datavalue = ua.DataValue(56)
        # tag.set_value(datavalue)
        # print(tag.get_value())

        dv = ua.DataValue(ua.Variant(125, ua.VariantType.Float))
        #设置值,与数据类型
        dv.ServerTimestamp = None
        dv.SourceTimestamp = None
        tag.set_value(dv)

        # if time_str == None:
        #     datavalue = value
        # else:
        #     datavalue = ua.DataValue(value)
        #     date_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
        #     datavalue.SourceTimestamp = date_time
        # tag.set_value(datavalue)

        client.disconnect()
    except enumerate as e:
        print(e)

#默认写入值
# set_value_to_OPC(410,5.6,'2021-10-12 08:06:00')

#指定数据类型写入值
set_value_to_OPC("1:IN7?Value",34)



Server

from opcua import ua, Server
from loguru import logger
from json_config import JsonConf
from datetime import datetime

class myObject():
    def __init__(self,server,name,groups,quantity):
        self.server = server
        self.name = name
        self.groups = groups
        self.quantity = int(quantity)
        self.Nodes = {}
        self.create_Node()

    def create_Node(self):

        objects = server.get_objects_node()
        # populating our address space
        myobj = objects.add_object(idx, self.name)
        for ID in self.groups:
            self.Nodes[str(ID)] = []
            subobj = myobj.add_object(idx, str(ID))
            for addr in range(0,self.quantity):
                myvar = subobj.add_variable(idx, str(addr), 0)
                myvar.set_writable()
                self.Nodes[str(ID)].append(myvar)

        # myvar = myobj.add_variable(idx, "MyVariable", 6.7)
        # myvar.set_writable()  # Set MyVariable to be writable by clients
        #t1 = threading.Thread(target=self.set_val)  # 创建线程
        #t1.setDaemon(True)  # 设置为后台线程,这里默认是False,设置为True之后则主线程不用等待子线程
        #t1.start()  # 开启线程

        # for ID in self.groups:
        #     try:
        #         for addr in range(0, self.quantity):
        #
        #             datavalue = ua.DataValue(0)
        #             date_time = datetime.strptime('2021-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
        #             datavalue.SourceTimestamp = date_time
        #         # tag.set_value(datavalue)
        #             self.Nodes[str(ID)][addr].set_value(datavalue)
        #             self.Nodes[str(ID)][addr].set_value(0)
        #     except Exception as e:
        #         logger.info('{},{},{}'.format(self.name, ID, e))

if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:48408/freeopcua/server/")
    # setup our own namespace, not really necessary but should as spec
    # uri = "http://examples.freeopcua.github.io"
    uri = "WBOPC_UA"
    idx = server.register_namespace(uri)

    config_json = JsonConf.load()
    # print(config_json)
    for user in config_json['OPC_UA_Server']:
        my_obj = myObject(server, user["name"],  user["groups"],user["quantity"])

    server.start()

© 版权声明
THE END
喜欢就支持一下吧
点赞15 分享