"""SQLAlchemy-powered model definition for project processings."""
# pylint: disable = super-on-old-class
from logging import getLogger
from lxml import etree
from sqlalchemy import Column, ForeignKey, types
from sqlalchemy.schema import PrimaryKeyConstraint, UniqueConstraint
from sqlalchemy.schema import ForeignKeyConstraint
from sqlalchemy.orm import relationship
from sqlalchemy.exc import IntegrityError
from ..lib.i18n import _
from ..lib.utils import normalize_spaces, export_file_set, wrap
from . import ID_LEN, LABEL_LEN, DESCRIPTION_LEN, PATH_LEN, VALUE_LEN
from . import Base, DBSession
LOG = getLogger(__name__)
PRC_FILE_TYPES = ('resource', 'template')
ADD2PACK_TARGETS = {
'result2files': _('result in files'),
'result2resources': _('result in resources'),
'result2templates': _('result in templates'),
'output2files': _('full output in files'),
'output2resources': _('full output in resources'),
'output2templates': _('full output in templates'),
'smart': _('smart packing')}
XML_NS = '{http://www.w3.org/XML/1998/namespace}'
# =============================================================================
[docs]class Processing(Base):
"""SQLAlchemy-powered project processing model."""
__tablename__ = 'processing'
__table_args__ = (
PrimaryKeyConstraint('project_id', 'processing_id'),
UniqueConstraint('processing_id'),
UniqueConstraint('project_id', 'label'),
{'mysql_engine': 'InnoDB'})
project_id = Column(
types.Integer, ForeignKey('project.project_id', ondelete='CASCADE'))
processing_id = Column(types.Integer, autoincrement=True)
label = Column(types.String(LABEL_LEN), nullable=False)
description = Column(types.String(DESCRIPTION_LEN))
processor = Column(types.String(ID_LEN), nullable=False)
output = Column(types.String(PATH_LEN))
add2pack = Column(
types.Enum(*ADD2PACK_TARGETS.keys(), name='add2pack_type_enum'))
indirect = Column(types.Boolean, default=False)
variables = relationship('ProcessingVariable', cascade='all, delete')
files = relationship('ProcessingFile', cascade='all, delete')
# -------------------------------------------------------------------------
def __init__(self, project_id, label, description, processor, output=None,
add2pack=None, indirect=False):
"""Constructor method."""
# pylint: disable = R0913
super(Processing, self).__init__()
self.project_id = project_id
self.label = normalize_spaces(label, LABEL_LEN)
self.description = normalize_spaces(description, DESCRIPTION_LEN)
self.processor = processor.strip()[0:ID_LEN]
self.output = output and output.strip()[0:PATH_LEN]
self.add2pack = add2pack
self.indirect = indirect
# -------------------------------------------------------------------------
[docs] @classmethod
def load(cls, project_id, processing_elt, check_if_exists=True):
"""Load a processing from a XML element.
:param processing_elt: (:class:`lxml.etree.Element` instance)
Processing XML element.
:param check_if_exists: (boolean, default=True)
Check if processing already exists before inserting.
:return: (:class:`pyramid.i18n.TranslationString` or integer)
Error message or integer.
"""
if processing_elt is None:
return _('nothing to do!')
label = normalize_spaces(processing_elt.findtext('label'), LABEL_LEN)
if check_if_exists:
processing = DBSession.query(cls).filter_by(
project_id=project_id, label=label).first()
if processing is not None:
return _('Processing "${l}" already exists.', {'l': label})
output = processing_elt.find('output')
processing = cls(
project_id, label,
processing_elt.findtext('description'),
processing_elt.findtext('processor').strip(),
output is not None and output.text or None,
output is not None and output.get('add2pack') or None,
processing_elt.get('indirect', False) == 'true')
# Load variables, resources and templates
for child in processing_elt.iterdescendants(tag=etree.Element):
if child.tag == 'var':
if child.get('visible') is not None or child.text is not None:
processing.variables.append(ProcessingVariable(
child.get('name'),
'' if child.text is not None and not child.text.strip()
else child.text, child.get('visible')))
if child.tag in PRC_FILE_TYPES:
processing.files.append(ProcessingFile(
child.tag, child.text.strip(), child.get('to'),
len(processing.files) + 1))
DBSession.add(processing)
try:
DBSession.commit()
except IntegrityError as error:
DBSession.rollback()
LOG.error(error)
return error
return processing.processing_id
# -------------------------------------------------------------------------
[docs] def xml(self, processor):
"""Serialize a processing to a XML representation.
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
:return: (:class:`lxml.etree.Element`)
"""
proc_elt = etree.Element('processing')
proc_elt.set('%sid' % XML_NS,
'prc%d.%d' % (self.project_id, self.processing_id))
etree.SubElement(proc_elt, 'label').text = self.label
if self.indirect:
proc_elt.set('indirect', 'true')
if self.description:
etree.SubElement(proc_elt, 'description').text = \
wrap(self.description, width=66, indent=12)
etree.SubElement(proc_elt, 'processor').text = self.processor
# Variables
if processor is not None:
self._export_variables(proc_elt, processor)
# Files
export_file_set(proc_elt, self, 'resource')
export_file_set(proc_elt, self, 'template')
if self.output or self.add2pack:
elt = etree.SubElement(proc_elt, 'output')
elt.text = self.output
if self.add2pack:
elt.set('add2pack', self.add2pack)
return proc_elt
# -------------------------------------------------------------------------
[docs] def export_build_variables(self, proc_elt, processor, pack, values):
"""Read variable definitions in processor tree and fill ``variables``
XML structure for a build.
:param proc_elt: (:class:`lxml.etree.Element` instance)
Processing element that binds the result.
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
:param pack: (:class:`~.models.packs.Pack` instance)
Current pack.
:param values: (dictionary)
Variables values.
"""
# Read values
processing_vars = dict([(k.name, k.default) for k in self.variables])
pack_vars = dict([(k.name, k.value) for k in pack.variables
if k.processing_id == self.processing_id])
vars_elt = etree.Element('variables')
for var in processor.findall('processor/variables/group/var'):
name = var.get('name')
value = None
if name in values:
value = values[name]
elif name in pack_vars:
value = pack_vars[name]
elif name in processing_vars:
value = processing_vars[name]
if value is not None and var.get('type') == 'boolean':
value = 'true' if value and value != 'false' else 'false'
if value is not None and value != var.findtext('default'):
etree.SubElement(vars_elt, 'var', name=name).text = \
unicode(value)
if len(vars_elt) + 1 > 1:
proc_elt.append(vars_elt)
# -------------------------------------------------------------------------
[docs] def update_sort(self):
"""Update ``sort`` field of ProcessingFile table."""
sorts = {'resource': 1001, 'template': 2001}
for item in sorted(self.files, key=lambda k: k.sort):
item.sort = sorts[item.file_type]
sorts[item.file_type] += 1
# -------------------------------------------------------------------------
def _export_variables(self, proc_elt, processor):
"""Read variable definitions in processor tree and fill ``variables``
XML structure.
:param proc_elt: (:class:`lxml.etree.Element` instance)
Processing element that binds the result.
:param processor: (:class:`lxml.etree.ElementTree` instance)
Processor of current processing.
"""
defaults = dict([
(k.name, (k.default, k.visible)) for k in self.variables])
if not defaults:
return
# Browse processor variables
vars_elt = etree.Element('variables')
for var in processor.findall('processor/variables/group/var'):
name = var.get('name')
if name in defaults:
elt = etree.Element('var', name=name)
if defaults[name][0] is not None \
and defaults[name][0] != var.findtext('default'):
elt.text = defaults[name][0] or ' '
if defaults[name][1] is not None and \
bool(defaults[name][1]) != bool(var.get('visible')):
elt.set('visible', str(defaults[name][1]).lower())
if elt.text is not None or elt.get('visible') is not None:
vars_elt.append(elt)
if len(vars_elt) + 1 > 1:
proc_elt.append(vars_elt)
# =============================================================================
[docs]class ProcessingVariable(Base):
"""SQLAlchemy-powered project processing variable model."""
# pylint: disable = R0903
__tablename__ = 'processing_variable'
__table_args__ = (
ForeignKeyConstraint(
['project_id', 'processing_id'],
['processing.project_id', 'processing.processing_id'],
ondelete='CASCADE'),
{'mysql_engine': 'InnoDB'})
project_id = Column(types.Integer, primary_key=True)
processing_id = Column(types.Integer, primary_key=True)
name = Column(types.String(ID_LEN), primary_key=True)
default = Column(types.String(VALUE_LEN))
visible = Column(types.Boolean)
# -------------------------------------------------------------------------
def __init__(self, name, default, visible=None):
"""Constructor method."""
super(ProcessingVariable, self).__init__()
self.name = name.strip()[0:ID_LEN]
self.default = default[0:VALUE_LEN] \
if isinstance(default, basestring) else default
if visible is not None:
self.visible = \
visible if isinstance(visible, bool) else (visible == 'true')
# =============================================================================
[docs]class ProcessingFile(Base):
"""SQLAlchemy-powered project processing file model."""
# pylint: disable = R0903
__tablename__ = 'processing_file'
__table_args__ = (
ForeignKeyConstraint(
['project_id', 'processing_id'],
['processing.project_id', 'processing.processing_id'],
ondelete='CASCADE'),
{'mysql_engine': 'InnoDB'})
project_id = Column(types.Integer, primary_key=True)
processing_id = Column(types.Integer, primary_key=True)
file_type = Column(
types.Enum(*PRC_FILE_TYPES, name='prcfil_type_enum'), primary_key=True)
path = Column(types.String(PATH_LEN), primary_key=True)
target = Column(types.String(PATH_LEN))
sort = Column(types.Integer, default=0)
# -------------------------------------------------------------------------
def __init__(self, file_type, path, target=None, sort=None):
"""Constructor method."""
super(ProcessingFile, self).__init__()
self.file_type = file_type
self.path = path.strip()[0:PATH_LEN]
self.target = (target and target.strip()[0:PATH_LEN]) \
or (file_type == 'template' and
self.path.partition('/')[2][0:PATH_LEN]) or None
self.sort = sort