# -*- coding: utf-8 -*-
##############################################################################
# LICENSE
#
# This file is part of mss_dataserver.
#
# If you use mss_dataserver in any program or publication, please inform and
# acknowledge its authors.
#
# mss_dataserver is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# mss_dataserver is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with mss_dataserver. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright 2019 Stefan Mertl
##############################################################################
import copy
import datetime
import gc
import gzip
import json
import logging
import os
import shutil
import subprocess
import threading
import time
import asyncio
import numpy as np
import obspy
import obspy.core.utcdatetime as utcdatetime
import obspy.clients.seedlink.easyseedlink as easyseedlink
from obspy.clients.seedlink.slpacket import SLPacket
import pyproj
import scipy
import scipy.spatial
import mss_dataserver.core.json_util as json_util
import mss_dataserver.core.validation as validation
import mss_dataserver.event.core as event_core
import mss_dataserver.event.detection as event_detection
import mss_dataserver.event.delaunay_detection as event_ddet
import mss_dataserver.postprocess.util as pp_util
[docs]class EasySeedLinkClientException(Exception):
"""
A base exception for all errors triggered explicitly by EasySeedLinkClient.
"""
# XXX Base on SeedLinkException?
pass
[docs]class MonitorClient(easyseedlink.EasySeedLinkClient):
""" A custom SeedLink client
Parameters
----------
project: :class:`mss_dataserver.core.project.Project`
The mss_dataserver project.
server_url: str
The URL of the server.
stations: :obj:`list` of :obj:`str`
The stations to request from the seedlink server.
monitor_stream: :class:`obspy.Stream`
The stream instance used to save the incoming data.
stream_lock: :class:`threading.Lock`
The lock object used to for thread-save access of the stream data.
data_dir: str
The data directory.
event_dir: str
The event directory.
process_interval: float
The time interval [s] used to process the received data.
stop_event: :class:`threading.Event`
The event used to signal the stopping of the program execution.
asyncio_loop: :class:`asyncio.EventLoop`
The asyncio event loop. Used to stop the loop if an error occurs.
pgv_sps: float
The samples per second of the PGV data stream.
autoconnect: boolean
The :class:`obspy.easyseedlink.EasySeedLinkClient` autoconnect parameter.
pgv_archive_time: float
The length of the archive stream to keep [s].
trigger_thr: float
The event trigger threshold [m/s].
warn_thr: float
The event warning threshold [m/s].
valid_event_thr: float
The threshold to declare an event as a valid event [m/s].
felt_thr: float
The threshold above which an event is considered as a felt event [m/s].
event_archive_timespan: float
The timespan used to load archived events [h].
min_event_length: float
The minimum length of an event [s]. Events smaller than this value are
ignored.
min_event_detections: int
The minimum number of detections for an event to be saved in the archive.
"""
[docs] def __init__(self, project, server_url, stations,
monitor_stream, stream_lock, data_dir, event_dir,
process_interval, stop_event, asyncio_loop,
pgv_sps = 1, autoconnect = False, pgv_archive_time = 1800,
trigger_thr = 0.01e-3, warn_thr = 0.01e-3,
valid_event_thr = 0.1e-3, felt_thr = 0.1e-3,
event_archive_timespan = 48, min_event_length = 2,
min_event_detections = 2):
''' Initialize the instance.
'''
easyseedlink.EasySeedLinkClient.__init__(self,
server_url = server_url,
autoconnect = autoconnect)
logger_name = __name__ + "." + self.__class__.__name__
self.logger = logging.getLogger(logger_name)
# Set the logging level of obspy module.
logging.getLogger('obspy.clients.seedlink').setLevel(logging.WARNING)
# The project instance.
self.project = project
# The URI of the agency.
self.agency_uri = project.agency_uri
# The URI of the author.
self.author_uri = project.author_uri
# The asyncio event loop. Used to stop the loop if an error occures.
self.asyncio_loop = asyncio_loop
# The stations to stream from the seedlink server.
self.stations = stations
# The recent, not yet processed data.
self.monitor_stream = monitor_stream
# The stream used for processing.
self.process_stream = obspy.core.Stream()
# The pgv stream holding the most recent data which has not been sent
# over the websocket.
self.pgv_stream = obspy.core.Stream()
# The pgv archive stream holding the PGV data of the specified archive
# time.
self.pgv_archive_stream = obspy.core.Stream()
# The velocity waveform data archive stream with the same length as the
# PGV archive stream.
self.vel_archive_stream = obspy.core.Stream()
# The length of the archive stream to keep [s].
self.pgv_archive_time = pgv_archive_time
self.vel_archive_time = pgv_archive_time
# The no-data value.
self.nodata_value = -999999
self.stream_lock = stream_lock
self.archive_lock = threading.Lock()
self.project_lock = threading.Lock()
# The common data output directory.
self.data_dir = data_dir
# The event data output directory.
self.supplement_dir = event_dir
# The time interval [s] used to process the received data.
self.process_interval = process_interval
# The samples per second of the PGV data stream.
self.pgv_sps = pgv_sps
self.stop_event = stop_event
# The trigger parameters.
self.trigger_thr = trigger_thr
self.warn_thr = warn_thr
self.valid_event_thr = valid_event_thr
self.felt_thr = felt_thr
# The most recent detected event.
self.event_triggered = False
self.current_event = None
# The last detected events.
self.event_archive = []
# The event warning.
self.event_warning = {}
# The latest event detection result.
self.last_detection_result = {}
# The PGV data state.
self.pgv_data_available = asyncio.Event()
# The event detection result state.
self.event_detection_result_available = asyncio.Event()
# The event warning state.
self.event_warning_available = asyncio.Event()
# The event trigger state.
self.current_event_available = asyncio.Event()
# The event archive state.
self.event_archive_changed = asyncio.Event()
# The keydata event.
self.event_keydata_available = asyncio.Event()
# The psysmon geometry inventory.
self.inventory = self.project.inventory
self.inventory.compute_utm_coordinates()
# The delaunay detector instance.
all_stations = self.inventory.get_station()
self.detector = event_ddet.DelaunayDetector(network_stations = all_stations,
trigger_thr = self.trigger_thr,
window_length = 10,
safety_time = 20,
p_vel = 3500,
min_trigger_window = 3,
max_edge_length = 40000,
author_uri = self.author_uri,
agency_uri = self.agency_uri)
self.recorder_map = self.get_recorder_mappings(station_nsl = self.stations)
self.conn.timeout = 10
# The required limits of an event to be saved in the archive.
self.min_event_length = min_event_length
self.min_event_detections = min_event_detections
# Load the archived data.
# The timespan to load in hours.
self.event_archive_timespan = event_archive_timespan
self.load_archive_catalogs(hours = self.event_archive_timespan)
[docs] def reset(self):
''' Reset the monitorclient to an initial state.
'''
self.monitor_stream.clear()
self.process_stream.clear()
self.pgv_stream.clear()
self.pgv_archive_stream.clear()
self.vel_archive_stream.clear()
self.event_triggered = False
self.current_event = None
self.project.event_library.clear()
self.load_archive_catalogs(hours = self.event_archive_timespan)
self.detector.reset()
[docs] def seedlink_connect(self):
''' Connect to the seedlink server.
'''
self.connect()
for cur_mss in self.recorder_map:
self.select_stream(cur_mss[0],
cur_mss[1],
cur_mss[2] + cur_mss[3])
[docs] def load_archive_catalogs(self, hours = 48):
''' Load the event catalogs of the specified last days.
Parameters
----------
hours: float
The timespan before now to load [h].
'''
self.logger.info("Loading archive catalogs for the last %d hours.", hours)
now = utcdatetime.UTCDateTime()
start_time = now - hours * 3600
start_day = utcdatetime.UTCDateTime(year = start_time.year,
julday = start_time.julday)
n_days = np.ceil((now - start_day) / 86400)
n_days = int(n_days)
for k in range(n_days):
cur_cat_date = now - k * 86400
cur_name = "{0:04d}-{1:02d}-{2:02d}".format(cur_cat_date.year,
cur_cat_date.month,
cur_cat_date.day)
self.logger.info("Requesting catalog %s.", cur_name)
with self.project_lock:
cur_cat = self.project.load_event_catalog(name = cur_name,
load_events = True)
if cur_cat:
self.logger.info("events in catalog: %s", cur_cat.events)
self.logger.info("Catalog keys: %s", self.project.event_library.catalogs.keys())
[docs] def trim_archive_catalogs(self, hours = 48):
''' Trim the catalogs in the library to the given timespan.
Parameters
----------
hours: float
The timespan before now used to trim the catalogs [h].
'''
self.logger.info('Trimming the event catalogs to %d hours from now.',
hours)
self.logger.info("Catalog keys before trim: %s",
self.project.event_library.catalogs.keys())
now = utcdatetime.UTCDateTime()
start_time = now - hours * 3600
start_day = utcdatetime.UTCDateTime(year = start_time.year,
julday = start_time.julday)
n_days = np.ceil((now - start_day) / 86400)
n_days = int(n_days)
catalogs_to_keep = []
for k in range(n_days):
cur_cat_date = now - k * 86400
cur_name = "{0:04d}-{1:02d}-{2:02d}".format(cur_cat_date.year,
cur_cat_date.month,
cur_cat_date.day)
catalogs_to_keep.append(cur_name)
available_catalogs = list(self.project.event_library.catalogs.keys())
catalogs_to_remove = [x for x in available_catalogs if x not in catalogs_to_keep]
self.logger.info("Catalog to remove %s", catalogs_to_remove)
for cur_name in catalogs_to_remove:
self.project.event_library.remove_catalog(name = cur_name)
self.logger.info("Catalog keys after trim: %s",
self.project.event_library.catalogs.keys())
[docs] def load_archive_file(self):
''' Load data from the JSON archive file.
'''
archive_filename = os.path.join(self.data_dir,
'mss_dataserver_archive.json')
archive = {}
if os.path.exists(archive_filename):
try:
with open(archive_filename) as fp:
archive = json.load(fp)
self.logger.info('Loaded the archive from file %s.',
archive_filename)
except Exception as e:
self.logger.exception("Couldn't load the archive file %s.",
archive_filename)
return archive
[docs] def load_from_archive(self):
''' Load data from a saved archive.
'''
archive = self.load_archive_file()
if archive:
if 'current_event' in archive.keys():
self.current_event = archive['current_event']
self.current_event['start_time'] = utcdatetime.UTCDateTime(self.current_event['start_time'])
self.current_event['end_time'] = utcdatetime.UTCDateTime(self.current_event['end_time'])
# TODO: Handle the loading of the pgv data stream.
self.current_event['pgv'] = obspy.core.Stream()
if 'event_archive' in archive.keys():
self.event_archive = archive['event_archive']
for cur_event in self.event_archive:
cur_event['start_time'] = utcdatetime.UTCDateTime(cur_event['start_time'])
cur_event['end_time'] = utcdatetime.UTCDateTime(cur_event['end_time'])
[docs] def save_to_archive(self, key):
''' Save data to the JSON archive.
Parameters
----------
key: str
The data key ('current_event', 'event_archive')
'''
archive_filename = os.path.join(self.data_dir,
'mss_dataserver_archive.json')
archive = self.load_archive_file()
if key == 'current_event':
data = self.get_current_event()
# TODO: Handle the saving of the event PGV data.
if key == 'event_archive':
data = self.get_event_archive()
try:
archive['last_write'] = utcdatetime.UTCDateTime().isoformat()
archive[key] = data
with open(archive_filename, 'w') as fp:
pref = json.dump(archive, fp, indent = 4, sort_keys = True)
self.logger.info('Saved the archive to file %s.',
archive_filename)
except Exception as e:
self.logger.exception("Error saving the data to the archive. data: %s, archive: %s",
data, archive)
[docs] def get_recorder_mappings(self, station_nsl = None):
''' Get the mappings of the requested NSLC.
Parameters
----------
station_nsl: :obj:`list` of :obj:`str`
The station NSL codes to process. If None, all available stations
in the inventory are processed.
Returns
-------
:obj:`dict`
The matching NSLC codes of the MSS units relating their
serial numbers to the actual station locations.
'''
recorder_map = {}
if station_nsl is None:
station_list = self.inventory.get_station()
else:
station_list = []
for cur_nsl in station_nsl:
cur_station = self.inventory.get_station(network = cur_nsl[0],
name = cur_nsl[1],
location = cur_nsl[2])
if len(cur_station) > 1:
raise ValueError('There are more than one stations. This is not yet supported.')
if len(cur_station) == 0:
raise ValueError('No station found for {0:s}. Check the input file.'.format(cur_nsl))
cur_station = cur_station[0]
station_list.append(cur_station)
for station in station_list:
for cur_channel in station.channels:
stream_tb = cur_channel.get_stream(start_time = obspy.UTCDateTime())
cur_loc = stream_tb[0].item.name.split(':')[0]
cur_chan = stream_tb[0].item.name.split(':')[1]
cur_key = ('XX',
stream_tb[0].item.serial,
cur_loc,
cur_chan)
recorder_map[cur_key] = cur_channel.nslc
self.logger.debug(recorder_map)
return recorder_map
[docs] def reconnect(self):
''' Reconnect to the server.'''
self.logger.info('Reconnecting to the server.')
self.close()
self.connect()
for cur_station in self.stations:
self.select_stream(cur_station[0], cur_station[1], cur_station[2])
[docs] def on_data(self, trace):
""" Override the on_data callback function.
"""
#self.logger.debug('Received trace:')
#self.logger.debug(str(trace))
#self.logger.debug("on_data trace data: %s", trace.data)
self.stream_lock.acquire()
cur_nslc = self.recorder_map[tuple(trace.id.split('.'))]
trace.stats.network = cur_nslc[0]
trace.stats.station = cur_nslc[1]
trace.stats.location = cur_nslc[2]
trace.stats.channel = cur_nslc[3]
self.monitor_stream.append(trace)
#self.monitor_stream.merge(method = 1, fill_value = 'interpolate')
#self.logger.debug(self.monitor_stream)
self.stream_lock.release()
#self.logger.debug('Leaving on_data.')
[docs] def run(self):
""" Start streaming data from the SeedLink server.
Streams need to be selected using
:meth:`~.EasySeedLinkClient.select_stream` before this is called.
This method enters an infinite loop, calling the client's callbacks
when events occur.
"""
# Note: This somewhat resembles the run() method in SLClient.
# Check if any streams have been specified (otherwise this will result
# in an infinite reconnect loop in the SeedLinkConnection)
if not len(self.conn.streams):
msg = 'No streams specified. Use select_stream() to select ' + \
'a stream.'
raise EasySeedLinkClientException(msg)
self.__streaming_started = True
# Start the processing timer thread.
#self.process_timer_thread = threading.Thread(name = 'process_timer',
# target = self.task_timer,
# args = (self.process_monitor_stream, ))
#self.process_timer_thread.start()
# Start the collection loop
start = time.time()
end = None
while not self.stop_event.is_set():
self.logger.debug('Waiting to collect data....')
data = self.conn.collect()
self.logger.debug('... received some data.')
if data == SLPacket.SLTERMINATE:
self.on_terminate()
continue
elif data == SLPacket.SLERROR:
self.on_seedlink_error()
continue
# At this point the received data should be a SeedLink packet
# XXX In SLClient there is a check for data == None, but I think
# there is no way that self.conn.collect() can ever return None
assert(isinstance(data, SLPacket))
packet_type = data.get_type()
# Ignore in-stream INFO packets (not supported)
if packet_type not in (SLPacket.TYPE_SLINF, SLPacket.TYPE_SLINFT):
# The packet should be a data packet
trace = data.get_trace()
# Pass the trace to the on_data callback
self.on_data(trace)
end = time.time()
# Start the data export thread if the write_interval has been
# reached.
#if end and (end - start) > self.write_interval:
#process_thread = threading.Thread(target = process_monitor_stream, args = (self.monitor_stream, self.stream_lock, self.data_dir))
#logging.info('Starting the processing thread.')
#process_thread.start()
#start = end
#end = None
[docs] def on_terminate(self):
''' Handle termination of the client.
'''
self.logger.warning('Terminating the client.')
# Clean the state.
self.stream_lock.acquire()
self.monitor_stream.clear()
self.stream_lock.release()
# Reconnect to the server.
try:
self.reconnect()
except Exception:
self.logger.error("Can't reconnect to the seedlink server.")
self.stop_event.set()
# Stop the asyncio loop.
self.logger.error("Stopping the asyncio event loop because there where troubles reconnecting to the seedlink server.")
self.asyncio_loop.stop()
[docs] def on_seedlink_error(self):
''' Handle client errors.
'''
self.logger.error('Got a seedlink error.')
[docs] async def task_timer(self, callback):
''' A timer executing a task at regular intervals.
Parameters
----------
callback: function
The function to be called after the given process_interval.
'''
self.logger.info('Starting the timer.')
interval = int(self.process_interval)
now = obspy.UTCDateTime()
delay_to_next_interval = interval - (now.timestamp % interval)
self.logger.info('Sleeping for %f seconds.', delay_to_next_interval)
time.sleep(delay_to_next_interval)
while not self.stop_event.is_set():
try:
self.logger.info('task_timer: Executing callback.')
await callback()
except Exception as e:
self.logger.exception(e)
self.stop()
now = obspy.UTCDateTime()
delay_to_next_interval = interval - (now.timestamp % interval)
self.logger.info('task_timer: Sleeping for %f seconds.', delay_to_next_interval)
#time.sleep(delay_to_next_interval)
await asyncio.sleep(delay_to_next_interval)
self.logger.debug("Leaving the task_timer method.")
[docs] def detect_event_warning(self):
''' Run the Voronoi detection with the most recent PGV data only.
This is a first crude detection of an event and is used for a
first warning notification of a possible event. The warning has
has to be confirmed by a detected event.
'''
self.logger.info("Running detect_event_warning.")
now = obspy.UTCDateTime()
with self.stream_lock:
working_stream = self.pgv_stream.copy()
# Get the max. PGV and the related stations.
max_pgv = []
stations = []
for cur_trace in working_stream:
max_pgv.append(np.nanmax(cur_trace.data))
stations.append(self.inventory.get_station(name = cur_trace.stats.station)[0])
# Compute the Delaunay triangles.
tri = self.compute_delaunay(stations)
# Convert the lists to numpy arrays.
max_pgv = np.array(max_pgv)
stations = np.array(stations)
# Compute the Delaunay trigger.
trigger_pgv = []
simp_stations = []
if tri:
edge_length = self.compute_edge_length(tri,
stations)
valid_tri = np.argwhere(edge_length < 30000).flatten()
edge_length = edge_length[valid_tri]
for k, cur_simp in enumerate(tri.simplices[valid_tri]):
cur_tri_pgv = max_pgv[cur_simp]
simp_stations.append([x.name for x in stations[cur_simp]])
trigger_pgv.append(np.nanmin(cur_tri_pgv))
trigger_pgv = np.array(trigger_pgv)
simp_stations = np.array(simp_stations)
# Detect the event warning.
mask = trigger_pgv >= self.warn_thr
if mask.any():
self.event_warning['time'] = now
self.event_warning['simp_stations'] = simp_stations[mask]
self.event_warning['trigger_pgv'] = trigger_pgv[mask]
self.event_warning_available.set()
self.logger.info("Event warning issued.")
self.logger.debug("Event warning data: %s.", self.event_warning)
else:
self.event_warning['time'] = now
self.event_warning['simp_stations'] = np.array([])
self.event_warning['trigger_pgv'] = np.array([])
self.event_warning_available.set()
self.logger.info("No event warning issued.")
self.logger.info("Finished the event warning computation.")
[docs] def detect_event(self):
''' Run the Voronoi event detection.
'''
self.logger.info('Running the event detection.')
#detect_win_length = 10
#safety_win = 10
#trigger_thr = self.trigger_thr
#min_trigger_window = 3
now = obspy.UTCDateTime()
with self.archive_lock:
working_stream = self.pgv_archive_stream.copy()
self.logger.debug("event detection working_stream: %s", working_stream)
# Run the Delaunay detection.
self.detector.init_detection_run(stream = working_stream)
if self.detector.detection_run_initialized:
self.detector.compute_trigger_data()
self.detector.evaluate_event_trigger()
# Evaluate the detection result.
if self.detector.new_event_available:
self.current_event = self.detector.get_event()
self.current_event_available.set()
if self.current_event.detection_state == 'closed':
try:
rejected = False
if ((self.current_event.max_pgv >= self.valid_event_thr) and (self.current_event.length >= 1)) or ((self.current_event.length >= self.min_event_length)) and (len(self.current_event.detections) >= self.min_event_detections):
# If only one detection triangle is available, all three stations
# have to be above the 0.1 mm/s threshold.
if len(self.current_event.detections) == 1:
cur_pgv_dict = self.current_event.get_max_pgv_per_station()
if not np.all(np.array(list(cur_pgv_dict.values())) >= 0.1e-3):
self.logger.info("PGV of single detection event too small: %s", cur_pgv_dict)
rejected = True
if not rejected:
# Save the event and its metadata in a thread to
# prevent blocking the data acquisition.
# TODO: Copy the event before exporting it. Deepcopy
# throws an error "TypeError: can't pickle
# _thread.RLock objects".
export_event = self.current_event
export_event_thread = threading.Thread(name = 'export_event',
target = self.export_event,
args = (export_event, ))
self.logger.info("Starting the export_event_thread.")
export_event_thread.start()
# TODO: Add some kind of event signaling to track the
# execution of the export thread.
self.export_event_thread = export_event_thread
self.logger.info("Continue the program execution.")
else:
rejected = True
if rejected:
self.logger.info("Rejected the event because it didn't fit the required parameters.")
self.logger.info("start_time: %s", self.current_event.start_time.isoformat())
self.logger.info("end_time: %s", self.current_event.end_time.isoformat())
self.logger.info("length: %d", self.current_event.length)
self.logger.info("max_pgv: %f", self.current_event.max_pgv)
self.logger.info("n_detections: %f", len(self.current_event.detections))
self.logger.info("valid_event_thr: %f", self.valid_event_thr)
self.logger.info("min_event_length: %d", self.min_event_length)
self.logger.info("min_event_detections: %d", self.min_event_detections)
finally:
# Clear the detector flag.
self.detector.new_event_available = False
else:
self.logger.warning("Failed to initialize the detection run.")
[docs] async def process_monitor_stream(self):
''' Process the data in the monitor stream.
'''
self.logger.info('Processing the monitor_stream.')
self.logger.info('# gc.get_objects: %d', len(gc.get_objects()))
with self.stream_lock:
monitor_stream_length = len(self.monitor_stream)
self.monitor_stream.merge()
self.monitor_stream.sort(keys = ['station'])
self.logger.debug('monitor_stream before selecting process stream: %s', str(self.monitor_stream.__str__(extended = True)))
# The minimum length of a trace to be added to the process stream [s].
process_min_length = 10
if monitor_stream_length > 0:
#now = obspy.UTCDateTime()
#process_end_time = now - now.timestamp % self.process_interval
#logger.debug('Trimming to end time: %s.', process_end_time)
with self.stream_lock:
for cur_trace in self.monitor_stream:
sec_remain = cur_trace.stats.endtime.timestamp % self.process_interval
cur_end_time = obspy.UTCDateTime(round(cur_trace.stats.endtime.timestamp - sec_remain))
cur_end_time = cur_end_time - cur_trace.stats.delta
self.logger.debug("Computed end_time: %s.",
cur_end_time.isoformat())
# Check if the trace length is larger xx seconds.
if (cur_end_time - cur_trace.stats.starttime) < process_min_length - cur_trace.stats.delta:
self.logger.debug("The process trace of %s with length %f would be smaller than %f seconds.",
cur_trace.id,
cur_end_time - cur_trace.stats.starttime,
process_min_length)
continue
self.logger.debug('Extracting process trace for %s.', cur_trace.id)
cur_slice_trace = cur_trace.slice(endtime = cur_end_time)
if len(cur_slice_trace) > 0:
self.process_stream.append(cur_slice_trace)
cur_trace.trim(starttime = cur_end_time + cur_trace.stats.delta)
self.process_stream.sort()
self.logger.debug('process_stream: %s', self.process_stream.__str__(extended = True))
with self.stream_lock:
self.logger.debug('monitor_stream: %s', str(self.monitor_stream.__str__(extended = True)))
# Get a stream containing only equal length traces per station.
el_stream = self.get_equal_length_traces()
self.logger.debug('Got el_stream: %s.', el_stream.__str__(extended = True))
# TODO: Handle the data in the process stream that has not been
# used to create the el_stream.
self.logger.debug('Data not used for el_stream: %s.', self.process_stream.__str__(extended = True))
# Detrend to remove eventual offset.
el_stream = el_stream.split()
el_stream.detrend(type = 'constant')
el_stream.merge()
# Convert the amplitudes from counts to m/s.
self.convert_to_physical_units(el_stream)
# Add the velocity waveform data to an archive stream.
with self.archive_lock:
self.vel_archive_stream = self.vel_archive_stream + el_stream
# Compute the PGV values.
# TODO: Computing the PGV is CPU intensive and could block the
# websocket server. Try to find a solution to move the computation to a
# separate process.
await self.compute_pgv(el_stream)
# Trim the archive stream.
self.trim_archive()
# Start the event alarm detection using the most recent pgv values.
#try:
# self.detect_event_warning()
#except Exception as e:
# self.logger.exception("Error computing the event warning.")
# Signal available PGV data.
self.pgv_data_available.set()
# Start the event detection.
try:
# TODO. Detecting the event is CPU intensive. Try to find a way
# to move the detection to another process.
self.detect_event()
except Exception as e:
self.logger.exception("Error computing the event detection.")
# Set the flag to mark another SOH state available.
self.event_keydata_available.set()
return
# TODO: Check if something could be interesting and remove the code
# below.
if len(set([len(x) for x in export_stream])) != 1:
self.logger.warning('The length of the traces in the stream is not equal. No export.')
self.logger.debug('Add the stream back to the monitor stream.')
stream_lock.acquire()
monitor_stream += export_stream
monitor_stream.merge()
self.logger.debug(monitor_stream)
stream_lock.release()
else:
if len(export_stream) > 0:
self.logger.debug('Exporting the stream.')
for cur_trace in export_stream:
self.logger.debug(cur_trace.get_id().replace('.', '_'))
self.logger.debug(cur_trace.stats.starttime.isoformat())
cur_filename = 'syscom_{0:s}_{1:s}.msd'.format(cur_trace.get_id().replace('.', '_'), cur_trace.stats.starttime.isoformat().replace('-', '').replace(':', '').replace('.', '_').replace('T', '_'))
cur_filename = os.path.join(data_dir, cur_filename)
export_stream.write(cur_filename, format = 'MSEED', reclen = 512, encoding = 11)
self.logger.debug('Done.')
else:
self.logger.debug('No data in stream.')
# Delete old files.
now = utcdatetime.UTCDateTime()
for cur_file in os.listdir(data_dir):
if os.stat(os.path.join(data_dir, cur_file)).st_mtime < now - 60:
os.remove(os.path.join(data_dir, cur_file))
[docs] def get_equal_length_traces(self):
''' Get a stream containing traces with equal length per station.
'''
self.logger.debug('Computing equal length traces.')
unique_stations = [(x.stats.network, x.stats.station, x.stats.location) for x in self.process_stream]
unique_stations = list(set(unique_stations))
unique_stations = sorted(unique_stations)
el_stream = obspy.core.Stream()
for cur_station in unique_stations:
self.logger.debug('Getting stream for %s.', cur_station)
# Get all traces for the station.
cur_stream = self.process_stream.select(network = cur_station[0],
station = cur_station[1],
location = cur_station[2])
self.logger.debug('Selected stream: %s.', cur_stream)
for cur_trace in cur_stream:
self.process_stream.remove(cur_trace)
cur_stream = cur_stream.merge()
# Check if all required channels are present:
missing_data = False
required_nslc = [x for x in self.recorder_map.values()
if x[1] == cur_station[1]]
available_nslc = [(x.stats.network,
x.stats.station,
x.stats.location,
x.stats.channel) for x in cur_stream]
self.logger.debug('available_nslc: %s', available_nslc)
self.logger.debug('required_nslc: %s', required_nslc)
for cur_required_nslc in required_nslc:
if cur_required_nslc not in available_nslc:
self.logger.debug('Data for channel %s is missing.',
cur_required_nslc)
missing_data = True
break
if missing_data:
self.logger.debug('Skipping station because of missing channel data. Re-inserting the data into the process stream.')
self.process_stream.extend(cur_stream)
continue
# Select a timespan with equal end time. Re-insert the remaining
# stream into the process_stream.
min_end = np.min([x.stats.endtime for x in cur_stream])
cur_el_stream = cur_stream.slice(endtime = min_end)
cur_rem_stream = cur_stream.trim(starttime = min_end + cur_el_stream[0].stats.delta)
self.process_stream.extend(cur_rem_stream)
self.logger.debug('Remaining stream added back to the process stream: %s', cur_rem_stream)
# In case of non-equal start times, drop the data before the common
# start time.
max_start = np.max([x.stats.starttime for x in cur_el_stream])
cur_rem_stream = cur_el_stream.slice(endtime = max_start - cur_el_stream[0].stats.delta)
cur_el_stream = cur_el_stream.slice(starttime = max_start)
self.logger.debug('Stream dropped because the start time was not equal: %s', cur_rem_stream)
self.logger.debug('el_stream: %s', cur_el_stream)
el_stream.extend(cur_el_stream)
return el_stream
[docs] async def compute_pgv(self, stream):
''' Compute the PGV values of the stream.
Parameters
----------
stream: :class:`obspy.Stream`
The stream used to compute the PGV data.
'''
self.logger.info('Computing the PGV.')
unique_stations = [(x.stats.network, x.stats.station, x.stats.location) for x in stream]
unique_stations = list(set(unique_stations))
samp_interval = 1 / self.pgv_sps
# Clear the PGV stream in case, it has not been served by the
# websocket server.
with self.stream_lock:
self.pgv_stream.clear()
for cur_station in unique_stations:
self.logger.debug('Getting stream for %s.', cur_station)
# Get all traces for the station.
cur_stream = stream.select(network = cur_station[0],
station = cur_station[1],
location = cur_station[2])
self.logger.debug('Selected stream: %s.', cur_stream)
min_start = np.min([x.stats.starttime for x in cur_stream])
sec_remain = min_start.timestamp % samp_interval
cur_offset = -sec_remain
self.logger.debug('offset: %f.', cur_offset)
cur_delta = cur_stream[0].stats.delta
cur_pgv = []
for win_st in cur_stream.slide(window_length = samp_interval - cur_delta,
step = samp_interval,
offset = cur_offset):
self.logger.debug('Processing stream slice: %s.', win_st)
x_st = win_st.select(channel = 'Hparallel')
y_st = win_st.select(channel = 'Hnormal')
# TODO: It is assumed, that there are no gaps in the data.
# Handle this problem.
if len(x_st.traces) > 1:
self.logger.error("Too many traces in the stream slice.")
for cur_x_trace, cur_y_trace in zip(x_st.traces, y_st.traces):
cur_x = cur_x_trace.data
cur_y = cur_y_trace.data
# Handle masked arrays.
if isinstance(cur_x, np.ma.MaskedArray):
cur_x = cur_x.data
if isinstance(cur_y, np.ma.MaskedArray):
cur_y = cur_y.data
if len(cur_x) != len(cur_y):
self.logger.error("The x and y data lenght dont't match. Can't compute the res. PGV for station %s and windowed stream: %s.", cur_station, win_st)
continue
# Check for nan values.
if np.any(np.isnan(cur_x)) or np.any(np.isnan(cur_y)):
nan_mask = np.isnan(cur_x) | np.isnan(cur_y)
cur_x = cur_x[~nan_mask]
cur_y = cur_y[~nan_mask]
# Check if there are enough data values available.
if len(cur_x) < len(cur_x_trace.data) / 2:
self.logger.error("There are not enough non-nan data values available. Skipping this window. win_st: %s.", win_st)
continue
cur_sec_remain = cur_x_trace.stats.starttime.timestamp % samp_interval
cur_win_start = cur_x_trace.stats.starttime - cur_sec_remain
cur_pgv_time = cur_win_start + samp_interval / 2
cur_pgv_value = np.max(np.sqrt(cur_x**2 + cur_y**2))
cur_pgv.append([cur_pgv_time, cur_pgv_value])
if cur_pgv:
cur_pgv = np.array(cur_pgv)
#self.logger.debug('cur_x: %s', cur_x)
#self.logger.debug('cur_y: %s', cur_y)
self.logger.debug('cur_pgv: %s', cur_pgv)
cur_stats = {'network': cur_x_trace.stats.network,
'station': cur_x_trace.stats.station,
'location': cur_x_trace.stats.location,
'channel': 'pgv',
'sampling_rate': self.pgv_sps,
'starttime': cur_pgv[0, 0]}
self.logger.debug('data type: %s.', cur_pgv[:, 1].dtype)
cur_pgv_trace = obspy.core.Trace(data = cur_pgv[:, 1].astype(np.float32),
header = cur_stats)
# Write the data to the current pgv stream.
self.pgv_stream.append(cur_pgv_trace)
# Merge the current pgv stream.
self.pgv_stream.merge()
self.logger.debug('pgv_stream: %s.', self.pgv_stream.__str__(extended = True))
with self.archive_lock:
self.pgv_archive_stream = self.pgv_archive_stream + self.pgv_stream
# Merge the archive stream.
with self.archive_lock:
self.pgv_archive_stream.merge()
self.logger.debug("pgv_archive_stream: %s", self.pgv_archive_stream.__str__(extended = True))
self.logger.info("Finished compute_pgv.")
[docs] def convert_to_physical_units(self, stream):
''' Convert the counts to physical units.
Parameters
----------
stream: :class:`obspy.Stream`
The stream to convert.
'''
for tr in stream.traces:
station = self.inventory.get_station(network = tr.stats.network,
name = tr.stats.station,
location = tr.stats.location)
if len(station) > 1:
raise ValueError('There are more than one stations. This is not yet supported.')
station = station[0]
channel = station.get_channel(name = tr.stats.channel)
if len(channel) > 1:
raise ValueError('There are more than one channels. This is not yet supported.')
channel = channel[0]
stream_tb = channel.get_stream(start_time = tr.stats.starttime,
end_time = tr.stats.endtime)
if len(stream_tb) > 1:
raise ValueError('There are more than one recorder streams. This is not yet supported.')
rec_stream = stream_tb[0].item
rec_stream_param = rec_stream.get_parameter(start_time = tr.stats.starttime,
end_time = tr.stats.endtime)
if len(rec_stream_param) > 1:
raise ValueError('There are more than one recorder stream parameters. This is not yet supported.')
rec_stream_param = rec_stream_param[0]
components_tb = rec_stream.get_component(start_time = tr.stats.starttime,
end_time = tr.stats.endtime)
if len(components_tb) > 1:
raise ValueError('There are more than one components. This is not yet supported.')
component = components_tb[0].item
comp_param = component.get_parameter(start_time = tr.stats.starttime,
end_time = tr.stats.endtime)
if len(comp_param) > 1:
raise ValueError('There are more than one parameters for this component. This is not yet supported.')
comp_param = comp_param[0]
self.logger.debug("bitweight: %f", rec_stream_param.bitweight)
self.logger.debug("gain: %f", rec_stream_param.gain)
self.logger.debug("sensitivity: %f", comp_param.sensitivity)
tr.data = tr.data * rec_stream_param.bitweight / (rec_stream_param.gain * comp_param.sensitivity)
tr.stats.unit = component.output_unit.strip()
[docs] def compute_pgv_res(st):
''' Compute the resultant of the peak-ground-velocity.
'''
x_st = st.select(channel = 'Hparallel').merge()
y_st = st.select(channel = 'Hnormal').merge()
res_st = obspy.core.Stream()
for cur_x_trace, cur_y_trace in zip(x_st.traces, y_st.traces):
cur_x = cur_x_trace.data
cur_y = cur_y_trace.data
if len(cur_x) != len(cur_y):
self.logger.error("The x and y data lenght dont't match. Can't compute the res. PGV for this trace.")
continue
self.logger.debug("x: %s", cur_x)
self.logger.debug("y: %s", cur_y)
cur_res = np.sqrt(cur_x**2 + cur_y**2)
cur_stats = {'network': cur_x_trace.stats['network'],
'station': cur_x_trace.stats['station'],
'location': cur_x_trace.stats['location'],
'channel': 'res',
'sampling_rate': cur_x_trace.stats['sampling_rate'],
'starttime': cur_x_trace.stats['starttime']}
res_trace = obspy.core.Trace(data = cur_res, header = cur_stats)
res_st.append(res_trace)
res_st.split()
return res_st
[docs] def get_triggered_stations(self):
''' Get the stations which have a PGV exceeding the threshold value.
'''
now = utcdatetime.UTCDateTime()
window = 120
with self.archive_lock:
working_stream = self.pgv_archive_stream.copy()
working_stream.trim(starttime = now - window)
all_stations = {}
triggered_stations = {}
for cur_trace in working_stream:
cur_max_pgv = np.nanmax(cur_trace.data)
if np.isnan(cur_max_pgv):
all_stations[cur_trace.stats.station] = self.nodata_value
else:
all_stations[cur_trace.stats.station] = float(cur_max_pgv)
if cur_max_pgv >= self.felt_thr:
triggered_stations[cur_trace.stats.station] = float(cur_max_pgv)
self.logger.debug("triggered_stations: %s", triggered_stations)
return (all_stations, triggered_stations)
[docs] def get_triggered_event_stations(self):
''' Get the stations which have triggered during the last event.
'''
now = utcdatetime.UTCDateTime()
window = 300
triggered_stations = {}
if self.current_event and self.current_event.start_time >= (now - window):
triggered_stations = self.current_event.get_max_pgv_per_station()
#elif self.event_archive and self.event_archive['start_time'] >= (now - window):
return triggered_stations
[docs] def trim_archive(self):
''' Crop the archive to a specified length.
'''
with self.archive_lock:
if len(self.pgv_archive_stream) > 0:
max_end_time = np.max([x.stats.endtime for x in self.pgv_archive_stream if x])
self.logger.debug("max_end_time: %s.", max_end_time)
crop_start_time = max_end_time - self.pgv_archive_time
self.pgv_archive_stream.trim(starttime = crop_start_time)
self.logger.debug("Trimmed the pgv archive stream start time to %s.",
crop_start_time)
self.logger.debug("pgv_archive_stream: %s", self.pgv_archive_stream)
if len(self.vel_archive_stream) > 0:
max_end_time = np.max([x.stats.endtime for x in self.vel_archive_stream if x])
self.logger.debug("max_end_time: %s.", max_end_time)
crop_start_time = max_end_time - self.vel_archive_time
self.vel_archive_stream.trim(starttime = crop_start_time)
self.logger.debug("Trimmed the velocity archive stream start time to %s.",
crop_start_time)
self.logger.debug("vel_archive_stream: %s", self.vel_archive_stream)
[docs] def export_event(self, export_event):
''' Save the event.
Parameters
----------
export_event: :class:`mss_dataserver.event.core.Event`
The event to export.
'''
# Get or create the event catalog and add the event to it.
cat_name = "{0:04d}-{1:02d}-{2:02d}".format(export_event.start_time.year,
export_event.start_time.month,
export_event.start_time.day)
with self.project_lock:
config_filepath = self.project.config['config_filepath']
cur_cat = self.project.get_event_catalog(cat_name)
cur_cat.add_events([export_event, ])
# Get or create the detection catalog and add the event
# detections to it.
det_catalogs = self.project.get_detection_catalog_names()
if cat_name not in det_catalogs:
cur_det_cat = self.project.create_detection_catalog(name = cat_name)
else:
cur_det_cat = self.project.load_detection_catalog(name = cat_name)
cur_det_cat.add_detections(export_event.detections)
# Write the detections to the database. This has to be done
# separately. Adding the detection to the event and then
# writing the event to the database doesn't write the
# detections to the database.
# TODO: An error occured, because the detection already had
# a database id assigned, and the write_to_database method
# tried to update the existing detection. This part of the
# method is not yet working, thus an error was thrown. This
# should not happen, because only fresh detections with no
# database id should be available at this point!!!!!
for cur_detection in export_event.detections:
cur_detection.write_to_database(self.project)
# Write the event to the database.
self.logger.info("Writing the event to the database.")
export_event.write_to_database(self.project)
# Export the event data to disk files.
self.logger.info("Exporting the event data.")
self.save_event_supplement(export_event)
# Set the event to notify that the archive has changes.
self.event_archive_changed.set()
# Compute the geojson supplement data.
proc_result = subprocess.run(['mssds_postprocess',
config_filepath,
'process-event',
'--public-id',
export_event.public_id,
'--no-pgv-contour-sequence'])
# Trim the event catalogs.
self.trim_archive_catalogs(hours = self.event_archive_timespan)
[docs] def get_event_supplement_dir(self, public_id, category = None):
''' Get the supplement directory of an event.
Parameters
----------
public_id: str
The public ID of the event.
category: str
The supplement data category.
Returns
-------
str
The path to the event supplement data.
'''
event_dir = pp_util.event_dir_from_publicid(public_id)
output_dir = os.path.join(self.supplement_dir,
event_dir)
if category is not None:
sup_map = pp_util.get_supplement_map()
if category in sup_map.keys():
output_dir = os.path.join(output_dir,
category)
else:
logger.error('The category %s was not found in the available supllement data categories.', category)
return output_dir
[docs] def save_event_supplement(self, export_event):
''' Save the supplement data of the event.
Parameters
----------
export_event: :class:`mss_dataserver.event.core.Event`
The event for which to export the supplement data.
'''
# Build the output directory.
output_dir = self.get_event_supplement_dir(public_id = export_event.public_id,
category = 'detectiondata')
self.logger.info("Exporting the event data to folder %s.", output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# The timespan to add in front or at the back of the event limits
# when exporting waveform data.
pre_win = 20
post_win = 20
pgv_stream = self.save_supplement_pgv(event = export_event,
pre_win = pre_win,
post_win = post_win,
output_dir = output_dir)
self.save_supplement_vel(event = export_event,
pre_win = pre_win,
post_win = post_win,
output_dir = output_dir)
self.save_supplement_inventory(event = export_event,
output_dir = output_dir)
self.save_supplement_metadata(event = export_event,
pgv_stream = pgv_stream,
output_dir = output_dir)
self.save_supplement_detection_data(event = export_event,
output_dir = output_dir)
[docs] def save_supplement_pgv(self, event, pre_win, post_win, output_dir):
''' Save the PGV data supplement.
Parameters
----------
event: :class:`mss_dataserver.event.core.Event`
The event for which to export the PGV data.
pre_win: float
The window to add before the start time of the event [s].
post_win: float
The window to add after the end time of the event [s].
output_dir: str
The path where to save the data.
Returns
-------
:class:`obspy.Stream`
The exported PGV data stream.
'''
start_time = event.start_time - pre_win
end_time = event.end_time + post_win
# Get the PGV data from the archive stream.
with self.archive_lock:
pgv_stream = self.pgv_archive_stream.slice(starttime = start_time,
endtime = end_time)
# Split the pgv stream to ensure a propper export to miniseed file.
split_pgv_stream = pgv_stream.split()
supplement_name = 'pgv'
supplement_category = 'detectiondata'
filename = "{public_id}_{category}_{name}.msd".format(public_id = event.public_id,
category = supplement_category,
name = supplement_name)
filepath = os.path.join(output_dir, filename)
self.logger.info("Writing the PGV data to file %s.", filepath)
split_pgv_stream.write(filepath,
format = 'MSEED',
blocksize = 512)
# The miniseed file is not compressed, because it is using float
# values. Compress it using gzip.
zip_filepath = filepath + '.gz'
with open(filepath, 'rb') as in_file:
with gzip.open(zip_filepath, 'wb') as out_file:
shutil.copyfileobj(in_file, out_file)
# Remove the uncompressed file.
os.remove(filepath)
return pgv_stream
[docs] def save_supplement_vel(self, event, pre_win, post_win, output_dir):
''' Save the velocity data supplement.
Parameters
----------
event: :class:`mss_dataserver.event.core.Event`
The event for which to export the velocity data.
pre_win: float
The window to add before the start time of the event [s].
post_win: float
The window to add after the end time of the event [s].
output_dir: str
The path where to save the data.
'''
start_time = event.start_time - pre_win
end_time = event.end_time + post_win
# Get the velocity data from the archive.
with self.archive_lock:
vel_stream = self.vel_archive_stream.slice(starttime = start_time,
endtime = end_time)
vel_stream.merge()
vel_stream = vel_stream.split()
self.logger.debug("vel_stream: %s", vel_stream.__str__(extended = True))
supplement_name = 'velocity'
supplement_category = 'detectiondata'
filename = "{public_id}_{category}_{name}.msd".format(public_id = event.public_id,
category = supplement_category,
name = supplement_name)
filepath = os.path.join(output_dir, filename)
self.logger.info("Writing the velocity data to file %s.", filepath)
vel_stream.write(filepath,
format = 'MSEED',
blocksize = 512)
# The miniseed file is not compressed, because it is using float
# values. Compress it using gzip.
zip_filepath = filepath + '.gz'
with open(filepath, 'rb') as in_file:
with gzip.open(zip_filepath, 'wb') as out_file:
shutil.copyfileobj(in_file, out_file)
# Remove the uncompressed file.
os.remove(filepath)
[docs] def save_supplement_inventory(self, event, output_dir):
''' Save the supplement inventory data to a json file.
Parameters
----------
event: :class:`mss_dataserver.event.core.Event`
The event for which to export the velocity data.
output_dir: str
The path where to save the data.
'''
# Write the inventory to a json file.
try:
supplement_name = 'geometryinventory'
category = 'detectiondata'
filename = "{public_id}_{category}_{name}.json.gz".format(public_id = event.public_id,
category = category,
name = supplement_name)
filepath = os.path.join(output_dir, filename)
self.logger.info("Writing the geometry inventory data to file %s.",
filepath)
# Prepare the file container for exporting to json file.
container_data = {}
container_data['metadata'] = {'db_id': event.db_id,
'public_id': event.public_id}
container_data['inventory'] = self.inventory.as_dict()
file_container = json_util.FileContainer(data = container_data,
agency_uri = self.agency_uri,
author_uri = self.author_uri)
with gzip.open(filepath, 'wt', encoding = 'UTF-8') as json_file:
pref = json.dump(file_container,
fp = json_file,
cls = json_util.GeneralFileEncoder,
indent = 4,
sort_keys = True)
except Exception as e:
self.logger.exception("Error saving the geometry inventory data to json file.")
[docs] def save_supplement_detection_data(self, event, output_dir):
''' Save the detection data of the event.
Parameters
----------
event: :class:`mss_dataserver.event.core.Event`
The event for which to export the velocity data.
output_dir: str
The path where to save the data.
'''
# Convert the time array UTCDateTime instances to timestamps to reduce
# the size of the json file.
for cur_key, cur_item in event.detection_data.items():
for cur_data in cur_item['trigger_data']:
cur_data['time'] = [x.timestamp for x in cur_data['time']]
# Write the event detection data to a json file.
try:
supplement_name = 'detectiondata'
category = 'detectiondata'
filename = "{public_id}_{category}_{name}.json.gz".format(public_id = event.public_id,
category = category,
name = supplement_name)
filepath = os.path.join(output_dir, filename)
self.logger.info("Writing the detection data to file %s.",
filepath)
# Prepare the file container for exporting to json file.
container_data = {}
container_data['metadata'] = {'db_id': event.db_id,
'public_id': event.public_id}
container_data['detection_data'] = event.detection_data
file_container = json_util.FileContainer(data = container_data,
agency_uri = self.agency_uri,
author_uri = self.author_uri)
with gzip.open(filepath, 'wt', encoding = 'UTF-8') as json_file:
pref = json.dump(file_container,
fp = json_file,
cls = json_util.SupplementDetectionDataEncoder,
indent = 4,
sort_keys = True)
except Exception as e:
self.logger.exception("Error saving the detection data to json file.")
[docs] def get_current_pgv(self):
''' Get the current PGV data.
Returns
-------
:obj:`dict`
The PGV data as a dictionary.
'''
data_dict = {}
pgv_data = {}
with self.archive_lock:
working_stream = self.pgv_archive_stream.copy()
# The time to use for the history pgv [s].
history_period = 60
# Trim the stream to the selected timespan.
now = utcdatetime.UTCDateTime()
now.milliseconds = 0
now.second = 0
start_time = now - history_period
working_stream = working_stream.slice(starttime = start_time)
for cur_trace in working_stream:
try:
if isinstance(cur_trace.data, np.ma.MaskedArray):
cur_data = cur_trace.data.data
else:
cur_data = cur_trace.data
cur_time = cur_trace.times(type = 'utcdatetime')
cur_mask = ~np.isnan(cur_data)
if np.any(cur_mask):
cur_hist_pgv = float(np.nanmax(cur_data))
cur_latest_pgv = float(cur_data[cur_mask][-1])
cur_latest_time = float(cur_time[cur_mask][-1].timestamp)
else:
cur_latest_pgv = None
cur_latest_time = None
cur_hist_pgv = None
tmp = {'pgv_history': cur_hist_pgv,
'latest_pgv': cur_latest_pgv,
'latest_time': cur_latest_time}
cur_nsl = ':'.join([cur_trace.stats.network,
cur_trace.stats.station,
cur_trace.stats.location])
pgv_data[cur_nsl] = tmp
except Exception as e:
self.logger.exception("Error while preparing the pgv archive.")
data_dict['history_period'] = history_period
data_dict['computation_time'] = now.isoformat()
data_dict['pgv_data'] = pgv_data
return data_dict
[docs] def get_pgv_timeseries(self):
''' Get the latest PGV timeseries data.
Returns
-------
:obj:`dict`
The PGV timeseries as a dictonary.
'''
pgv_data = {}
for cur_trace in self.pgv_stream:
cur_start_time = cur_trace.stats.starttime
tmp = {}
try:
# There have been some NaN values in the data. I couldn't
# figure out where they came from. Make sure to replace them
# with the nodata value before creating the pgv data.
cur_trace.data[np.isnan(cur_trace.data)] = self.nodata_value
tmp['time'] = [x.timestamp for (x, y) in zip(cur_trace.times(type = "utcdatetime"), cur_trace.data.tolist()) if y != self.nodata_value]
tmp['data'] = [x for x in cur_trace.data.tolist() if x != self.nodata_value]
except Exception as e:
self.logger.exception("Error getting the PGV data.")
cur_nsl = ':'.join([cur_trace.stats.network,
cur_trace.stats.station,
cur_trace.stats.location])
pgv_data[cur_nsl] = tmp
# Clear the pgv stream.
self.pgv_stream.clear()
return pgv_data
[docs] def get_pgv_timeseries_archive(self, nsl_code = None):
''' Get the archived PGV timeseries data.
Parameters
----------
nsl_code: :obj:`list` of :obj:`str`
A list of NSL codes to process.
Returns
-------
:obj:`dict`
The PGV data as a dictionary.
'''
pgv_data = {}
with self.archive_lock:
working_stream = self.pgv_archive_stream.copy()
# If provided select the required stations.
if nsl_code is not None:
tmp_stream = obspy.Stream()
for cur_nsl in nsl_code:
cur_parts = cur_nsl.split(':')
tmp_stream += working_stream.select(network = cur_parts[0],
station = cur_parts[1],
location = cur_parts[2])
working_stream = tmp_stream
# The time in seconds to send to the server.
display_time = 600
now = utcdatetime.UTCDateTime()
now.milliseconds = 0
now.second = 0
start_time = now - display_time
working_stream.trim(starttime = start_time)
for cur_trace in working_stream:
try:
# There have been some NaN values in the data. I couldn't
# figure out where they came from. Make sure to replace them
# with the nodata value before creating the pgv data.
cur_trace.data[np.isnan(cur_trace.data)] = self.nodata_value
tmp = {}
tmp['time'] = [x.timestamp for (x, y) in zip(cur_trace.times(type = "utcdatetime"), cur_trace.data.tolist()) if y != self.nodata_value]
tmp['data'] = [x for x in cur_trace.data.tolist() if x != self.nodata_value]
cur_nsl = ':'.join([cur_trace.stats.network,
cur_trace.stats.station,
cur_trace.stats.location])
pgv_data[cur_nsl] = tmp
except Exception as e:
self.logger.exception("Error while preparing the pgv archive.")
return pgv_data
[docs] def get_current_event(self):
''' Return the current event in serializable form.
Returns
-------
:obj:`dict`
The current event as a dictionary.
'''
cur_event = {}
if self.current_event:
cur_event = validation.Event(id = self.current_event.db_id,
public_id = self.current_event.public_id,
start_time = self.current_event.start_time.isoformat(),
end_time = self.current_event.end_time.isoformat(),
length = self.current_event.length,
description = self.current_event.description,
comment = self.current_event.comment,
max_pgv = self.current_event.max_pgv,
state = self.current_event.detection_state,
num_detections = len(self.current_event.detections),
triggered_stations = self.current_event.triggered_stations)
cur_event = cur_event.dict()
# TODO: Serve the detailed event data only on request.
#cur_event['trigger_data'] = self.current_event['trigger_data']
#cur_event['overall_trigger_data'] = self.current_event['overall_trigger_data']
#cur_archive_event['max_station_pgv'] = cur_event['max_station_pgv']
return cur_event
[docs] def get_event_warning(self):
''' Return the current event warning in serializable form.
Returns
-------
:obj:`dict`
The current event warning as a dictionary.
'''
cur_warning = {}
if self.event_warning:
cur_warning['time'] = self.event_warning['time'].isoformat()
cur_warning['simp_stations'] = self.event_warning['simp_stations'].tolist()
cur_warning['trigger_pgv'] = self.event_warning['trigger_pgv'].tolist()
return cur_warning
[docs] def get_recent_events(self):
''' Return the recent events in serializable form.
Returns
-------
:obj:`dict`
The recent events as a dictionary.
'''
recent_event_timespan = self.event_archive_timespan
now = utcdatetime.UTCDateTime()
today = utcdatetime.UTCDateTime(now.timestamp // 86400 * 86400)
request_start = today - recent_event_timespan * 3600
with self.project_lock:
events = self.project.get_events(start_time = request_start)
cur_archive = {}
if len(events) > 0:
for cur_event in events:
self.logger.info('public_id: %s', cur_event.public_id)
self.logger.info('triggered_stations: %s', cur_event.triggered_stations)
cur_archive_event = validation.Event(db_id = cur_event.db_id,
public_id = cur_event.public_id,
start_time = cur_event.start_time.isoformat(),
end_time = cur_event.end_time.isoformat(),
length = cur_event.length,
description = cur_event.description,
comment = cur_event.comment,
max_pgv = cur_event.max_pgv,
state = cur_event.detection_state,
num_detections = len(cur_event.detections),
triggered_stations = cur_event.triggered_stations)
cur_archive[cur_event.public_id] = cur_archive_event.dict()
return cur_archive
[docs] def get_event_by_id(self, ev_id = None, public_id = None):
''' Get an event by event id or public id.
Parameters
----------
ev_id: int
The event database id.
public_id: str
The event public id.
Returns
-------
:class:`mss_dataserver.event.core.Event`
The event matching the passed id.
'''
if ev_id is None and public_id is None:
self.logger.error("Either the id or the public_id of the event have to be specified.")
return
with self.project_lock:
event = self.project.load_event_by_id(ev_id = ev_id,
public_id = public_id)
return event
[docs] def get_event_supplement(self, public_id, selection):
''' Get the detailed data of an event.
Parameters
----------
public_id: str
The public id of the event.
selection: :obj:`list` of :obj:`dict`
The supplement data selection (keys: category, name).
Returns
-------
:obj:`dict`
A dictionary containing the requested event supplement data.
'''
event_supplement = {}
event_supplement['public_id'] = public_id
event_supplement['data'] = {}
#event = self.get_event_by_id(ev_id = ev_id,
# public_id = public_id)
supp_dir = self.get_event_supplement_dir(public_id = public_id)
# Check, if the supplement directory exists.
if not os.path.exists(supp_dir):
self.logger.error("The event supplement data directory % doesn't exist.",
supp_dir)
return event_supplement
# TODO: Check the available supplement data.
# Load the supplement data.
for cur_selection in selection:
cur_category = cur_selection['category']
cur_name = cur_selection['name']
try:
cur_data = pp_util.get_supplement_data(public_id = public_id,
category = cur_category,
name = cur_name,
directory = self.supplement_dir)
# Convert the dataframe to json and revert it back to a dictionary to
# ensure, that the data can be serialized when sendig it over the
# websocket.
cur_data = json.loads(cur_data.to_json())
except Exception:
self.logger.exception("Error getting the supplement data %s.",
cur_selection)
cur_data = {}
if cur_category not in event_supplement['data'].keys():
event_supplement['data'][cur_category] = {}
event_supplement['data'][cur_category][cur_name] = cur_data
return event_supplement
[docs] def get_keydata(self):
''' Return the keydata.
Used for the LWZ display.
Returns
-------
:obj:`dict`
The keydata as a dictionary.
'''
all_stations, triggered_stations = self.get_triggered_stations()
keydata = {}
keydata['time'] = utcdatetime.UTCDateTime().isoformat()
keydata['all_stations'] = all_stations
keydata['triggered_stations'] = triggered_stations
keydata['triggered_event_stations'] = self.get_triggered_event_stations()
return keydata