# -*- coding: utf-8 -*-
##############################################################################
# LICENSE
#
# This file is part of mss_dataserver.
#
# If you use mss_dataserver in any program or publication, please inform and
# acknowledge its authors.
#
# mss_dataserver is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# mss_dataserver is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with mss_dataserver. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright 2019 Stefan Mertl
##############################################################################
''' General utility function.
'''
import configparser
import json
import logging
import logging.handlers
import os
[docs]def load_configuration(filename):
''' Load the configuration from a file.
Load the configuration from a .ini file using configparser.
The properties of the configuration file are documented in an
example .ini file in the *example* directory.
Parameters
----------
filename: str
The full path to the configuration file.
Returns
-------
config: dict
A dictionary holding the configuration data.
Notes
-----
Example configuration file: https://github.com/Macroseismic-Sensor-Network/mss_dataserver/blob/main/example/mss_dataserver_config.ini
'''
if not os.path.exists(filename):
raise RuntimeError("The configuration filename {filename} doesn't exist.".format(filename = filename))
parser = configparser.ConfigParser()
parser.read(filename)
config = {}
config['config_filepath'] = filename
config['websocket'] = {}
config['websocket']['host'] = parser.get('websocket', 'host').strip()
config['websocket']['port'] = int(parser.get('websocket', 'port'))
config['seedlink'] = {}
config['seedlink']['host'] = parser.get('seedlink', 'host').strip()
config['seedlink']['port'] = int(parser.get('seedlink', 'port'))
config['output'] = {}
config['output']['data_dir'] = parser.get('output', 'data_dir').strip()
config['output']['event_dir'] = parser.get('output', 'event_dir').strip()
config['log'] = {}
config['log']['log_dir'] = parser.get('log', 'log_dir').strip()
config['log']['loglevel'] = parser.get('log', 'loglevel').strip()
config['log']['max_bytes'] = int(parser.get('log', 'max_bytes'))
config['log']['backup_count'] = int(parser.get('log', 'backup_count'))
config['project'] = {}
config['project']['author_uri'] = parser.get('project', 'author_uri').strip()
config['project']['agency_uri'] = parser.get('project', 'agency_uri').strip()
config['project']['inventory_file'] = parser.get('project', 'inventory_file').strip()
config['database'] = {}
config['database']['host'] = parser.get('database', 'host').strip()
config['database']['username'] = parser.get('database', 'username').strip()
config['database']['password'] = parser.get('database', 'password').strip()
config['database']['dialect'] = parser.get('database', 'dialect').strip()
config['database']['driver'] = parser.get('database', 'driver').strip()
config['database']['database_name'] = parser.get('database', 'database_name').strip()
config['process'] = {}
config['process']['stations'] = json.loads(parser.get('process', 'stations'))
config['process']['interval'] = int(parser.get('process', 'interval'))
config['process']['pgv_sps'] = int(parser.get('process', 'pgv_sps'))
config['process']['trigger_threshold'] = float(parser.get('process', 'trigger_threshold'))
config['process']['warn_threshold'] = float(parser.get('process', 'warn_threshold'))
config['process']['valid_event_threshold'] = float(parser.get('process', 'valid_event_threshold'))
config['process']['pgv_archive_time'] = int(parser.get('process', 'pgv_archive_time'))
config['process']['event_archive_timespan'] = int(parser.get('process', 'event_archive_timespan'))
config['process']['min_event_length'] = int(parser.get('process', 'min_event_length'))
config['process']['min_event_detections'] = int(parser.get('process', 'min_event_detections'))
config['postprocess'] = {}
config['postprocess']['data_dir'] = parser.get('postprocess', 'data_dir').strip()
config['postprocess']['map_dir'] = parser.get('postprocess', 'map_dir').strip()
config['postprocess']['visualization_dir'] = parser.get('postprocess', 'visualization_dir').strip()
config['postprocess']['boundary_filename'] = parser.get('postprocess', 'boundary_filename').strip()
config['postprocess']['basemap_filename'] = parser.get('postprocess', 'basemap_filename').strip()
config['postprocess']['station_amplification_filename'] = parser.get('postprocess', 'station_amplification_filename').strip()
return config
[docs]def get_logger_rotating_file_handler(filename = None,
log_level = 'INFO',
max_bytes = 1000,
backup_count = 3):
''' Create a logging rotating file handler.
Create a logging RotatingFileHandler and ad a Formatter to it.
Parameters
----------
filename: str
The full path of the log file.
log_level: str
The logging log level.
['DEBUG', 'INFO', 'WARNING', 'ERROR']
max_bytes: int
The maximum filesize of the log file [bytes].
If the file grows larger than this value, a new
file is created.
backup_count: int
The number of rotating files to use.
Returns
-------
ch: logging.handlers.RotatingFileHandler
The logging filehandler.
'''
if not filename:
return
ch = logging.handlers.RotatingFileHandler(filename = filename,
maxBytes = max_bytes,
backupCount = backup_count)
ch.setLevel(log_level)
formatter = logging.Formatter("#LOG# - %(asctime)s - %(process)d - %(levelname)s - %(name)s: %(message)s")
ch.setFormatter(formatter)
return ch
[docs]class Version(object):
''' A version String representation.
Parameters
----------
version: str
The version as a point-seperated string (e.g. 0.0.1).
'''
[docs] def __init__(self, version = '0.0.1'):
''' Initialize the instance.
'''
self.version = self.string_to_tuple(version)
def __str__(self):
''' The string representation.
'''
return '.'.join([str(x) for x in self.version])
def __eq__(self, c):
''' Test for equality.
'''
for k, cur_n in enumerate(self.version):
if cur_n != c.version[k]:
return False
return True
def __ne__(self, c):
''' Test for inequality.
'''
return not self.__eq__(c)
def __gt__(self, c):
''' Test for greater than.
'''
for k, cur_n in enumerate(self.version):
if cur_n > c.version[k]:
return True
elif cur_n != c.version[k]:
return False
return False
def __lt__(self, c):
''' Test for less than.
'''
for k, cur_n in enumerate(self.version):
if cur_n < c.version[k]:
return True
elif cur_n != c.version[k]:
return False
return False
def __ge__(self, c):
''' Test for greater or equal.
'''
return self.__eq__(c) or self.__gt__(c)
def __le__(self, c):
''' Test for less or equal.
'''
return self.__eq__(c) or self.__lt__(c)
[docs] def string_to_tuple(self, vs):
''' Convert a version string to a tuple.
Parameters
----------
version: str
The version as a point-seperated string (e.g. 0.0.1).
Returns
-------
version_tuple: tuple
The version string as a tuple.
'''
nn = vs.split('.')
for k,x in enumerate(nn):
if x.isdigit():
nn[k] = int(x)
else:
tmp = re.split('[A-Za-z]', x)
tmp = [x for x in tmp if x.isdigit()]
if len(tmp) > 0:
nn[k] = int(tmp[0])
else:
nn[k] = 0
return tuple(nn)