"""XML manipulation."""
from os.path import join, dirname, basename, relpath
from io import BytesIO
from textwrap import wrap
from warnings import warn
from lxml import etree
from pyramid.httpexceptions import HTTPForbidden
from pyramid.response import Response
from .i18n import _
from .utils import has_permission, normalize_name, normalize_spaces
from .utils import camel_case, make_id
from ..models.users import User
from ..models.groups import Group
from ..models.storages import Storage
from ..models.indexers import Indexer
from ..models.projects import Project
XML_NS = '{http://www.w3.org/XML/1998/namespace}'
PUBLIFORGE_RNG_VERSION = '1.0'
PF_NAMESPACE = 'http://publiforge.org/functions'
EXTENSIONS = {
'user': 'pfusr', 'group': 'pfgrp', 'indexer': 'pfidx', 'storage': 'pfstg',
'project': 'pfprj', 'processing': 'pfprc', 'pack': 'pfpck'}
# =============================================================================
[docs]def load_xml(filename, relaxngs=None, data=None, noline=False, parser=None):
"""Load an XML document and validate it against a Relax NG file.
:param str filename:
Path to XML file.
:param dict relaxngs: (optional)
Relax NG dictionary such as ``{<pattern>: <relax_ng_file>,...}``. If it
is ``None``, no validation is performed.
:type data: str, bytes or :class:`lxml.etree.ElementTree`
:param data: (optional)
Content of the XML document. If it is not ``None``, it is used in place
of the content of the file ``filename``.
:param bool noline: (default=False)
If ``True``, the error message does not contain line numbers.
:type parser: class:`etree.XMLParser`
:param parser: (optional)
Specific parser for ``etree.parse`` function.
:rtype: str, :class:`TranslationString` or :class:`ElementTree`
:return:
An error message or an instance of :class:`lxml.etree.ElementTree`
class.
"""
# Read file
# pylint: disable = protected-access
if data is None or not isinstance(data, etree._ElementTree):
if data is not None and not isinstance(data, bytes):
data = bytes(data.encode())
try:
tree = etree.parse(
filename if data is None else BytesIO(data), parser=parser)
except IOError:
return _('Unknown file "${n}"', {'n': basename(filename)})
except etree.XMLSyntaxError as error:
return str(error)
else:
tree = data
# pylint: enable = protected-access
# Validate
if relaxngs is None:
return tree
error = validate_xml(tree, relaxngs, noline)
if error is not None:
return error
return tree
# =============================================================================
[docs]def load(filename, relaxngs=None, data=None, noline=False, parser=None):
"""Legacy function."""
warn('Deprecated: use publiforge.lib.xml.load_xml()', DeprecationWarning)
return load_xml(filename, relaxngs, data, noline, parser)
# =============================================================================
[docs]def validate_xml(tree, relaxngs, noline=False):
"""Load an XML document and validate it against a Relax NG file.
:type tree: :class:`lxml.etree.ElementTree`
:param tree:
XML document.
:param dict relaxngs:
Relax NG dictionary such as ``{<pattern>: <relax_ng_file>,...}``.
:param bool noline: (default=False)
If ``True``, the error message does not contain line numbers.
:rtype: str, :class:`TranslationString` or ``None``
:return:
An error message or ``None``.
"""
# Find the right RelaxNG
relaxng = None
root = tree.getroot()
for name in relaxngs:
chunks = name.split(',')
chunk = chunks[0].strip()
if root.tag != chunk:
continue
for chunk in chunks[1:]:
chunk = chunk.split()
if root.get(chunk[0]) != chunk[1]:
chunk = None
break
if chunk is not None:
relaxng = relaxngs[name]
break
if relaxng is None:
return _('${tag}: Relax NG not found', {'tag': tree.getroot().tag})
# Load Relax NG
if isinstance(relaxng, str):
try:
relaxng = etree.RelaxNG(etree.parse(relaxng))
except IOError as error:
return str(error)
except (etree.XMLSyntaxError, etree.RelaxNGParseError) as error:
return '"{0}": {1}'.format(relaxng, error)
# Validate
if not relaxng.validate(tree):
error = relaxng.error_log.last_error
return error.message if noline else \
_('Line ${l}: ${m}', {'l': error.line, 'm': error.message})
return None
# =============================================================================
[docs]def local_text(root_elt, xpath, request=None, lang=None, default=''):
"""Return the text in local language of the ``root_elt`` element child
selected by ``xpath``.
:param root_elt: (:class:`lxml.etree.Element` instance)
Root element.
:param xpath: (string)
XPath expression.
:param request: (:class:`pyramid.request.Request` instance, optional)
Current request.
:param lang: (string, optional)
Preferred language.
:param default: (string, optional)
Default label.
:return: (string)
If label does not exist in the asked language, this method returns the
label in default language or in english or the first label or ''.
"""
if lang is None and request is None:
return default
lang = lang or request.session['lang']
lang_xpath = xpath + '[@xml:lang="%s"]'
text = root_elt.xpath(lang_xpath % lang) \
or root_elt.xpath(lang_xpath % lang.split('_')[0])
if not text and request is not None:
lang = request.registry.settings.get(
'pyramid.default_locale_name', 'en')
text = root_elt.xpath(lang_xpath % lang) \
or root_elt.xpath(lang_xpath % lang.split('_')[0])
if not text:
text = root_elt.xpath(lang_xpath % 'en')
text = (len(text) == 1 and text[0].text) \
or (root_elt.find(xpath) is not None and root_elt.find(xpath).text) \
or default
return normalize_spaces(text)
# =============================================================================
[docs]def upload_configuration(request, perm, only=None):
"""Upload a XML configuration file.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param perm: (string)
Level of permission needed.
:param only: (string, optional)
``user``, ``group``, ``storage`` or ``project``.
"""
if not has_permission(request, perm):
raise HTTPForbidden()
xml_file = request.params.get('xml_file')
if isinstance(xml_file, basestring):
return
errors = import_configuration(
request.registry.settings, xml_file.filename, only=only,
xml=xml_file.file.read(), request=request)
for error in errors:
request.session.flash(error, 'alert')
# =============================================================================
[docs]def import_configuration(settings, filename, only=None, error_if_exists=True,
xml=None, request=None):
"""Import a XML configuration file.
:param settings: (dictionary)
Application settings
:param filename: (string)
Full path to file to import.
:param only: (string, optional)
``user``, ``group``, ``storage``, ``indexer`` or ``project``.
:param error_if_exists: (boolean, default=True)
It returns an error if an item already exists.
:param xml: (string, optional)
Content of the XML document.
:param request: (:class:`pyramid.request.Request` instance, optional)
Current request.
:return: (list)
A list of error messages.
"""
# pylint: disable = R0912
# Load XML
tree = load_xml(
filename,
{'publiforge':
join(dirname(__file__), '..', 'RelaxNG', 'publiforge.rng')}, xml)
if isinstance(tree, basestring):
return (tree,)
# Load users
# pylint: disable = E1103
errors = []
if only is None or only == 'user':
for elt in tree.xpath('user|users/user'):
errors.append(User.load(settings, elt, error_if_exists))
# Load groups
if only is None or only == 'group':
for elt in tree.xpath('group|groups/group'):
errors.append(Group.load(elt, error_if_exists))
# Load storages
if only is None or only == 'storage':
for elt in tree.xpath('storage|storages/storage'):
storage = Storage.load(settings, elt, error_if_exists)
if isinstance(storage, basestring):
errors.append(storage)
elif storage is not None and request is not None:
handler = request.registry['handler']\
.get_handler(storage.storage_id, storage)
handler.clone(request)
# Load indexers
if only is None or only == 'indexer':
for elt in tree.xpath('indexer|indexers/indexer'):
errors.append(Indexer.load(elt, error_if_exists))
# Load projects
if only is None or only == 'project':
for elt in tree.xpath('project|projects/project'):
errors.append(Project.load(elt, error_if_exists))
return [k for k in errors if k is not None]
# =============================================================================
[docs]def export_configuration(elements, filename=None, command=False):
"""Export an XML configuration and return it as a
:class:`pyramid.response.Response` object.
:param elements: (list)
List of :class:`lxml.etree.Element` objects.
:param filename: (string, optional)
Name of file to export. Default to ``'publiforge'``.
:param command: (boolean, default=False)
``True`` if called by command line.
:return: (:class:`pyramid.response.Response` instance)
"""
def _label(elt):
"""Get label or name of ``elt``."""
label = elt.get('%sid' % XML_NS) or elt.get('login') \
or (elt.findtext('label') is not None and elt.findtext('label'))
return normalize_spaces(label)
# Nothing to do
if not elements:
raise HTTPForbidden()
# Create XML document
root = etree.Element('publiforge', version=PUBLIFORGE_RNG_VERSION)
# Single export
if len(elements) == 1:
label = _label(elements[0])
root.append(etree.Comment('=' * 70))
root.append(etree.Comment(u'{0:^70}'.format(label)))
root.append(etree.Comment('=' * 70))
root.append(elements[0])
filename = '%s.%s' % (label, EXTENSIONS.get(elements[0].tag, 'pf'))
# Multiple export
else:
for name in ('user', 'group', 'storage', 'indexer',
'project', 'processing', 'pack'):
elts = [k for k in elements if k is not None and k.tag == name]
if not elts:
continue
root.append(etree.Comment('=' * 70))
root.append(etree.Comment(u'{0:^70}'.format(
'%ss' % name.capitalize())))
root.append(etree.Comment('=' * 70))
grouper = etree.SubElement(root, '%ss' % name)
for elt in elts:
grouper.append(etree.Comment(' ' * 68))
grouper.append(etree.Comment(
u'{0:=^68}'.format(' %s ' % _label(elt))))
grouper.append(etree.Comment(' ' * 68))
grouper.append(elt)
filename = filename or 'publiforge'
# Command line
if command:
root = etree.ElementTree(root)
root.write(
filename, pretty_print=True, encoding='utf-8',
xml_declaration=True)
return None
# Response
response = Response(
body=etree.tostring(
root, pretty_print=True, encoding='utf-8', xml_declaration=True),
content_type='application/xml')
response.headerlist.append((
'Content-Disposition',
'attachment; filename="%s.xml"' % normalize_name(filename)))
return response
# =============================================================================
[docs]def xml_wrap(text, depth):
"""Return a dictionary with the localized texts contained in an XML
element.
:param str text:
Text to wrap and indent.
:param int depth:
Depth of the parent element in the entire XML structure.
:rtype: str
"""
indent = u' ' * 2 * (depth + 1)
return u'\n{0}\n{1}'.format(
u'\n'.join(
[u'{0}{1}'.format(indent, k)
for k in wrap(text, 79 - 2 * (depth + 1))]), u' ' * 2 * depth)
# =============================================================================
[docs]def xpath_camel_case(context, text):
"""XPath function: camel_case()."""
# pylint: disable = W0613
return camel_case(text)
# =============================================================================
[docs]def xpath_make_id(context, name, mode):
"""XPath function: make_id()."""
# pylint: disable = W0613
return make_id(name, mode)
# =============================================================================
[docs]def xpath_relpath(context, path, start):
"""XPath function: relpath()."""
# pylint: disable = W0613
return relpath(path, start)
# =============================================================================
[docs]def xpath_wrap(context, text, depth):
"""XPath function: xml_wrap()."""
# pylint: disable = unused-argument
return xml_wrap(text, int(depth))
# =============================================================================
def _relaxng(relaxngs, root):
"""Find the right Relax NG among ``relaxngs`` according to ``root``
element.
:param relaxngs: (dictionary)
:param root: (:class:`lxml.etree.Element` instance)
:return: (string or :class:`lxml.etree.RelaxNG` or ``None``)
"""
for name in relaxngs:
chunks = name.split(',')
chunk = chunks[0].strip()
if root.tag != chunk:
continue
for chunk in chunks[1:]:
chunk = chunk.split()
if root.get(chunk[0]) != chunk[1]:
chunk = None
break
if chunk is not None:
return relaxngs[name]
return None