#!/usr/bin/env python
"""Populate database and storages."""
from logging import getLogger
from argparse import ArgumentParser
from os import listdir
from os.path import exists, join, abspath, dirname
from ConfigParser import ConfigParser
from locale import getdefaultlocale
from shutil import rmtree
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from sqlalchemy import engine_from_config
from sqlalchemy.exc import OperationalError
from pyramid.paster import get_appsettings
from ..lib.i18n import _, localizer
from ..lib.utils import normalize_name
from ..lib.config import config_get, config_get_namespace
from ..lib.log import setup_logging
from ..lib.xml import import_configuration
from ..lib.handler import HandlerManager
from ..models import ID_LEN, DBSession, Base
from ..models.users import PAGE_SIZE, User, UserPerm, UserIP
from ..models.storages import Storage
LOG = getLogger(__name__)
DEFAULT_SETTINGS_NAME = 'PubliForge'
# =============================================================================
[docs]def main():
"""Main function."""
# Parse arguments
parser = ArgumentParser(description='Populate database and storages.')
parser.add_argument(
'conf_uri', help='URI of configuration (e.g. production.ini#foo)')
parser.add_argument(
'files', nargs='*', help='optional XML configuration files to use')
parser.add_argument(
'--drop-tables', dest='drop_tables', help='drop existing tables',
action='store_true')
parser.add_argument(
'--no-pull', dest='no_pull', help='do not synchronize storages',
action='store_true')
parser.add_argument(
'--reset-index', dest='reset_index', help='delete storage indexes',
action='store_true')
parser.add_argument(
'--no-index', dest='no_index', help='do not index storages',
action='store_true')
parser.add_argument(
'--log-level', dest='log_level', help='log level', default='INFO',
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'))
args = parser.parse_args()
if not exists(args.conf_uri.partition('#')[0]):
parser.print_usage()
return
setup_logging(args.log_level)
# Populate database and storage
Populate(args, args.conf_uri).all(args.files)
# =============================================================================
[docs]class Populate(object):
"""Class to populate database and storages."""
# -------------------------------------------------------------------------
def __init__(self, args, conf_uri):
"""Constructor method."""
self._args = args
self._settings = get_appsettings(conf_uri)
if len(self._settings) < 3:
try:
self._settings = get_appsettings(
conf_uri, DEFAULT_SETTINGS_NAME)
except LookupError:
self._settings = None
return
conf_file = abspath(args.conf_uri.partition('#')[0])
self._config = ConfigParser({'here': dirname(conf_file)})
self._config.read(conf_file)
if not self._initialize_sql():
self._settings = None
return
# -------------------------------------------------------------------------
[docs] def all(self, files=tuple()):
"""Check settings, initialize database and create storages.
:param files: (list, optional)
List of files on command-line.
:return: (boolean)
"""
if self._settings is None:
return False
# Check general settings
if not self._settings.get('uid'):
LOG.critical(self._translate(_(
'Must define an unique identifier for this instance.')))
return False
# Agent only
if self._settings.get('storage.root') is None:
return True
# Clean up
path = self._settings.get('opener.cache')
if path and exists(path):
rmtree(path)
path = self._settings.get('temporary_dir')
if path and exists(path):
rmtree(path)
# Populate
self._populate_admin()
self._populate_from_xml(files)
if DBSession.query(UserPerm)\
.filter_by(scope='all', level='admin').first() is None:
LOG.error(self._translate(_('You must define an administrator.')))
DBSession.remove()
return False
self._update_storages()
DBSession.remove()
return True
# -------------------------------------------------------------------------
def _initialize_sql(self):
"""Database initialization.
:return: (boolean)
``True`` if it succeeds.
"""
# Initialize SqlAlchemy session
try:
dbengine = engine_from_config(self._settings, 'sqlalchemy.')
except KeyError:
LOG.critical(self._translate(_('Database is not defined.')))
return False
DBSession.configure(bind=dbengine)
Base.metadata.bind = dbengine
# Possibly, drop any existing tables
if self._args.drop_tables or \
self._settings.get('drop_tables') == 'true' or \
config_get(self._config, 'Populate', 'drop_tables') == 'true':
LOG.info(self._translate(_('###### Dropping existing tables')))
try:
Base.metadata.drop_all()
except OperationalError as error:
LOG.error(error.args[0])
return False
# Create the tables if they don't already exist
try:
Base.metadata.create_all(dbengine)
except OperationalError as error:
LOG.error(error.args[0])
return False
return True
# -------------------------------------------------------------------------
def _populate_admin(self):
"""Populate database with an administrator."""
if not self._config.has_option('Populate', 'admin.login'):
return
LOG.info(self._translate(_('###### Adding administrator')))
login = normalize_name(self._get('admin.login'))[0:ID_LEN]
ips = self._get('admin.ips', '')
record = {
'status': 'active',
'password': self._get('admin.password'),
'name': self._get('admin.name'),
'email': self._get('admin.email'),
'lang': self._get(
'admin.language',
self._settings.get('pyramid.default_locale_name', 'en')),
'restrict_ip': bool(ips),
'home': self._get('admin.home', 'site'),
'page_size': int(self._get('admin.page_size', PAGE_SIZE))}
if not record['password'] or not record['name']\
or not record['email']:
exit('*** Incorrect administrator definition.')
user = DBSession.query(User.login).filter_by(login=login).first()
if user is not None:
return
user = User(self._settings, login, **record)
DBSession.add(user)
DBSession.commit()
user.perms.append(UserPerm(user.user_id, 'all', 'admin'))
for my_ip in ips.split():
user.ips.append(UserIP(user.user_id, my_ip))
DBSession.commit()
# -------------------------------------------------------------------------
def _populate_from_xml(self, extra_files):
"""Populate database with XML content.
:param list extra_files:
List of files on command-line.
"""
# List of files to load
files = config_get_namespace(self._config, 'Populate', 'file')
files = [files[k] for k in sorted(files.keys())] + list(extra_files)
if not files:
return
# Load files
LOG.info(self._translate(_('###### Loading configurations')))
done = set()
for filename in files:
filename = abspath(filename)
if filename not in done:
LOG.info(filename)
errors = import_configuration(
self._settings, filename, error_if_exists=False)
for error in errors:
LOG.error(self._translate(error))
done.add(filename)
# -------------------------------------------------------------------------
def _update_storages(self):
"""Update all storages."""
# pylint: disable = logging-format-interpolation
cache_manager = CacheManager(
**parse_cache_config_options(self._settings))
handler_manager = HandlerManager(self._settings, cache_manager)
if self._args.reset_index and 'storage.index' in self._settings \
and exists(self._settings['storage.index']):
LOG.info(self._translate(_('###### Reseting indexes')))
handler_manager.delete_index(False)
LOG.info(self._translate(_('###### Updating storages')))
for storage in DBSession.query(Storage):
storage_dir = join(
self._settings['storage.root'], storage.storage_id)
handler = handler_manager.get_handler(storage.storage_id, storage)
if handler is None:
LOG.error(self._translate(
_('${e}: unkwown engine', {'e': storage.vcs_engine})))
elif not exists(storage_dir) or not listdir(storage_dir):
LOG.info('{0:.<32}'.format(storage.storage_id))
error = handler.clone()
if error:
LOG.error(error)
elif not self._args.no_index:
handler_manager.index(storage.storage_id)
elif not hasattr(self._args, 'no_pull') \
or not self._args.no_pull:
LOG.info('{0:.<32}'.format(storage.storage_id))
handler.synchronize(None, True)
if not self._args.no_index:
handler_manager.index(storage.storage_id)
# -------------------------------------------------------------------------
def _get(self, option, default=None):
"""Retrieve a value from section [Populate].
:param option: (string)
Option name.
:param default: (string, optional)
Default value
:return: (string)
Read value or default value.
"""
return config_get(self._config, 'Populate', option, default)
# -------------------------------------------------------------------------
@classmethod
def _translate(cls, text):
"""Return ``text`` translated.
:param text: (string)
Text to translate.
"""
return localizer(getdefaultlocale()[0]).translate(text)
# =============================================================================
if __name__ == '__main__':
main()