"""SQLAlchemy-powered model definition for users."""
# pylint: disable = super-on-old-class
from datetime import datetime
from lxml import etree
from sqlalchemy import Column, ForeignKey, types
from sqlalchemy.orm import relationship
from ..lib.i18n import _
from ..lib.config import settings_get_list
from ..lib.utils import hash_sha, normalize_name, normalize_spaces
from . import ID_LEN, LABEL_LEN, PATH_LEN, Base, DBSession
USER_STATUS = {'active': _('active'), 'inactive': _('inactive')}
PERM_SCOPES = {
'all': _('All'), 'doc': _('Documentation'), 'usr': _('Users'),
'grp': _('Groups'), 'stg': _('Storages'), 'idx': _('Indexing'),
'prj': _('Projects')}
USER_PERMS = {
'manager': _('Manager'), 'editor': _('Editor'), 'user': _('User')}
HOMES = {
'site': _('Site administration'), 'projects': _('Project list'),
'storages': _('Storage list')}
PAGE_SIZE = 20
# =============================================================================
[docs]class User(Base):
"""SQLAlchemy-powered user model."""
# pylint: disable = too-many-instance-attributes
__tablename__ = 'user'
__table_args__ = {'mysql_engine': 'InnoDB'}
user_id = Column(types.Integer, autoincrement=True, primary_key=True)
login = Column(types.String(ID_LEN), unique=True, nullable=False)
status = Column(
types.Enum(*USER_STATUS.keys(), name='usr_status_enum'),
nullable=False)
password = Column(types.String(64))
name = Column(types.String(LABEL_LEN), nullable=False)
email = Column(types.String(LABEL_LEN), nullable=False)
lang = Column(types.String(5), nullable=False)
expiration = Column(types.Date)
restrict_ip = Column(types.Boolean, default=False)
home = Column(
types.Enum(*HOMES.keys(), name='usr_home_enum'), default='projects',
nullable=False)
page_size = Column(types.Integer, default=PAGE_SIZE)
last_login = Column(types.DateTime)
updated = Column(types.DateTime)
created = Column(types.DateTime, default=datetime.now)
perms = relationship('UserPerm', cascade='all, delete')
ips = relationship('UserIP', cascade='all, delete')
selection = relationship('UserFile', cascade='all, delete')
# -------------------------------------------------------------------------
def __init__(self, settings, login, status, password, name, email,
lang='en', expiration=None, restrict_ip=False, home=None,
page_size=None, created=None, updated=None):
"""Constructor method."""
# pylint: disable = too-many-arguments
super(User, self).__init__()
self.login = normalize_name(login)[0:ID_LEN].lower()
self.status = status
self.set_password(settings, password)
self.name = normalize_spaces(name, LABEL_LEN)
self.email = email[0:LABEL_LEN].lower()
self.lang = lang[0:5]
self.expiration = datetime.strptime(expiration, '%Y-%m-%d') \
if isinstance(expiration, basestring) else expiration
self.restrict_ip = restrict_ip
self.home = home
self.page_size = page_size
self.created = datetime.now() if created is None \
else datetime.strptime(created, '%Y-%m-%dT%H:%M:%S')
if updated is not None:
self.updated = datetime.strptime(updated, '%Y-%m-%dT%H:%M:%S')
# -------------------------------------------------------------------------
[docs] def set_password(self, settings, password):
"""Encrypt and set password.
:param settings: (dictionary)
Pyramid deployment settings.
:param password: (string)
Clear password.
"""
if password:
self.password = hash_sha(
password.strip(), settings.get('encryption', '-'))
# -------------------------------------------------------------------------
[docs] def setup_environment(self, request):
"""Set up user environment (session and cookie).
:param request: (:class:`pyramid.request.Request` instance)
Current request.
It saves in session the following values:
* ``user_id``: user ID
* ``login``: user login
* ``name``: user name
* ``lang``: user language
* ``home``: home page content
* ``paging``: paging environment
* ``perms``: list of permissions
"""
# pylint: disable = E1101
# User information
request.session.clear()
request.session['user_id'] = self.user_id
request.session['login'] = self.login
request.session['name'] = self.name
request.session['home'] = self.home
request.session['paging'] = (self.page_size, {})
# Language
langs = settings_get_list(
request.registry.settings, 'languages', ['en'])
request.session['lang'] = \
(self.lang in langs and self.lang) or \
(self.lang[0:2] in langs and self.lang[0:2]) or \
request.registry.settings.get('pyramid.default_locale_name', 'en')
# Permissions
perms = {}
levels = {'user': 1, 'editor': 2, 'manager': 3, 'admin': 4}
for perm in self.perms:
perms[perm.scope] = perm.level
for group in self.groups:
for perm in group.perms:
if perm.scope not in perms or \
levels[perms[perm.scope]] < levels[perm.level]:
perms[perm.scope] = perm.level
if 'all' in perms and perms['all'] != 'admin':
perm = levels[perms['all']]
# pylint: disable = consider-iterating-dictionary
for scope in PERM_SCOPES.keys()[1:]:
if scope not in perms or levels[perms[scope]] < perm:
perms[scope] = perms['all']
del perms['all']
request.session['perms'] = ('admin', ) \
if perms.get('all') == 'admin' \
else tuple([('%s_%s') % ((k, perms[k])) for k in perms])
# -------------------------------------------------------------------------
[docs] @classmethod
def load(cls, settings, user_elt, error_if_exists=True):
"""Load a user from a XML element.
:param settings: (dictionary)
Application settings.
:param user_elt: (:class:`lxml.etree.Element` instance)
User XML element.
:param error_if_exists: (boolean, default=True)
It returns an error if user already exists.
:return: (:class:`pyramid.i18n.TranslationString` or ``None``)
Error message or ``None``.
"""
# Check if already exists
login = normalize_name(user_elt.get('login'))[0:ID_LEN]
user = DBSession.query(cls).filter_by(login=login).first()
if user is not None:
if error_if_exists:
return _('User "${l}" already exists.', {'l': login})
return None
# Create user
record = {
'status':
user_elt.get('status', 'active'),
'created':
user_elt.get('created'),
'updated':
user_elt.get('updated'),
'password':
user_elt.findtext('password') is not None and
user_elt.findtext('password').strip() or None,
'name':
user_elt.findtext('name'),
'email':
user_elt.findtext('email').strip(),
'lang':
user_elt.findtext('language') is not None and
user_elt.findtext('language') or
settings.get('pyramid.default_locale_name', 'en'),
'expiration':
user_elt.findtext('expiration') is not None and
user_elt.findtext('expiration') or None,
'restrict_ip':
bool(user_elt.find('ips') is not None),
'home':
user_elt.findtext('home') is not None and
user_elt.findtext('home').strip() or None,
'page_size':
user_elt.findtext('page_size') is not None and
int(user_elt.findtext('page_size').strip()) or None}
user = cls(settings, login, **record)
if record['password'] and user_elt.find('password').get('hash'):
user.password = record['password']
DBSession.add(user)
DBSession.commit()
# Add permissions
done = set()
for item in user_elt.findall('permissions/permission'):
if item.get('scope') not in done:
user.perms.append(
UserPerm(user.user_id, item.get('scope'), item.text))
done.add(item.get('scope'))
# Add IP restriction
done = set()
for item in user_elt.findall('ips/ip'):
if item.text not in done:
user.ips.append(UserIP(user.user_id, item.text))
done.add(item.text)
# Add selection
done = set()
for item in user_elt.findall('selection/file'):
if item.text.strip() not in done:
user.selection.append(UserFile(item.text.strip()))
done.add(item.text.strip())
DBSession.commit()
return None
# -------------------------------------------------------------------------
[docs] def xml(self, i_manager=False):
"""Serialize a user to a XML representation.
:param i_manager: (boolean, default=False)
Am I a user manager?
:return: (:class:`lxml.etree.Element`)
"""
if 'admin' in [k.level for k in self.perms]:
return None
user_elt = etree.Element('user')
user_elt.set('login', self.login)
if self.status != 'active':
user_elt.set('status', self.status)
user_elt.set('created', self.created.isoformat().partition('.')[0])
if self.updated and self.created != self.updated:
user_elt.set('updated', self.updated.isoformat().partition('.')[0])
etree.SubElement(user_elt, 'name').text = self.name
if i_manager:
etree.SubElement(
user_elt, 'password', hash='true').text = self.password
etree.SubElement(user_elt, 'email').text = self.email
etree.SubElement(user_elt, 'language').text = self.lang
etree.SubElement(user_elt, 'home').text = self.home
if self.page_size != PAGE_SIZE:
etree.SubElement(user_elt, 'page_size').text = str(self.page_size)
if self.expiration:
etree.SubElement(user_elt, 'expiration').text = \
self.expiration.isoformat()
# Permissions
if self.perms:
elt = etree.SubElement(user_elt, 'permissions')
for perm in self.perms:
etree.SubElement(
elt, 'permission', scope=perm.scope).text = perm.level
# IPs
if self.ips:
elt = etree.SubElement(user_elt, 'ips')
for item in self.ips:
etree.SubElement(elt, 'ip').text = item.ip
# Selection
if self.selection:
elt = etree.SubElement(user_elt, 'selection')
for item in self.selection:
etree.SubElement(elt, 'file').text = item.path
return user_elt
# =============================================================================
[docs]class UserPerm(Base):
"""SQLAlchemy-powered user permission class."""
# pylint: disable = R0903
__tablename__ = 'user_perm'
__table_args__ = {'mysql_engine': 'InnoDB'}
user_id = Column(
types.Integer,
ForeignKey('user.user_id', ondelete='CASCADE'), primary_key=True)
scope = Column(
types.Enum(*PERM_SCOPES.keys(), name='perm_scope_enum'),
primary_key=True)
level = Column(
types.Enum(*(['admin'] + USER_PERMS.keys()), name='usr_perms_enum'))
# -------------------------------------------------------------------------
def __init__(self, user_id, scope, level):
"""Constructor method."""
super(UserPerm, self).__init__()
self.user_id = user_id
self.scope = 'all' if level == 'admin' else scope
self.level = level
# =============================================================================
[docs]class UserIP(Base):
"""Class for restriction by IP."""
# pylint: disable = C0103, R0903
__tablename__ = 'user_ip'
__table_args__ = {'mysql_engine': 'InnoDB'}
user_id = Column(
types.Integer,
ForeignKey('user.user_id', ondelete='CASCADE'), primary_key=True)
ip = Column(types.String(40), primary_key=True)
# -------------------------------------------------------------------------
def __init__(self, user_id, ip):
"""Constructor method."""
super(UserIP, self).__init__()
self.user_id = user_id
self.ip = ip
# =============================================================================
[docs]class UserFile(Base):
"""SQLAlchemy-powered user selection model."""
# pylint: disable = R0903
__tablename__ = 'user_file'
__table_args__ = {'mysql_engine': 'InnoDB'}
user_id = Column(
types.Integer,
ForeignKey('user.user_id', ondelete='CASCADE'), primary_key=True)
path = Column(types.String(PATH_LEN), primary_key=True)
# -------------------------------------------------------------------------
def __init__(self, path, user_id=None):
"""Constructor method."""
super(UserFile, self).__init__()
self.user_id = user_id
self.path = path.strip()[0:PATH_LEN]