# -*- coding: utf-8 -*-
"""A *handler* manages access to a storage and its indexing."""
import re
from logging import getLogger
from os import listdir, walk, makedirs, remove
from os.path import join, exists, splitext, relpath, isdir, dirname
from os.path import basename, getmtime
from shutil import rmtree
from time import time, sleep
from threading import Thread
from locale import strxfrm
from lxml import etree
from whoosh.index import exists_in, create_in, open_dir, LockError
from whoosh.fields import Schema, ID, STORED, TEXT, NUMERIC, DATETIME, KEYWORD
from whoosh.fields import UnknownFieldError
from whoosh.qparser import QueryParser, MultifieldParser
from .i18n import _
from .config import settings_get_list
from .utils import EXCLUDED_FILES, decrypt, unzip, normalize_spaces
from .utils import normalize_filename, age, size_label, cache_decorator
from .utils import image_metadata
from .xml import load_xml
from .vcs import VcsNone
from .vcs.hg import VcsMercurial, VcsLocal
from .vcs.hgsvn import VcsHgSubversion
from .vcs.svn import VcsSubversion
from ..models import DBSession
from ..models.indexers import Indexer, IndexerExtractor
from ..models.storages import Storage
LOG = getLogger(__name__)
# =============================================================================
[docs]class HandlerManager(object):
"""This class manages all handlers.
:param settings: (dictionary)
Setting dictionary.
:param cache_manager: (:class:`beaker.cache.CacheManager` instance)
Global Beaker cache manager.
:param opener_manager: (:class:`~.lib.opener.OpenerManager`, optional)
Global opener manager.
One instance of :class:`HandlerManager` is created during application
initialization. It is only used in front mode. It is stored in application
registry.
``self.cache_manager`` is a :class:`beaker.cache.CacheManager` instance.
``self._indexers`` is a dictionary such as
``{indexer_id: (value_type, extrators),...}``
where ``extractors`` is a list of tuples like
``(indexed_files_regex, type, parameter, max_length)``.
``self._handlers`` is a dictionary of :class:`Handler` objects.
"""
# -------------------------------------------------------------------------
def __init__(self, settings, cache_manager, opener_manager=None):
"""Constructor method."""
# Attributes
self.settings = settings
self.cache_manager = cache_manager
self.opener_manager = opener_manager
self._indexers = None
self._handlers = {}
self._thread = None
# VCS engine classes
vcs_classes = {'none': VcsNone, 'local': VcsLocal, 'hg': VcsMercurial,
'hgsvn': VcsHgSubversion, 'svn': VcsSubversion}
self.vcs_classes = {}
for vcs in settings_get_list(settings, 'storage.vcs'):
if vcs in vcs_classes and vcs not in self.vcs_classes:
self.vcs_classes[vcs] = vcs_classes[vcs]
# -------------------------------------------------------------------------
[docs] def vcs_list(self):
"""Return a list of available Version Control System."""
return self.vcs_classes.keys()
# -------------------------------------------------------------------------
[docs] def currently_managed(self):
"""Return a list of ID of currently managed storages."""
return self._handlers.keys()
# -------------------------------------------------------------------------
[docs] def get_handler(self, storage_id, storage=None):
"""Create or retrieve a Storage Control System for storage
``storage_id``.
:param storage_id: (string)
Storage ID.
:param storage: (:class:`~.models.storages.Storage` instance,
optional).
:return: (:class:`Handler` instance or ``None``)
"""
self._cleanup()
if storage_id in self._handlers:
self._handlers[storage_id].expire = \
time() + self._handlers[storage_id].cache.expiretime
return self._handlers[storage_id]
if storage is None:
storage = DBSession.query(Storage).filter_by(
storage_id=storage_id).first()
if storage is None or storage_id != storage.storage_id \
or storage.vcs_engine not in self.vcs_classes:
return None
self._handlers[storage_id] = Handler(self, storage)
return self._handlers[storage_id]
# -------------------------------------------------------------------------
[docs] def remove_handler(self, storage_id):
"""Remove handler from handler list.
:param storage_id: (string)
Storage ID.
"""
if storage_id in self._handlers:
del self._handlers[storage_id]
# -------------------------------------------------------------------------
[docs] def progress(self, storage_ids, pending=False):
"""Return the progress of actions on storages.
:param storage_ids: (list)
Storage ID list.
:param pending: (boolean, default=False)
``True`` if there is a pending work.
:return: (tuple)
Return a tuple such as ``(working, progress_list)``.
``working`` is a boolean indicating whether one of the storage is in
progress. ``progress_list`` is a list of items like ``(status,
message)``. See :class:`Handler` class.
"""
working = False
prgrss = {}
for storage_id in storage_ids:
if storage_id in self._handlers:
handler_prgrss = self._handlers[storage_id].progress()
if handler_prgrss[0] != 'wait':
prgrss[storage_id] = handler_prgrss
working |= handler_prgrss[0] == 'run'
return working | pending, prgrss
# -------------------------------------------------------------------------
[docs] def index(self, storage_id, in_thread=False):
"""Index storage ``storage_id`` directly or in a thread.
:param storage_id: (string)
ID of storage to index.
:param in_thread: (boolean, default=False)
Launch indexing in a thread or not.
"""
# Nothing to do
if not self._handlers[storage_id].indexing[0]:
return
if self._handlers[storage_id].indexing[0] != 'ok' \
and self._handlers[storage_id].indexing[0][0:6] != 'update':
self._handlers[storage_id].indexing[0] += '>update'
return
# Directly...
if not in_thread:
self._thread_index(storage_id)
return
# ...or in a thread
if self._thread is not None and self._thread.is_alive():
return
if self._thread is not None:
del self._thread
self._thread = Thread(
target=self._thread_index, name='index:%s' % storage_id,
args=[storage_id])
self._thread.start()
self._handlers[storage_id].expire = \
time() + self._handlers[storage_id].cache.expiretime
# -------------------------------------------------------------------------
[docs] def delete_index(self, update=True):
"""Delete all indexes.
:param bool update: (default=True)
If ``True`` update indexes after deleting.
"""
# Index directory
index_dir = self.settings.get('storage.index')
if index_dir is None or not exists(index_dir):
return
# Is something running?
running = False
for storage_id in self._handlers:
if self._handlers[storage_id].indexing[0][0:3] == 'run':
self._handlers[storage_id].indexing[0] = 'run>delete'
running = True
elif self._handlers[storage_id].indexing[0]:
self._handlers[storage_id].indexing[0] = 'delete'
if running:
return
# Delete indexes
LOG.info('Deleting indexes')
self._remove_index_directory(index_dir)
self._indexers = None
# Update indexes
if update:
for storage in DBSession.query(Storage):
if storage.storage_id not in self._handlers \
and storage.indexed_files:
self._handlers[storage.storage_id] = Handler(self, storage)
if storage.storage_id in self._handlers \
and self._handlers[storage.storage_id].indexing[0]:
self._handlers[storage.storage_id].indexing[0] = 'update'
self._handlers[storage.storage_id].indexing[1] = 0
self._cleanup()
# -------------------------------------------------------------------------
[docs] def search(self, storage_id, fieldnames, query, limit=None):
"""Search in storage ``storage_id``.
:param str storage_id:
ID of storage to search.
:param list fieldnames:
List of fields to search.
:param str query:
A query in the Whoosh default query language.
:param int limit: (optional)
Maximum number of results.
:rtype: list
:return:
A list of dictionaries. Each dictionary has the keys ``score``,
``storage_id`` and ``path`` plus the keys reprensenting the
stored fields.
"""
# Nothing to do
index_dir = self.settings['storage.index']
if not exists(index_dir) \
or not exists_in(index_dir, indexname=storage_id):
return []
# Update storage
if storage_id not in self._handlers:
handler = self.get_handler(
storage_id, DBSession.query(Storage)
.filter_by(storage_id=storage_id).first())
else:
handler = self.get_handler(storage_id)
handler.synchronize(None)
# Query
# pylint: disable = W0703
result_list = []
index = open_dir(index_dir, indexname=storage_id)
with index.searcher() as searcher:
if len(fieldnames) == 1:
parser = QueryParser(fieldnames[0], index.schema)
else:
parser = MultifieldParser(fieldnames, index.schema)
try:
results = searcher.search(parser.parse(query), limit=limit)
except Exception:
index.close()
return result_list
for hit in results:
result = dict([(k, hit[k]) for k in hit if k != 'time'])
result.update({
'score': round(hit.score, 2), 'storage_id': storage_id})
result_list.append(result)
index.close()
return result_list
# -------------------------------------------------------------------------
def _cleanup(self):
"""Delete expired Handler and launch indexing."""
# Delete expired and find pending operations
now = time()
indexing = False
waiting = []
# pylint: disable = consider-iterating-dictionary
for storage_id in self._handlers.keys():
handler = self._handlers[storage_id]
if handler.expire < now:
if handler.progress()[0] != 'run' \
and handler.indexing[0] in ('', 'ok'):
del self._handlers[storage_id]
continue
if handler.indexing[0][0:3] == 'run':
indexing = True
elif handler.indexing[0] not in ('', 'ok') \
and handler.indexing[1] < now:
if handler.progress()[0] == 'run':
handler.indexing[1] = now + handler.indexing[2]
else:
waiting.append([storage_id] + handler.indexing[0:2])
# Indexing
if indexing or not waiting:
return
waiting = sorted(waiting, key=lambda item: item[2])
if waiting[0][1][0:6] == 'delete':
self.delete_index()
else:
self.index(waiting[0][0], True)
# -------------------------------------------------------------------------
def _thread_index(self, storage_id):
"""Action launched in a thread to index a storage.
:param str storage_id:
ID of storage to index.
"""
try:
index = self._open_or_create_index(storage_id)
writer = index.writer()
except LockError:
return
self._handlers[storage_id].indexing[0] = 'run%s%s' \
% self._handlers[storage_id].indexing[0].partition('>')[1:]
done = set()
root = join(self.settings['storage.root'], storage_id)
LOG.info('Indexing %s', storage_id)
# Loop over the stored documents
for docnum, fields in writer.reader().iter_docs():
name = join(root, fields['path']).partition('#')[0]
if not exists(name) or getmtime(name) > fields['time']:
writer.delete_document(docnum)
else:
done.add(fields['path'])
# Loop over the files
indexed_files = self._handlers[storage_id].indexing[3]
for path, dirs, files in walk(root):
for name in tuple(dirs):
if name in EXCLUDED_FILES or '~' in name:
dirs.remove(name)
for name in files:
if indexed_files.search(name) and name not in EXCLUDED_FILES:
name = relpath(join(path, name), root)
if not isinstance(name, unicode):
name = name.decode('utf8')
if name not in done:
self._add_document2index(writer, storage_id, name)
writer.commit()
index.close()
DBSession.close()
self._handlers[storage_id].indexing[0] = \
self._handlers[storage_id].indexing[0].partition('>')[2] or 'ok'
# -------------------------------------------------------------------------
def _open_or_create_index(self, storage_id):
"""Open and index structure (possibly create it).
:param str storage_id:
ID of index to open.
:rtype: :class:`whoosh.index.Index`
"""
# Open existing index
index_dir = self.settings['storage.index']
if exists(index_dir) and exists_in(index_dir, indexname=storage_id):
return open_dir(index_dir, indexname=storage_id)
# Create schema
schema = Schema(
path=ID(unique=True, stored=True, sortable=True),
time=STORED, filename=ID(stored=True), filetype=KEYWORD)
done = set((u'path', u'time', u'filename', u'filetype'))
for indexer in DBSession.query(Indexer):
if indexer.indexer_id in done:
continue
if indexer.value_type == 'integer':
schema.add(
indexer.indexer_id,
NUMERIC(stored=bool(indexer.result_column)))
elif indexer.value_type == 'date':
schema.add(
indexer.indexer_id,
DATETIME(stored=bool(indexer.result_column)))
else:
schema.add(
indexer.indexer_id,
TEXT(stored=bool(indexer.result_column)))
done.add(indexer.indexer_id)
# Create index
if not exists(index_dir):
makedirs(index_dir)
return create_in(index_dir, schema, indexname=storage_id)
# -------------------------------------------------------------------------
def _add_document2index(self, writer, storage_id, path):
"""Add a document to the index.
:type writer: :class:`whoosh.writing.IndexWriter
:param writer:
Writer on current index.
:param str storage_id:
ID of storage to index.
:param str path:
Relative path to file to add.
"""
# Index document
name = basename(path)
fullname = join(self.settings['storage.root'], storage_id, path)
fields = {
'path': path, 'time': time(), 'filename': name,
'filetype': splitext(name)[1][1:]}
content = tree = None
self._load_indexers()
for indexer_id in self._indexers:
chunks = []
for extractor in self._indexers[indexer_id][1]:
if not extractor[0].search(name):
continue
if extractor[1] == 'xpath':
if tree is None:
tree = load_xml(fullname)
self._index_xpath(tree, extractor[2], extractor[3], chunks)
elif extractor[1] == 'iim':
self._index_iim(
fullname, extractor[2], extractor[3], chunks)
else:
if content is None:
with open(fullname, 'r') as hdl:
content = hdl.read()
self._index_regex(
content, extractor[2], extractor[3], chunks)
chunks = self._format_index_field(
storage_id, fullname, indexer_id, chunks)
if chunks:
fields[indexer_id] = chunks
try:
writer.add_document(**fields)
except (ValueError, UnknownFieldError) as error:
LOG.error('%s/%s: %s', storage_id, path, error)
# -------------------------------------------------------------------------
def _load_indexers(self, reset=False):
"""Fill indexer dictionary."""
# Something to do?
if not reset and self._indexers is not None:
return
# Load from database
self._indexers = {}
for extractor in DBSession.query(
IndexerExtractor.indexer_id, Indexer.value_type,
IndexerExtractor.indexed_files,
IndexerExtractor.extractor_type, IndexerExtractor.parameter,
IndexerExtractor.limit)\
.join(Indexer).order_by(IndexerExtractor.indexer_id):
indexer_id = extractor[0]
value_type = extractor[1]
try:
extractor = (
re.compile(extractor[2]), extractor[3],
extractor[3] == 'regex' and re.compile(extractor[4]) or
extractor[4], extractor[5])
except re.error as error:
LOG.error('Indexer "%s": %s', indexer_id, error)
continue
if indexer_id in self._indexers:
self._indexers[indexer_id][1].append(extractor)
else:
self._indexers[indexer_id] = (value_type, [extractor])
# -------------------------------------------------------------------------
@classmethod
def _index_regex(cls, content, regex, limit, chunks):
"""Extract text from ``content`` according to ``regex``.
:param content: (string)
Content to process.
:param regex: (:class:`re.RegexObject` instance)
Regular expression to use.
:param limit: (integer or ``None``)
Maximum size of the extraction.
:param chunks: (list)
List of chunks to complete.
"""
try:
if not regex.pattern or regex.pattern == '.*':
chunks.append(content.decode('utf8'))
else:
chunks.append(u' '.join(regex.findall(content)))
except UnicodeDecodeError:
return
if limit:
chunks[-1] = normalize_spaces(chunks[-1])[0:limit] + u'…'
# -------------------------------------------------------------------------
@classmethod
def _index_iim(cls, fullname, fields, limit, chunks):
"""Extract IIM (Information Interchange Model) fields from image
``fullname``.
:param str fullname:
Absolute path to image.
:param list fields:
Fields to extract.
:param int limit:
Maximum size of the extraction or ``None``.
:param list chunks:
List of chunks to complete.
"""
metadata = image_metadata(fullname)
chunk = u''
if metadata.get('DocumentName') and (
'DocumentName' in fields or 'Caption' in fields):
chunk = u'{0}'.format(metadata['DocumentName'])
if metadata.get('ImageDescription') and 'ImageDescription' in fields:
chunk += u' {0}'.format(metadata['ImageDescription'])
if not chunk:
return
chunks.append(normalize_spaces(chunk))
if limit:
chunks[-1] = chunks[-1][0:limit] + u'…'
# -------------------------------------------------------------------------
@classmethod
def _index_xpath(cls, tree, xpath, limit, chunks):
"""Extract text from ``tree`` according to ``xpath``.
:type tree: :class:`ElementTree`
:param tree:
Content to process.
:param str xpath:
XPath to use.
:param int limit:
Maximum size of the extraction or ``None``.
:param list chunks:
List of chunks to complete.
"""
# pylint: disable = protected-access
if not isinstance(tree, etree._ElementTree):
return
try:
elements = tree.xpath(xpath)
except etree.XPathEvalError as error:
LOG.error('XPath "%s": %s', xpath, error)
return
if not elements:
return
if isinstance(elements, list):
if isinstance(elements[0], etree._Element):
elements = u' '.join([
isinstance(k.text, unicode) and k.text or
k.text.encode('utf8') for k in elements if k.text])
else:
elements = u' '.join([k for k in elements if k.strip()])
if isinstance(elements, (float, int)):
elements = str(int(elements))
else:
if not elements.strip():
return
if limit:
elements = normalize_spaces(elements)[0:limit] + u'…'
chunks.append(elements)
# -------------------------------------------------------------------------
def _format_index_field(self, storage_id, fullname, indexer_id, chunks):
"""Format index field according to type.
:param str storage_id:
ID of the current storage.
:param str fullname:
Absolute path to indexed file.
:param str indexer_id:
Indexer ID.
:param list chunks:
Content of field to format.
:rtype: str
"""
if indexer_id == 'image':
extensions = ('.png', '.jpg', '.jpeg', '.svg', '.gif')
chunks = (chunks and chunks[0].split()[0]) or \
(splitext(fullname)[1] in extensions and
splitext(basename(fullname))[0])
if chunks:
regex = re.compile(
r'{0}\.(png|jpg|jpeg|svg|gif)'.format(chunks))
for path, name, files in walk(dirname(fullname)):
name = [k for k in files if regex.match(k)]
if name:
return relpath(
join(path, name[0]),
join(self.settings['storage.root'], storage_id))
chunks = ''
if not chunks:
return ''
if self._indexers[indexer_id][0] == 'boolean':
chunks = u'TRUE'
elif self._indexers[indexer_id][0] == 'integer':
chunks = int(chunks[0]) if chunks[0].isdigit() else 0
elif self._indexers[indexer_id][0] == 'date':
chunks = (u'%04d') % (int(chunks[0])) \
if chunks[0].isdigit() else chunks[0].strip()
# pylint: disable = bare-except
try:
DATETIME().to_bytes(chunks)
except: # noqa
return ''
else:
chunks = u' '.join(chunks)
return chunks
# -------------------------------------------------------------------------
def _remove_index_directory(self, index_dir):
"""Remove index directory.
:param index_dir: (string)
Absolute path to index directory.
"""
for storage_id in self._handlers:
if self._handlers[storage_id].indexing[0]:
self._handlers[storage_id].indexing[0] = 'run'
for filename in listdir(index_dir):
filename = join(index_dir, filename)
if isdir(filename):
rmtree(filename)
else:
remove(filename)
# =============================================================================
[docs]class Handler(object):
"""This class manages access to one storage.
``self.uid`` is the ID of the associated storage.
``self.expire`` is the deadline for this object.
``self.cache`` is a :class:`beaker.cache.Cache` instance.
``self.vcs`` is a :class:`~.lib.vcs.Vcs` instance.
``self.indexing`` is the indexing status of the storage in a tuple such as
``(status, start_time, delay_to_start, indexed_file_regex)``:
* ``''``: no index
* ``'ok'``: no pending indexing operation
* ``'update'``: waiting for indexing
* ``'delete'``: waiting for deleting index
* ``'run'``: in progress
``run`` status can be completed by next step: ``run>delete>index``.
``self._report`` is a tuple such as ``(status, message, expire, ttl)``
where ``expire`` is the report validity date and ``status`` is one of the
following strings:
* ``'wait'``: waiting for VCS operation
* ``'run'``: VCS operation in progress
* ``'error'``: VCS operation ended with error
* ``'end'``: VCS operation ended with success
``self._refresh`` is a tuple such as ``(time_to_refresh, refresh_period)``.
"""
# pylint: disable = too-many-instance-attributes
# -------------------------------------------------------------------------
def __init__(self, handler_manager, storage):
"""Constructor method.
:param handler_manager: (:class:`HandlerManager` instance)
Application :class:`HandlerManager` object.
:param storage: (:class:`~.models.storages.Storage` instance).
"""
self.uid = storage.storage_id
self.opener_manager = handler_manager.opener_manager
self.cache = handler_manager.cache_manager.get_cache(
'stg_%s' % self.uid,
expire=int(handler_manager.settings.get('storage.cache', 3600)))
self.expire = time() + self.cache.expiretime
self.vcs = handler_manager.vcs_classes[storage.vcs_engine](
join(handler_manager.settings['storage.root'], self.uid),
storage.vcs_url, storage.vcs_user, decrypt(
storage.vcs_password,
handler_manager.settings.get('encryption', '-')))
is_indexing = storage.indexed_files \
and 'storage.index' in handler_manager.settings
try:
self.indexing = [
is_indexing and 'ok' or '', 0,
int(handler_manager.settings.get('refresh.short', 2)),
is_indexing and re.compile(storage.indexed_files)]
except re.error as error:
LOG.error('Indexed files for storage "%s": %s', self.uid, error)
self.indexing = ['', 0, 0, None]
self.normalize_mode = storage.normalize_mode
self._report = (
'wait', None, 0,
int(handler_manager.settings.get('storage.report_ttl', 120)))
self._refresh = [0, int(storage.refresh)]
self._thread = None
# -------------------------------------------------------------------------
[docs] def clone(self, request=None):
"""Launch an action to clone a storage.
:param request: (:class:`pyramid.request.Request` instance, optional)
Current request.
"""
# Action directly...
if request is None:
self.index()
return self.vcs.clone()
# ...or in a thread
if self.launch(request, self.vcs.clone):
self.index()
self._refresh[0] = time() + self._refresh[1]
return None
# -------------------------------------------------------------------------
[docs] def synchronize(self, request, force=False):
"""Launch an action to synchronize storage with its source.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param force: (boolean, default=False)
Force synchronization even if period is not over.
:return: (boolean)
``True`` if it really launch a synchronization.
If ``force`` is ``False``, the synchronizaton is only done if the
delay of ``synchro_period`` seconds has expired.
"""
# Something to do?
if not force and self._refresh[0] > time():
self.expire = time() + self.cache.expiretime
return False
# Directly...
if request is None:
self.report('run')
error = self.vcs.pull_update(self)
if error:
self.report('error', error)
return False
self._refresh[0] = time() + self._refresh[1]
self.report('end')
self.index()
return True
# ...or in a thread
if self.launch(request, self.vcs.pull_update):
self._refresh[0] = time() + self._refresh[1]
self.index()
return True
return False
# -------------------------------------------------------------------------
[docs] def report(self, status, message=None):
"""Save a report.
:param status: (string)
Current status. See :class:`Handler` class.
:param message: (string, optional)
Message to write in report.
:return: (string or ``None``)
Message.
"""
self._report = (
status, message, time() + self._report[3], self._report[3])
self.expire = time() + self.cache.expiretime
return message
# -------------------------------------------------------------------------
[docs] def progress(self):
"""Return the progress of action on the storage.
:return: (tuple)
A tuple such as ``(status, message)``. See :class:`Handler` class.
"""
if self._thread is not None and self._thread.is_alive():
return ('run', None)
if self._report[0] != 'wait' and self._report[2] < time():
self.report('wait')
return self._report[0:2]
# -------------------------------------------------------------------------
@cache_decorator()
def dir_infos(self, request, path, sort, quick, storage):
"""List all files of a directory with VCS informations.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param path: (string)
Relative path of the directory.
:param sort: (string)
A sort order among ``+name``, ``-name``, ``+size``, ``-size``,
``+date``, ``-date``.
:param quick: (boolean)
``True`` to have a quick information.
:param storage: (dictionary)
Dictionary of the current storage.
:return: (tuple)
A tuple such as ``(dirs, files)`` where ``dirs`` and ``files`` are
lists such as ``[name, type, size, date, revision, user,
message, overview]``. For instance: ``['README', 'plain',
'45.7 Kio', '2 days', '16:0e0229a916f4', 'user1', 'Bug fixed',
'<div class="overview">See...</div>']``.
"""
status = self._report[0]
self.report('run')
# Get information
dirs, files = self.vcs.directory_log(path, quick)
# Sort
key = {'size': lambda k: k[2], 'date': lambda k: k[3]}.get(
sort[1:], lambda k: strxfrm(k[0].encode('utf8')))
dirs = sorted(dirs, key=key, reverse=(sort[0] == '-'))
files = sorted(files, key=key, reverse=(sort[0] == '-'))
# Improve labels and add overviews
for item in dirs:
item[2] = size_label(item[2], True)
item[3] = (age(item[3]), item[3].isoformat(' ').partition('.')[0])
item.append(None)
for item in files:
item[2] = size_label(item[2], False)
item[3] = (age(item[3]), item[3].isoformat(' ').partition('.')[0])
opener = not quick and self.opener_manager and \
self.opener_manager.get_opener(
join(self.vcs.path, path, item[0]), storage)[0]
item.append(opener and opener.overview(
request, storage, join(path, item[0])) or None)
if status != 'run':
self.report('wait')
return dirs, files
# -------------------------------------------------------------------------
[docs] def upload(self, user, path, upload_files, filename, message):
"""Synchronize, upload files and propagate.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param path: (string)
Relative path to files.
:param upload_files: (list)
List fo :class:`cgi.FieldStorage` of the file to upload.
:param filename: (string or `None``)
Name of file to upload or ``None``.
:param message: (string)
Commit message.
"""
# Check repository
if not self._check_repository(user, message):
return
fullpath = join(self.vcs.path, path or '.')
if not exists(fullpath):
return
for upload_file in upload_files:
# Check filename
upload_filename = upload_file.filename
if '\\' in upload_filename:
upload_filename = upload_filename.split('\\')[-1]
if self.normalize_mode is not None:
upload_filename = normalize_filename(
upload_filename, self.normalize_mode)
if filename is not None and filename != upload_filename:
self.report('error', _('File names are different.'))
return
# Upload files
ext = splitext(upload_filename)[1]
if ext != '.zip' or filename:
upload_filename = filename or upload_filename
with open(join(fullpath, upload_filename), 'w') as hdl:
hdl.write(upload_file.file.read())
else:
unzip(upload_file.file, fullpath, self.normalize_mode)
upload_file.file.close()
# Add
if self.vcs.add(path, self):
return
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def create(self, user, seed, path, name, message):
"""Create a new file according to the seed file.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param seed: (string)
Full path to the seed file.
:param path: (string)
Relative path to file to create.
:param name: (string)
Name of the file to create.
:param message: (string)
Commit message.
"""
# Check repository
if not self._check_repository(user, message):
return
# Create file
if self.normalize_mode is not None:
name = normalize_filename(name, self.normalize_mode)
if self.vcs.create(seed, path, name, self):
return
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def duplicate(self, user, path, original, name, message):
"""Duplicate the ``original`` file.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param path: (string)
Relative path to file to create.
:param original: (string)
Name of the original file.
:param name: (string)
Name of the file to create.
:param message: (string)
Commit message.
"""
# Check repository
if not self._check_repository(user, message):
return
# Duplicate file
if self.normalize_mode is not None:
name = normalize_filename(name, self.normalize_mode)
if self.vcs.duplicate(path, original, name, self):
return
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def mkdir(self, user, path, name, message):
"""Make a directory.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param path: (string)
Relative path to directory to create.
:param name: (string)
Name of directory to create.
:param message: (string)
Commit message.
"""
# Check repository
if self.vcs.engine == 'svn' \
and not self._check_repository(user, message):
return
# Make directory
if self.normalize_mode is not None:
name = normalize_filename(name, self.normalize_mode, True)
if self.vcs.mkdir(path, name, self):
return
# Propagate
if self.vcs.engine == 'svn':
self._propagate(user, message)
else:
self.report('end')
# -------------------------------------------------------------------------
[docs] def add(self, user, path, message):
"""Synchronize, add files and propagate.
:param list user:
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param str path:
Relative path to files.
:param str message:
Commit message.
"""
# Check repository
if not self._check_repository(user, message):
self.vcs.remove_untracked(path, self)
return
# Add
if self.vcs.add(path, self):
return
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def rename(self, user, path, filename, new_name, message):
"""Synchronize, rename a file and propagate.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param path: (string)
Relative path to files.
:param filename: (string)
Name of file to move.
:param new_name: (string)
New name.
:param message: (string)
Commit message.
"""
# Check repository
if not self._check_repository(user, message):
return
# Move files
if self.vcs.rename(path, filename, new_name, self):
return
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def remove(self, user, path, filenames, message):
"""Synchronize, remove files and propagate.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param path: (string)
Relative path to files.
:param filenames: (list)
Names of files to remove.
:param message: (string)
Commit message.
"""
# Check repository
if not self._check_repository(user, message):
return
# Remove files
for filename in filenames:
if self.vcs.remove(path, filename, self):
return
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def replace(self, user, path, content, message):
"""Synchronize, replace one file and propagate.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param path: (string)
Relative path to files.
:param content: (string)
New content of the file to replace.
"""
# Check repository
if not self._check_repository(user, message):
return
# Does the file exists?
filename = join(self.vcs.path, path)
if not exists(filename):
self.report('error', _('This file does not exist anymore.'))
return
# Save file
with open(filename, 'wb') as hdl:
hdl.write(content)
# Propagate
self._propagate(user, message)
# -------------------------------------------------------------------------
[docs] def recover(self):
"""Recover from an interrupted commit or pull."""
self.vcs.recover(self)
# -------------------------------------------------------------------------
[docs] def index(self):
"""Schedule index."""
if self.indexing[0] == 'ok':
self.indexing[0] = 'update'
self.indexing[1] = time() + self.indexing[2]
# -------------------------------------------------------------------------
[docs] def launch(self, request, action, args=(), kwargs=None):
"""Launch a new action in a thread.
:param request: (:class:`pyramid.request.Request` instance)
Current request.
:param action: (function)
Action to launch.
:param args: (tuple, optional)
Arguments for the action.
:param kwargs: (dictionary, optional)
Keyword arguments for the action.
:return: (boolean)
``True`` if action has been launched.
Only one action per storage at a time is possible.
"""
# Is this storage undergoing an action?
if self._thread is not None and self._thread.is_alive():
request.session.flash(_(
'${i}: action already in progress.', {'i': self.uid}), 'alert')
return False
if self._thread is not None:
del self._thread
# Launch action
kwargs = kwargs or {}
if action.im_self is not self:
kwargs['handler'] = self
self._thread = Thread(
target=action, name='vcs:%s' % self.uid, args=args, kwargs=kwargs)
self._thread.start()
self.expire = time() + self.cache.expiretime
return True
# -------------------------------------------------------------------------
def _check_repository(self, user, message):
"""Wait for repository availability, synchronize and check push
capacity.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param message: (string)
Commit message.
:return: (boolean)
"""
k = 10
while self._report[0] == 'run' and k:
k -= 1
sleep(1)
if self._report[0] == 'run':
self.report('error', _('Storage is busy.'))
return False
if self.synchronize(None, True) and \
not self.vcs.commit_push(message, user[0], user[1], user[2], self):
return True
self.vcs.revert_all(self)
return False
# -------------------------------------------------------------------------
def _propagate(self, user, message):
"""Commit and push chages or revert all.
:param user: (list)
VCS user like ``(vcs_user_id, vcs_password, user_name)``.
:param message: (string)
Commit message.
"""
if not self.vcs.commit_push(message, user[0], user[1], user[2], self):
self.report('end')
else:
self.vcs.backout(user[2] or user[0], self)