# -*- coding: utf-8 -*-
"""SQLAlchemy-powered model definition for storages."""
# pylint: disable = super-on-old-class
from os.path import exists, join
import shutil
from lxml import etree
from sqlalchemy import Column, ForeignKey, types
from sqlalchemy.orm import relationship
from ..lib.i18n import _
from ..lib.utils import NORMALIZE_FILE_MODE, encrypt, normalize_name
from ..lib.utils import normalize_spaces, wrap
from . import ID_LEN, LABEL_LEN, DESCRIPTION_LEN, PATH_LEN, PATTERN_LEN
from . import Base, DBSession
from .users import User
from .groups import Group
STORAGE_ACCESS = {
'open': _('open'), 'restricted': _('restricted'), 'closed': _('closed')}
VCS_ENGINES = {
'none': _(u'none – None'), 'local': _(u'local – Local'),
'hg': u'hg – Mercurial', 'hgsvn': u'hgsvn – Subversion (hg)',
'svn': u'svn – Subversion'}
STORAGE_PERMS = {'writer': _('File editor'), 'reader': _('File reader')}
REFRESH = 3600
# =============================================================================
[docs]class Storage(Base):
"""SQLAlchemy-powered storage model."""
# pylint: disable = too-many-instance-attributes
__tablename__ = 'storage'
__table_args__ = {'mysql_engine': 'InnoDB'}
storage_id = Column(types.String(ID_LEN), primary_key=True)
label = Column(types.String(LABEL_LEN), unique=True, nullable=False)
description = Column(types.String(DESCRIPTION_LEN))
vcs_engine = Column(
types.Enum(*VCS_ENGINES.keys(), name='vcs_engine_enum'),
nullable=False)
vcs_url = Column(types.String(PATH_LEN))
vcs_user = Column(types.String(ID_LEN))
vcs_password = Column(types.String(64))
public_url = Column(types.String(PATH_LEN))
access = Column(
types.Enum(*STORAGE_ACCESS.keys(), name='stg_access_enum'),
nullable=False, default='open')
refresh = Column(types.Integer, default=REFRESH)
indexed_files = Column(types.String(PATTERN_LEN))
normalize_mode = Column(
types.Enum(
*NORMALIZE_FILE_MODE.keys(), name='vcs_normalize_mode_enum'))
openers = relationship('StorageOpener', cascade='all, delete')
users = relationship(
'StorageUser', backref='storage', cascade='all, delete')
groups = relationship('StorageGroup', cascade='all, delete')
# -------------------------------------------------------------------------
def __init__(self, settings, storage_id, label, description=None,
vcs_engine='local', vcs_url=None, vcs_user=None,
vcs_password=None, public_url=None, access=None,
refresh=None, indexed_files=None, normalize_mode=None):
"""Constructor method."""
# pylint: disable = too-many-arguments
super(Storage, self).__init__()
self.storage_id = storage_id.strip()[0:ID_LEN]
self.label = normalize_spaces(label, LABEL_LEN)
self.description = normalize_spaces(description, DESCRIPTION_LEN)
self.access = access
self.vcs_engine = vcs_engine
if vcs_engine not in ('none', 'local'):
self.vcs_url = vcs_url[0:PATH_LEN]
self.vcs_user = vcs_user and vcs_user.strip()[0:ID_LEN]
self.set_vcs_password(settings, vcs_password)
elif public_url is not None:
self.public_url = public_url.strip()[0:PATH_LEN]
self.refresh = refresh
self.indexed_files = indexed_files.strip() \
if indexed_files and indexed_files.strip() else None
self.normalize_mode = normalize_mode
# -------------------------------------------------------------------------
[docs] def set_vcs_password(self, settings, vcs_password):
"""Encrypt and set password.
:param settings: (dictionary)
Pyramid deployment settings.
:param vcs_password: (string)
Clear VCS password.
"""
if vcs_password:
self.vcs_password = encrypt(
vcs_password.strip(), settings.get('encryption', '-'))
# -------------------------------------------------------------------------
[docs] @classmethod
def delete(cls, storage_root, storage_id):
"""Delete a storage.
:param storage_root: (string)
Storage root path.
:param storage_id: (string)
Storage identifier.
"""
# Delete records in database
DBSession.query(cls).filter_by(storage_id=storage_id).delete()
DBSession.commit()
# Remove directory
if exists(join(storage_root, storage_id)):
shutil.rmtree(join(storage_root, storage_id))
# -------------------------------------------------------------------------
[docs] @classmethod
def load(cls, settings, storage_elt, error_if_exists=True):
"""Load a storage from a XML element.
:param settings: (dictionary)
Application settings.
:param storage_elt: (:class:`lxml.etree.Element` instance)
Storage XML element.
:param error_if_exists: (boolean, default=True)
It returns an error if storage already exists.
:return: (:class:`pyramid.i18n.TranslationString` ``None`` or
:class:`Storage` instance)
Error message or ``None`` or the new storage object.
"""
# Reset
storage_id = storage_elt.get('id').strip()[0:ID_LEN]
label = normalize_spaces(storage_elt.findtext('label'), LABEL_LEN)
if storage_elt.find('reset') is not None \
and bool(storage_elt.findtext('reset')):
Storage.delete(settings['storage.root'], storage_id)
# Check if already exists
storage = DBSession.query(cls).filter_by(
storage_id=storage_id).first()
if storage is None:
storage = DBSession.query(cls).filter_by(label=label).first()
if storage is not None:
if error_if_exists:
return _(
'Storage "${i}" already exists.', {'i': storage_id})
return None
# Create storage
vcs_elt = storage_elt.find('vcs')
record = {
'label': label,
'description':
storage_elt.findtext('description'),
'vcs_engine':
vcs_elt.get('engine'),
'vcs_url':
vcs_elt.findtext('url') is not None and
vcs_elt.findtext('url').strip() or None,
'vcs_user':
vcs_elt.findtext('user') is not None and
vcs_elt.findtext('user').strip() or None,
'vcs_password':
vcs_elt.findtext('password') is not None and
vcs_elt.findtext('password').strip() or None,
'public_url':
vcs_elt.findtext('public') is not None and
vcs_elt.findtext('public').strip() or None,
'access':
storage_elt.findtext('access') is not None and
storage_elt.findtext('access').strip() or 'open',
'refresh':
storage_elt.findtext('refresh') is not None and
int(storage_elt.findtext('refresh').strip()) or None,
'indexed_files': storage_elt.findtext('indexed'),
'normalize_mode':
storage_elt.find('normalize') is not None and
storage_elt.find('normalize').get('mode') or None}
storage = Storage(settings, storage_id, **record)
if record['vcs_password'] and vcs_elt.find('password').get('encrypt'):
storage.vcs_password = record['vcs_password']
DBSession.add(storage)
DBSession.commit()
# Load additional element
return cls._load_additional(storage_elt, storage)
# -------------------------------------------------------------------------
@classmethod
def _load_additional(cls, storage_elt, storage):
"""Load additional elements as openers, users and groups.
:param storage_elt: (:class:`lxml.etree.Element` instance)
Current storage DOM element.
:param storage: (:class:`Storage`)
Current storage record
:return: (:class:`Storage` instance)
The new storage object.
"""
# Add openers
done = set()
for item in storage_elt.findall('openers/opener'):
opener_id = item.text[0:ID_LEN]
if opener_id not in done:
storage.openers.append(
StorageOpener(
storage.storage_id, opener_id, len(done) + 1))
done.add(opener_id)
# Add users
done = set()
for item in storage_elt.findall('members/member'):
login = normalize_name(item.text)[0:ID_LEN]
if login not in done:
user = DBSession.query(User).filter_by(login=login).first()
if user is not None:
storage.users.append(StorageUser(
storage.storage_id, user.user_id,
item.get('in-menu'), item.get('permission'),
vcs_user=item.get('vcs-user'),
vcs_password=item.get('vcs-password')))
done.add(login)
# Add groups
done = set()
for item in storage_elt.findall('members/member-group'):
group_id = normalize_name(item.text)[0:ID_LEN]
if group_id not in done:
group = DBSession.query(Group).filter_by(
group_id=group_id).first()
if group is not None:
storage.groups.append(
StorageGroup(
storage.storage_id, group.group_id,
item.get('permission')))
done.add(group_id)
DBSession.commit()
return storage
# -------------------------------------------------------------------------
[docs] def xml(self): # noqa
"""Serialize a storage to a XML representation.
:return: (:class:`lxml.etree.Element`)
"""
# pylint: disable = R0912
storage_elt = etree.Element('storage')
storage_elt.set('id', self.storage_id)
etree.SubElement(storage_elt, 'label').text = self.label
if self.description:
etree.SubElement(storage_elt, 'description').text = \
wrap(self.description, indent=8)
elt = etree.SubElement(storage_elt, 'vcs')
elt.set('engine', self.vcs_engine)
if self.vcs_url:
etree.SubElement(elt, 'url').text = self.vcs_url
if self.vcs_user:
etree.SubElement(elt, 'user').text = self.vcs_user
etree.SubElement(elt, 'password', encrypt='true').text = \
self.vcs_password
if self.public_url:
etree.SubElement(elt, 'public').text = self.public_url
if self.access != 'open':
etree.SubElement(storage_elt, 'access').text = self.access
if self.refresh != REFRESH:
etree.SubElement(storage_elt, 'refresh').text = str(self.refresh)
if self.indexed_files:
etree.SubElement(storage_elt, 'indexed').text = self.indexed_files
if self.normalize_mode:
etree.SubElement(
storage_elt, 'normalize', mode=self.normalize_mode)
# Openers
if self.openers:
openers_elt = etree.SubElement(storage_elt, 'openers')
for opener in sorted(self.openers, key=lambda k: k.sort):
elt = etree.SubElement(openers_elt, 'opener')
elt.text = opener.opener_id
# Members
if self.users or self.groups:
members_elt = etree.SubElement(storage_elt, 'members')
for user in self.users:
elt = etree.SubElement(members_elt, 'member')
elt.text = user.user.login
if user.in_menu:
elt.set('in-menu', 'true')
if user.perm:
elt.set('permission', user.perm)
if user.vcs_user:
elt.set('vcs-user', user.vcs_user)
if user.vcs_password:
elt.set('vcs-password', user.vcs_password)
for group in self.groups:
elt = etree.SubElement(members_elt, 'member-group')
elt.text = group.group_id
if group.perm and group.perm != 'reader':
elt.set('permission', group.perm)
return storage_elt
# =============================================================================
[docs]class StorageOpener(Base):
"""SQLAlchemy-powered association table between ``Storage`` and its
openers."""
# pylint: disable = too-few-public-methods
__tablename__ = 'storage_opener'
__table_args__ = {'mysql_engine': 'InnoDB'}
storage_id = Column(
types.String(ID_LEN),
ForeignKey('storage.storage_id', ondelete='CASCADE'),
primary_key=True)
opener_id = Column(types.String(ID_LEN), primary_key=True)
sort = Column(types.Integer, default=0)
# -------------------------------------------------------------------------
def __init__(self, storage_id, opener_id, sort=0):
"""Constructor method."""
super(StorageOpener, self).__init__()
self.storage_id = storage_id.strip()[0:ID_LEN]
self.opener_id = opener_id.strip()[0:ID_LEN]
self.sort = sort
# =============================================================================
[docs]class StorageUser(Base):
"""SQLAlchemy-powered association table between ``Storage`` and
``User``."""
# pylint: disable = R0913
__tablename__ = 'storage_user'
__table_args__ = {'mysql_engine': 'InnoDB'}
storage_id = Column(
types.String(ID_LEN),
ForeignKey('storage.storage_id', ondelete='CASCADE'),
primary_key=True)
user_id = Column(
types.Integer, ForeignKey('user.user_id', ondelete='CASCADE'),
primary_key=True)
in_menu = Column(types.Boolean, default=False)
perm = Column(types.Enum(*STORAGE_PERMS.keys(), name='stg_perms_enum'))
vcs_user = Column(types.String(ID_LEN))
vcs_password = Column(types.String(40))
user = relationship('User')
# -------------------------------------------------------------------------
def __init__(self, storage_id, user_id, in_menu=False, perm='reader',
vcs_user=None, vcs_password=None, settings=None):
"""Constructor method."""
super(StorageUser, self).__init__()
self.storage_id = storage_id.strip()[0:ID_LEN]
self.user_id = user_id
self.in_menu = bool(in_menu)
if perm != 'none':
self.perm = perm
self.vcs_user = vcs_user
self.vcs_password = vcs_password
if vcs_password and settings is not None:
self.set_vcs_password(settings, vcs_password)
# -------------------------------------------------------------------------
[docs] def set_vcs_password(self, settings, vcs_password):
"""Encrypt and set VCS password.
:param settings: (dictionary)
Pyramid deployment settings.
:param password: (string)
Clear password.
"""
self.vcs_password = encrypt(
vcs_password, settings.get('encryption', '-'))
# =============================================================================
[docs]class StorageGroup(Base):
"""SQLAlchemy-powered association table between ``Storage`` and
``Group``."""
# pylint: disable = R0903
__tablename__ = 'storage_group'
__table_args__ = {'mysql_engine': 'InnoDB'}
storage_id = Column(
types.String(ID_LEN),
ForeignKey('storage.storage_id', ondelete='CASCADE'),
primary_key=True)
group_id = Column(
types.String(ID_LEN),
ForeignKey('group.group_id', ondelete='CASCADE'), primary_key=True)
perm = Column(
types.Enum(*STORAGE_PERMS.keys(), name='stg_perms_enum'),
default='reader')
# -------------------------------------------------------------------------
def __init__(self, storage_id, group_id, perm=None):
"""Constructor method."""
super(StorageGroup, self).__init__()
self.storage_id = storage_id.strip()[0:ID_LEN]
self.group_id = group_id.strip()[0:ID_LEN]
self.perm = perm