Source code for publiforge.models.processors
"""SQLAlchemy-powered model definition for processors."""
# pylint: disable = super-on-old-class
from time import time
from os.path import join, dirname
from cStringIO import StringIO
from lxml import etree
from sqlalchemy import Column, types
from ..lib.xml import local_text, load_xml
from . import Base, DBSession, ID_LEN
# =============================================================================
[docs]class Processor(Base):
"""SQLAlchemy-powered processor model."""
__tablename__ = 'processor'
__table_args__ = {'mysql_engine': 'InnoDB'}
processor_id = Column(types.String(ID_LEN), primary_key=True)
xml = Column(types.Text())
updated = Column(types.Float())
# -------------------------------------------------------------------------
def __init__(self, processor_id):
"""Constructor method."""
super(Processor, self).__init__()
self.processor_id = processor_id.strip()[0:ID_LEN]
self.updated = time()
# -------------------------------------------------------------------------
[docs] @classmethod
def load(cls, processor_id, xml):
"""Load a processor from a XML string.
:param processor_id: (string)
Processor ID.
:param xml: (string)
XML string.
:return: (string, :class:`pyramid.i18n.TranslationString` or
:class:`Processor`)
Error message or processor object.
"""
if isinstance(xml, unicode):
xml = xml.encode('utf8')
tree = load_xml('%s processor.xml' % processor_id, {
'publiforge':
join(dirname(__file__), '..', 'RelaxNG', 'publiforge.rng')}, xml)
if isinstance(tree, basestring):
return tree
# Check if already exists
processor = DBSession.query(cls).filter_by(
processor_id=processor_id).first()
if processor is None:
processor = cls(processor_id)
processor.xml = xml.decode('utf8')
processor.updated = time()
return processor
# -------------------------------------------------------------------------
[docs] @classmethod
def ids(cls, request):
"""ID of all processors.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:return: (list)
"""
request.registry['fbuild'].refresh_agent_list(request)
return [k[0] for k in DBSession.query(cls.processor_id).all()]
# -------------------------------------------------------------------------
[docs] @classmethod
def labels(cls, request):
"""Label of all processors in local language or default language.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:return: (dictionary)
A dictionary such as ``{<processor_id>:
<processor_label>,...}``.
If some labels do not exist in the asked language, this method returns
the label in default language or ``processor_id``.
"""
request.registry['fbuild'].refresh_agent_list(request)
labels = {}
for processor in DBSession.query(cls.processor_id, cls.xml):
tree = etree.parse(StringIO(processor[1].encode('utf8')))
labels[processor[0]] = local_text(
tree.find('processor'), 'label', request, default=processor[0])
return labels
# -------------------------------------------------------------------------
[docs] @classmethod
def description(cls, request, processor_id=None, xml=None):
"""Description of processor ``processor_id`` in local language or
default language.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param processor_id: (string, optional)
Processor ID.
:param xml: (:class:`lxml.etree.ElementTree` instance, optional)
Processor XML.
:return: (string)
Description.
If some labels do not exist in the asked language, this method returns
the label in default language or ``processor_id``.
"""
if processor_id is not None:
request.registry['fbuild'].refresh_agent_list(request)
xml = DBSession.query(cls.xml).filter_by(
processor_id=processor_id).first()
if xml is not None:
xml = etree.parse(StringIO(xml[0].encode('utf8')))
if xml:
return local_text(xml.find('processor'), 'description', request)
return None