Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement XML exporter #270

Merged
merged 5 commits into from
Aug 15, 2016
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 93 additions & 132 deletions opcua/common/xmlexporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,73 @@ def __init__(self, server, node_set_attrs):
self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attrs))
self.server = server

def export_xml(self, node_list, xmlpath):
self.logger.info('Exporting XML file to %s', xmlpath)
def build_etree(self, node_list):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere I would like to be able to add a list of namespace to etree too

Copy link
Contributor Author

@zerox1212 zerox1212 Aug 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not used the namespace stuff. How can I get a list of the namespaces in the server?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, only add a method add_namespaces(list of string and maybe idx). So the users or GUI take care of consistency

"""
Create an XML etree object from a list of nodes
Args:
node_list: list of Node objects for export

Returns:
"""
self.logger.info('Building XML etree')

# add all namespace uris to the XML etree
self._get_xml_namespace_uris() # TODO not done yet

# add all nodes in the list to an XML etree
# add all required aliases to the XML etree
self._get_xml_aliases() # TODO not done yet

# add all nodes in the list to the XML etree
for node in node_list:
self.node_to_xml(node)
self.node_to_etree(node)

def export_xml(self, xmlpath):
"""
Write the XML etree in the exporter object to a file
Args:
xmlpath: string representing the path/file name

Returns:
"""
# try to write the XML etree to a file
self.logger.info('Exporting XML file to %s', xmlpath)
try:
self.etree.write(xmlpath, short_empty_elements=False)
except TypeError as e: # TODO where to find which exceptions etree.write() raises?
self.logger.error("Error writing XML to file: ", e)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I better to raise exception than catching

def dump_xml(self):
self.logger.info('Dumping XML file to console')
def dump_etree(self):
"""
Dump etree to console for debugging
Returns:
"""
self.logger.info('Dumping XML etree to console')
Et.dump(self.etree)

def node_to_xml(self, node):
def node_to_etree(self, node):
"""
Add the necessary XML sub elements to the etree for exporting the node
Args:
node: Node object which will be added to XML etree

Returns:
"""
node_class = node.get_node_class()

if node_class is ua.NodeClass.Object:
self.add_object(node)
elif node_class is ua.NodeClass.UaObjectType:
self.add_object_type(node)
self.add_object_xml(node)
elif node_class is ua.NodeClass.ObjectType:
self.add_object_type_xml(node)
elif node_class is ua.NodeClass.Variable:
self.add_variable(node)
self.add_variable_xml(node)
elif node_class is ua.NodeClass.VariableType:
self.add_variable_type(node)
self.add_variable_type_xml(node)
elif node_class is ua.NodeClass.RefernceType:
self.add_reference(node)
self.add_reference_xml(node)
elif node_class is ua.NodeClass.DataType:
self.add_datatype(node)
self.add_datatype_xml(node)
elif node_class is ua.NodeClass.Method:
self.add_method(node)
self.add_method_xml(node)
else:
self.logger.info("Not implemented node type: %s ", node_class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. but do a rename of 'add_xxx" :-)


Expand All @@ -70,39 +103,28 @@ def _get_node(self, obj):
# node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
# return node

def to_nodeid(self, nodeid):
# TODO migrate this function to _get_xml_nodeid and delete
pass
def add_object_xml(self, obj):
"""
Add a UA object element to the XML tree
"""
browsename = obj.get_browse_name().to_string()
nodeid = obj.nodeid.to_string()

# ORIGINAL CODE FROM IMPORTER
# if not nodeid:
# return ua.NodeId(ua.ObjectIds.String)
# elif "=" in nodeid:
# return ua.NodeId.from_string(nodeid)
# elif hasattr(ua.ObjectIds, nodeid):
# return ua.NodeId(getattr(ua.ObjectIds, nodeid))
# else:
# if nodeid in self.parser.aliases:
# nodeid = self.parser.aliases[nodeid]
# else:
# nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
# return ua.NodeId.from_string(nodeid)

def add_object(self, obj):
pass
displayname = obj.get_display_name().Text.decode(encoding='UTF8')

# ORIGINAL CODE FROM IMPORTER
# node = self._get_node(obj)
# attrs = ua.ObjectAttributes()
# if obj.desc:
# attrs.Description = ua.LocalizedText(obj.desc)
# attrs.DisplayName = ua.LocalizedText(obj.displayname)
# attrs.EventNotifier = obj.eventnotifier
# node.NodeAttributes = attrs
# self.server.add_nodes([node])
# self._add_refs(obj)
refs = obj.get_references()

obj_el = Et.SubElement(self.etree.getroot(),
'UAObject',
BrowseName=browsename,
NodeId=nodeid)

def add_object_type(self, obj):
disp_el = Et.SubElement(obj_el, 'DisplayName', )
disp_el.text = displayname

self._add_ref_sub_els(obj_el, refs)

def add_object_type_xml(self, obj):
pass

# ORIGINAL CODE FROM IMPORTER
Expand All @@ -116,22 +138,22 @@ def add_object_type(self, obj):
# self.server.add_nodes([node])
# self._add_refs(obj)

def add_variable(self, obj):
def add_variable_xml(self, obj):
"""
Add a UA variable element to the XML tree
"""
browsename = self._get_xml_browsename(obj)
browsename = obj.get_browse_name().to_string()
datatype = o_ids.ObjectIdNames[obj.get_data_type().Identifier]
nodeid = self._get_xml_nodeid(obj)
parent = self._get_xml_parent(obj)
acccesslevel = str(obj.get_access_level().val)
useraccesslevel = str(obj.get_user_access_level().val)
nodeid = obj.nodeid.to_string()
parent = obj.get_parent().nodeid.to_string()
acccesslevel = str(obj.get_attribute(ua.AttributeIds.AccessLevel).Value.Value)
useraccesslevel = str(obj.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value)

displayname = obj.get_display_name().Text.decode(encoding='UTF8')

value = str(obj.get_value())

refs = [] # TODO get list of refs here
refs = obj.get_references() # [] # TODO get list of refs here

var_el = Et.SubElement(self.etree.getroot(),
'UAVariable',
Expand All @@ -145,76 +167,14 @@ def add_variable(self, obj):
disp_el = Et.SubElement(var_el, 'DisplayName', )
disp_el.text = displayname

refs_el = Et.SubElement(var_el, 'References')

# TODO creating XML ref elements not done yet
for ref in refs:
refx_el = Et.SubElement(refs_el, 'Reference', ReferenceType=ref)
self._add_ref_sub_els(var_el, refs)

val_el = Et.SubElement(var_el, 'Value')

valx_el = Et.SubElement(val_el, 'uax:' + datatype)
valx_el.text = value

# ORIGINAL CODE FROM IMPORTER
# node = self._get_node(obj)
# attrs = ua.VariableAttributes()
# if obj.desc:
# attrs.Description = ua.LocalizedText(obj.desc)
# attrs.DisplayName = ua.LocalizedText(obj.displayname)
# attrs.DataType = self.to_nodeid(obj.datatype)
# # if obj.value and len(obj.value) == 1:
# if obj.value is not None:
# attrs.Value = self._add_variable_value(obj, )
# if obj.rank:
# attrs.ValueRank = obj.rank
# if obj.accesslevel:
# attrs.AccessLevel = obj.accesslevel
# if obj.useraccesslevel:
# attrs.UserAccessLevel = obj.useraccesslevel
# if obj.minsample:
# attrs.MinimumSamplingInterval = obj.minsample
# if obj.dimensions:
# attrs.ArrayDimensions = obj.dimensions
# node.NodeAttributes = attrs
# self.server.add_nodes([node])
# self._add_refs(obj)

def _add_variable_value(self, obj):
"""
Returns the value for a Variable based on the objects valuetype.
"""
# MAY NOT BE NEEDED
pass

# ORIGINAL CODE FROM IMPORTER
# if obj.valuetype == 'ListOfLocalizedText':
# return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
# elif obj.valuetype == 'EnumValueType':
# values = []
# for ev in obj.value:
# enum_value = ua.EnumValueType()
# enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
# enum_value.Description = ua.LocalizedText(ev['Description'])
# enum_value.Value = int(ev['Value'])
# values.append(enum_value)
# return values
# elif obj.valuetype == 'Argument':
# values = []
# for arg in obj.value:
# argument = ua.Argument()
# argument.Name = arg['Name']
# argument.Description = ua.LocalizedText(arg['Description'])
# argument.DataType = self.to_nodeid(arg['DataType'])
# argument.ValueRank = int(arg['ValueRank'])
# argument.ArrayDimensions = arg['ArrayDimensions']
# values.append(argument)
# return values
#
# return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))


def add_variable_type(self, obj):
def add_variable_type_xml(self, obj):
pass

# ORIGINAL CODE FROM IMPORTER
Expand All @@ -236,7 +196,7 @@ def add_variable_type(self, obj):
# self.server.add_nodes([node])
# self._add_refs(obj)

def add_method(self, obj):
def add_method_xml(self, obj):
pass

# ORIGINAL CODE FROM IMPORTER
Expand All @@ -257,7 +217,7 @@ def add_method(self, obj):
# self.server.add_nodes([node])
# self._add_refs(obj)

def add_reference(self, obj):
def add_reference_xml(self, obj):
# MAY NOT BE NEEDED
pass

Expand All @@ -277,7 +237,7 @@ def add_reference(self, obj):
# self.server.add_nodes([node])
# self._add_refs(obj)

def add_datatype(self, obj):
def add_datatype_xml(self, obj):
pass

# ORIGINAL CODE FROM IMPORTER
Expand Down Expand Up @@ -310,19 +270,20 @@ def _add_refs(self, obj):
# refs.append(ref)
# self.server.add_references(refs)

@staticmethod
def _get_xml_nodeid(obj):
"""
Convert a UA NodeId object to a formatted string for XML
:param obj:
:return:
"""
return 'ns=' + str(obj.nodeid.NamespaceIndex) + ';' + str(obj.nodeid.Identifier)
def _get_xml_namespace_uris(self):
# TODO name space uris should be exported
pass

def _get_xml_aliases(self):
# TODO aliases need to be created at the top of the xml
pass

@staticmethod
def _get_xml_browsename(obj):
bn = obj.get_browse_name()
return str(bn.NamespaceIndex) + ':' + bn.Name
def _add_ref_sub_els(parent_el, refs):
refs_el = Et.SubElement(parent_el, 'References')

def _get_xml_parent(self, obj):
return self._get_xml_nodeid(obj.get_parent())
for ref in refs:
ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
ref_forward = str(ref.IsForward)
refx_el = Et.SubElement(refs_el, 'Reference', IsForward=ref_forward, ReferenceType=ref_name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I look at UAExpert exports it doesn't always include "IsForward". How do I know if this is required or not?

Copy link
Member

@oroulet oroulet Aug 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to check attributes values against the default and only export if they are different.

How to get default is another question:

  • Hard coded in exporter
  • Modified attrs in ui
  • Create ua.ObjectAttributes or ua.VariableTypeAttribute depending on noce class and compare attribute from node in address space and value of attributes in the created object ( I think I vote for that one)

refx_el.text = ref.NodeId.to_string()