# -*- coding: utf-8 -*-
##############################################################################
# LICENSE
#
# This file is part of mss_dataserver.
#
# If you use mss_dataserver in any program or publication, please inform and
# acknowledge its authors.
#
# mss_dataserver is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# mss_dataserver is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with mss_dataserver. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright 2019 Stefan Mertl
##############################################################################
'''
Utility functions to interact with the database.
:copyright:
Stefan Mertl
:license:
GNU General Public License, Version 3
(http://www.gnu.org/licenses/gpl-3.0.html)
'''
import sqlalchemy as sqa
import logging
import re
logger_name = __name__
logger = logging.getLogger(logger_name)
[docs]def db_table_migration(engine, table, prefix):
''' Check if a database table migration is needed and apply the changes.
Check the table definition of the database with the one from defined
in the current database factories of the modules. If changes are
detected, the database tables are updated to fit the new version.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
prefix: String
The database table name prefix.
The names of the tables contain only the basename (e.g. geom_recorder).
The full table name usually contains the name of the project as
a prefix.
'''
logger.info('Checking if database table %s needs an update.', table.__table__.name)
migrate_success = False
cur_metadata = sqa.MetaData(engine)
cur_metadata.reflect(engine)
if table.__table__.name in iter(cur_metadata.tables.keys()):
# Check for changes between the existing and the new table.
table_updated = update_db_table(engine = engine,
table = table,
metadata = cur_metadata,
prefix = prefix)
if not table_updated:
logger.info('Everything is up-to-date. No update needed.')
migrate_success = True
else:
# The table is missing in the schema, create it.
logger.info('The table %s is not existing, create it.', table.__table__.name)
try:
table.__table__.create()
migrate_success = True
except:
logger.exception('Error creating the table %s.', table.__table__.name)
return migrate_success
[docs]def update_db_table(engine, table, metadata, prefix):
''' Update the table structure to the new schema.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
metadata: sqlalchemy.schema.MetaData
The tables of the current database schema.
prefix: String
The database table name prefix.
The names of the tables contain only the basename (e.g. geom_recorder).
The full table name usually contains the name of the project as
a prefix.
'''
table_updated = False
# Check for added columns.
new_table = table.__table__
exist_table = metadata.tables[new_table.name]
columns_to_add = set(new_table.columns.keys()).difference(set(exist_table.columns.keys()))
if columns_to_add:
if not table_updated:
logger.info('A database table migration is needed.')
for cur_col in columns_to_add:
logger.info('Adding column %s to table %s.', cur_col, table.__table__.name)
add_column(engine = engine,
table = table,
column = new_table.columns[cur_col],
prefix = prefix)
table_updated = True
# Check for columns to remove.
columns_to_remove = set(exist_table.columns.keys()).difference(set(new_table.columns.keys()))
if columns_to_remove:
if not table_updated:
logger.info('A database table migration is needed.')
for cur_col in columns_to_remove:
logger.info('Removing column %s from table %s.', cur_col, table.__table__.name)
remove_column(engine = engine,
table = table,
column = exist_table.columns[cur_col])
table_updated = True
# Check for changed column specifications.
for cur_name, cur_col in list(new_table.columns.items()):
if cur_name not in iter(exist_table.columns.keys()):
# The column is not existing in the database.
# Might have been deleted. Ignore it.
continue
exist_col = exist_table.columns[cur_name]
# Check for the column type.
new_type = cur_col.type.compile(engine.dialect)
exist_type = exist_col.type.compile(engine.dialect)
if not compare_column_type(new_type, exist_type):
if not table_updated:
logger.info('A database table migration is needed.')
logger.info('Changing the type of column %s from %s to %s.', cur_name, exist_type, new_type)
change_column_type(engine = engine,
table = table,
column = cur_col)
table_updated = True
# Drop all existing foreign keys. This has to be done in case that unique
# keys have to be removed which might be needed by foreign keys.
for cur_key in exist_col.foreign_keys:
logger.info("Removing the foreign key %s from table %s.", cur_key.name, table.__table__.name)
remove_foreign_key(engine = engine,
table = table,
fk_symbol = cur_key.name)
# Remove all unique contraints.
insp = sqa.inspect(engine)
unique_const = insp.get_unique_constraints(exist_table.name)
for cur_const in unique_const:
logger.info('Removing the unique constraint %s.', cur_const['name'])
remove_unique_constraint(engine, table, cur_const['name'])
# Add the new constraints to the table.
const_to_add = new_table.constraints.difference(exist_table.constraints)
unique_const = [x for x in const_to_add if isinstance(x, sqa.schema.UniqueConstraint)]
foreign_const = [x for x in const_to_add if isinstance(x, sqa.schema.ForeignKeyConstraint)]
# TODO: Handle changes of the primary key.
primary_const = [x for x in const_to_add if isinstance(x, sqa.schema.PrimaryKeyConstraint)]
for cur_const in unique_const:
logger.info('Adding the unique constraint %s.', cur_const.name)
add_unique_constraint(engine, table, cur_const)
table_updated = True
for cur_const in foreign_const:
#logger.warning("Changing foreign key constraint is not yet implemented (%s).", cur_const)
target_columns = [x.target_fullname for x in cur_const.elements]
logger.info('Adding foreign key (%s) referring (%s).', ','.join(cur_const.column_keys),
','.join(target_columns))
try:
# Get the target table using the ForeignKey elements. Using the
# referred_table was not working for all constraints because the
# referred_table is a dynamic attribute.
target_name = [x.target_fullname.split('.')[0] for x in cur_const.elements]
target_name = list(set(target_name))
if len(target_name) != 1:
logger.error("Too many target tables for the foreign constraint: %s.", target_name)
continue
else:
target_name = prefix + target_name[0]
add_foreign_key(engine = engine,
table = table,
columns = cur_const.column_keys,
target_table = target_name,
target_columns = [x.target_fullname.split('.')[1] for x in cur_const.elements],
on_update = cur_const.onupdate,
on_delete = cur_const.ondelete)
except:
logger.exception("Error creating the foreign key.")
return table_updated
[docs]def add_column(engine, table, column, prefix):
''' Add a column to a database table.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
column: sqlalchemy.schema.Column
The database table column to add.
prefix: String
The database table name prefix.
The names of the tables contain only the basename (e.g. geom_recorder).
The full table name usually contains the name of the project as
a prefix.
'''
table_name = table.__table__.name
column_type = column.type.compile(engine.dialect)
if column.nullable:
engine.execute('ALTER TABLE %s ADD COLUMN %s %s' % (table_name, column.name, column_type))
else:
engine.execute('ALTER TABLE %s ADD COLUMN %s %s NOT NULL' % (table_name, column.name, column_type))
for cur_key in column.foreign_keys:
add_foreign_key(engine = engine,
table = table,
column = column,
target = prefix + cur_key.target_fullname)
[docs]def remove_column(engine, table, column):
''' Remove a column from the database table.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
column: sqlalchemy.schema.Column
The database table column to add.
'''
table_name = table.__table__.name
for cur_key in column.foreign_keys:
remove_foreign_key(engine, table, cur_key.name)
engine.execute('ALTER TABLE %s DROP COLUMN %s' % (table_name, column.name))
[docs]def change_column_type(engine, table, column):
''' Change the type of a database table column.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
column: sqlalchemy.schema.Column
The database table column to add.
'''
table_name = table.__table__.name
column_type = column.type.compile(dialect = engine.dialect)
engine.execute('ALTER TABLE %s MODIFY %s %s' % (table_name, column.name, column_type))
[docs]def add_foreign_key(engine, table, columns, target_table, target_columns, on_update, on_delete):
''' Add a foreign key constraint.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
columns: list of String
The column names.
target_table: String
The name of the table, that the foreign key references.
target_columns: list of String
The names of the columns, that the foreign key references.
on_update: String
The action to perform on update (sql ON UPDATA).
on_delete: String
The action to perform on delete (sql ON DELETE).
'''
table_name = table.__table__.name
if on_update is None:
on_update = 'RESTRICT'
if on_delete is None:
on_delete = 'RESTRICT'
engine.execute('ALTER TABLE %s ADD FOREIGN KEY (%s) REFERENCES %s(%s) ON UPDATE %s ON DELETE %s' % (table_name,
','.join(columns),
target_table,
','.join(target_columns),
on_update,
on_delete))
[docs]def add_foreign_key_old(engine, table, column, target, on_update, on_delete):
''' Add a foreign key to the column.
This is an old function definition. Will be removed.
'''
table_name = table.__table__.name
tmp = target.split('.')
target_table = tmp[0]
target_column = tmp[1]
engine.execute('ALTER TABLE %s ADD FOREIGN KEY (%s) REFERENCES %s(%s) ON_DELETE %s ON_UPDATE %s' % (table_name, column.name, target_table, target_column, on_update, on_delete))
[docs]def remove_foreign_key(engine, table, fk_symbol):
''' Remove a foreign key.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
fk_symbol: String
The name of the foreign key.
'''
table_name = table.__table__.name
engine.execute('ALTER TABLE %s DROP FOREIGN KEY %s' % (table_name, fk_symbol))
[docs]def add_unique_constraint(engine, table, constraint):
''' Add a unique constraint to the table.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
constraint: String
The name of the constraint.
'''
table_name = table.__table__.name
col_names = [x.name for x in constraint.columns]
const_name = constraint.name
if const_name:
sql_cmd = 'ALTER TABLE %s ADD CONSTRAINT %s UNIQUE (%s)' % (table_name, const_name, ','.join(col_names))
else:
sql_cmd = 'ALTER TABLE %s ADD CONSTRAINT UNIQUE (%s)' % (table_name, ','.join(col_names))
engine.execute(sql_cmd)
[docs]def remove_unique_constraint(engine, table, name):
''' Remove a unique constraint from the table using the constraint name.
Parameters
----------
engine: sqlalchemy.engine.Engine
The sqlalchemy database Engine.
table: sqlalchemy.schema.Table
The table to migrate.
name: String
The name of the contraint.
'''
table_name = table.__table__.name
engine.execute('ALTER TABLE %s DROP INDEX %s' % (table_name, name))
[docs]def compare_column_type(col1, col2):
''' Compare the type of two columns.
Parameters
----------
col1: sqlalchemy.schema.Colum
The first column.
col2: sqlalchemy.schema.Colum
The column to compare.
Returns
-------
is_equal: bool
True if the two columns are equal, False if not.
'''
is_equal = False
tmp = re.split('[()]', col1)
if tmp:
if len(tmp) >= 1:
col1_type = tmp[0]
if len(tmp) >= 2:
col1_len = tmp[1]
else:
col1_len = None
tmp = re.split('[()]', col2)
if tmp:
if len(tmp) >= 1:
col2_type = tmp[0]
if len(tmp) >= 2:
col2_len = tmp[1]
else:
col2_len = None
if col1_type == col2_type:
if col1_len is not None:
if col1_len == col2_len:
is_equal = True
else:
is_equal = True
return is_equal