"""A file *opener* is a class to display, render and edit some specific
kind of file."""
# pylint: disable = no-name-in-module
from logging import getLogger
from os import listdir, makedirs
from os.path import join, exists, dirname, isfile, normpath
from os.path import sep, splitext
from locale import getdefaultlocale
from shutil import rmtree, copy
from ConfigParser import ConfigParser
from fnmatch import fnmatch
from collections import OrderedDict
from cgi import escape
import re
from pygments import highlight
from pygments.lexers import get_lexer_for_filename
from pygments.lexers import XmlLexer
from pygments.util import ClassNotFound
from pygments.formatters import HtmlFormatter
from webhelpers2.html import literal
from colander import SchemaNode, Mapping, String, Length
from lxml import etree
from pyramid.asset import abspath_from_asset_spec
from ..i18n import _, localizer
from ..config import settings_get_list
from ..utils import copy_content, decrypt, normalize_spaces
from ..viewutils import variable_schema_node
from ..form import Form
from ..xml import load_xml, local_text
from ..viewutils import current_storage
from ...models import ID_LEN, DBSession
from ...models.storages import StorageUser
LOG = getLogger(__name__)
PYGMENT_CSS = '/Static/Css/pygments.css'
PYGMENT_MAX_SIZE = 8388608
CODEMIRROR_MAX_SIZE = 2097152
CODEMIRROR = '/Static/Js/codemirror.js'
CODEMIRROR_LOADER = '/Static/Js/codemirror_loader.js'
CODEMIRROR_CSS = '/Static/Css/codemirror.css'
CODEMIRROR_CLASS = 'editor'
MAIN_ROUTE = '/Static'
OPENER_ROUTE = 'file/opener/%s'
OPENER_DEFAULTS = [
'ImageEdit', 'Image', 'Publiset', 'Xml', 'Ini', 'Plain', 'NoMore']
# =============================================================================
[docs]class OpenerManager(object):
"""This class manages file openers.
:param dict settings:
Setting dictionary.
One instance of :class:`OpenerManager` is created during application
initialization. It is only used in front mode. It is stored in application
registry.
``self._openers`` is a 'sorted by preference' list of available file
openers.
"""
# -------------------------------------------------------------------------
def __init__(self, settings):
"""Constructor method."""
# Empty cache directory
self.settings = settings
if not settings.get('opener.cache'):
exit('*** Must define a cache directory for openers.')
return
if not exists(settings['opener.cache']):
makedirs(settings['opener.cache'])
# Read all available openers
roots = settings_get_list(settings, 'opener.roots')
roots.append(join(dirname(__file__), '..', '..', 'Openers'))
self.opener_roots = {}
for root in roots:
root = normpath(abspath_from_asset_spec(root))
if not exists(root):
continue
for opener_id in listdir(root):
if opener_id not in self.opener_roots and \
exists(join(root, opener_id, 'opener.xml')):
self.opener_roots[opener_id] = join(root, opener_id)
# Create listed openers
activated = settings_get_list(settings, 'opener.list') \
+ OPENER_DEFAULTS
self._openers = OrderedDict()
for pattern in activated:
for opener_id in sorted(self.opener_roots, reverse=True):
if opener_id not in self._openers \
and fnmatch(opener_id, pattern):
self._load_opener(opener_id)
# -------------------------------------------------------------------------
[docs] def ids(self):
"""Return a list of IDs of available openers.
:rtype: list
"""
return self._openers.keys()
# -------------------------------------------------------------------------
[docs] def add_static_views(self, configurator):
"""Possibly, add static view for openers.
:type configurator: :class:`pyramid.config.Configurator`
:param configurator:
Object to configure a Pyramid application registry.
"""
done = set()
for opener_id in self._openers:
route, path = self._openers[opener_id].route()
if route is not None and route not in done:
configurator.add_static_view(route, path, cache_max_age=3600)
done.add(route)
# -------------------------------------------------------------------------
[docs] def get_opener(self, filename, storage):
"""Find the best opener for file ``filename``.
:param str filename:
Full path to file to check.
:param dict storage:
Storage dictionary.
:rtype: tuple
:return:
A tuple such as ``(opener, content)`` where ``opener`` is not
``None`` if an opener matches.
"""
content = None
done = set()
for opener_id in storage['openers'] + self._openers.keys():
if opener_id in done:
continue
if opener_id == 'NoMore':
return None, content
if opener_id in self._openers:
match, content = self._openers[opener_id].match(
filename, content)
if match:
return self._openers[opener_id], content
done.add(opener_id)
return None, content
# -------------------------------------------------------------------------
[docs] def descriptions(self, request, opener_ids=None):
"""Return a dictionary of tuples such as ``(label, description)`` for
each opener of ``opener_ids`` list.
:type request: :class:`pyramid.request.Request`
:param request: (optional)
Current request.
:param opener_ids: (dicrionary, optional)
List of IDs of opener to check. If ``None``, return the
descriptions of all openers.
:return: (list)
"""
infos = OrderedDict()
if opener_ids is None:
opener_ids = self._openers.keys()
for opener_id in opener_ids:
if opener_id in self._openers:
infos[opener_id] = (
self._openers[opener_id].label(request),
self._openers[opener_id].description(request))
return infos
# -------------------------------------------------------------------------
[docs] def has_seeds(self, opener_ids):
"""Return ``True`` if at least one opener among ``openers`` has a seed.
:param opener_ids: (list)
List of IDs of opener to check.
:return: (boolean)
"""
for opener_id in opener_ids:
if opener_id in self._openers \
and self._openers[opener_id].has_seeds():
return True
return False
# -------------------------------------------------------------------------
[docs] def seed_list(self, request, opener_ids):
"""Return a list of couples such as ``(seed_file, seed_label)``.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param opener_ids: (list)
List of IDs of opener to use.
:return: (list)
"""
seeds = []
for opener_id in opener_ids:
if opener_id in self._openers:
seeds += self._openers[opener_id].seed_list(request)
return tuple(sorted(seeds, key=lambda k: k[1]))
# -------------------------------------------------------------------------
[docs] def seed_fullpath(self, request, path):
"""Install the environment and return the absolute path to the seed
file.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param path: (string)
Relative path to the seed.
:return: (string or ``None``)
"""
opener_id = path.split(sep)[0]
if opener_id not in self._openers:
request.session.flash(
_('Unknown opener "${o}"', {'o': opener_id}), 'alert')
return None
self._openers[opener_id].install_environment(request)
return join(self.settings['opener.cache'], path)
# -------------------------------------------------------------------------
def _load_opener(self, opener_id):
"""Return the opener which ID is ``opener_id``.
:param opener_id: (string)
ID of the opener to load.
:return: (opener class or ``none``)
"""
# Load XML file
opener_xml = load_xml(
join(self.opener_roots[opener_id], 'opener.xml'),
{'publiforge':
join(dirname(__file__), '..', '..', 'RelaxNG', 'publiforge.rng')})
if isinstance(opener_xml, basestring):
LOG.error(
'Opener "%s": %s', opener_id,
localizer(getdefaultlocale()[0]).translate(opener_xml))
return
# Load module
# pylint: disable = maybe-no-member
module_name = opener_xml.findtext('opener/module').strip()
try:
module = __import__(module_name, fromlist=['Opener'])
except ImportError as error:
LOG.error('Opener "%s": %s', opener_id, error)
return
self._openers[opener_id] = module.Opener(self, opener_id, opener_xml)
# =============================================================================
[docs]class Opener(object):
"""Base class for file opener class.
:type opener_manager: :class:`OpenerManager`
:param opener_manager:
Application :class:`OpenerManager` object.
:param str opener_id:
ID of this opener.
:type opener_xml: :class:`lxml.etree.ElementTree`
:param opener_xml:
ID of this opener.
"""
# pylint: disable = too-many-public-methods
_config_file = None
# -------------------------------------------------------------------------
def __init__(self, opener_manager, opener_id, opener_xml):
"""Constructor method."""
self.opener_id = opener_id
self._manager = opener_manager
self._path = join(opener_manager.settings['opener.cache'], opener_id)
self._opener_xml = opener_xml
self._config = None
self._variables_xml = None
# -------------------------------------------------------------------------
[docs] def label(self, request=None):
"""Label in local or default language.
:type request: :class:`pyramid.request.Request`
:param request: (optional)
Current request.
"""
return local_text(
self._opener_xml, 'opener/label', request,
lang=None if request is not None else 'en', default=self.opener_id)
# -------------------------------------------------------------------------
[docs] def description(self, request=None):
"""Description in local or default language.
:type request: :class:`pyramid.request.Request`
:param request: (optional)
Current request.
"""
return local_text(
self._opener_xml, 'opener/description', request,
lang=None if request is not None else 'en').replace(' --', '<br/>')
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs] def has_seeds(self):
"""Return ``True`` if at least one seed exists.
:rtype: bool
"""
return self._opener_xml.find('opener/seeds') is not None
# -------------------------------------------------------------------------
[docs] def seed_list(self, request):
"""Return a list of couples such as ``(seed_file, seed_label)`` where
``seed_file`` is a relative path to the seed file and ``seed_label`` a
localized label.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:rtype: list
"""
seeds = []
root = self._opener_xml.find('opener/seeds')
if root is None:
return seeds
for elt in root:
if elt.findtext('file'):
seeds.append((
join(self.opener_id, elt.findtext('file')),
local_text(elt, 'label', request)))
return seeds
# -------------------------------------------------------------------------
[docs] def match(self, fullname, content=None):
"""Check whether this opener matches with the file ``fullname``.
:param str fullname:
Full path to file to check.
:param str content: (optional)
Content of the file to check.
:rtype: tuple
:return:
A tuple such as ``(match, content)`` where ``match`` is ``True``
if the opener matches.
"""
# Without configuration file
if self._config_file is None:
if fullname[-4:] in ('.rnc', '.rng'):
return True, content
try:
get_lexer_for_filename(fullname)
except (ClassNotFound, KeyError):
return False, content
return True, content
# With configuration file
self.install_environment()
regex1 = self._config_get('Match', 'file_regex')
if regex1 and not re.search(regex1, fullname):
return False, content
regex2 = self._config_get('Match', 'content_regex')
if not regex2:
return bool(regex1), content
if content is None:
with open(fullname, 'r') as hdl:
content = hdl.read()
return bool(re.search(regex2, content)), content
# -------------------------------------------------------------------------
[docs] def read(self, request, storage, path, content=None):
"""Literal XHTML to display the file content.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the file to read.
:param str content: (optional)
Content of the file to read.
:rtype: str
:return:
A piece of literal XHTML code.
"""
if not self.install_environment(request, can_reset=True):
return ''
content = self._load_content(request, storage, path, content)
fullname = join(
request.registry.settings['storage.root'], storage['storage_id'],
path)
if fullname[-4:] == '.rnc' or len(content) > PYGMENT_MAX_SIZE:
return literal('<div><pre>%s</pre></div>') % content
if fullname[-4:] == '.rng':
return literal(highlight(content, XmlLexer(), HtmlFormatter()))
return literal(highlight(
content, get_lexer_for_filename(fullname), HtmlFormatter()))
# -------------------------------------------------------------------------
[docs] def title(self, request, storage, path, content=None):
"""Return a title for the current file.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the native file.
:param str content: (optional)
Initial content of the file to edit.
:rtype: :class:`str` or ``None``
:return:
A piece of XHTML representing the title of the file.
"""
pass
# -------------------------------------------------------------------------
[docs] def overview(self, request, storage, path, content=None):
"""Return an overview for the current file.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the native file.
:param str content: (optional)
Initial content of the file to edit.
:rtype: :class:`str` or ``None``
:return:
A piece of XHTML representing the file.
"""
if splitext(path)[1] in ('.eps', '.ps'):
return None
if not self.install_environment(request):
return None
content = self._load_content(request, storage, path, content)
if content is None:
return None
html = ''
for count, line in enumerate(content[0:1024].split('\n')):
html += '<div>%s</div>' % escape(line[0:96])
if count > 4:
break
return html
# -------------------------------------------------------------------------
[docs] @classmethod
def can_render(cls):
"""Return ``True`` if it can produce a rendering.
:rtype: bool
"""
return False
# -------------------------------------------------------------------------
[docs] @classmethod
def render(cls, request, storage, path, content=None, native=None):
"""Literal XHTML to display file rendering.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the file to render.
:param str content: (optional)
Content of the file to render.
:rtype: native object
:param native: (optional)
Object to convert.
:rtype: str
:return:
A piece of literal XHTML code.
"""
# pylint: disable = unused-argument
return ''
# -------------------------------------------------------------------------
[docs] @classmethod
def can_write(cls):
"""Return ``True`` if it can simply modify the file.
:rtype: bool
"""
return True
# -------------------------------------------------------------------------
[docs] def write(self, request, storage, path, content=None):
"""XHTML form and content for the file to write.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the file to write.
:param str content: (optional)
Content of the file to write.
:rtype: tuple
:return:
A tuple such as ``(form, literal_html, content)`` where ``content``
is the current state of the content.
"""
schema = self._vcs_schema(storage)
schema.add(SchemaNode(String(), name='code'))
form = Form(request, schema=schema)
if not self.install_environment(request, can_reset=True):
return form, '', content
content = self._get_content(
request, storage, path, content, request.params)[0]
if content is None:
return form, '', content
if not content or len(content) > CODEMIRROR_MAX_SIZE:
request.session.flash(
_("This file can't be edited online."), 'alert')
return form, '', content
try:
content = content.decode('utf8')
except UnicodeEncodeError:
pass
textarea = form.textarea(
'code', content, rows=30, cols=80, class_=CODEMIRROR_CLASS)
if self._config_file is not None and \
self._config_get('Write', 'auto.check'):
textarea = form.textarea(
'code', content, rows=30, cols=80, class_=CODEMIRROR_CLASS,
data_autocheck=self._config_get('Write', 'auto.check'))
else:
textarea = form.textarea(
'code', content, rows=30, cols=80, class_=CODEMIRROR_CLASS)
return form, textarea, content.encode('utf8')
# -------------------------------------------------------------------------
[docs] @classmethod
def can_edit(cls):
"""Return ``True`` if it can produce an editor.
:rtype: bool
"""
return False
# -------------------------------------------------------------------------
[docs] @classmethod
def edit(cls, request, action, storage, path, content=None):
"""XHTML form and content for the file to edit.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param str action: ('sav!', 'wrt!' or 'edt!')
Current action
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the file to edit.
:param str content: (optional)
Content of the file to edit.
:rtype: :class:`~.form.Form` or :class:`str`
:return:
A tuple such as ``(form, literal_html, native)`` where ``native``
is the current state of the file.
"""
# pylint: disable = unused-argument
return Form(request), '', None
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs] @classmethod
def resources_dirs(cls, filename, media_type):
"""Return the list of resource directories.
:param filename: (string)
Full path to file being processed.
:param media_type: ('image', 'audio' or 'video')
Media type.
:return: (list)
"""
# pylint: disable = unused-argument
return []
# -------------------------------------------------------------------------
[docs] @classmethod
def resources_files(cls, path, media_type, media_only):
"""Return the list of media files of ``path`` directory.
:param path: (string)
Full path to the directory to browse.
:param media_type: ('image', 'audio' or 'video')
Media type.
:param media_only: (boolean)
If ``True``, return files following the media extension only.
:return: (list)
"""
# pylint: disable = unused-argument
return []
# -------------------------------------------------------------------------
[docs] def save(self, request, storage, path, content, values):
"""Reconstitute and save the current file.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the file to save.
:param str content:
Initial value of content.
:param dict values:
Form values.
:rtype: bool
"""
# Check validity
if request.session.peek_flash('alert'):
return False
is_valid, content = self.is_valid(
request, storage, path, content, values)
if not is_valid:
return False
content = self._fix_content(request, content)
if self._content2native(request, content) is None:
return False
# Get storage handler
handler, user = self._storage_handler(request, storage)
if handler is None:
return False
# Replace file
handler.replace(
user, path, content,
values.get('message') or
request.localizer.translate(_('On-line editing')))
if handler.progress()[0] == 'error':
request.session.flash(handler.progress()[1], 'alert')
return False
handler.cache.clear()
return True
# -------------------------------------------------------------------------
[docs] def is_valid(self, request, storage, path, content=None, values=None):
"""Check whether the content is valid.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the native file.
:param str content: (optional)
Content of the file.
:param dict values: (optional)
Form values or request POST parameters.
:rtype: tuple
:return:
A tuple such as ``(is_valid, content)``.
"""
if not self.install_environment(request):
return False, None
content, trust = self._get_content(
request, storage, path, content, values)
if trust is True and bool(content):
return True, content
if trust is False or not bool(content):
return False, content
return self._content2native(request, content) is not None, content
# -------------------------------------------------------------------------
[docs] def css(self, mode, request=None):
"""Return a list of CSS files for the mode ``mode``.
:param mode: (``'read'``, ``'render'``, ``'write'`` or ``'edit'``)
Current mode for CSS.
:param request: (:class:`pyramid.request.Request` instance, optional)
Current request.
:return: (tuple)
"""
# Without configuration file
if self._config_file is None:
if mode == 'read':
return (PYGMENT_CSS,)
if mode == 'write':
return (CODEMIRROR_CSS,)
return tuple()
# With configuration file
lang = request.session['lang'] if request else 'en'
routes = self._config_get(mode.capitalize(), 'css')
return[k.strip().format(lang=lang) for k in routes.split(',')] \
if routes else ()
# -------------------------------------------------------------------------
[docs] def javascript(self, mode, request=None):
"""Return list of JavaScript files for the mode ``mode``.
:param mode: (``'read'``, ``'render'``, ``'write'`` or ``'edit'``)
Current mode for CSS.
:param request: (:class:`pyramid.request.Request` instance, optional)
Current request.
:return: (tuple)
"""
# Without configuration file
if self._config_file is None:
if mode == 'write':
return (CODEMIRROR, CODEMIRROR_LOADER)
return tuple()
# With configuration file
lang = request.session['lang'] if request else 'en'
routes = self._config_get(mode.capitalize(), 'javascript')
return [k.strip().format(lang=lang) for k in routes.split(',')] \
if routes else ()
# -------------------------------------------------------------------------
[docs] def position(self, directory, filename):
"""Return the number of files of the same type and the position of the
current file.
:param directory: (string)
Full path to the directory containing the current file.
:param filename: (string)
Name of the current file.
:return: (tuple)
A tuple such as ``(position, count)``.
"""
files = sorted(listdir(directory))
count = 0
position = 0
for name in files:
if filename == name:
position = count
name = join(directory, name)
if isfile(name) and self.match(name)[0]:
count += 1
return position + 1, count
# -------------------------------------------------------------------------
[docs] def navigate(self, directory, filename, direction):
"""Return the next or previous file of the same type.
:param directory: (string)
Full path to the directory containing the current file.
:param filename: (string)
Name of the current file.
:param direction: (``'previous'`` or ``'next'``)
Direction of the navigation.
:return: (string)
Full path to next (or previous) file or ``None``.
"""
files = sorted(listdir(directory))
if direction == 'next':
files = files[files.index(filename) + 1:]
else:
files = files[0:files.index(filename)]
files.reverse()
for name in files:
name = join(directory, name)
if isfile(name) and self.match(name)[0]:
return name
return None
# -------------------------------------------------------------------------
[docs] def route(self):
"""Return a route path and a full path to a directory for static data.
:return: (tuple)
A tuple such as ``(route_path, directory_path)`` or
``(None, None)``.
"""
public = self._opener_xml.findtext('opener/public')
if public is None:
return None, None
return OPENER_ROUTE % self.opener_id, join(self._path, public.strip())
# -------------------------------------------------------------------------
[docs] def install_environment( # noqa
self, request=None, opener_id=None, target=None, can_reset=False):
"""Install the opener directory, possibly with inheritance, in the
opener cache directory.
:param request: (:class:`pyramid.request.Request` instance, optional)
Current request.
:param opener_id: (string, optional)
ID of the opener to install, by default the object itself.
:param target: (string, optional)
Full path to the target directory.
:param can_reset: (boolean, default=False)
If ``can_reset`` is ``True`` and ``opener.develop`` is ``'true'``
in settings, the opener is destroyed and created again.
:return: (boolean)
"""
# Check situation
# pylint: disable = too-many-branches
opener_id = opener_id or self.opener_id
if opener_id not in self._manager.opener_roots:
if request is not None:
request.session.flash(
_('Opener "${o}" is missing.', {'o': opener_id}), 'alert')
return False
if target is None:
target = self._path
if can_reset \
and self._manager.settings.get('opener.develop') == 'true' \
and exists(target):
rmtree(target)
self._config = None
self._variables_xml = None
if exists(target):
return True
try:
makedirs(target)
except OSError:
pass
# Read opener.xml file to check if other openers are needed
tree = etree.parse(
join(self._manager.opener_roots[opener_id], 'opener.xml'))
for elt in tree.findall('opener/ancestors/ancestor'):
if not self.install_environment(request, elt.text.strip(), target):
if exists(target):
rmtree(target)
self._config = None
self._variables_xml = None
return False
# Install opener
for elt in tree.findall('opener/imports/import'):
source = elt.text.strip()
source = abspath_from_asset_spec(elt.text.strip()) \
if ':' in source \
else join(self._manager.opener_roots[opener_id], source)
if not exists(source):
if request is not None:
request.session.flash(_(
'Opener "${o}" calls a missing file: "${s}".',
{'o': opener_id, 's': normpath(source)}), 'alert')
if exists(target):
rmtree(target)
self._config = None
self._variables_xml = None
return False
if isfile(source):
if not exists(dirname(join(target, elt.get('to')))):
makedirs(dirname(join(target, elt.get('to'))))
copy(source, join(target, elt.get('to')))
else:
copy_content(source, join(target, elt.get('to')), force=True)
copy_content(self._manager.opener_roots[opener_id], target, force=True)
return True
# -------------------------------------------------------------------------
def _get_content(self, request, storage, path, content=None, values=None):
"""Get from the request or from the file the content of the current
document as a string.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the native file.
:param str content: (optional)
Content of the file.
:param dict values: (optional)
Form values or request POST parameters.
:rtype: tuple
:return:
A tuple such as ``(content, trust)`` where ``trust`` means:
``True`` valid, ``False`` invalid and ``None`` unchecked.
"""
# From writing
if values is not None and 'code' in values:
return values['code'].replace('\r\n', '\n').encode('utf8'), None
# From editing
if values:
native, trust = self._values2native(
request, storage, path, content, values)
return self._native2content(request, native), trust
# From file
return self._load_content(request, storage, path, content), None
# -------------------------------------------------------------------------
def _get_native(self, request, storage, path, content=None, values=None):
"""Get from file or from request an object depending on the native
content of the file.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param storage: (dictionary)
Storage dictionary.
:param path: (string)
Relative path to the native file.
:param content: (string, optional)
Content of the file to edit.
:param values: (dictionary)
Form values or request POST parameters.
:return: (native object or ``None``)
"""
# From writing
if values is not None and 'code' in values:
return self._content2native(request, values['code'].encode('utf8'))
# From editing
if values:
return self._values2native(
request, storage, path, content, values)[0]
# From file
return self._load_native(request, storage, path, content)
# -------------------------------------------------------------------------
@classmethod
def _load_content(cls, request, storage, path, content=None):
"""Load the content of the file as a string.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param storage: (dictionary)
Storage dictionary.
:param path: (string)
Relative path to the native file.
:param content: (string, optional)
Content of the file to edit.
:return: (string or ``None``)
"""
if content is None:
fullname = join(
request.registry.settings['storage.root'],
storage['storage_id'], path)
try:
with open(fullname, 'rb') as hdl:
content = hdl.read()
except IOError:
return ''
try:
content = content.decode('utf8')
except UnicodeDecodeError:
pass
return content
# -------------------------------------------------------------------------
def _load_native(self, request, storage, path, content=None):
"""Load an object depending on the native content of the file.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param storage: (dictionary)
Storage dictionary.
:param path: (string)
Relative path to the native file.
:param content: (string, optional)
Initial content of the file to edit.
:return: (native object or ``None``)
"""
return self._load_content(request, storage, path, content)
# -------------------------------------------------------------------------
@classmethod
def _content2native(cls, request, content):
"""Transform a string into a native object and validate it.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param str content:
String to convert.
:rtype: native object or ``None``
"""
# pylint: disable = unused-argument
return content
# -------------------------------------------------------------------------
@classmethod
def _native2content(cls, request, native):
"""Transform a native object into a string.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:type native: native object
:param native:
Object to convert.
:rtype: :class:`str` or ``None``
"""
# pylint: disable = unused-argument
return native or ''
# -------------------------------------------------------------------------
@classmethod
def _values2native(cls, request, storage, path, content, values):
"""Transform a dictionary of form values into a native object and
validate it.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:param str path:
Relative path to the native file.
:param str content:
Initial content.
:param dict values:
Values to use.
:rtype: tuple
:return:
A tuple such as ``(native, is_valid)``
"""
# pylint: disable = unused-argument
return values.get('code', '').replace('\r\n', '\n').encode('utf8'), \
'code' in values
# -------------------------------------------------------------------------
@classmethod
def _fix_content(cls, request, content):
"""Possibly fix some details.
:param str content:
Content to fix.
:rtype: str
Fixed XML content
"""
# pylint: disable = unused-argument
return content
# -------------------------------------------------------------------------
def _config_get(self, section, option, default=None):
"""Retrieve a value from a configuration object.
:param str section:
Section name.
:param str option:
Option name.
:param str default: (optional)
Default value
:rtype: str
:return:
Read value or default value.
"""
# Read configuration file
if self._config_file is None:
return default
if self._config is None:
config_file = join(self._path, self._config_file)
if not exists(config_file):
return default
params = {
'here': self._path,
'main_route': MAIN_ROUTE,
'route': '/' + OPENER_ROUTE % self.opener_id,
'stgpath': self._manager.settings['storage.root'],
'lang': '{lang}', 'id': '{id}', 'ext': '{ext}', 'xml': '{xml}'}
self._config = ConfigParser(params)
self._config.optionxform = str
self._config.read(config_file)
# Read value
if not self._config.has_option(section, option):
return default
value = self._config.get(section, option)
return value.decode('utf8') if isinstance(value, str) else value
# -------------------------------------------------------------------------
def _load_variables_xml(self):
"""Load the variables DOM element if exists.
:rtype: bool
"""
if self._variables_xml is not None:
return bool(self._variables_xml)
self._variables_xml = self._opener_xml.findtext(
'opener/variables/group-file')
if self._variables_xml is None:
self._variables_xml = ''
return False
self._variables_xml = load_xml(
join(self._path, self._variables_xml),
{'publiforge':
join(dirname(__file__), '..', '..', 'RelaxNG', 'publiforge.rng')})
if isinstance(self._variables_xml, basestring):
self._variables_xml = ''
return False
return True
# -------------------------------------------------------------------------
def _edit_form(self, request, storage, variables, decode_entities=False):
"""Create the form object to edit the current file.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:type variables: :class:`lxml.etree.ElementTree`
:param variables:
XML structure of variables with default values.
:param bool decode_entities: (default=False)
If ``True`` convert entities like < into <.
:rtype: :class:`~.lib.form.Form`
"""
defaults = {}
schema = self._vcs_schema(storage)
for var in variables.findall('variables/group/var'):
name = var.get('name')
schema.add(variable_schema_node(name, var.get('type'), var))
if var.find('default') is not None:
defaults[name] = etree.tostring(
var.find('default'), pretty_print=True,
encoding='utf-8').strip()[9:-10]
defaults[name] = defaults[name].strip().decode('utf8')
if decode_entities:
defaults[name] = defaults[name].replace('<', '<')\
.replace('>', '>').replace('&', '&')
if var.get('type') == 'boolean':
defaults[name] = (defaults[name] == 'true')
elif var.get('type') == 'string':
defaults[name] = normalize_spaces(defaults[name])
return Form(
request, schema=schema, defaults=defaults, force_defaults=True)
# -------------------------------------------------------------------------
@classmethod
def _vcs_schema(cls, storage):
"""Base editing schema with only VCS fields.
:param dict storage:
Storage dictionary.
:rtype: class:`colander.SchemaNode`
"""
schema = SchemaNode(Mapping())
if 'none' not in storage['vcs_engine']:
schema.add(SchemaNode(String(), name='message', missing=None))
schema.add(SchemaNode(
String(), name='vcs_user',
validator=Length(max=ID_LEN), missing=None))
schema.add(SchemaNode(String(), name='vcs_password', missing=None))
return schema
# -------------------------------------------------------------------------
@classmethod
def _storage_handler(cls, request, storage):
"""Retrieve storage handler and user handler.
:type request: :class:`pyramid.request.Request`
:param request:
Current request.
:param dict storage:
Storage dictionary.
:rtype: tuple
:return:
A tuple such as ``(handler, user)``
"""
storage = current_storage(request, storage['storage_id'], False)[0]
user = DBSession.query(StorageUser).filter_by(
storage_id=storage.storage_id,
user_id=request.session['user_id']).first()
if storage.vcs_engine not in ('none', 'local') \
and not (user and user.vcs_user):
request.session.flash(
_('ID and password for storage are missing.'), 'alert')
return None, None
user = (
user.vcs_user, decrypt(
user.vcs_password,
request.registry.settings.get('encryption', '-')),
request.session['name']) if user \
else (None, None, request.session['name'])
return request.registry['handler'].get_handler(
storage.storage_id, storage), user