工作中需要对工业设备进行数据采集,一般使用UDP和ModBus进行数据通信。UDP用来处理快速大量的实时数据(如振动数据等),ModBus用于慢变量数据(如温度、压力等),出于工作内容和兴趣,了解了一下OPC工业协议,以此记录学习。
常见的工业协议按照OSI(Open System Interconnect)参考模型分到了不同层次,粗略的总结,可以分类如下:
软件接口 | OPC |
应用层 | ModBus |
数据链路层 | CAN,ProfiBus |
物理层 | RS232,RS485 |
Client
from opcua import Client
from datetime import datetime
from opcua import ua
def set_value_to_OPC(tagID,value,time_str = None):
# tagID: 指对应OPCUA server中点对应的ID号
# value:需要写入的值
# time_str:写入值对应的UTC时间,格式为'%Y-%m-%d %H:%M:%S'
# client = Client("opc.tcp://127.0.0.1:48408/freeopcua/server/")
# client = Client("opc.tcp://172.16.102.105:48408/freeopcua/server/") #AWS 服务上地址
#通过tagID找到对应的tag,并写入值
# try:
# client.connect()
# tagID = tagID
# value = value
# tag = client.get_node("ns=2;i={}".format(tagID))
# if time_str == None:
# datavalue = value
# else:
# datavalue = ua.DataValue(value)
# date_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
# datavalue.SourceTimestamp = date_time
# tag.set_value(datavalue)
# # v.set_value(i)
# client.disconnect()
# except enumerate as e:
# print(e)
client = Client("opc.tcp://192.168.70.134:62551/DataAccessServer")
try:
print(client.connect())
tagID = tagID
value = value
# tag = client.get_node("ns=2;i={}".format(tagID))
tag = client.get_node("ns=2;s=1:IN7?Value")
print(tag.get_value())
# datavalue = ua.DataValue(56)
# tag.set_value(datavalue)
# print(tag.get_value())
dv = ua.DataValue(ua.Variant(125, ua.VariantType.Float))
#设置值,与数据类型
dv.ServerTimestamp = None
dv.SourceTimestamp = None
tag.set_value(dv)
# if time_str == None:
# datavalue = value
# else:
# datavalue = ua.DataValue(value)
# date_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
# datavalue.SourceTimestamp = date_time
# tag.set_value(datavalue)
client.disconnect()
except enumerate as e:
print(e)
#默认写入值
# set_value_to_OPC(410,5.6,'2021-10-12 08:06:00')
#指定数据类型写入值
set_value_to_OPC("1:IN7?Value",34)
Server
from opcua import ua, Server
from loguru import logger
from json_config import JsonConf
from datetime import datetime
class myObject():
def __init__(self,server,name,groups,quantity):
self.server = server
self.name = name
self.groups = groups
self.quantity = int(quantity)
self.Nodes = {}
self.create_Node()
def create_Node(self):
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, self.name)
for ID in self.groups:
self.Nodes[str(ID)] = []
subobj = myobj.add_object(idx, str(ID))
for addr in range(0,self.quantity):
myvar = subobj.add_variable(idx, str(addr), 0)
myvar.set_writable()
self.Nodes[str(ID)].append(myvar)
# myvar = myobj.add_variable(idx, "MyVariable", 6.7)
# myvar.set_writable() # Set MyVariable to be writable by clients
#t1 = threading.Thread(target=self.set_val) # 创建线程
#t1.setDaemon(True) # 设置为后台线程,这里默认是False,设置为True之后则主线程不用等待子线程
#t1.start() # 开启线程
# for ID in self.groups:
# try:
# for addr in range(0, self.quantity):
#
# datavalue = ua.DataValue(0)
# date_time = datetime.strptime('2021-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
# datavalue.SourceTimestamp = date_time
# # tag.set_value(datavalue)
# self.Nodes[str(ID)][addr].set_value(datavalue)
# self.Nodes[str(ID)][addr].set_value(0)
# except Exception as e:
# logger.info('{},{},{}'.format(self.name, ID, e))
if __name__ == "__main__":
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:48408/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
# uri = "http://examples.freeopcua.github.io"
uri = "WBOPC_UA"
idx = server.register_namespace(uri)
config_json = JsonConf.load()
# print(config_json)
for user in config_json['OPC_UA_Server']:
my_obj = myObject(server, user["name"], user["groups"],user["quantity"])
server.start()
© 版权声明
THE END