Source code for obci_readmanager.signal_processing.tags.tags_file_reader
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
"""Module provides a simple class that is able to read tags xml file and give on demand subsequential tags."""
import xml.dom.minidom
from functools import cmp_to_key
from . import tag_utils
from . import tags_logging as logger
LOGGER = logger.get_logger('tags_file_reader')
[docs]class TagsFileReader:
"""A simple class that is able to read tags xml file and give on demand subsequential tags."""
def __init__(self, p_tags_file_name):
"""Init tags file path."""
self._tags_file_name = p_tags_file_name
self._tags = []
self.start_tags_reading()
[docs] def start_tags_reading(self):
"""Read tags file, store data in memory."""
try:
l_tags_file = open(self._tags_file_name, 'rt')
except IOError:
LOGGER.error("Couldn`t open tags file.")
else:
try:
# Analyse xml info file, get what we want and close the file.
self._parse_tags_file(l_tags_file)
except xml.parsers.expat.ExpatError:
LOGGER.error("An error occured while parsing tags xml file.")
finally:
l_tags_file.close()
[docs] def get_tags(self):
"""Return next tag or None if all tags were alredy returned by this method."""
return self._tags
def _parse_tags_file(self, p_tags_file):
"""Parse p_tags_file xml tags file and store it in memory."""
l_tags_doc = xml.dom.minidom.parse(p_tags_file)
l_xml_root_element = l_tags_doc.getElementsByTagName("tags")[0]
for i_tag_node in l_xml_root_element.getElementsByTagName("tag"):
# Iterate over <tag> tags
l_raw_tag = {}
for i_key in ['length', 'name', 'position', 'channelNumber']:
l_raw_tag[i_key] = i_tag_node.getAttribute(i_key)
for i_node in i_tag_node.childNodes:
try:
l_raw_tag[i_node.tagName] = i_node.firstChild.nodeValue
except AttributeError:
pass
# TODO - in case tags aren`t sorted by start_timestamp,
# sort them at the end of current method
self._tags.append(tag_utils.unpack_tag_from_dict(l_raw_tag))
def cmp_tags(t1, t2):
ts1 = t1['start_timestamp']
ts2 = t2['start_timestamp']
if ts1 == ts2:
return 0
elif ts1 > ts2:
return 1
else:
return -1
# warning - may slow down as hell
self._tags.sort(key=cmp_to_key(cmp_tags))