Source code for mss_dataserver.event.detection

# -*- coding: utf-8 -*-

''' Managing detections.
'''

import logging

import mss_dataserver.geometry.db_inventory as db_inventory
import obspy.core.utcdatetime as utcdatetime

[docs]class Detection(object): ''' A MSS Delaunay detection. Parameters ---------- start_time: :class:`obspy.UTCDateTime` The start time of the detection. end_time: :class:`obspy.UTCDateTime` The end time of the detection. stations: :obj:`list` of :class:`~mss_dataserver.geometry.inventory.Station` 3 stations related to the detection. They are the corners of a detection triangle. max_pgv: dict The maximum PGV values of the detection timespan. A dictionary of {station.nsl_string: PGV values}. db_id: int The database id of the detection. catalog_id: int The database id of the detection catalog which contains the detection. agency_uri: str The uniform resource identifier of the author agency. author_uri: str The uniform resource identifier of the author. creation_time: :class:`obspy.UTCDateTime` The creation time of the detection. parent: :class:`Catalog` The detection catalog holding the detection. changed: bool Flag indicating a change of the detection. '''
[docs] def __init__(self, start_time, end_time, stations, max_pgv, db_id = None, catalog_id = None, agency_uri = None, author_uri = None, creation_time = None, parent = None, changed = True): ''' Initialize the instance. ''' # Check for correct input arguments. # Check for None values in the event limits. if start_time is None or end_time is None: raise ValueError("None values are not allowed for the event time limits.") # Check the event limits. if end_time < start_time: raise ValueError("The end_time %s is smaller than the start_time %s.", end_time, start_time) # The parent object holding this event. Most likely this is a detection # Catalog instance or an event instance. self.parent = parent # The unique database id. self.db_id = db_id # The channel matching the rec_stream_id. This is loaded only if a self.channel = None # The catalog id to which the detection belongs. self.catalog_id = catalog_id # The start time of the event. self.start_time = utcdatetime.UTCDateTime(start_time) # The end time of the event. self.end_time = utcdatetime.UTCDateTime(end_time) # The stations of the Delaunay triangle. self.stations = stations # The max. PGV values of the detection timespan. # A dictionary of {station.nsl_string: PGV values}. self.max_pgv = max_pgv # The agency_uri of the creator. self.agency_uri = agency_uri # The author_uri of the creator. self.author_uri = author_uri # The time of creation of this event. if creation_time is None: creation_time = utcdatetime.UTCDateTime() self.creation_time = utcdatetime.UTCDateTime(creation_time) # Flag to indicate a change of the detection attributes. self.changed = changed
@property def rid(self): ''' str: The resource ID of the detection. ''' return '/event/' + str(self.db_id) @property def start_time_string(self): ''' str: The string representation of the start time. ''' return self.start_time.isoformat() @property def end_time_string(self): ''' str: The string representation of the end time. ''' return self.end_time.isoformat() @property def length(self): ''' float: The length of the detection in seconds. ''' return self.end_time - self.start_time @property def nslc(self): ''' tuple of str: The NSLC code of the related channel. ''' if self.channel is None: return None else: return self.channel.nslc @property def nsl(self): ''' tuple of str: The NSLC code of the related channel. ''' if self.channel is None: return None else: return (self.channel.nslc[0], self.channel.nslc[1], self.channel.nslc[2]) @property def absolute_max_pgv(self): ''' float: The absolute maximum PGV value. ''' return max(self.max_pgv.values())
[docs] def update(self, start_time = None, end_time = None, max_pgv = None): ''' Update the attributes of the detection. Parameters ---------- start_time: :class:`obspy.UTCDateTime` of :obj:`str` The new start time of the detection. end_time: :class:`obspy.UTCDateTime` of :obj:`str` The new end time of the detection. max_pgv: dict The new maximum PGV data. A dictionary of {station.nsl_string: PGV values}. ''' if start_time is not None: self.start_time = utcdatetime.UTCDateTime(start_time) if end_time is not None: self.end_time = utcdatetime.UTCDateTime(end_time) if max_pgv is not None: for cur_key, cur_value in max_pgv.items(): if cur_value > self.max_pgv[cur_key]: self.max_pgv[cur_key] = cur_value
[docs] def set_channel_from_inventory(self, inventory): ''' Set the channel matching the recorder stream. Parameters ---------- inventory: :class:`~mss_dataserver.geometry.inventory.Inventory` The inventory used to match the channels. ''' self.channel = inventory.get_channel_from_stream(start_time = self.start_time, end_time = self.end_time)
[docs] def write_to_database(self, project): ''' Write the detection to the pSysmon database. Parameters ---------- project: :class:`~mss_dataserver.core.project.Project` The project used to access the database. ''' if self.db_id is None: # If the db_id is None, insert a new event. if self.creation_time is not None: creation_time = self.creation_time.isoformat() else: creation_time = None if self.parent is not None: catalog_id = self.parent.db_id else: catalog_id = None db_session = project.get_db_session() db_detection_orm = project.db_tables['detection'] db_detection = db_detection_orm(catalog_id = catalog_id, start_time = self.start_time.timestamp, end_time = self.end_time.timestamp, stat1_id = self.stations[0].id, stat2_id = self.stations[1].id, stat3_id = self.stations[2].id, max_pgv1 = float(self.max_pgv[self.stations[0].nsl_string]), max_pgv2 = float(self.max_pgv[self.stations[1].nsl_string]), max_pgv3 = float(self.max_pgv[self.stations[2].nsl_string]), agency_uri = self.agency_uri, author_uri = self.author_uri, creation_time = creation_time) db_session.add(db_detection) db_session.commit() self.db_id = db_detection.id db_session.close() else: # If the db_id is not None, update the existing event. db_session = project.get_db_session() db_detection_orm = project.db_tables['detection'] query = db_session.query(db_detection_orm).filter(db_detection_orm.id == self.db_id) if db_session.query(query.exists()): db_detection = query.scalar() if self.parent is not None: db_detection.catalog_id = self.parent.db_id else: db_detection.catalog_id = None db_detection.start_time = self.start_time.timestamp db_detection.end_time = self.end_time.timestamp db_detection.method = self.method db_detection.agency_uri = self.agency_uri db_detection.author_uri = self.author_uri if self.creation_time is not None: db_detection.creation_time = self.creation_time.isoformat() else: db_detection.creation_time = None db_session.commit() db_session.close() else: raise RuntimeError("The detection with ID=%d was not found in the database.", self.db_id)
[docs] def get_db_orm(self, project): ''' Get an orm representation to use it for bulk insertion into the database. Parameters ---------- project: :class:`~mss_dataserver.core.project.Project` The project used to access the database. Returns ------- :class:`mss_dataserver.event.databaseFactory.DetectionDb` The database mapper class instance of the detection. ''' db_detection_orm = project.db_tables['detection'] if self.creation_time is not None: creation_time = self.creation_time.isoformat() else: creation_time = None if self.parent is not None: catalog_id = self.parent.db_id else: catalog_id = None labels = ['catalog_id', 'start_time', 'end_time', 'method', 'agency_uri', 'author_uri', 'creation_time'] db_dict = dict(list(zip(labels, (catalog_id, self.start_time.timestamp, self.end_time.timestamp, self.method, self.agency_uri, self.author_uri, creation_time)))) db_detection = db_detection_orm(**db_dict) db_detection.id = self.db_id return db_detection
[docs] @classmethod def from_orm(cls, detection_orm, inventory): ''' Convert a database orm mapper detection to a detection. Parameters ---------- detection_orm : :class:`mss_dataserver.event.databaseFactory.DetectionDb` The ORM of the detection_orm database table. inventory: :class:`~mss_dataserver.geometry.db_inventory.Inventory` The geometry inventory used to retrieve the stations of the detection. ''' stat1 = inventory.get_station(id = detection_orm.stat1_id)[0] stat2 = inventory.get_station(id = detection_orm.stat2_id)[0] stat3 = inventory.get_station(id = detection_orm.stat3_id)[0] detection = cls(start_time = detection_orm.start_time, end_time = detection_orm.end_time, db_id = detection_orm.id, catalog_id = detection_orm.catalog_id, stations = [stat1, stat2, stat3], max_pgv = {stat1.nsl_string: detection_orm.max_pgv1, stat2.nsl_string: detection_orm.max_pgv2, stat3.nsl_string: detection_orm.max_pgv3}, agency_uri = detection_orm.agency_uri, author_uri = detection_orm.author_uri, creation_time = detection_orm.creation_time) return detection
[docs]class Catalog(object): ''' A detection catalog. Parameters ---------- name: str The name of the catalog. db_id: int The database id of the catalog. description: str The description of the catalog. agency_uri: str The uniform resource identifier of the author agency. author_uri: str The uniform resource identifier of the author. creation_time: :class:`obspy.UTCDateTime` The creation time of the detection. detections: :obj:`list` of :class:`Detection` The detections of the catalog. '''
[docs] def __init__(self, name, db_id = None, description = None, agency_uri = None, author_uri = None, creation_time = None, detections = None): ''' Instance initialization. ''' # The logging logger instance. logger_name = __name__ + "." + self.__class__.__name__ self.logger = logging.getLogger(logger_name) # The unique database ID. self.db_id = db_id # The name of the catalog. self.name = name # The description of the catalog. self.description = description # The agency_uri of the creator. self.agency_uri = agency_uri # The author_uri of the creator. self.author_uri = author_uri # The time of creation of this event. if creation_time is None: self.creation_time = utcdatetime.UTCDateTime(); else: self.creation_time = utcdatetime.UTCDateTime(creation_time); # The detections of the catalog. if detections is None: self.detections = [] else: self.detections = detections
[docs] def add_detections(self, detections): ''' Add one or more detections to the catalog. Parameters ---------- detections : list of :class:`Detection` The detections to add to the catalog. ''' # Check for potential duplicates. # TODO: add a compare method for the detection class. db_ids = [x.db_id for x in self.detections] detections = [x for x in detections if x.db_id is None or x.db_id not in db_ids] for cur_detection in detections: cur_detection.parent = self self.detections.extend(detections)
[docs] def remove_detections(self, detections): ''' Remove the detections from the catalog. Parameters ---------- detections : list of :class:`Detection` The detections to add to the catalog. ''' for cur_detection in detections: if cur_detection in self.detections: self.detections.remove(cur_detection)
[docs] def get_detections(self, start_time = None, end_time = None, start_inside = False, end_inside = False, **kwargs): ''' Get detections using search criteria passed as keywords. Parameters ---------- start_time : class:`obspy.UTCDateTime` The minimum starttime of the detections. end_time : class:`obspy.UTCDateTime` The maximum end_time of the detections. start_inside : bool If True, select only those detection with a start time inside the search window. end_inside : bool If True, select only those detection with an end time inside the search window. Keyword Arguments ----------------- nslc: :obj:`tuple` of :obj:`str` The network-station-location-channel code (e.g ('XX', 'DUBA', '00', 'HNormal')). Returns ------- :class:`Detection` The detection matching the search criteria. ''' ret_detections = self.detections valid_keys = ['nslc'] for cur_key, cur_value in kwargs.items(): if cur_key in valid_keys: ret_detections = [x for x in ret_detections if getattr(x, cur_key) == cur_value] else: warnings.warn('Search attribute %s is not existing.' % cur_key, RuntimeWarning) if start_time is not None: if start_inside: ret_detections = [x for x in ret_detections if (x.end_time is None) or (x.start_time >= start_time)] else: ret_detections = [x for x in ret_detections if (x.end_time is None) or (x.end_time > start_time)] if end_time is not None: if end_inside: ret_detections = [x for x in ret_detections if x.end_time <= end_time] else: ret_detections = [x for x in ret_detections if x.start_time < end_time] return ret_detections
[docs] def assign_channel(self, inventory): ''' Set the channels according to the rec_stream_ids. Parameters. ----------- inventory: :class:`~mss_dataserver.geometry.inventory.Inventory` The inventory used to get the matching channels. ''' # Get the unique stream ids. id_list = [x.rec_stream_id for x in self.detections] id_list = list(set(id_list)) # Get the channels for the ids. channels = [inventory.get_channel_from_stream(id = x) for x in id_list] channels = [x[0] if len(x) == 1 else None for x in channels] channels = dict(list(zip(id_list, channels))) for cur_detection in self.detections: cur_detection.channel = channels[cur_detection.rec_stream_id]
#@profile(immediate=True)
[docs] def load_detections(self, project, start_time = None, end_time = None, min_detection_length = None): ''' Load detections from the database. The query can be limited using the allowed keyword arguments. Parameters ---------- start_time : :class:`obspy.UTCDateTime` The begin of the time-span to load. end_time : :class:`obspy.UTCDateTime` The end of the time-span to load. min_detection_length: float The minimum length of the detection. ''' if project is None: raise RuntimeError("The project is None. Can't query the database without a project.") db_session = project.get_db_session() try: detection_table = project.db_tables['detection'] query = db_session.query(detection_table).\ filter(detection_table.catalog_id == self.db_id).\ filter(detection_table.end_time > detection_table.start_time) if start_time: query = query.filter(detection_table.start_time >= start_time.timestamp) if end_time: query = query.filter(detection_table.start_time <= end_time.timestamp) if min_detection_length: query = query.filter(detection_table.end_time - detection_table.start_time >= min_detection_length) detections_to_add = [] for cur_orm in query: try: cur_detection = Detection.from_orm(cur_orm) detections_to_add.append(cur_detection) except: self.logger.exception("Error when creating a detection object from database values for detection id %d. Skipping this detection.", cur_orm.id) self.add_detections(detections_to_add) finally: db_session.close()
[docs] def clear_detections(self): ''' Clear the detections list. ''' self.detections = []
[docs] def write_to_database(self, project): ''' Write the catalog to the database. Parameters ---------- project: :class:`~mss_dataserver.core.project.Project` The project used to access the database. ''' if self.db_id is None: # If the db_id is None, insert a new catalog. if self.creation_time is not None: creation_time = self.creation_time.isoformat() else: creation_time = None db_session = project.get_db_session() db_catalog_orm = project.db_tables['detection_catalog'] db_catalog = db_catalog_orm(name = self.name, description = self.description, agency_uri = self.agency_uri, author_uri = self.author_uri, creation_time = creation_time ) db_session.add(db_catalog) db_session.commit() self.db_id = db_catalog.id db_session.close() else: # If the db_id is not None, update the existing catalog. db_session = project.get_db_session() db_catalog_orm = project.db_tables['detection_catalog'] query = db_session.query(db_catalog_orm).filter(db_catalog_orm.id == self.db_id) if db_session.query(query.exists()): db_catalog = query.scalar() db_catalog.name = self.name db_catalog.description = self.description db_catalog.agency_uri = self.agency_uri db_catalog.author_uri = self.author_uri if self.creation_time is not None: db_catalog.creation_time = self.creation_time.isoformat() else: db_catalog.creation_time = None db_session.commit() db_session.close() else: raise RuntimeError("The detection catalog with ID=%d was not found in the database.", self.db_id) # Write or update all detections of the catalog to the database. for cur_detection in [x for x in self.detections if x.changed is True]: cur_detection.write_to_database(project)
[docs] @classmethod def from_orm(cls, db_catalog, load_detections = False): ''' Convert a database orm mapper catalog to a catalog. Parameters ---------- db_catalog: :class:`mss_dataserver.event.databaseFactory.DetectionCatalogDb` The table mapperclass of the detection catalog database table. load_detections: bool If true all detections contained in the catalog are loaded from the database. Returns ------- :class:`Detection` The detection class created using the database ORM instance. ''' catalog = cls(name = db_catalog.name, db_id = db_catalog.id, description = db_catalog.description, agency_uri = db_catalog.agency_uri, author_uri = db_catalog.author_uri, creation_time = db_catalog.creation_time ) # Add the detections to the catalog. if load_detections is True: for cur_detection_orm in db_catalog.detections: cur_detection = Detection.from_orm(cur_detection_orm) catalog.add_detections([cur_detection,]) return catalog
[docs]class Library(object): ''' Manage detection catalogs. Parameters ---------- name: str The name of the library. Attributes ---------- catalogs: dict The catalogs managed by the library. The key is the name of the catalog. '''
[docs] def __init__(self, name): ''' Initialize the instance. ''' # The name of the library. self.name = name # The catalogs of the library. self.catalogs = {}
[docs] def add_catalog(self, catalog): ''' Add one or more catalogs to the library. Parameters ---------- catalog : :class:`Catalog` or :obj:`list` of :class:`Catalog` The catalog(s) to add to the library. ''' if isinstance(catalog, list): for cur_catalog in catalog: self.add_catalog(cur_catalog) else: self.catalogs[catalog.name] = catalog
[docs] def remove_catalog(self, name): ''' Remove a catalog from the library. Parameters ---------- name : String The name of the catalog to remove. Returns ------- removed_catalog : :class:`Catalog` The removed catalog. None if no catalog was removed. ''' if name in iter(self.catalogs.keys()): return self.catalogs.pop(name) else: return None
[docs] def clear(self): ''' Remove all catalogs. ''' self.catalogs = {}
[docs] def get_catalogs_in_db(self, project): ''' Query the available catalogs in the database. Parameters ---------- project : :class:`psysmon.core.project.Project` The project managing the database. Returns ------- catalog_names : List of Strings The available catalog names in the database. ''' catalog_names = [] db_session = project.get_db_session() try: db_catalog_orm = project.db_tables['detection_catalog'] query = db_session.query(db_catalog_orm) if db_session.query(query.exists()): catalog_names = [x.name for x in query.order_by(db_catalog_orm.name)] finally: db_session.close() return catalog_names
[docs] def load_catalog_from_db(self, project, name, load_detections = False): ''' Load catalogs from the database. Parameters ---------- project : :class:`psysmon.core.project.Project` The project managing the database. name : String or list of Strings The name of the catalog to load from the database. ''' if isinstance(name, str): name = [name, ] db_session = project.get_db_session() try: db_catalog_orm = project.db_tables['detection_catalog'] query = db_session.query(db_catalog_orm).filter(db_catalog_orm.name.in_(name)) if db_session.query(query.exists()): for cur_db_catalog in query: cur_catalog = Catalog.from_orm(cur_db_catalog, load_detections) self.add_catalog(cur_catalog) finally: db_session.close()