Source code for publiforge.lib.processor.generator
"""This module generate something from nothing."""
from os.path import join, exists, dirname, basename, splitext
from ConfigParser import ConfigParser
from imp import load_source
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from ...i18n import _
from ...config import config_get
from ...utils import decrypt
from ...handler import HandlerManager
from ....models import DBSession
from ....models.storages import Storage
from .. import bin_directory
# =============================================================================
[docs]class Processor(object):
"""Main class for generator processor."""
# -------------------------------------------------------------------------
def __init__(self, build):
"""Constructor method.
:param build: (:class:`~.lib.build.agent.AgentBuild`)
Main Build object.
"""
# Attributes
self.build = build
self._handler_manager = None
self._handlers = {}
# Configuration
name = join(build.path, 'Processor', 'generator.ini')
if not exists(name):
build.stopped(_('File "generator.ini" is missing.'))
return
self._config = ConfigParser({
'here': dirname(name),
'bin': bin_directory(),
'processor': join(build.path, 'Processor'),
'stgpath': build.data_path,
'lang': build.context['lang']})
self._config.read(name)
# -------------------------------------------------------------------------
[docs] def start(self):
"""Start the processor."""
if self.build.stopped():
return
# Load script
filename = self.config('Generation', 'process')
if not filename:
self.build.stopped(_('Option [Generation]/process is missing.'))
return
if not exists(filename):
self.build.stopped(
_('Unknown file "${n}".', {'n': basename(filename)}))
return
module = load_source(splitext(basename(filename))[0], filename)
# Execute script
module.main(self)
# -------------------------------------------------------------------------
[docs] def message(self, text):
"""Add a message in ``build.result['values']``.
:param text: (string or :class:`pyramid.i18n.TranslationString`)
Message text.
"""
if 'values' not in self.build.result:
self.build.result['values'] = []
self.build.result['values'].append(self.build.translate(text))
# -------------------------------------------------------------------------
[docs] def storage_handler(self, storage_id, storage=None):
"""Update environment for storage ``storage_id`` with handler and user
information and return the current storage handler.
:param storage_id: (string)
Storage ID.
:param storage: (:class:`~.models.storages.Storage` instance,
optional).
:return: (tuple or ``None``)
A tuple such as
``(storage_handler, (vcs_user_id, vcs_password, user_name))``.
"""
if not self.build.context['local']:
self.build.stopped(
_('This processor does not work on a pure agent.'))
return None
# Get handler manager
if self._handler_manager is None:
cache_manager = CacheManager(
**parse_cache_config_options(self.build.settings))
self._handler_manager = HandlerManager(
self.build.settings, cache_manager)
# Already used?
if storage_id in self._handlers and \
storage_id in self._handler_manager.currently_managed():
return self._handlers[storage_id]
# Create
if storage is None:
storage = DBSession.query(Storage).filter_by(
storage_id=storage_id).first()
if storage is None:
self.build.stopped(_('Unknown storage "${n}".', {'n': storage_id}))
return None
self._handlers[storage_id] = (
self._handler_manager.get_handler(storage_id, storage),
(storage.vcs_user,
decrypt(storage.vcs_password,
self.build.settings.get('encryption', '-')),
self.build.context['name']))
return self._handlers[storage_id]
# -------------------------------------------------------------------------
[docs] def config(self, section, option, default=None):
"""Retrieve a value from a configuration object.
:param section: (string)
Section name.
:param option: (string)
Option name.
:param default: (string, optional)
Default value
:return: (string)
Read value or default value.
"""
return config_get(self._config, section, option, default)