Source code for publiforge.lib.vcs.svn

# pylint: disable = no-name-in-module
"""Storage with Subversion Version Control System management."""

from logging import getLogger
from os import remove, listdir
from os.path import exists, getmtime, join, isdir, relpath
from shutil import rmtree
from datetime import datetime
from cStringIO import StringIO
from urllib import quote
from time import sleep

from subvertpy import SubversionException, ra, client
from subvertpy import AUTH_PARAM_DEFAULT_USERNAME, AUTH_PARAM_DEFAULT_PASSWORD
from subvertpy.ra import BusyException
from subvertpy.properties import PROP_REVISION_LOG, PROP_REVISION_AUTHOR
from subvertpy.properties import PROP_REVISION_DATE, time_from_cstring

from pyramid.i18n import TranslationString

from ...lib.i18n import _
from ...lib.utils import execute
from ...lib.vcs import Vcs


LOG = getLogger(__name__)


# =============================================================================
[docs]class VcsSubversion(Vcs): """Version control system with Subversion.""" engine = 'svn' # ------------------------------------------------------------------------- def __init__(self, path, url, user_id=None, password=None): """Constructor method.""" super(VcsSubversion, self).__init__(path, url, user_id, password) # pylint: disable = no-member providers = ra.get_platform_specific_client_providers() providers += [ ra.get_ssl_client_cert_file_provider(), ra.get_ssl_client_cert_pw_file_provider(), ra.get_ssl_server_trust_prompt_provider(self._get_server_trust), ra.get_username_prompt_provider(self._get_user, 0), ra.get_simple_prompt_provider(self._get_pass, 0)] self._auth = ra.Auth(providers) if self._user_id: self._auth.set_parameter( AUTH_PARAM_DEFAULT_USERNAME, self._user_id) if self._password: self._auth.set_parameter( AUTH_PARAM_DEFAULT_PASSWORD, self._password) # pylint: disable = c-extension-no-member self._client = client.Client(auth=self._auth) self._remote = None self._counter = 0 # -------------------------------------------------------------------------
[docs] def clone(self, handler=None): """Create a copy of an existing repository in a directory. See abstract function :meth:`~.lib.vcs.Vcs.clone`. """ try: self._client.checkout(self.url, self.path, 'HEAD') except (OSError, SubversionException) as error: if handler is not None: handler.report('error', error.args[0].decode('utf8')) return error.args[0].decode('utf8') return None
# -------------------------------------------------------------------------
[docs] def pull_update(self, handler=None): """Pull changes and update. See abstract function :meth:`~.lib.vcs.Vcs.pull_update`. """ # pylint: disable = no-member if not exists(self.path): return None try: self._client.update(self.path) except SubversionException as error: error = error.args[0].replace(self.path, '') if handler is not None: handler.report('error', error.decode('utf8')) return error.decode('utf8') return None
# -------------------------------------------------------------------------
[docs] def commit_push(self, message, user_id, password, name, handler=None): """Commit and push changes. See abstract method :meth:`~.lib.vcs.Vcs.commit_push`. """ if not exists(self.path): return None cmd = ['nice', 'svn', '-q', '--non-interactive', '--no-auth-cache'] if user_id: cmd += ['--username', user_id] if password: cmd += ['--password', password] cmd += ['commit', '-m', message or '-'] err = execute(cmd, self.path) if err[1]: err = err[0] or err[1] err = err.replace(self.path, '') if handler is not None: handler.report('error', err) return err return None
# -------------------------------------------------------------------------
[docs] def remove_untracked(self, path, handler): """Remove untracked files. See :meth:`~.lib.vcs.Vcs.remove_unrtacked`. """ pass
# -------------------------------------------------------------------------
[docs] def revert_all(self, handler): """Revert all files of the repository.""" if not exists(self.path): return cmd = ['nice', 'svn', '-q', '--non-interactive', '--no-auth-cache', 'revert', '--recursive', '.'] execute(cmd, self.path) try: self._client.update(self.path) except SubversionException: pass
# -------------------------------------------------------------------------
[docs] def backout(self, name, handler): """Reverse effect of earlier changeset. See :meth:`~.lib.vcs.Vcs.backout`. """ pass
# -------------------------------------------------------------------------
[docs] def recover(self, handler): """Recover from an interrupted commit or pull.""" if not exists(self.path): return cmd = ['nice', 'svn', '-q', '--non-interactive', '--no-auth-cache', 'cleanup'] execute(cmd, self.path)
# -------------------------------------------------------------------------
[docs] def last_change(self): """Return the last change on the repository. See :meth:`~.lib.vcs.Vcs.last_change`. """ try: self._set_remote() stat = self._remote.stat('.', self._remote.get_latest_revnum()) return ( datetime.fromtimestamp(long(stat['time']) / 1000000), stat['created_rev'], stat['last_author']) except (SubversionException, ValueError, BusyException): return datetime.fromtimestamp(getmtime(self.path)), '-', '-'
# -------------------------------------------------------------------------
[docs] def log(self, path, filename, limit=1): """show revision history of file ``filename``. See :meth:`~.lib.vcs.Vcs.log`. """ _log = [] relname = join(path, filename).encode('utf8') def _log_printer(changed_paths, rev, revprops, has_children=None): """Catch log information.""" # pylint: disable = W0613 info = dict(revprops.items()) _log.append(( datetime.fromtimestamp( (time_from_cstring(info[PROP_REVISION_DATE])) / 1000000), rev, info[PROP_REVISION_AUTHOR].decode('utf8'), info[PROP_REVISION_LOG].decode('utf8'))) try: self._set_remote() self._remote.get_log( _log_printer, paths=relname, start=self._remote.get_latest_revnum(), end=0, limit=limit) except BusyException: if self._counter < 10: sleep(1) self._counter += 1 return self.log(path, filename, limit) self._counter = 0 return ((datetime.fromtimestamp( getmtime(self.full_path(path, filename))), '-', '-', '-'),) except SubversionException: self._counter = 0 return ((datetime.fromtimestamp( getmtime(self.full_path(path, filename))), '-', '-', '-'),) self._counter = 0 return _log
# -------------------------------------------------------------------------
[docs] def add(self, path, handler=None): """Add all new files in ``path``. See abstract function :meth:`~.lib.vcs.Vcs.add`. """ fullpath = self.full_path(path) if isinstance(fullpath, TranslationString): if handler is not None: handler.report('error', fullpath) return fullpath for name in listdir(fullpath): if name != '.svn': try: self._client.add(join(fullpath, name).encode('utf8')) except SubversionException: pass return None
# -------------------------------------------------------------------------
[docs] def mkdir(self, path, name, handler=None): """Make the directroy ``name``. See abstract function :meth:`~.lib.vcs.Vcs.mkdir`. """ fullname = self.full_path(path, name) if isinstance(fullname, TranslationString): if handler is not None: handler.report('error', fullname) return fullname if not exists(fullname): try: self._client.mkdir(fullname) except SubversionException as error: error = error.args[0].replace(self.path, '').decode('utf8') if handler is not None: handler.report('error', error) return error return None
# -------------------------------------------------------------------------
[docs] def rename(self, path, filename, new_name, handler): """Rename a file.""" fullname = self.full_path(path, filename) if isinstance(fullname, TranslationString): return handler.report('error', fullname) new_fullname = self.full_path(path, new_name) if isinstance(new_fullname, TranslationString): return handler.report('error', new_fullname) if exists(new_fullname): return handler.report('error', _('File already exists!')) try: self._client.copy(fullname, new_fullname) self._client.delete(fullname) except SubversionException as error: return handler.report( 'error', error.args[0].replace(self.path, '').decode('utf8')) return None
# -------------------------------------------------------------------------
[docs] def remove(self, path, filename, handler): """Remove a file. See abstract function :meth:`~.lib.vcs.Vcs.remove`. """ fullname = self.full_path(path, filename) if isinstance(fullname, TranslationString): return handler.report('error', fullname) if exists(fullname): try: self._client.delete(fullname) except SubversionException as error: if isdir(fullname): rmtree(fullname) elif exists(fullname): remove(fullname) return handler.report( 'error', error.args[0].replace(self.path, '').decode('utf8')) return None
# -------------------------------------------------------------------------
[docs] def revision(self, fullname, revision): """Retrieve a revision. See :meth:`~.lib.vcs.Vcs.revision`. """ out = StringIO() relname = relpath(fullname, self.path) try: self._set_remote() self._remote.get_file(relname, out, int(revision)) except SubversionException as error: return error.args[0].replace(self.path, '').decode('utf8') content = out.getvalue() out.close() return content
# -------------------------------------------------------------------------
[docs] def diff(self, fullname, revision): """Return differences between revision ``revision`` and current revision. See :meth:`~.lib.vcs.Vcs.diff`. """ relname = relpath(fullname, self.path) try: self._set_remote() stat = self._remote.stat(relname, self._remote.get_latest_revnum()) except SubversionException as error: return error.args[0].replace(self.path, '').decode('utf8') url = self._path2url(relname) try: out, err = self._client.diff( int(revision), int(stat['created_rev']), url, url) except SubversionException as error: return error.args[0].replace(self.path, '').decode('utf8') error = err.read() diff = out.read() err.close() out.close() if error: # pylint: disable = no-member return error.replace(self.path, '').decode('utf8') return diff.decode('utf8')
# ------------------------------------------------------------------------- def _set_remote(self): """Set the Subversion remote access.""" if self._remote is not None: return self._remote = ra.RemoteAccess( url=self.url, client_string_func=self._get_client_string, auth=self._auth) # ------------------------------------------------------------------------- @classmethod def _get_client_string(cls): """Return a client identification string.""" return 'publiforge' # ------------------------------------------------------------------------- def _get_user(self, realm, may_save): """Call back function called to get a user name to access a repository. """ # pylint: disable = W0613 return self._user_id, False # ------------------------------------------------------------------------- def _get_pass(self, realm, username, may_save): """Call back function called to get a user name an password to access a repository. """ # pylint: disable = W0613 return self._user_id or username, self._password, False # ------------------------------------------------------------------------- @classmethod def _get_server_trust(cls, url, retry, certificate, may_save): """Call back function called to trust server.""" # pylint: disable = W0613 return True, False # ------------------------------------------------------------------------- def _path2url(self, path): """Build svn URL for path, URL-escaping path.""" if not path or path == '.': return self.url assert path[0] != '/', path return '/'.join((self.url, quote(path).rstrip('/'),))