# pylint: disable = C0322
"""Group view callables."""
from colander import Mapping, SchemaNode, String
from colander import All, Length, OneOf, Regex
from sqlalchemy import func
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPForbidden, HTTPFound, HTTPNotFound
from ..lib.i18n import _
from ..lib.log import log_activity
from ..lib.utils import has_permission, normalize_spaces, make_id
from ..lib.xml import upload_configuration, export_configuration
from ..lib.viewutils import get_action, paging_users, paging_groups
from ..lib.form import Form
from ..lib.tabset import TabSet
from ..lib.paging import PAGE_SIZES
from ..models import NULL, ID_LEN, LABEL_LEN, DESCRIPTION_LEN
from ..models import DBSession, close_dbsession
from ..models.users import PERM_SCOPES, USER_PERMS, User
from ..models.groups import GROUP_USER, Group, GroupPerm
from ..models.storages import StorageUser
from ..models.projects import ProjectUser
GROUP_SETTINGS_TABS = (_('Description'), _('Permissions'), _('Members'))
# =============================================================================
[docs]class GroupView(object):
"""Class to manage groups."""
# -------------------------------------------------------------------------
def __init__(self, request):
"""Constructor method."""
request.add_finished_callback(close_dbsession)
self._request = request
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='group_admin', renderer='../Templates/grp_admin.pt',
permission='grp.update')
def admin(self):
"""List groups for administration purpose."""
action, items = get_action(self._request)
if action == 'imp!':
upload_configuration(self._request, 'grp_manager', 'group')
log_activity(self._request, 'import_group')
action = ''
elif action[0:4] == 'del!':
self._delete_groups(items)
action = ''
elif action[0:4] == 'exp!':
return self._export_groups(items)
paging, defaults = paging_groups(self._request, 'groups!')
form = Form(self._request, defaults=defaults)
group_sizes = dict(
DBSession.query(
GROUP_USER.c.group_id, func.count(GROUP_USER.c.user_id))
.group_by(GROUP_USER.c.group_id).all())
depth = 3 if self._request.breadcrumbs.current_path() == \
self._request.route_path('site_admin') else 2
self._request.breadcrumbs.add(_('Group administration'), depth)
return {
'form': form, 'paging': paging, 'action': action,
'group_sizes': group_sizes, 'PAGE_SIZES': PAGE_SIZES,
'i_editor': has_permission(self._request, 'grp_editor'),
'i_manager': has_permission(self._request, 'grp_manager')}
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='group_view', renderer='../Templates/grp_view.pt',
permission='grp.read')
def view(self):
"""Show group settings."""
group_id = self._request.matchdict.get('group_id')
action = get_action(self._request)[0]
if action == 'exp!':
return self._export_groups((group_id,))
group = DBSession.query(Group).filter_by(group_id=group_id).first()
if group is None:
raise HTTPNotFound()
tab_set = TabSet(self._request, GROUP_SETTINGS_TABS)
members = [k.user_id for k in group.users]
if members:
members = DBSession.query(User.user_id, User.login, User.name)\
.filter(User.user_id.in_(members)).order_by(User.name).all()
self._request.breadcrumbs.add(
_('Group settings'), replace=self._request.route_path(
'group_edit', group_id=group.group_id))
return {
'tab_set': tab_set, 'PERM_SCOPES': PERM_SCOPES,
'USER_PERMS': USER_PERMS, 'group': group, 'members': members,
'i_editor': has_permission(self._request, 'grp_editor')}
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='group_create', renderer='../Templates/grp_edit.pt',
permission='grp.create')
def create(self):
"""Create a group."""
form, tab_set = self._settings_form()
if form.validate():
dbgroup = self._create(form.values)
if dbgroup is not None:
self._request.breadcrumbs.pop()
log_activity(self._request, 'group_create', dbgroup.group_id)
return HTTPFound(self._request.route_path(
'group_edit', group_id=dbgroup.group_id))
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
self._request.breadcrumbs.add(_('Group creation'))
return {
'form': form, 'tab_set': tab_set,
'PERM_SCOPES': PERM_SCOPES, 'USER_PERMS': USER_PERMS,
'PAGE_SIZES': PAGE_SIZES, 'group': None, 'members': None,
'paging': None, 'groups': None}
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='group_edit', renderer='../Templates/grp_edit.pt',
permission='grp.update')
def edit(self):
"""Edit group settings."""
# Action
# pylint: disable = no-value-for-parameter
paging = groups = None
group_id = self._request.matchdict.get('group_id')
action, items = get_action(self._request)
if action == 'aur?' \
or (not action and 'paging_id' in self._request.GET):
paging = paging_users(self._request)[0]
groups = DBSession.query(Group.group_id, Group.label).all()
elif action[0:4] == 'aur!':
self._add_members(group_id, items)
elif action[0:4] == 'rur!':
DBSession.execute(
GROUP_USER.delete()
.where(GROUP_USER.c.group_id == group_id)
.where(GROUP_USER.c.user_id == int(action[4:])))
DBSession.query(StorageUser).filter_by(user_id=int(action[4:]))\
.filter(StorageUser.perm != NULL).delete()
DBSession.query(ProjectUser).filter_by(user_id=int(action[4:]))\
.filter(ProjectUser.perm != NULL).delete()
DBSession.commit()
del self._request.session['menu']
# Environment
dbgroup = DBSession.query(Group).filter_by(group_id=group_id).first()
if dbgroup is None:
raise HTTPNotFound()
form, tab_set = self._settings_form(dbgroup)
members = [k.user_id for k in dbgroup.users]
if members:
members = DBSession.query(User.user_id, User.login, User.name)\
.filter(User.user_id.in_(members)).order_by(User.name).all()
# Save
view_path = self._request.route_path(
'group_view', group_id=dbgroup.group_id)
if action == 'sav!' and form.validate(dbgroup) \
and self._save(dbgroup, form.values):
log_activity(self._request, 'group_edit', dbgroup.group_id)
return HTTPFound(view_path)
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
# Breadcrumbs
self._request.breadcrumbs.add(_('Group settings'), replace=view_path)
return {
'form': form, 'action': action, 'tab_set': tab_set,
'PERM_SCOPES': PERM_SCOPES, 'USER_PERMS': USER_PERMS,
'PAGE_SIZES': PAGE_SIZES, 'group': dbgroup, 'members': members,
'paging': paging, 'groups': groups}
# -------------------------------------------------------------------------
def _delete_groups(self, group_ids):
"""Delete groups.
:param group_ids: (list)
List of group IDs to delete.
"""
if not has_permission(self._request, 'grp_manager'):
raise HTTPForbidden()
group_ids = set(group_ids)
# Do not delete my own groups
user_id = self._request.session['user_id']
ids = set([k[0] for k in DBSession.query(GROUP_USER.c.group_id)
.filter(User.user_id == user_id)
.filter(User.user_id == GROUP_USER.c.user_id).all()])
if group_ids & ids:
self._request.session.flash(
_("You can't delete your own group."), 'alert')
return
# Delete
deleted = []
for dbgroup in DBSession.query(Group).filter(
Group.group_id.in_(group_ids)):
deleted.append(dbgroup.group_id)
DBSession.delete(dbgroup)
if deleted:
DBSession.commit()
log_activity(self._request, 'group_delete', u' '.join(deleted))
# -------------------------------------------------------------------------
def _export_groups(self, group_ids):
"""Export groups.
:param group_ids: (list)
List of group IDs to export.
:return: (:class:`pyramid.response.Response` instance)
"""
elements = []
exported = []
for dbgroup in DBSession.query(Group)\
.filter(Group.group_id.in_(group_ids)).order_by('label'):
exported.append(dbgroup.group_id)
elements.append(dbgroup.xml())
name = '%s_groups.pfgrp' % self._request.registry.settings.get(
'skin.name', 'publiforge')
log_activity(
self._request, 'group_export', u' '.join(exported))
return export_configuration(elements, name)
# -------------------------------------------------------------------------
def _settings_form(self, group=None):
"""Return a group settings form.
:param group: (:class:`~..models.groups.Group` instance, optional)
Current group object.
:return: (tuple)
A tuple such as ``(form, tab_set)``.
"""
# Schema
schema = SchemaNode(Mapping())
if group is None:
schema.add(SchemaNode(
String(), name='group_id', validator=All(
Regex(r'^[a-zA-Z_][\w_.-]+$'), Length(max=ID_LEN))))
schema.add(SchemaNode(
String(), name='description',
validator=Length(max=DESCRIPTION_LEN), missing=''))
schema.add(SchemaNode(
String(), name='label', validator=Length(min=2, max=LABEL_LEN)))
for scope in PERM_SCOPES:
schema.add(SchemaNode(
String(), name='perm_%s' % scope,
validator=OneOf(USER_PERMS.keys()), missing='-'))
# Defaults
defaults = {}
if group is not None:
for perm in group.perms:
defaults['perm_%s' % perm.scope] = perm.level
if 'paging' in self._request.session \
and 'users' in self._request.session['paging'][1]:
defaults.update(self._request.session['paging'][1]['users'])
return (
Form(self._request, schema=schema, defaults=defaults, obj=group),
TabSet(self._request, GROUP_SETTINGS_TABS))
# -------------------------------------------------------------------------
def _create(self, values):
"""Create a group.
:param values: (dictionary)
Form values.
:return: (:class:`~..models.groups.Group` instance or ``None``)
"""
# Check existence
group_id = make_id(values['group_id'], 'token', ID_LEN)
group = DBSession.query(Group).filter_by(group_id=group_id).first()
if group is None:
label = normalize_spaces(values['label'], LABEL_LEN)
group = DBSession.query(Group).filter_by(label=label).first()
if group is not None:
self._request.session.flash(
_('This group already exists.'), 'alert')
return None
if DBSession.query(User).filter_by(login=group_id).first() is not None:
self._request.session.flash(
_('This identifier already exists.'), 'alert')
return None
# Create group
group = Group(group_id, label, values['description'])
DBSession.add(group)
DBSession.commit()
# Add permissions
for scope in PERM_SCOPES:
scope = 'perm_%s' % scope
if values[scope] != '-':
group.perms.append(GroupPerm(
group.group_id, scope[5:], values[scope]))
DBSession.commit()
return group
# -------------------------------------------------------------------------
@classmethod
def _save(cls, group, values):
"""Save a group settings.
:param group: (:class:`~..models.groups.Group` instance)
Group to update.
:param values: (dictionary)
Form values.
:return: (boolean)
``True`` if succeeds.
"""
# Update permissions
perms = dict([(k.scope, k) for k in group.perms])
for scope in PERM_SCOPES:
if values['perm_%s' % scope] != '-':
if scope not in perms:
group.perms.append(GroupPerm(
group.group_id, scope, values['perm_%s' % scope]))
else:
perms[scope].level = values['perm_%s' % scope]
elif scope in perms:
DBSession.delete(perms[scope])
DBSession.commit()
return True
# -------------------------------------------------------------------------
def _add_members(self, group_id, items):
"""Add selected users to the group.
:param group_id: (string)
Group ID.
:param items: (list)
List of user IDS.
"""
# pylint: disable = no-value-for-parameter
group = DBSession.query(Group).filter_by(
group_id=group_id).first()
user_ids = [k.user_id for k in group.users]
for user_id in items:
user_id = int(user_id)
if user_id not in user_ids:
DBSession.execute(GROUP_USER.insert().values(
group_id=group_id, user_id=user_id))
DBSession.commit()
del self._request.session['menu']