Source code for publiforge.views.pack

# pylint: disable = C0322
"""Pack view callables."""

from os import sep
from colander import Mapping, SchemaNode, String, Boolean, Integer
from colander import Length, OneOf
from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError

from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound, HTTPNotFound, HTTPForbidden

from .selection import Selection
from ..lib.i18n import _
from ..lib.log import log_activity
from ..lib.utils import MIME_TYPES, normalize_spaces, age, rst2xhtml
from ..lib.utils import shift_files
from ..lib.viewutils import get_action, get_selection, current_project
from ..lib.viewutils import file_details, task_auto_build, current_storage
from ..lib.viewutils import file_upload, selection2container
from ..lib.packutils import pack_download, operator_label, operator_labels
from ..lib.packutils import pack_upload_settings, pack_upload_content
from ..lib.packutils import paging_packs
from ..lib.xml import export_configuration
from ..lib.form import Form
from ..lib.tabset import TabSet
from ..lib.paging import PAGE_SIZES, Paging
from ..models import LABEL_LEN, DESCRIPTION_LEN, PATH_LEN
from ..models import DBSession, close_dbsession
from ..models.tasks import Task
from ..models.jobs import JobPack
from ..models.packs import FILE_TYPE_MARKS, Pack, PackFile, PackEvent


PACK_SETTINGS_TABS = (
    _('Description'), _('Files'), _('Note'), _('Task'), _('History'))
FILE_TYPE_LABELS = {
    'file': _('Files to process'), 'resource': _('Resource files'),
    'template': _('Template files')}


# =============================================================================
[docs]class PackView(object): """Class to manage packs.""" # ------------------------------------------------------------------------- def __init__(self, request): """Constructor method.""" request.add_finished_callback(close_dbsession) self._request = request # -------------------------------------------------------------------------
[docs] @view_config(route_name='pack_index', renderer='../Templates/pck_index.pt') @view_config( route_name='pack_index_pack', renderer='../Templates/pck_index.pt') @view_config(route_name='pack_index', renderer='json', xhr=True) @view_config(route_name='pack_index_pack', renderer='json', xhr=True) def index(self): """List all packs of a project.""" # Action project = current_project(self._request) if project['entries'] not in ('all', 'packs'): raise HTTPForbidden() action, items = self._index_action(project) if self._request.is_xhr: return None if action[0:4] == 'bld!': if len(items) == 1: pack = DBSession.query(Pack).filter_by( project_id=project['project_id'], pack_id=int(items[0])).first() self._request.breadcrumbs.add( _('Packs'), root_chunks=3, anchor='p%d' % pack.pack_id) self._request.session['project']['pack_id'] = pack.pack_id return HTTPFound(self._request.route_path( 'build_launch', project_id=project['project_id'], processing_id=project['processing_id'], pack_ids='_'.join(items))) elif action[0:4] == 'dnl!': return pack_download( self._request, project['project_id'], items[0]) elif action[0:4] == 'exp!': elements = [] for pack in DBSession.query(Pack)\ .filter_by(project_id=project['project_id'])\ .filter(Pack.pack_id.in_(items)).order_by('label'): elements.append(pack.xml()) if len(items) == 1: self._request.session['project']['pack_id'] = pack.pack_id return export_configuration( elements, '%s_packs.pfpck' % project['label']) # List of packs and current pack paging, defaults = paging_packs(self._request, project['project_id']) defaults['processing_id'] = str(project['processing_id']) pack = self._current_pack(project['project_id']) files = file_details(self._request, pack) # Breadcrumb trail self._request.breadcrumbs.add( _('Packs'), root_chunks=3, anchor=self._request.session['project']['pack_id'] is not None and 'p%d' % self._request.session['project']['pack_id'] or None) return { 'form': Form(self._request, defaults=defaults), 'sep': sep, 'paging': paging, 'action': action, 'project': project, 'FILE_TYPE_MARKS': FILE_TYPE_MARKS, 'MIME_TYPES': MIME_TYPES, 'PAGE_SIZES': PAGE_SIZES, 'pack_id': pack and pack.pack_id, 'files': files, 'i_packeditor': project['perm'] in ('leader', 'packmaker', 'packeditor'), 'i_packmaker': project['perm'] in ('leader', 'packmaker')}
# -------------------------------------------------------------------------
[docs] @view_config(route_name='pack_view', renderer='../Templates/pck_view.pt') def view(self): """Display a pack.""" # Environment project = current_project(self._request) pack = self._current_pack(project['project_id']) files = file_details(self._request, pack, False) if pack is None: raise HTTPNotFound(comment=_('Unknown pack!')) tab_set = TabSet(self._request, PACK_SETTINGS_TABS) operator = pack.task_id and operator_label( self._request, project, pack.operator_type, pack.operator_id) self._request.session['project']['pack_id'] = pack.pack_id # Action action = get_action(self._request)[0] if action == 'exp!': return export_configuration((pack.xml(),)) elif action[0:4] == 'sel!': Selection(self._request).add((action[4:],)) self._request.session.flash(_('Selection updated.')) # Breadcrumbs self._request.breadcrumbs.add( _('Pack settings'), replace=self._request.route_path( 'pack_edit', project_id=project['project_id'], pack_id=pack.pack_id)) return { 'sep': sep, 'age': age, 'tab_set': tab_set, 'action': 'none', 'project': project, 'pack': pack, 'files': files, 'file_type_labels': FILE_TYPE_LABELS, 'MIME_TYPES': MIME_TYPES, 'PAGE_SIZES': PAGE_SIZES, 'note': rst2xhtml(pack.note), 'operator': operator, 'events': self._paging_events(pack)[0], 'i_packeditor': project['perm'] in ( 'leader', 'packmaker', 'packeditor'), 'i_packmaker': project['perm'] in ('leader', 'packmaker')}
# -------------------------------------------------------------------------
[docs] @view_config(route_name='pack_create', renderer='../Templates/pck_edit.pt') def create(self): """Create a pack.""" project = current_project(self._request) if project['perm'] not in ('leader', 'packmaker'): return HTTPForbidden() form, tab_set = self._settings_form(project)[0:2] if form.validate(): pack = self._create(project['project_id'], form.values) if pack is not None: self._request.breadcrumbs.pop() log_activity( self._request, 'pack_create', u'"{0}"'.format(project['label']), u'"{0}"'.format(pack.label)) return HTTPFound(self._request.route_path( 'pack_edit', project_id=pack.project_id, pack_id=pack.pack_id, _anchor='tab1')) self._request.breadcrumbs.add(_('Pack creation')) return { 'form': form, 'tab_set': tab_set, 'project': project, 'pack': None}
# -------------------------------------------------------------------------
[docs] @view_config(route_name='pack_edit', renderer='../Templates/pck_edit.pt') @view_config(route_name='pack_edit', renderer='json', xhr=True) def edit(self): """Edit a pack.""" # Environment project = current_project(self._request) if project['perm'] not in ('leader', 'packmaker', 'packeditor'): return HTTPForbidden() pack = self._current_pack(project['project_id']) if pack is None: raise HTTPNotFound(comment=_('Unknown pack!')) self._request.session['project']['pack_id'] = pack.pack_id # Action action, storage, force = self._edit_action(project, pack) if self._request.is_xhr: return None files = file_details(self._request, pack, False) form, tab_set, operators = self._settings_form(project, pack, force) if action == 'add!': self._request.session.flash( _('Add files to your selection and return to the pack.')) return HTTPFound(self._request.route_path( 'storage_index', _query={'crumbs': 10})) if action[0:3] in ('mup', 'dwn'): shift_files(action[0:3], action[4:], pack, files, form) DBSession.commit() elif action == 'sav!' and form.validate() \ and self._save(operators, pack, form.values): return HTTPFound(self._request.route_path( 'pack_view', project_id=project['project_id'], pack_id=pack.pack_id)) # Breadcrumbs self._request.breadcrumbs.add( _('Pack settings'), replace=self._request.route_path( 'pack_view', project_id=project['project_id'], pack_id=pack.pack_id)) return { 'form': form, 'sep': sep, 'age': age, 'tab_set': tab_set, 'action': action, 'project': project, 'pack': pack, 'files': files, 'storage': storage, 'file_type_labels': FILE_TYPE_LABELS, 'MIME_TYPES': MIME_TYPES, 'PAGE_SIZES': PAGE_SIZES, 'operators': operators, 'events': self._paging_events(pack)[0]}
# ------------------------------------------------------------------------- def _settings_form(self, project, pack=None, force_defaults=False): """Return a pack settings form. :param project: (dictionary) Current project dictionary. :param pack: (:class:`~.models.packs.Pack` instance, optional) Current pack object. :param force_defaults: (boolean) ``defaults`` overrides ``POST``. :return: (tuple) A tuple such as ``(form, tab_set, operators)``. ``operators`` is a list of tuples such as ``(user_id, user_name)``. """ schema = SchemaNode(Mapping()) schema.add(SchemaNode( String(), name='label', validator=Length(min=3, max=LABEL_LEN))) schema.add(SchemaNode( String(), name='description', validator=Length(max=DESCRIPTION_LEN), missing=None)) schema.add(SchemaNode(Boolean(), name='recursive', missing=False)) schema.add(SchemaNode(String(), name='note', missing=None)) defaults = {} operators = {} if pack is not None: # Files index = {'file': 0, 'resource': 0, 'template': 0} for item in sorted(pack.files, key=lambda k: k.sort): if item.file_type == 'template': schema.add(SchemaNode( String(), name='template_%s' % item.path.encode('utf8'), validator=Length(max=PATH_LEN))) defaults['template_%s' % item.path] = \ item.target name = ('%s_%d_see' % (item.file_type, index[item.file_type])) schema.add(SchemaNode(Boolean(), name=name, missing=False)) defaults[name] = item.visible index[item.file_type] += 1 # Current task operators = operator_labels(project) schema.add(SchemaNode( Integer(), name='task_id', missing=None, validator=OneOf(project['task_labels'].keys()))) schema.add(SchemaNode( String(), name='operator', missing=None, validator=OneOf([k[0] for k in operators]))) defaults['task_id'] = pack.task_id defaults['operator'] = '%s%d' % ( pack.operator_type, pack.operator_id) if pack.operator_id \ else None form = Form( self._request, schema=schema, defaults=defaults, obj=pack, force_defaults=force_defaults) if force_defaults: for name in defaults: if name.startswith('file_'): form.static(name) return (form, TabSet(self._request, PACK_SETTINGS_TABS), operators) # ------------------------------------------------------------------------- def _create(self, project_id, values): """Create a record in ``packs`` table. :param project_id: (string) Project ID. :param values: (dictionary) Values to record. :return: (:class:`~.models.packs.Pack` instance) """ # Check unicity and create pack label = normalize_spaces(values['label'], LABEL_LEN) if DBSession.query(Pack) \ .filter_by(project_id=project_id, label=label).first(): self._request.session.flash( _('This pack already exists.'), 'alert') return None # Create record pack = Pack(project_id, label, values['description']) try: DBSession.add(pack) DBSession.commit() except IntegrityError: self._request.session.flash( _('This pack already exists.'), 'alert') return None return pack # ------------------------------------------------------------------------- def _save(self, operators, pack, values): """Save pack settings. :param operators: (list) List of operators. :param pack: (:class:`~..models.packs.Pack` instance) Pack to update. :param values: (dictionary) Form values. :return: (boolean) ``True`` if succeeds. """ pack.label = normalize_spaces(values['label'], LABEL_LEN) pack.description = normalize_spaces( values['description'], DESCRIPTION_LEN) pack.recursive = values['recursive'] pack.note = values['note'].strip() \ if values['note'] and values['note'].strip() else None # Files index = {'file': 0, 'resource': 0, 'template': 0} for item in sorted(pack.files, key=lambda k: k.sort): path = item.path.encode('utf8') item.visible = values[ ('%s_%d_see' % (item.file_type, index[item.file_type]))] index[item.file_type] += 1 if item.file_type == 'template': item.target = values['template_%s' % path] if 'task' in self._request.session and \ self._request.session['task']['pack_id'] == pack.pack_id: self._request.session['task']['files'] = file_details( self._request, pack) # Task old_task = pack.task_id old_operator = '%s%s' % (pack.operator_type, pack.operator_id) if values['task_id']: pack.task_id = values['task_id'] task = DBSession.query(Task).filter_by( project_id=pack.project_id, task_id=pack.task_id).first() if task.operator_type == 'auto': pack.operator_type = 'auto' pack.operator_id = None elif values['operator']: pack.operator_type = values['operator'][0:4] pack.operator_id = int(values['operator'][4:]) else: pack.operator_type = task.operator_type pack.operator_id = task.operator_id else: task = None pack.task_id = None pack.operator_type = None pack.operator_id = None DBSession.commit() # If new task if task is not None and \ (pack.task_id != old_task or '%s%s' % (pack.operator_type, pack.operator_id) != old_operator): operator = dict(operators).get( '%s%s' % (pack.operator_type, str(pack.operator_id))) \ or (task is None and self._request.session['name']) \ or _('Automatic') operator = self._request.localizer.translate(operator) DBSession.add(PackEvent( pack.project_id, pack.pack_id, pack.task_id, task.label or '', pack.operator_type, pack.operator_id, '%s *' % operator)) DBSession.commit() if task and task.operator_type == 'auto': task_auto_build(self._request, pack, task) return True # ------------------------------------------------------------------------- def _index_action(self, project): """Process actions for index of packs. :param project: (dictionary) Current project dictionary. :return: (tuple) A tuple such as ``(action, items)``. """ # Authorization action, items = get_action(self._request) if project['perm'] not in ('leader', 'packmaker', 'packeditor') and \ action[0:4] == 'get!': raise HTTPForbidden(comment=_("You can't modify pack!")) if project['perm'] not in ('leader', 'packmaker') and \ action[0:4] in ('ipk!', 'imp!', 'del!'): raise HTTPForbidden(comment=_("You can't create or delete pack!")) # Action if action[0:4] == 'get!': pack = DBSession.query(Pack).filter_by( project_id=project['project_id'], pack_id=int(action[4:]))\ .first() selection2container( self._request, 'pck', pack, 'file', get_selection(self._request)) action = '' elif action[0:4] == 'sel!': Selection(self._request).add((action[4:],)) action = '' elif action[0:4] == 'ipk!': pack_upload_content( self._request, project['project_id'], self._request.params.get('message')) action = '' elif action[0:4] == 'imp!': pack_upload_settings(self._request, project['project_id']) action = '' elif action[0:4] == 'del!': items = [int(k) for k in items] if self._request.session['project']['pack_id'] in items: self._request.session['project']['pack_id'] = None if 'task' in self._request.session \ and self._request.session['task']['pack_id'] in items: self._request.session['task']['pack_id'] = None self._request.session['task']['files'] = [] DBSession.query(JobPack)\ .filter_by(project_id=project['project_id'])\ .filter(JobPack.pack_id.in_(items)).delete('fetch') for dbpack in DBSession.query(Pack)\ .filter_by(project_id=project['project_id'])\ .filter(Pack.pack_id.in_(items)): log_activity( self._request, 'pack_delete', u'"{0}"'.format(project['label']), u'"{0}"'.format(dbpack.label)) DBSession.delete(dbpack) DBSession.commit() action = '' return action, items # ------------------------------------------------------------------------- def _edit_action(self, project, pack): """Process actions for edition. :param project: (dictionary) Current project dictionary. :param pack: (:class:`~.models.packs.Pack` instance) Current pack object. :return: (tuple) A tuple such as ``(action, storage, force)``. """ action = get_action(self._request)[0] storage = None force = False if action[0:4] == 'get!': selection2container( self._request, 'pck', pack, action[4:], get_selection(self._request)) force = True elif action[0:4] == 'rmv!': DBSession.query(PackFile).filter_by( project_id=project['project_id'], pack_id=pack.pack_id, file_type=action[4:].partition('_')[0], path=action[4:].partition('_')[2]).delete() DBSession.commit() force = True if 'task' in self._request.session and \ self._request.session['task']['pack_id'] == pack.pack_id: self._request.session['task']['files'] = file_details( self._request, pack) elif action[0:4] == 'upl?': storage = current_storage( self._request, action.partition('_')[2].split('/')[0])[0] elif action[0:4] == 'upl!': storage = current_storage( self._request, action[4:].split('/')[0])[0] file_upload( self._request, action[4:], self._request.params.get('message')) action = '' elif action == 'del!': DBSession.query(PackEvent).filter_by( project_id=project['project_id'], pack_id=pack.pack_id)\ .delete() DBSession.commit() return action, storage, force # ------------------------------------------------------------------------- def _current_pack(self, project_id): """Return the current pack object. :param project_id: (string) Current project ID. :return: (:class:`~.models.packs.Pack` instance, file_dictionary_or_list) ``request.session['project']`` is updated with ``pack_id``. """ # Pack pack_id = self._request.matchdict.get('pack_id') if pack_id is None: return None pack_id = int(pack_id) pack = DBSession.query(Pack).filter_by( project_id=project_id, pack_id=pack_id).first() if pack is None: return None # Set current pack if self._request.params.get('open'): self._request.session['project']['pack_id'] = pack.pack_id return pack # ------------------------------------------------------------------------- def _paging_events(self, pack): """Return a :class:`~..lib.widget.Paging` object filled with events for pack ``pack_id`` of project ``project_id``. :param pack: (:class:`~..models.packs.Pack` instance) Current pack. :return: (tuple) A tuple such as ``(paging, filters)`` where ``paging`` is a :class:`~..lib.widget.Paging` object and ``filters`` a dictionary of filters. """ # Parameters params = Paging.params(self._request, 'events', '-begin') # Query query = DBSession.query(PackEvent).filter_by( project_id=pack.project_id, pack_id=pack.pack_id) # Order by oby = getattr(PackEvent, params['sort'][1:]) query = query.order_by(desc(oby) if params['sort'][0] == '-' else oby) return Paging(self._request, 'events', query), params