"""Storage with Mercurial Version Control management."""
from logging import getLogger
from os import remove, renames, listdir
from os.path import exists, join, isdir, getmtime, normpath
from shutil import rmtree
from datetime import datetime
from tempfile import mkdtemp
from urllib2 import HTTPError, URLError
from mercurial import ui, hg, commands
from mercurial.error import Abort, RepoError, ParseError
from pyramid.i18n import TranslationString
from ...lib.i18n import _
from ...lib.utils import EXCLUDED_FILES
from ...lib.vcs import Vcs
LOG = getLogger(__name__)
# =============================================================================
[docs]class VcsMercurialUi(ui.ui):
"""Override Mercurial ui class for PubliForge use."""
# pylint: disable = R0904
# -------------------------------------------------------------------------
def __init__(self, src=None):
"""Contructor method."""
super(VcsMercurialUi, self).__init__(src)
self.handler = None
# -------------------------------------------------------------------------
[docs] def write(self, *args, **opts):
"""Write args in log as informations."""
for arg in args:
arg = arg.strip()
if arg:
LOG.info(arg)
# -------------------------------------------------------------------------
[docs] def write_err(self, *args, **opts):
"""Write args in log as errors."""
for arg in args:
arg = arg.strip()
if arg:
LOG.error(arg)
if self.handler is not None:
self.handler.report('error', arg)
# =============================================================================
[docs]class VcsMercurialUiLog(VcsMercurialUi):
"""Ui class to retrieve file information."""
# pylint: disable = too-many-public-methods
# -------------------------------------------------------------------------
def __init__(self, src=None):
"""Constructor method."""
super(VcsMercurialUiLog, self).__init__(src)
self.infos = []
self.logs = []
self._entry = None
self._description = False
# -------------------------------------------------------------------------
[docs] def write(self, *args, **opts):
"""This method collects log information."""
for arg in args:
if arg.startswith('changeset:'):
if self._entry is not None:
self._update_infos()
self._entry = ['', int(arg[10:].partition(':')[0]), '', '']
elif arg.startswith('user:'):
self._entry[2] = arg[5:].strip()
elif arg.startswith('date:'):
self._entry[0] = arg[5:-6].strip()
elif arg.startswith('files:'):
try:
self._entry[3] = ' %s ' % arg[6:].strip().decode('utf8')
except UnicodeDecodeError:
self._entry[3] = ' - '
elif arg.startswith('summary:'):
self._entry[3] = arg[8:].strip()
self.logs.append(self._entry)
self._entry = None
elif arg.startswith('description:'):
self._description = True
elif self._description:
self._update_infos(arg)
self._entry = None
# -------------------------------------------------------------------------
def _update_infos(self, arg=''):
"""Update ``self.infos`` list."""
for name in self.infos:
if name in self._entry[3] \
and self.infos[name][1] < self._entry[1]:
self.infos[name] = (
self._entry[0], self._entry[1],
self._entry[2], arg.strip())
self._description = False
self.logs.append(self._entry)
# =============================================================================
[docs]class VcsMercurial(Vcs):
"""Version control system with Mercurial."""
engine = 'hg'
# -------------------------------------------------------------------------
def __init__(self, path, url, user_id=None, password=None):
"""Constructor method."""
super(VcsMercurial, self).__init__(path, url, user_id, password)
self._ui = VcsMercurialUi()
self._ui.setconfig('ui', 'interactive', 'no')
self._ui.setconfig('ui', 'username', '-')
self._ui.setconfig('web', 'cacerts', '')
# -------------------------------------------------------------------------
[docs] def clone(self, handler=None):
"""Create a copy of an existing repository in a directory.
See abstract function :meth:`~.lib.vcs.Vcs.clone`.
"""
self._ui.handler = handler
try:
commands.clone(self._ui, self._full_url(), self.path)
except (Abort, RepoError, HTTPError, URLError,
AssertionError, OSError) as error:
if handler is not None:
handler.report('error', error)
return error
remove(join(self.path, '.hg', 'hgrc'))
return None
# -------------------------------------------------------------------------
[docs] def pull_update(self, handler=None):
"""Pull changes and update.
See abstract function :meth:`~.lib.vcs.Vcs.pull_update`.
"""
if not exists(self.path):
return None
try:
if self.engine == 'local':
commands.update(self._ui, self._repo(handler))
else:
commands.pull(
self._ui, self._repo(handler), self._full_url(),
update=True)
except (Abort, RepoError, HTTPError, URLError,
OSError, AttributeError) as error:
if handler is not None:
handler.report('error', error)
return error
return None
# -------------------------------------------------------------------------
[docs] def commit_push(self, message, user_id, password, name, handler=None):
"""Commit and push changes.
See abstract method :meth:`~.lib.vcs.Vcs.commit_push`.
"""
if not exists(self.path):
return None
myui, repo = self._named_ui_repo(name or user_id, handler)
message = message or '-'
try:
commands.commit(myui, repo, message=message.encode('utf8'))
if self.engine != 'local':
commands.push(myui, repo, self._full_url(user_id, password))
except (Abort, RepoError, HTTPError, URLError, OSError) as error:
if handler is not None:
handler.report('error', error)
return error
return None
# -------------------------------------------------------------------------
[docs] def remove_untracked(self, path, handler):
"""Remove untracked files.
See :meth:`~.lib.vcs.Vcs.remove_unrtacked`.
"""
path = self.full_path(path)
if isinstance(path, TranslationString):
return
for name in listdir(path):
if name not in EXCLUDED_FILES:
name = join(path, name)
if isdir(name):
rmtree(name)
else:
remove(name)
self.revert_all(handler)
# -------------------------------------------------------------------------
[docs] def revert_all(self, handler):
"""Revert all files of the repository."""
if not exists(self.path):
return
commands.revert(
self._ui, self._repo(handler), all=True, no_backup=True)
commands.update(self._ui, self._repo(handler), clean=True)
# -------------------------------------------------------------------------
[docs] def backout(self, name, handler):
"""Reverse effect of earlier changeset.
See :meth:`~.lib.vcs.Vcs.backout`.
"""
if not exists(self.path):
return None
myui, repo = self._named_ui_repo(name, handler)
try:
commands.backout(
myui, repo, rev='tip', message='Previous changeset canceled')
except (Abort, RepoError, OSError) as error:
return handler.report('error', error)
return None
# -------------------------------------------------------------------------
[docs] def recover(self, handler):
"""Recover from an interrupted commit or pull."""
commands.recover(self._ui, self._repo(handler))
# -------------------------------------------------------------------------
[docs] def last_change(self):
"""Return the last change on the repository.
See :meth:`~.lib.vcs.Vcs.last_change`.
"""
repo = self._repo()
if repo is None:
return (datetime.now(), -1, '-')
ctx = repo['tip']
if ctx.rev() == -1:
return (datetime.fromtimestamp(getmtime(self.path)), -1, '-')
return (datetime.fromtimestamp(ctx.date()[0]), ctx.rev(),
ctx.user().partition('<')[0].decode('utf8'))
# -------------------------------------------------------------------------
[docs] def log(self, path, filename, limit=1):
"""Show revision history of file ``filename``.
See :meth:`~.lib.vcs.Vcs.log`.
"""
filename = self._log_path(path, filename)
if filename is None:
return None
# Collect information
myui = VcsMercurialUiLog()
myui.setconfig('ui', 'verbose', False)
try:
repo = hg.repository(myui, self.path)
except RepoError:
return None
try:
commands.log(
myui, repo, filename, limit=str(limit),
follow=limit > 1 and not isdir(filename),
date=None, rev=None, user=None)
except (Abort, RepoError, HTTPError, URLError,
OSError, TypeError) as error:
return [(datetime.now(), '', '', str(error).decode('utf8'))]
if not myui.logs:
return myui.logs
# Format information
myui.logs = sorted(myui.logs, key=lambda k: k[1], reverse=True)[:limit]
for k, entry in enumerate(myui.logs):
myui.logs[k] = (
datetime.strptime(entry[0], '%a %b %d %H:%M:%S %Y'),
str(entry[1]), entry[2].partition('<')[0].decode('utf8'),
entry[3].decode('utf8'))
return myui.logs
# -------------------------------------------------------------------------
[docs] def directory_log(self, path, quick=False):
"""List all files of a directory with VCS informations.
See :meth:`~.lib.vcs.Vcs.directory_log`.
"""
dirs, files = super(VcsMercurial, self).directory_log(path)
if quick:
return dirs, files
# Collect information
myui = VcsMercurialUiLog()
myui.setconfig('ui', 'verbose', True)
myui.infos = dict(
[(' %s/' % normpath(join(path, k[0])), ('', -1, '', ''))
for k in dirs] +
[(' %s ' % normpath(join(path, k[0])), ('', -1, '', ''))
for k in files])
try:
repo = hg.repository(myui, self.path)
except RepoError:
return dirs, files
try:
commands.log(myui, repo, self._log_path(path))
except (Abort, RepoError, HTTPError, URLError, OSError, TypeError):
return dirs, files
# Format information
for entry in dirs + files:
name = ' %s%s' % (
normpath(join(path, entry[0])),
entry[1] == 'folder' and '/' or ' ')
if name in myui.infos and myui.infos[name][1] > -1:
try:
entry[3] = datetime.strptime(
myui.infos[name][0], '%a %b %d %H:%M:%S %Y')
entry[4] = myui.infos[name][1]
entry[5] = myui.infos[name][2].partition('<')[0]\
.decode('utf8')
entry[6] = myui.infos[name][3].decode('utf8')
except ImportError:
pass
return dirs, files
# -------------------------------------------------------------------------
[docs] def add(self, path, handler=None):
"""Add all new files in ``path``.
See abstract function :meth:`~.lib.vcs.Vcs.add`.
"""
path = self.full_path(path)
if isinstance(path, TranslationString):
if handler is not None:
handler.report('error', path)
return path
if not exists(path):
return None
warn = commands.add(self._ui, self._repo(handler), path)
if warn:
if handler is not None:
handler.report('error', _('Rejected'))
return _('Rejected')
return None
# -------------------------------------------------------------------------
[docs] def rename(self, path, filename, new_name, handler):
"""Rename a file."""
filename = self.full_path(path, filename)
if isinstance(filename, TranslationString):
return handler.report('error', filename)
new_name = self.full_path(path, new_name)
if isinstance(new_name, TranslationString):
return handler.report('error', new_name)
if exists(new_name):
return handler.report('error', _('File already exists!'))
try:
commands.rename(self._ui, self._repo(handler), filename, new_name)
except (HTTPError, URLError, OSError, ValueError) as error:
return handler.report('error', error)
except (Abort, RepoError, IOError):
if exists(filename) and not exists(new_name):
renames(filename, new_name)
return None
# -------------------------------------------------------------------------
[docs] def remove(self, path, filename, handler):
"""Remove a file.
See abstract function :meth:`~.lib.vcs.Vcs.remove`.
"""
filename = self.full_path(path, filename)
if isinstance(filename, TranslationString):
return handler.report('error', filename)
if not exists(filename):
return None
try:
commands.remove(
self._ui, self._repo(handler), filename)
except (Abort, RepoError, HTTPError, URLError, OSError) as error:
return handler.report('error', error)
if exists(filename):
if isdir(filename):
rmtree(filename)
else:
remove(filename)
return None
# -------------------------------------------------------------------------
[docs] def revision(self, fullname, revision):
"""Retrieve a revision.
See :meth:`~.lib.vcs.Vcs.revision`.
"""
tmp_dir = mkdtemp(prefix='publiforge')
tmp_fil = join(tmp_dir, 'output')
err = commands.cat(
self._ui, self._repo(), fullname, rev=revision, output=tmp_fil)
if err:
rmtree(tmp_dir)
return None
with open(tmp_fil, 'rb') as hdl:
content = hdl.read()
rmtree(tmp_dir)
return content
# -------------------------------------------------------------------------
[docs] def diff(self, fullname, revision):
"""Return differences between revision ``revision`` and current
revision.
See :meth:`~.lib.vcs.Vcs.diff`.
"""
class Ui(VcsMercurialUi):
"""Ui class to retrieve file information."""
# pylint: disable = too-many-public-methods
# pylint: disable = bad-super-call
def __init__(self, src=None):
super(Ui, self).__init__(src)
self.diff = ''
def write(self, *args, **opts):
self.diff += ''.join(args)
myui = Ui()
repo = hg.repository(myui, self.path)
try:
commands.diff(myui, repo, fullname, rev=[revision])
except (RuntimeError, ParseError) as error:
return str(error)
try:
return myui.diff.decode('utf8')
except UnicodeDecodeError:
return myui.diff.decode('latin1')
return ''
# -------------------------------------------------------------------------
def _repo(self, handler=None):
"""Get a repository object.
:param handler: (:class:`~.lib.handler.Handler` instance, optional)
Owner of this action.
:return: (:class:`mercurial.repo`)
"""
if handler is not None:
self._ui.handler = handler
try:
return hg.repository(self._ui, self.path)
except RepoError as error:
if handler is not None:
handler.report('error', error)
# -------------------------------------------------------------------------
def _named_ui_repo(self, name, handler=None):
"""Get an UI and a repository for a specific name.
:param name: (string)
Name for VCS access.
:param handler: (:class:`~.lib.handler.Handler` instance, optional)
Owner of this action.
:return: (:class:`VcsMercurialUi`, :class:`mercurial.repo`)
"""
myui = VcsMercurialUi()
myui.handler = handler
myui.setconfig('ui', 'interactive', 'no')
myui.setconfig('ui', 'username', name.encode('utf8'))
myui.setconfig('web', 'cacerts', '')
repo = hg.repository(myui, self.path)
return myui, repo
# =============================================================================
[docs]class VcsLocal(VcsMercurial):
"""Version control system for local files."""
engine = 'local'
# -------------------------------------------------------------------------
[docs] def clone(self, handler=None):
"""Initialize a Mercurial repository and copy source.
See abstract function :meth:`~.lib.vcs.Vcs.clone`.
"""
self._ui.handler = handler
try:
commands.init(self._ui, self.path)
except RepoError as error:
if handler is not None:
handler.report('error', error)
repo = self._repo()
commands.add(self._ui, repo)
commands.commit(self._ui, repo, message='Initial commit')