# pylint: disable = C0322
"""Processing view callables."""
from os import sep
from os.path import join, normpath
from colander import Mapping, SchemaNode, Boolean
from colander import String, Length, OneOf
from webhelpers2.html import literal
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from pyramid.response import Response
from .selection import Selection
from ..lib.i18n import _
from ..lib.utils import MIME_TYPES, has_permission, normalize_spaces
from ..lib.utils import shift_files
from ..lib.xml import export_configuration, local_text
from ..lib.form import Form
from ..lib.viewutils import get_action, current_project, current_storage
from ..lib.viewutils import current_processing, current_processing_output
from ..lib.viewutils import file_details, file_upload, variable_schema
from ..lib.viewutils import variable_input, variable_description
from ..lib.viewutils import get_selection, selection2container
from ..lib.tabset import TabSet
from ..views.pack import FILE_TYPE_LABELS
from ..models import LABEL_LEN, DESCRIPTION_LEN, PATH_LEN
from ..models import DBSession, close_dbsession
from ..models.processors import Processor
from ..models.processings import ADD2PACK_TARGETS
from ..models.processings import Processing, ProcessingFile, ProcessingVariable
PROCESSING_SETTINGS_TABS = (
_('Description'), _('Variables'), _('Files'), _('Output'))
# =============================================================================
[docs]class ProcessingView(object):
"""Class to manage processings."""
# -------------------------------------------------------------------------
def __init__(self, request):
"""Constructor method."""
request.add_finished_callback(close_dbsession)
self._request = request
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='processing_view', renderer='../Templates/prc_view.pt')
@view_config(route_name='processing_view', renderer='json', xhr=True)
def view(self):
"""Display processing settings."""
# Permission
project = current_project(self._request)
i_editor = project['perm'] == 'leader' \
or has_permission(self._request, 'prj_editor')
# Environment
processing, processor, output = current_processing(self._request)
variable_defaults = self._variable_defaults(
processor, processing.variables)
tab_set = TabSet(self._request, PROCESSING_SETTINGS_TABS)
# Action
action, files, description = self._action(
processing, processor, variable_defaults, i_editor)[0:3]
if self._request.is_xhr:
return Response(description, content_type='text/html')
if action == 'exp!':
if not processing.processor.startswith('Parallel'):
return export_configuration((processing.xml(processor),))
self._request.session.flash(_(
'You cannot export this processing.'), 'alert')
elif action[0:4] == 'sel!':
Selection(self._request).add((action[4:],))
self._request.session.flash(_('Selection updated.'))
# Breadcrumbs
self._request.breadcrumbs.add(
_('Processing settings'), replace=self._request.route_path(
'processing_edit', project_id=project['project_id'],
processing_id=processing.processing_id))
return {
'sep': sep, 'tab_set': tab_set, 'project': project,
'processing': processing, 'processor': processor,
'variables': variable_defaults, 'files': files,
'file_type_labels': FILE_TYPE_LABELS, 'MIME_TYPES': MIME_TYPES,
'output': output, 'ADD2PACK_TARGETS': ADD2PACK_TARGETS,
'description': description, 'i_editor': i_editor}
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='processing_create', renderer='../Templates/prc_edit.pt')
def create(self):
"""Create a processing."""
# Permission
project = current_project(self._request)
if project['perm'] != 'leader' \
and not has_permission(self._request, 'prj_editor'):
raise HTTPForbidden()
# Environment
processor_id = self._request.params.get('processor_id')
processor_labels = Processor.labels(self._request)
schema = SchemaNode(Mapping())
schema.add(SchemaNode(
String(), name='processor_id',
validator=OneOf(processor_labels.keys())))
schema.add(SchemaNode(
String(), name='label', validator=Length(min=2, max=LABEL_LEN)))
schema.add(SchemaNode(
String(), name='description', missing='',
validator=Length(max=DESCRIPTION_LEN)))
schema.add(SchemaNode(Boolean(), name='indirect', missing=False))
form = Form(self._request, schema=schema)
tab_set = TabSet(self._request, PROCESSING_SETTINGS_TABS)
# Action
action = get_action(self._request)[0]
description = None
if action == 'des!' and processor_id:
description = Processor.description(self._request, processor_id)
description = literal(
'<b>%s (%s)</b><br/>%s' % (
processor_labels.get(processor_id) or '', processor_id,
description or ''))
if self._request.is_xhr:
return Response(description, content_type='text/html')
if action == 'sav!' and form.validate():
processing = self._create(project['project_id'], form.values)
if processing is not None:
if 'project' in self._request.session:
del self._request.session['project']
self._request.breadcrumbs.pop()
return HTTPFound(self._request.route_path(
'processing_edit', project_id=processing.project_id,
processing_id=processing.processing_id))
self._request.breadcrumbs.add(_('Processing creation'))
return {
'form': form, 'tab_set': tab_set, 'project': project,
'processing': None, 'processor_labels': processor_labels,
'description': description}
# -------------------------------------------------------------------------
[docs] @view_config(
route_name='processing_edit', renderer='../Templates/prc_edit.pt')
@view_config(route_name='processing_edit', renderer='json', xhr=True)
def edit(self):
"""Edit a processing."""
# Permission
project = current_project(self._request)
if project['perm'] != 'leader' and not has_permission(
self._request, 'prj_editor'):
raise HTTPForbidden()
# Environment
processing, processor, output = current_processing(self._request)
variable_defaults = self._variable_defaults(
processor, processing.variables)
# Action
action, files, description, storage = self._action(
processing, processor, variable_defaults, True)
if self._request.is_xhr and action[0:4] == 'des!':
return Response(description, content_type='text/html')
form, tab_set = self._settings_form(
processing, processor, variable_defaults)
if action == 'sav!' and form.validate() \
and self._save(processing, processor, form.values):
action = 'view'
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
if action == 'view':
return HTTPFound(self._request.route_path(
'processing_view', project_id=processing.project_id,
processing_id=processing.processing_id))
elif action[0:4] == 'get!' and action[4:] != 'output':
selection2container(
self._request, 'prc', processing, action[4:],
get_selection(self._request))
files = file_details(self._request, processing, False)
elif action[0:4] == 'get!':
selection2container(
self._request, 'prc', processing, 'output',
get_selection(self._request))
output = current_processing_output(self._request, processing)
form.values['output'] = output['path']
form.static('output')
elif action == 'add!':
self._request.session.flash(
_('Add files to your selection and return to the processing.'))
return HTTPFound(self._request.route_path(
'storage_index', _query={'crumbs': 10}))
elif action[0:4] == 'rst!':
name = action[4:]
var = processor.xpath(
'processor/variables/group/var[@name="%s"]' % name)
if len(var) == 1 and var[0].findtext('default') is not None:
value = var[0].findtext('default').strip()
form.values[name] = value == 'true' \
if var[0].get('type') == 'boolean' else value
form.values['%s_def' % name] = form.values[name]
form.static(name)
form.static('%s_def' % name)
elif action[0:3] in ('mup', 'dwn'):
shift_files(action[0:3], action[4:], processing, files, form)
DBSession.commit()
if self._request.is_xhr:
return None
# Breadcrumbs
self._request.breadcrumbs.add(
_('Processing settings'), replace=self._request.route_path(
'processing_view', project_id=processing.project_id,
processing_id=processing.processing_id))
return {
'form': form, 'sep': sep, 'tab_set': tab_set, 'action': action,
'file_type_labels': FILE_TYPE_LABELS, 'MIME_TYPES': MIME_TYPES,
'project': project, 'processing': processing,
'processor': processor, 'variables': variable_defaults,
'files': files, 'storage': storage, 'output': output,
'ADD2PACK_TARGETS': ADD2PACK_TARGETS, 'description': description,
'variable_input': variable_input}
# -------------------------------------------------------------------------
def _settings_form(self, processing, processor, variable_defaults):
"""Return a processing settings form.
:param processing: (:class:`~..models.processings.Processing` instance)
Current processing object.
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
:param variable_defaults: (dictionary)
Defaultvalues affected to variables of current processing.
:return: (tuple)
A tuple such as ``(form, tab_set)``
"""
schema, defaults = variable_schema(processor, variable_defaults)
schema.add(SchemaNode(
String(), name='label', validator=Length(min=2, max=LABEL_LEN)))
schema.add(SchemaNode(
String(), name='description',
validator=Length(max=DESCRIPTION_LEN), missing=''))
schema.add(SchemaNode(Boolean(), name='indirect', missing=False))
defaults['label'] = processing.label
defaults['description'] = processing.description
defaults['indirect'] = processing.indirect
if not processing.output \
and processor.findtext('processor/output') is not None:
processing.output = processor.findtext('processor/output').strip()
processing.add2pack = \
processor.find('processor/output').get('add2pack')
DBSession.commit()
if processing.output:
schema.add(SchemaNode(
String(), name='output',
validator=Length(max=PATH_LEN), missing=''))
schema.add(SchemaNode(
String(), name='add2pack',
validator=OneOf(ADD2PACK_TARGETS.keys()), missing=None))
defaults['output'] = processing.output.partition(sep)[2]
defaults['add2pack'] = processing.add2pack
for item in processing.files:
if item.file_type == 'template':
schema.add(SchemaNode(
String(), name='template_%s' % item.path,
validator=Length(max=PATH_LEN)))
defaults['template_%s' % item.path] = item.target
return (Form(self._request, schema=schema, defaults=defaults),
TabSet(self._request, PROCESSING_SETTINGS_TABS))
# -------------------------------------------------------------------------
def _action(self, processing, processor, variable_values,
show_processor_defaults=False):
"""Return current action and help message.
:param processing:
(:class:`~..models.processings.Processing` instance)
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
:param variable_values: (dictionary)
Values affected to the variables of current processing.
:param files: (dictionary)
See :func:`~.lib.viewutils.file_details` function.
:param show_processor_defaults: (boolean, default=False)
Show the processor default value if any.
:return: (tuple)
A tuple such as ``(action, files, description, storage)``.
"""
action = get_action(self._request)[0]
files = None
description = None
store = None
if action == 'des!':
description = Processor.description(self._request, xml=processor)
description = literal('<b>%s</b> (%s)<br/>%s' % (
local_text(processor, 'processor/label', self._request),
processing.processor, description or ''))
elif action[0:4] == 'des!':
description = variable_description(
self._request, processor.find('processor'),
action[4:], variable_values, show_processor_defaults)
elif action[0:4] == 'rmv!':
DBSession.query(ProcessingFile).filter_by(
project_id=processing.project_id,
processing_id=processing.processing_id,
file_type=action[4:].partition('_')[0],
path=action[4:].partition('_')[2]).delete()
DBSession.commit()
files = file_details(self._request, processing, False)
elif action[0:4] == 'upl?':
store = current_storage(
self._request, action.partition('_')[2].split(sep)[0])[0]
elif action[0:4] == 'upl!':
store = current_storage(
self._request, action[4:].split(sep)[0])[0]
file_upload(
self._request, action[4:], self._request.params.get('message'))
action = ''
if files is None:
files = file_details(self._request, processing, False)
return action, files, description, store
# -------------------------------------------------------------------------
def _create(self, project_id, values):
"""Create a record in ``projects_processings`` table.
:param project_id: (string)
Project ID.
:param values: (dictionary)
Values to record.
:return:
(:class:`~..models.processings.Processing` instance)
"""
# Check unicity and create processing
label = normalize_spaces(values['label'], LABEL_LEN)
processing = DBSession.query(Processing).filter_by(
project_id=project_id, label=label).first()
if processing is not None:
self._request.session.flash(
_('This processing already exists.'), 'alert')
return None
processing = Processing(
project_id, label, values['description'], values['processor_id'],
indirect=values['indirect'])
# Create variables
processor = self._request.registry['fbuild'].processor(
self._request, values['processor_id'])
for var in processor.findall('processor/variables/group/var'):
if var.findtext('default') is not None:
value = var.findtext('default').strip()
processing.variables.append(
ProcessingVariable(var.get('name'), value, value))
# Create output
if processor.findtext('processor/output') is not None:
processing.output = processor.findtext('processor/output').strip()
processing.add2pack = \
processor.find('processor/output').get('add2pack')
DBSession.add(processing)
DBSession.commit()
return processing
# -------------------------------------------------------------------------
def _save(self, processing, processor, values):
"""Save a processing settings.
:param processing: (:class:`~..models.processings.Processing` instance)
Processing to update.
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
:param values: (dictionary)
Form values.
:return: (boolean)
``True`` if succeeds.
"""
processing.label = normalize_spaces(values['label'], LABEL_LEN)
processing.description = normalize_spaces(
values['description'], DESCRIPTION_LEN)
processing.indirect = values['indirect']
DBSession.query(ProcessingVariable).filter_by(
project_id=processing.project_id,
processing_id=processing.processing_id).delete()
for var in processor.findall('processor/variables/group/var'):
name = var.get('name')
default = var.findtext('default', '').strip()
if var.get('type') == 'integer':
default = int(default or '0')
elif var.get('type') == 'boolean':
default = (default == 'true')
if values.get(name) != default \
or values.get('%s_see' % name) != bool(var.get('visible')):
processing.variables.append(ProcessingVariable(
name,
values.get(name) if values.get(name) != default else None,
values.get('%s_see' % name)
if values.get('%s_see' % name) != bool(var.get('visible'))
else None))
for item in processing.files:
if item.file_type == 'template':
item.target = values['template_%s' % item.path]
if processing.output:
processing.output = normpath(join(
processing.output.partition(sep)[0],
values['output'].replace('..', '')))
processing.add2pack = values['add2pack']
DBSession.commit()
del self._request.session['project']
if 'build' in self._request.session:
del self._request.session['build']
return True
# -------------------------------------------------------------------------
@classmethod
def _variable_defaults(cls, processor, variables):
"""Return a dictionary of variable default values.
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
:param variables: (:class:`~.models.processings.ProcessingVariable`)
Variables of current processing.
:return: (dictionay)
A dictionary such as {<name>: (<default>, <visible>, <label>),...}.
"""
variables = dict([(k.name, k) for k in variables])
defaults = {}
for var in processor.findall('processor/variables/group/var'):
name = var.get('name')
default = variables[name].default if name in variables \
and variables[name].default is not None \
else var.findtext('default')
visible = variables[name].visible if name in variables \
and variables[name].visible is not None \
else bool(var.get('visible'))
label = None
if default is not None and var.get('type') == 'select':
label = dict([
(k.get('value', k.text), k.text)
for k in var.findall('option')]).get(default, default)
defaults[name] = (default, visible, label)
return defaults