Source code for mss_dataserver.geometry.inventory_parser

# -*- coding: utf-8 -*-
 # This file is part of mss_dataserver.
 # If you use mss_dataserver in any program or publication, please inform and
 # acknowledge its authors.
 # mss_dataserver is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 # (at your option) any later version.
 # mss_dataserver is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 # GNU General Public License for more details.
 # You should have received a copy of the GNU General Public License
 # along with mss_dataserver. If not, see <>.
 # Copyright 2019 Stefan Mertl

The inventory parser module.

This module contains parser classes to read inventory data from files.

import logging
from lxml import etree

from mss_dataserver.geometry.inventory import Inventory
from mss_dataserver.geometry.inventory import Network
from mss_dataserver.geometry.inventory import Array
from mss_dataserver.geometry.inventory import Station
from mss_dataserver.geometry.inventory import Channel
from mss_dataserver.geometry.inventory import Recorder
from mss_dataserver.geometry.inventory import RecorderStream
from mss_dataserver.geometry.inventory import RecorderStreamParameter
from mss_dataserver.geometry.inventory import Sensor
from mss_dataserver.geometry.inventory import SensorComponent
from mss_dataserver.geometry.inventory import SensorComponentParameter
from obspy.core.utcdatetime import UTCDateTime

[docs]class InventoryXmlParser: ''' A parser for a geometry inventory XML file in psysmon format. '''
[docs] def __init__(self): # the logger instance. logger_name = __name__ + "." + self.__class__.__name__ self.logger = logging.getLogger(logger_name) # The required attributes which have to be present in the tags. self.required_attributes = {} self.required_attributes['inventory'] = ('name', ) self.required_attributes['sensor'] = ('serial', ) self.required_attributes['component'] = ('name', ) self.required_attributes['component_parameter'] = () self.required_attributes['response_paz'] = () self.required_attributes['recorder'] = ('serial', ) self.required_attributes['stream'] = ('name', ) self.required_attributes['stream_parameter'] = () self.required_attributes['assigned_component'] = () self.required_attributes['network'] = ('name', ) self.required_attributes['station'] = ('name', ) self.required_attributes['location'] = ('name', ) self.required_attributes['channel'] = ('name', ) self.required_attributes['assigned_stream'] = () self.required_attributes['array'] = ('name', ) self.required_attributes['array_station'] = () # The required tags which have to be present in the inventory. self.required_tags = {} self.required_tags['sensor'] = ('model', 'producer') self.required_tags['component'] = ('description', 'input_unit', 'output_unit', 'deliver_unit', 'component_parameter', ) self.required_tags['component_parameter'] = ('start_time', 'end_time', 'sensitivity') self.required_tags['response_paz'] = ('type', 'units', 'A0_normalization_factor', 'normalization_frequency') #self.required_tags['complex_zero'] = ('real_zero', 'imaginary_zero') #self.required_tags['complex_pole'] = ('real_pole', 'imaginary_pole') self.required_tags['recorder'] = ('model', 'producer', 'description') self.required_tags['stream'] = ('label', ) self.required_tags['stream_parameter'] = ('start_time', 'end_time', 'gain', 'bitweight') self.required_tags['assigned_component'] = ('sensor_serial', 'sensor_model', 'sensor_producer', 'component_name', 'start_time', 'end_time') self.required_tags['network'] = ('description', 'type') self.required_tags['station'] = ('location', ) self.required_tags['location'] = ('x', 'y', 'z', 'coord_system', 'description') self.required_tags['channel'] = ('description', ) self.required_tags['assigned_stream'] = ('recorder_serial', 'recorder_model', 'recorder_producer','stream_name', 'start_time', 'end_time') self.required_tags['array'] = () self.required_tags['array_station'] = ('network', 'name', 'location', 'start_time', 'end_time')
[docs] def parse(self, filename, inventory_name = 'new xml inventory'): ''' Parse a XML inventory file. Parameters ---------- filename: str The path to the inventory XML file. inventory_name: str The name of the inventory. Returns ------- :class:`mss_dataserver.geometry.inventory.Inventory` The parsed inventory. ''' import lxml.etree self.logger.debug("parsing file...\n") inventory = Inventory(inventory_name, type = 'xml') # Parse the xml file passed as argument. parser = lxml.etree.XMLParser(remove_comments = True) tree = lxml.etree.parse(filename, parser) inventory_root = tree.getroot() # Check if the root element is of type inventory. if inventory_root.tag != 'inventory': return else: self.logger.debug("found inventory root tag\n") # Set the name of the inventory. = inventory_root.attrib['name'] # Get the recorders and stations of the inventory. sensor_list = tree.findall('sensor_list') recorder_list = tree.findall('recorder_list') networks = tree.findall('network') arrays = tree.findall('array') # Process the sensors first. for cur_sensor_list in sensor_list: sensors = cur_sensor_list.findall('sensor') self.process_sensors(inventory, sensors) # Next process the recorders. These might depend on sensors. for cur_recorder_list in recorder_list: recorders = cur_recorder_list.findall('recorder') self.process_recorders(inventory, recorders) # Now process the networks which might depend on recorders. self.process_networks(inventory, networks) # Finally process the arrays which require all elements already added # to the inventory. self.process_arrays(inventory, arrays) self.logger.debug("Success reading the XML file.") return inventory
[docs] def instance_to_xml(self, instance, root, name, attributes, tags, attr_map, converter, element_handler = {}): ''' Translate an inventory object into a xml element. Parameters ---------- instance: object An instance of a :class:`mss_dataserver.geometry.inventory` class. root: :class:`etree.Element` or :class:`etree.Subelement` The XML root. name: str The name of the XML element. attributes: :obj:`list` of str The attributes of the XML element. tags: :obj:`list` of str The tags of the XML element. attr_map: :obj:`dict` The mapping of the XML attributes to instance attributes. converter: :obj:`dict` A dictionary with functions used to convert attributes before adding them to the XML element. element_handler: :obj:`dict` A dictionary with functions used to handle attributes before adding them to the XML element. A handler can be used if a simple conversion using the converter is not sufficient. Returns ------- :class:`etree.SubElement` The created XML element. ''' attrib = {} for cur_key in attributes: attrib[cur_key] = getattr(instance, attr_map[cur_key]) element = etree.SubElement(root, name, **attrib) for cur_key in tags: if cur_key in element_handler.keys(): eh = element_handler[cur_key] eh(name = cur_key, value = getattr(instance, attr_map[cur_key]), root = element) else: tag = etree.SubElement(element, cur_key) if cur_key in converter.keys(): cur_text = converter[cur_key](getattr(instance, attr_map[cur_key])) else: value = getattr(instance, attr_map[cur_key]) if value is not None: cur_text = str(value) else: cur_text = '' tag.text = cur_text return element
[docs] def clean_time_string(self, value): ''' Remove running and big bang string from time string. Parameters ---------- value: str The time string to clean. Returns ------- str The cleaned time string. ''' if value == 'big bang': value = '' elif value == 'running': value = '' return value
[docs] def handle_element_pz(self, name, value, root): ''' Convert a list of complex values to a list of xml tree elements. The new XML elements are added in place to the root element. Parameters ---------- name: str The name of the XML element. value: :obj:`list` of complex The complex values to convert. root: :class:`etree.SubElement` The root of the XML element. ''' for cur_pz in value: element = etree.SubElement(root, name) element.text = str(cur_pz).replace('(', '').replace(')','')
[docs] def export_xml(self, inventory, filename): ''' Export an inventory to xml file. Parameters ---------- inventory: :class:`mss_dataserver.geometry.inventory.Inventory` The inventory to convert. filename: str The path to the file to which to save the XML file. ''' root = etree.Element('inventory', name = # sensor sensor_attributes = ['serial',] sensor_tags = ['model', 'producer', 'description'] sensor_map = {'serial':'serial', 'model':'model', 'producer':'producer', 'description':'description'} sensor_converter = {} # sensor component component_attributes = ['name', ] component_tags = ['description', 'input_unit', 'output_unit', 'deliver_unit'] component_map = {'name':'name', 'description':'description', 'input_unit':'input_unit', 'output_unit':'output_unit', 'deliver_unit':'deliver_unit'} component_converter = {} # sensor component parameter component_parameter_attributes = [] component_parameter_tags = ['start_time', 'end_time', 'sensitivity'] component_parameter_map = {'start_time':'start_time_string', 'end_time':'end_time_string', 'sensitivity':'sensitivity'} component_parameter_converter = {'start_time':self.clean_time_string, 'end_time':self.clean_time_string} component_parameter_paz_attributes = [] component_parameter_paz_tags = ['type', 'A0_normalization_factor', 'normalization_frequency', 'complex_zero', 'complex_pole'] component_parameter_paz_map = {'type':'tf_type', 'A0_normalization_factor':'tf_normalization_factor', 'normalization_frequency':'tf_normalization_frequency', 'complex_zero':'tf_zeros', 'complex_pole':'tf_poles'} component_parameter_paz_converter = {} component_parameter_paz_handler = {'complex_zero':self.handle_element_pz, 'complex_pole':self.handle_element_pz} # recorder rec_attributes = ['serial',] rec_tags = ['model', 'producer', 'description'] rec_map = {'serial':'serial', 'model':'model', 'producer':'producer', 'description':'description'} rec_converter = {} #stream stream_attributes = ['name',] stream_tags = ['label',] stream_map = {'name':'name', 'label':'label'} stream_converter = {} #stream_parameter stream_param_attributes = [] stream_param_tags = ['start_time', 'end_time', 'gain', 'bitweight'] stream_param_map = {'start_time':'start_time_string', 'end_time':'end_time_string', 'gain':'gain', 'bitweight':'bitweight'} stream_param_converter = {'start_time':self.clean_time_string, 'end_time':self.clean_time_string} # stream assigned component stream_comp_attributes = [] stream_comp_tags = ['sensor_serial', 'sensor_model', 'sensor_producer', 'component_name', 'start_time', 'end_time'] stream_comp_map = {'sensor_serial':'serial', 'sensor_model':'model', 'sensor_producer':'producer', 'component_name':'name', 'start_time':'start_time_string', 'end_time':'end_time_string'} stream_comp_converter = {'start_time':self.clean_time_string, 'end_time':self.clean_time_string} # network net_attributes = ['name',] net_tags = ['description', 'type'] net_map = {'name':'name', 'description':'description', 'type':'type'} net_converter = {} # array array_attributes = ['name',] array_tags = ['description',] array_map = {'name': 'name', 'description': 'description'} array_converter = {} # array_station array_stat_attributes = [] array_stat_tags = ['network', 'name', 'location', 'start_time', 'end_time'] array_stat_map = {'network': 'network', 'name': 'name', 'location': 'location', 'start_time': 'start_time', 'end_time': 'end_time'} array_stat_converter = {} # station stat_attributes = ['name',] stat_tags = [] stat_map = {} stat_converter = {} #location loc_attributes = ['name',] loc_tags = ['x', 'y', 'z', 'coord_system', 'description'] loc_map = {'name':'location', 'x':'x', 'y':'y', 'z':'z', 'coord_system':'coord_system', 'description':'description'} loc_converter = {} # channel chan_attributes = ['name',] chan_tags = ['description',] chan_map = {'name':'name', 'description':'description'} chan_converter = {} # assigned stream chan_stream_attributes = [] chan_stream_tags = ['recorder_serial', 'recorder_model', 'recorder_producer', 'stream_name', 'start_time', 'end_time'] chan_stream_map = {'recorder_serial':'serial', 'recorder_model':'model', 'recorder_producer':'producer', 'stream_name':'name', 'start_time':'start_time_string', 'end_time':'end_time_string'} chan_stream_converter = {'start_time':self.clean_time_string, 'end_time':self.clean_time_string} # Export the sensors. sensor_list = etree.SubElement(root, 'sensor_list') for cur_sensor in inventory.sensors: sensor_element = self.instance_to_xml(instance = cur_sensor, root = sensor_list, name = 'sensor', attributes = sensor_attributes, tags = sensor_tags, attr_map = sensor_map, converter = sensor_converter) for cur_component in cur_sensor.components: comp_element = self.instance_to_xml(instance = cur_component, root = sensor_element, name = 'component', attributes = component_attributes, tags = component_tags, attr_map = component_map, converter = component_converter) for cur_parameter in cur_component.parameters: param_element = self.instance_to_xml(instance = cur_parameter, root = comp_element, name = 'component_parameter', attributes = component_parameter_attributes, tags = component_parameter_tags, attr_map = component_parameter_map, converter = component_parameter_converter) paz_element = self.instance_to_xml(instance = cur_parameter, root = param_element, name = 'response_paz', attributes = component_parameter_paz_attributes, tags = component_parameter_paz_tags, attr_map = component_parameter_paz_map, converter = component_parameter_paz_converter, element_handler = component_parameter_paz_handler) #Export the recorders. recorder_list = etree.SubElement(root, 'recorder_list') for cur_recorder in inventory.recorders: rec_element = self.instance_to_xml(instance = cur_recorder, root = recorder_list, name = 'recorder', attributes = rec_attributes, tags = rec_tags, attr_map = rec_map, converter = rec_converter) for cur_stream in cur_recorder.streams: stream_element = self.instance_to_xml(instance = cur_stream, root = rec_element, name = 'stream', attributes = stream_attributes, tags = stream_tags, attr_map = stream_map, converter = stream_converter) for cur_param in cur_stream.parameters: param_element = self.instance_to_xml(instance = cur_param, root = stream_element, name = 'stream_parameter', attributes = stream_param_attributes, tags = stream_param_tags, attr_map = stream_param_map, converter = stream_param_converter) for cur_comp_tb in cur_stream.components: comp_element = self.instance_to_xml(instance = cur_comp_tb, root = stream_element, name = 'assigned_component', attributes = stream_comp_attributes, tags = stream_comp_tags, attr_map = stream_comp_map, converter = stream_comp_converter) # Export the networks. for cur_network in inventory.networks: net_element = self.instance_to_xml(instance = cur_network, root = root, name = 'network', attributes = net_attributes, tags = net_tags, attr_map = net_map, converter = net_converter) # Get the unique station names. stat_names = [ for x in cur_network.stations] stat_names = list(set(stat_names)) for cur_stat_name in stat_names: cur_station_list = cur_network.get_station(name = cur_stat_name) # Create the station element. stat_element = etree.SubElement(net_element, 'station', name = cur_stat_name) for cur_station in cur_station_list: # Add the location elements. loc_element = self.instance_to_xml(instance = cur_station, root = stat_element, name = 'location', attributes = loc_attributes, tags = loc_tags, attr_map = loc_map, converter = loc_converter) for cur_channel in cur_station.channels: chan_element = self.instance_to_xml(instance = cur_channel, root = loc_element, name = 'channel', attributes = chan_attributes, tags = chan_tags, attr_map = chan_map, converter = chan_converter) for cur_stream_tb in cur_channel.streams: self.instance_to_xml(instance = cur_stream_tb, root = chan_element, name = 'assigned_stream', attributes = chan_stream_attributes, tags = chan_stream_tags, attr_map = chan_stream_map, converter = chan_stream_converter) # Export the arrays for cur_array in inventory.arrays: array_element = self.instance_to_xml(instance = cur_array, root = root, name = 'array', attributes = array_attributes, tags = array_tags, attr_map = array_map, converter = array_converter) for cur_station_tb in cur_array.stations: # Create the station element. stat_element = self.instance_to_xml(instance = cur_station_tb, root = array_element, name = 'station', attributes = array_stat_attributes, tags = array_stat_tags, attr_map = array_stat_map, converter = array_stat_converter) # Write the xml string to a file. et = etree.ElementTree(root) et.write(filename, pretty_print = True, xml_declaration = True, encoding = 'UTF-8')
#fid = open(filename, 'w') #fid.write(etree.tostring(root, pretty_print = True)) #fid.close()
[docs] def process_sensors(self, inventory, sensors): ''' Process the extracted sensor tags. The sensors are added in place to the passed inventory. Parameters ---------- inventory: :class:`mss_dataserver.geometry.inventory.Inventory` The inventory to which to add the parsed sensors. sensors: :obj:`list` of :class:`etree.SubElement` The xml sensor nodes parsed using the findall method. ''' self.logger.debug("Processing the sensors.") for cur_sensor in sensors: sensor_content = self.parse_node(cur_sensor) if self.check_completeness(cur_sensor, sensor_content, 'sensor') is False: continue if 'component' in sensor_content.keys(): sensor_content.pop('component') sensor_to_add = Sensor(serial = cur_sensor.attrib['serial'], **sensor_content) inventory.add_sensor(sensor_to_add) components = cur_sensor.findall('component') self.process_components(sensor_to_add, components)
[docs] def process_components(self, sensor, components): ''' Process the component nodes of a sensor. Parameters ---------- sensor: :class:`mss_dataserver.geometry.inventory.Sensor` The sensor to which to add the components. components: :obj:`list` of :class:`etree.SubElement` The xml component nodes parsed using the findall method. ''' for cur_component in components: component_content = self.parse_node(cur_component) if self.check_completeness(cur_component, component_content, 'component') is False: continue if 'component_parameter' in component_content: component_content.pop('component_parameter') component_to_add = SensorComponent(name = cur_component.attrib['name'], **component_content) sensor.add_component(component_to_add) parameters = cur_component.findall('component_parameter') self.process_component_parameters(component_to_add, parameters)
[docs] def process_component_parameters(self, component, parameters): ''' Process the component_parameter nodes of a component. Parameters ---------- component: :class:`mss_dataserver.geometry.inventory.SensorComponent` The sensor component to which to add the parameters. components: :obj:`list` of :class:`etree.SubElement` The xml sensor component parameter nodes parsed using the findall method. ''' for cur_parameter in parameters: content = self.parse_node(cur_parameter) if self.check_completeness(cur_parameter, content, 'component_parameter') is False: continue if 'response_paz' in content: content.pop('response_paz') parameter_to_add = SensorComponentParameter(**content) component.add_parameter(parameter_to_add) response_paz = cur_parameter.findall('response_paz') self.process_response_paz(parameter_to_add, response_paz)
[docs] def process_response_paz(self, parameter, response_paz): ''' Process the response_paz nodes of a component_paramter. Parameters ---------- parameter: :class:`mss_dataserver.geometry.inventory.SensorComponentParameter` The sensor component parameter to which to add the response information. response_paz: :obj:`list` of :class:`etree.SubElement` The xml transfer function response nodes parsed using the findall method. ''' for cur_paz in response_paz: content = self.parse_node(cur_paz) if self.check_completeness(cur_paz, content, 'response_paz') is False: continue self.logger.debug("Adding the tf to the parameter %s", parameter) parameter.set_transfer_function(tf_type = content['type'], tf_units = content['units'], tf_normalization_factor = float(content['A0_normalization_factor']), tf_normalization_frequency = float(content['normalization_frequency'])) zeros = cur_paz.findall('complex_zero') self.process_complex_zero(parameter, zeros) poles = cur_paz.findall('complex_pole') self.process_complex_pole(parameter, poles)
[docs] def process_complex_zero(self, parameter, zeros): ''' Process the complex_zero nodes in a response_paz. Parameters ---------- parameter: :class:`mss_dataserver.geometry.inventory.SensorComponentParameter` The sensor component parameter to which to add the complex zeros. zeros: :obj:`list` of :class:`etree.SubElement` The xml transfer function complex zeros nodes parsed using the findall method. ''' for cur_zero in zeros: self.logger.debug('Adding zero to the parameter %s', parameter) zero = cur_zero.text.replace(' ', '') parameter.tf_add_complex_zero(complex(zero))
[docs] def process_complex_pole(self, parameter, poles): ''' Process the complex_poles nodes in a response_paz. Parameters ---------- parameter: :class:`mss_dataserver.geometry.inventory.SensorComponentParameter` The sensor component parameter to which to add the complex poles. poles: :obj:`list` of :class:`etree.SubElement` The xml transfer function complex poles nodes parsed using the findall method. ''' for cur_pole in poles: pole = cur_pole.text.replace(' ', '') parameter.tf_add_complex_pole(complex(pole))
[docs] def process_recorders(self, inventory, recorders): ''' Process the extracted recorder nodes. Parameters ---------- inventory: :class:`mss_dataserver.geometry.inventory.Inventory` The inventory to which to add the parsed sensors. recorders: :obj:`list` of :class:`etree.SubElement` The xml recorder nodes parsed using the findall method. ''' self.logger.debug("Processing the recorders.") for cur_recorder in recorders: content = self.parse_node(cur_recorder) if self.check_completeness(cur_recorder, content, 'recorder') is False: continue if 'stream' in content.keys(): content.pop('stream') # Create the Recorder instance. rec_to_add = Recorder(serial = cur_recorder.attrib['serial'], **content) inventory.add_recorder(rec_to_add) # Process the streams of the recorder. streams = cur_recorder.findall('stream') self.process_recorder_streams(rec_to_add, streams)
[docs] def process_recorder_streams(self, recorder, streams): ''' Process the stream nodes of a recorder. Parameters ---------- recorder: :class:`mss_dataserver.geometry.inventory.Recorder` The recorder to which to add the streams. streams: :obj:`list` of :class:`etree.SubElement` The xml stream nodes parsed using the findall method. ''' for cur_stream in streams: content = self.parse_node(cur_stream) if self.check_completeness(cur_stream, content, 'stream') is False: continue if 'stream_parameter' in content.keys(): content.pop('stream_parameter') if 'assigned_component' in content.keys(): content.pop('assigned_component') # Create the stream instance. stream_to_add = RecorderStream(name = cur_stream.attrib['name'], **content) recorder.add_stream(stream_to_add) stream_parameters = cur_stream.findall('stream_parameter') self.process_stream_parameters(stream_to_add, stream_parameters) assigned_components = cur_stream.findall('assigned_component') self.process_assigned_components(stream_to_add, assigned_components)
[docs] def process_stream_parameters(self, stream, parameters): ''' Process the stream_parameter nodes of a recorder stream. Parameters ---------- stream: :class:`mss_dataserver.geometry.inventory.RecorderStream` The recorder to which to add the streams. streams: :obj:`list` of :class:`etree.SubElement` The xml recorder stream parameter nodes parsed using the findall method. ''' for cur_parameter in parameters: content = self.parse_node(cur_parameter) if self.check_completeness(cur_parameter, content, 'stream_parameter') is False: continue parameter_to_add = RecorderStreamParameter(**content) stream.add_parameter(parameter_to_add)
[docs] def process_assigned_components(self, stream, components): ''' Process the components assigned to a recorder stream. Parameters ---------- stream: :class:`mss_dataserver.geometry.inventory.RecorderStream` The recorder to which to add the streams. components: :obj:`list` of :class:`etree.SubElement` The xml sensor component nodes parsed using the findall method. ''' for cur_component in components: content = self.parse_node(cur_component) if self.check_completeness(cur_component, content, 'assigned_component') is False: continue stream.add_component(serial = content['sensor_serial'], model = content['sensor_model'], producer = content['sensor_producer'], name = content['component_name'], start_time = content['start_time'], end_time = content['end_time'])
[docs] def process_networks(self, inventory, networks): ''' Process the extracted network nodes. Parameters ---------- inventory: :class:`mss_dataserver.geometry.inventory.Inventory` The inventory to which to add the parsed sensors. networks: :obj:`list` of :class:`etree.SubElement` The xml network nodes parsed using the findall method. ''' self.logger.debug("Processing the networks.") for cur_network in networks: content = self.parse_node(cur_network) if self.check_completeness(cur_network, content, 'network') is False: continue if 'station' in content.keys(): content.pop('station') # Create the Recorder instance. net_to_add = Network(name=cur_network.attrib['name'], **content) # Add the network to the inventory. inventory.add_network(net_to_add) stations = cur_network.findall('station') self.process_stations(net_to_add, stations)
[docs] def process_arrays(self, inventory, arrays): ''' Process the extracted array nodes. Parameters ---------- inventory: :class:`mss_dataserver.geometry.inventory.Inventory` The inventory to which to add the parsed sensors. arrays: :obj:`list` of :class:`etree.SubElement` The xml array nodes parsed using the findall method. ''' self.logger.debug("Processing the networks.") for cur_array in arrays: content = self.parse_node(cur_array) self.check_completeness(cur_array, content, 'array') if 'station' in content.keys(): content.pop('station') # Create the Array instance and add it to the inventory. array_to_add = Array(name = cur_array.attrib['name'], **content) inventory.add_array(array_to_add) # Process the stations to be added to the array. stations = cur_array.findall('station') for cur_station in stations: stat_content = self.parse_node(cur_station) self.check_completeness(cur_station, stat_content, 'array_station') station_to_add = inventory.get_station(network = stat_content['network'], name = stat_content['name'], location = stat_content['location']) if len(station_to_add) == 1: station_to_add = station_to_add[0] array_to_add.add_station(station_to_add, start_time = stat_content['start_time'], end_time = stat_content['end_time'])
[docs] def process_stations(self, network, stations): ''' Process the station nodes of a network. Parameters ---------- network: :class:`mss_dataserver.geometry.inventory.Network` The network to which to add the stations. stations: :obj:`list` of :class:`etree.SubElement` The xml station nodes parsed using the findall method. ''' for cur_station in stations: content = self.parse_node(cur_station) if self.check_completeness(cur_station, content, 'station') is False: continue locations = cur_station.findall('location') for cur_location in locations: loc_content = self.parse_node(cur_location) if 'channel' in loc_content.keys(): loc_content.pop('channel') station_to_add = Station(name = cur_station.attrib['name'], location = cur_location.attrib['name'], **loc_content) network.add_station(station_to_add) channels = cur_location.findall('channel') self.process_channels(station_to_add, channels)
[docs] def process_channels(self, station, channels): ''' Process the channel nodes of a station. Parameters ---------- station: :class:`mss_dataserver.geometry.inventory.Station` The station to which to add the channels. channels: :obj:`list` of :class:`etree.SubElement` The xml channel nodes parsed using the findall method. ''' for cur_channel in channels: content = self.parse_node(cur_channel) if self.check_completeness(cur_channel, content, 'channel') is False: continue if 'assigned_stream' in content.keys(): content.pop('assigned_stream') channel_to_add = Channel(name = cur_channel.attrib['name'], **content) station.add_channel(channel_to_add) assigned_streams = cur_channel.findall('assigned_stream') self.process_assigned_streams(channel_to_add, assigned_streams)
[docs] def process_assigned_streams(self, channel, streams): ''' Process the assigned streams of a channel. Parameters ---------- channel: :class:`mss_dataserver.geometry.inventory.Channel` The channel to which to add the streams. streams: :obj:`list` of :class:`etree.SubElement` The xml stream nodes parsed using the findall method. ''' for cur_stream in streams: content = self.parse_node(cur_stream) if self.check_completeness(cur_stream, content, 'assigned_stream') is False: continue channel.add_stream(serial = content['recorder_serial'], model = content['recorder_model'], producer = content['recorder_producer'], name = content['stream_name'], start_time = content['start_time'], end_time = content['end_time'])
[docs] def get_node_text(self, xml_element, tag): ''' Get the text of a XML node. Parameters ---------- xml_element: :class:`etree.SubElement` The XML element to search. tag: str The tag to search in the XML element. Returns ------- str The node text. ''' node = xml_element.find(tag) if node is not None: return node.text else: return None
[docs] def parse_node(self, xml_element): ''' Parse a XML node. Parameters ---------- xml_element: :class:`etree.SubElement` The XML element to search. Returns ------- :obj:`dict` of str The content of the node as a dictionary. ''' node_content = {} for cur_node in list(xml_element): if cur_node.text is not None: node_content[cur_node.tag] = cur_node.text.strip() else: node_content[cur_node.tag] = cur_node.text return node_content
[docs] def keys_complete(self, node_content, required_keys): ''' Check if a node contains all required keys. Parameters ---------- node_content: :obj:`dict` of str The content of a node as a dictionary. required_keys: :obj:`dict` of str The keys which are mandatory for the node. Returns ------- :obj:`list` of str The missing keys. ''' missing_keys = [] for cur_key in required_keys: if cur_key in node_content: continue else: missing_keys.append(cur_key) return missing_keys
[docs] def check_completeness(self, node, content, node_type): ''' Check the completeness of a XML node. Parameters ---------- node: :class:`etree.SubElement` The XML node to check. content: :obj:`dict` of str The content of a node as a dictionary. node_type: str The type of the node. Returns ------- boolean True, if the node has passed all checks. ''' missing_attrib = self.keys_complete(node.attrib, self.required_attributes[node_type]) missing_keys = self.keys_complete(content, self.required_tags[node_type]); if not missing_keys and not missing_attrib: self.logger.debug(node_type + " xml content:") self.logger.debug("%s", content) return True else: self.logger.error("Not all required fields present!\nMissing Keys:\n") self.logger.error("%s", missing_keys) self.logger.error("%s", missing_attrib) raise RuntimeError("Not all required fieds for node %s present." % node_type)