"""Agent build management.
An *agent* is a Web service.
"""
from logging import getLogger
from os import listdir, makedirs
from os.path import exists, join, normpath, dirname, relpath
from os.path import getmtime, getatime
from threading import Thread
from time import time
from fnmatch import fnmatch
from shutil import rmtree, copy2
from glob import glob
import urllib
from lxml import etree
from pyramid.asset import abspath_from_asset_spec
from ..i18n import _, localizer
from ..config import settings_get_list
from ..utils import copy_content, camel_case
from ..xml import load_xml
LOG = getLogger(__name__)
# =============================================================================
[docs]class AgentBuildManager(object):
"""This class manages agent builds.
One instance of :class:`AgentBuildManager` is created during application
initialization. It is only used in agent mode. It is stored in application
registry.
``self._processors`` is a tuple such as ``(processor_dictionary, root_list,
available_list)``. ``processor_dictionary`` is a dictionary such as
``{processor_id: processor_path,...}``.
``self._fronts`` is a dictionary such as ``{front_id: password,...}``.
``self._builds`` is a dictionary of :class:`AgentBuild` objects.
``self._results`` is a dictionary of dictionaries such as ``{build_id:
result_dict}``. ``result_dict`` is a dictionary with following keys:
``status``, ``log``, ``expire``. According to build events, it can also
contains ``files``, ``values`` and ``error`` keys.
``self._results[build_id]['status']`` is one of the following strings:
``a_stop``, ``a_fatal`` or ``a_end``.
``self._results[build_id]['log']`` is a list of tuples such as
``(timestamp, step, percent, message)``.
"""
# pylint: disable = R0902
# -------------------------------------------------------------------------
def __init__(self, settings):
"""Constructor method.
:param settings: (dictionary)
Setting dictionary.
"""
# Attributes
self.settings = settings
self.translation_dirs = [
abspath_from_asset_spec(k)
for k in settings_get_list(settings, 'translation_dirs')]
self.build_ttl = int(settings.get('build.ttl', 1800))
self.result_ttl = int(settings.get('build.result_ttl', 604800))
self._concurrent = [
int(settings.get('front.synchronize', 1)),
int(settings.get('build.concurrent', 3))]
self._buildspace_ttl = int(settings.get('buildspace.ttl', 2678400))
self._syncs = {}
self._builds = {}
self._results = {}
# Processor list
self._processors = [
{},
tuple(settings_get_list(settings, 'processor.roots')),
tuple(settings_get_list(settings, 'processor.list'))]
# Authorized front list
self._fronts = {}
for idx in range(100):
if 'front.%d.uid' % idx in settings:
self._fronts[settings['front.%d.uid' % idx]] = \
settings.get('front.%d.password' % idx, '')
# Build directory cleaning
if settings.get('build.reset') == 'true' \
and exists(settings.get('build.root')):
rmtree(settings.get('build.root'))
# -------------------------------------------------------------------------
[docs] def agent_id(self):
"""Get agent unique ID."""
return self.settings['uid']
# -------------------------------------------------------------------------
[docs] def processor_list(self):
"""Refresh information and return a list of available processors and
the number of possible concurrent builds.
:return: (tuple)
A tuple such as ``(processor_list, concurrent)``.
"""
self._processors[0] = {}
self.add_processors(join(dirname(__file__), '..', '..', 'Processors'))
for path in self._processors[1]:
self.add_processors(path)
plist = []
for pid in self._processors[0]:
for pattern in self._processors[2]:
if pid not in plist and fnmatch(pid, pattern):
plist.append(pid)
break
return plist, self._concurrent[1]
# -------------------------------------------------------------------------
[docs] def add_processors(self, path):
"""Add all processors in path ``path``.
:param path: (string)
Where to look for processors.
"""
path = abspath_from_asset_spec(path)
if not exists(path):
return
for pid in listdir(path):
if exists(join(path, pid, 'processor.xml')):
self._processors[0][pid] = normpath(join(path, pid))
# -------------------------------------------------------------------------
[docs] def processor_path(self, processor_id):
"""Return processor path if exists.
:param processor_id: (string)
Processor ID.
:return: (string)
"""
return self._processors[0].get(processor_id)
# -------------------------------------------------------------------------
[docs] def processor_xml(self, processor_id):
"""Return processor XML if exists.
:param processor_id: (string)
Processor ID.
:return: (string)
"""
# Read main file
if processor_id not in self._processors[0]:
return ''
parser = etree.XMLParser(remove_blank_text=True, remove_comments=True)
relaxngs = {
'publiforge':
join(dirname(__file__), '..', '..', 'RelaxNG', 'publiforge.rng')}
tree = load_xml(
join(self._processors[0][processor_id], 'processor.xml'),
relaxngs=relaxngs, parser=parser)
if isinstance(tree, basestring):
return ''
# Read variable files
# pylint: disable = maybe-no-member
if tree.getroot().find('processor/variables/group-file') is not None:
variables_elt = etree.Element('variables')
for elt in tree.getroot().findall(
'processor/variables/group-file'):
path = elt.text.strip()
path = abspath_from_asset_spec(path) if ':' in path \
else join(self._processors[0][processor_id], path)
var_tree = load_xml(path, relaxngs=relaxngs, parser=parser)
if isinstance(var_tree, basestring):
return ''
for subelt in var_tree.getroot().findall('variables/group'):
variables_elt.append(subelt)
tree.getroot().find('processor').replace(
tree.getroot().find('processor/variables'), variables_elt)
return etree.tostring(tree, encoding='utf8')
# -------------------------------------------------------------------------
[docs] def front_list(self):
"""Return a list of authorized fronts."""
return self._fronts.keys()
# -------------------------------------------------------------------------
[docs] def authorized_front(self, front_id, password):
"""``True`` if ``front_id`` is authorized to use agent services."""
return front_id in self._fronts and self._fronts[front_id] == password
# -------------------------------------------------------------------------
[docs] def activity(self):
"""Return the global activity i.e. the number of active or pending
builds.
:return: (integer)
Number of builds or -1 if the maximum number is reached.
"""
if len(self._syncs) >= self._concurrent[0] \
or len(self._builds) + len(self._syncs) >= self._concurrent[1]:
return -1
return len(self._builds)
# -------------------------------------------------------------------------
[docs] def synchronizing(self, build_id, lock=True):
"""Data synchronization for the build ``build_id``.
:param build_id: (string)
Build ID.
:param lock: (boolean, default=True)
If ``True`` try to add build ``build_id`` in the list of
synchronizations.
:return: (boolean)
"""
self._cleanup()
if not lock:
if build_id in self._syncs:
del self._syncs[build_id]
return True
if len(self._syncs) >= self._concurrent[0] \
or len(self._builds) + len(self._syncs) >= self._concurrent[1]:
return False
self._syncs[build_id] = time() + self.build_ttl
return True
# -------------------------------------------------------------------------
[docs] def start_build(self, build_id, context, processing, pack, end_url=None):
"""Create a build, add it in ``self._builds`` dictionary and try to
start it.
:param build_id: (string)
Build ID.
:param context: (dictionary)
See :class:`~.front.FrontBuildManager`
:meth:`~.front.FrontBuildManager.call` method.
:param processing: (dictionary)
A processing dictionary.
:param pack: (dictionary)
A pack dictionary.
:param end_url: (string, optional)
URL to call to complete the build.
:return: (:class:`~AgentBuild` or ``None``)
"""
self._cleanup()
self._cleanup_directories()
if build_id in self._builds:
return self._builds[build_id]
self.synchronizing(build_id, False)
if build_id in self._results:
del self._results[build_id]
if len(self._syncs) >= self._concurrent[0] \
or len(self._builds) + len(self._syncs) >= self._concurrent[1]:
return None
self._builds[build_id] = AgentBuild(
self, build_id, context, processing, pack, end_url)
self._builds[build_id].start()
return self._builds[build_id]
# -------------------------------------------------------------------------
[docs] def progress(self, build_id):
"""Return the progress of build.
:param build_id: (string)
Build ID.
:return: (tuple)
A tuple such as ``(<step>, <percent>, <message>)``.
The step ``<step>`` is one of the following:
* ``a_start``: starting
* ``a_env``: importing processor environment
* ``a_build``: building
* ``a_warn``: a warning occurred
* ``a_error``: an error occurred
* ``a_fatal``: a fatal error occurred
* ``a_stop``: stopping
* ``a_end``: successfully completed
* ``none``: unknown or not in progress build
"""
self._cleanup()
if build_id in self._builds and self._builds[build_id].result['log']:
return self._builds[build_id].result['log'][-1][1:]
elif build_id in self._results:
return self._results[build_id]['log'][-1][1:]
return 'none', 0, ''
# -------------------------------------------------------------------------
[docs] def stop(self, build_id):
"""Stop a build.
:param build_id: (string)
Build ID.
"""
self._cleanup()
self.synchronizing(build_id, False)
if build_id in self._builds:
self._builds[build_id].stop()
return ''
# -------------------------------------------------------------------------
[docs] def result(self, build_id):
"""Return the result of build.
:param build_id: (string)
Build ID.
:return: (dictionary)
``self._result`` or ``{'status': 'none'}``.
The status ``<status>`` is one of the following:
* ``a_stop``: stopped
* ``a_fatal``: a fatal error occurred
* ``a_end``: successfuly completed
* ``none``: unknown build
"""
self._cleanup()
return self._results[build_id] if build_id in self._results \
else {'status': 'none', 'log': [], 'message': ''}
# -------------------------------------------------------------------------
def _cleanup(self):
"""Delete completed builds and expired results and kill long builds."""
# Build -> result or stop
now = time()
# pylint: disable = consider-iterating-dictionary
for build_id in self._builds.keys():
if self._builds[build_id].stopped():
self._builds[build_id].result['expire'] = now + self.result_ttl
self._results[build_id] = self._builds[build_id].result
del self._builds[build_id]
elif self._builds[build_id].expire < now:
self._builds[build_id].stopped(_('timeout'))
# Remove old results
for build_id in self._results.keys():
if now > self._results[build_id]['expire']:
del self._results[build_id]
# Remove too long synchronisations
for build_id in self._syncs.keys():
if self._syncs[build_id] < now:
del self._syncs[build_id]
# -------------------------------------------------------------------------
def _cleanup_directories(self):
"""Remove old directories."""
# Clean up buid path
now = time()
if exists(self.settings['build.root']):
for name in listdir(self.settings['build.root']):
path = join(self.settings['build.root'], name)
if name not in self._builds and exists(path) \
and getatime(path) + self.result_ttl < now:
rmtree(path)
# Clean up buidspace path
if self.settings.get('buildspace.root') is not None \
and exists(self.settings['buildspace.root']):
for name in listdir(self.settings['buildspace.root']):
path = join(self.settings['buildspace.root'], name)
if exists(path) \
and getatime(path) + self._buildspace_ttl < now:
rmtree(path)
# =============================================================================
[docs]class AgentBuild(object):
"""This class manages one local build.
``self.result`` is a dictionary with the following keys: ``status``,
``message``, ``log``. Log entry is a list of tuples such as ``(<timestamp>,
<step>, <percent>, <message>)``. Processor can add keys like: ``files``,
``values``.
"""
# pylint: disable = too-many-instance-attributes
# -------------------------------------------------------------------------
def __init__(self, build_manager, build_id, context, processing, pack,
end_url):
"""Constructor method.
:param build_manager: (:class:`AgentBuildManager` instance)
Application :class:`AgentBuildManager` object.
:param build_id: (string)
Build ID.
:param context: (dictionary)
See :class:`~publiforge.lib.build.front.FrontBuildManager`
:meth:`~publiforge.lib.build.front.FrontBuildManager.call`
method.
:param processing: (dictionary)
A processing dictionary.
:param pack: (dictionary)
A pack dictionary.
:param end_url: (string, optional)
URL to call to complete the build.
"""
# pylint: disable = too-many-arguments
self._manager = build_manager
self.settings = build_manager.settings
self.build_id = build_id
self.context = context
self.path = join(
build_manager.settings['build.root'], camel_case(build_id))
self.data_path = build_manager.settings.get('storage.root') \
if context['local'] else build_manager.settings['buildspace.root']
self.result = {
'status': 'none',
'log': [(time(), 'a_wait', 1, self.translate(_('waiting...')))],
'message': ''}
self.expire = time() + build_manager.build_ttl
self.processing = processing
self.pack = pack
self._end_url = end_url
self._thread = None
if not exists(self.path):
makedirs(self.path)
# -------------------------------------------------------------------------
[docs] def start(self):
"""Start the processing."""
# Already started
if self._thread is not None:
self.stopped(_('Already in progress'), 'a_warn')
return
self.result['log'] = [(
time(), 'a_start', 1, self.translate(_('agent build startup')))]
# Create build directory
path = join(self.path, 'Attic')
if not exists(path):
makedirs(path)
path = join(self.path, 'Processor')
if self.settings.get('build.develop') == 'true' and exists(path):
rmtree(path)
if not exists(path):
makedirs(path)
if not self._import_processor(self.processing['processor_id']):
self.stopped(self.result['message'])
return
self.log(_('processor environment installed'), 'a_env', 1)
path = join(self.path, 'Output')
if exists(path):
try:
rmtree(path)
except OSError as error:
self.stopped(error)
return
makedirs(path)
# Build directly...
if self._end_url is None:
self._thread_processor()
return
# ...or in a thread
self._thread = Thread(target=self._thread_processor)
self._thread.start()
# -------------------------------------------------------------------------
[docs] def stop(self):
"""Stop building."""
if not self.stopped():
self.result['status'] = 'a_stop'
self.result['message'] = self.translate(_('Stop by user'))
self.log(_('stopped'), 'a_stop', 90)
# -------------------------------------------------------------------------
[docs] def stopped(self, error=None, level='a_fatal'):
"""Check if there is a fatal error and if the build is stopped.
:param str error: (optional)
Error message.
:param str level: (default='a_fatal')
Error level: ``a_warn``, ``a_error`` or ``a_fatal``.
:rtype: bool
:return:
``True`` if it is stopped.
"""
if error:
if level == 'a_fatal':
self.result['status'] = level
self.result['message'] = self.translate(error)
self.log(error, level, 90)
return self.result['status'] in ('a_stop', 'a_fatal', 'a_end')
# -------------------------------------------------------------------------
[docs] def output2attic(self):
"""Copy the content of ``Output`` directory to ``Attic`` directory."""
if not self.stopped():
copy_content(join(self.path, 'Output'), join(self.path, 'Attic'))
# -------------------------------------------------------------------------
[docs] def get_in_attic(self, target, dependencies=None, relations=None):
"""Try to retrieve the last version of a file in the attic and copy it
in the ``Output`` directory.
:param target: (string)
Path to target file.
:param dependencies: (list, optional)
List of files to compare with to known if it is necessary to
process.
:param relations: (list, optional)
List of closely related files to retrieve with the target file.
:return: (boolean)
``True`` if the operation succeeded.
"""
if self.processing['variables'].get('force'):
return False
# Get file in attic
archive = join(
self.path, 'Attic', relpath(target, join(self.path, 'Output')))
if not exists(archive):
return False
# Get the more recent date in dependencies
last_one = 0
if dependencies is not None:
for pattern in dependencies:
if '*' not in pattern and '?' not in pattern \
and not exists(pattern):
return False
for filename in glob(pattern):
file_time = getmtime(filename)
if last_one < file_time:
last_one = file_time
# Nothing to do
if dependencies is not None \
and (not last_one or getmtime(archive) < last_one):
return False
# Retrieve related files
relations = relations if relations is not None else []
for pattern in relations:
pattern = join(
self.path, 'Attic',
relpath(pattern, join(self.path, 'Output')))
if '*' not in pattern and '?' not in pattern \
and not exists(pattern):
return False
for filename in glob(pattern):
relation = join(
self.path, 'Output',
relpath(filename, join(self.path, 'Attic')))
copy2(filename, relation)
# Copy archive in output directory
if not exists(dirname(target)):
makedirs(dirname(target))
copy2(archive, target)
return True
# -------------------------------------------------------------------------
[docs] def message(self, text):
"""Add a message in ``result['values']``.
:param text: (string or :class:`pyramid.i18n.TranslationString`)
Message text.
"""
if 'values' not in self.result:
self.result['values'] = []
self.result['values'].append(self.translate(text))
# -------------------------------------------------------------------------
[docs] def log(self, message, step=None, percent=None):
"""Append an entry to ``result['log']``.
:param message: (string)
Message to write in log.
:param step: (string, optional)
If not ``None``, progress is updated.
:param percent: (int, optional)
Percent of progress for step ``<step>``.
"""
if percent is None:
percent = self.result['log'][-1][2] if step is None else 0
if step is None:
step = self.result['log'][-1][1]
self.result['log'].append(
(time(), step, percent, self.translate(message)))
if self.context['local'] and 'request' not in self.context:
{'a_warn': LOG.warning,
'a_error': LOG.error,
'a_fatal': LOG.critical}.get(
step, LOG.info)(self.result['log'][-1][3])
# -------------------------------------------------------------------------
[docs] def translate(self, text):
"""Return ``text`` translated.
:param text: (string)
Text to translate.
"""
return localizer(
self.context['lang'],
self._manager.translation_dirs +
[join(dirname(__file__), '..', '..', 'Locale')]).translate(text)
# -------------------------------------------------------------------------
def _thread_processor(self):
"""Action in a thread to launch processor."""
# Find the processor
tree = etree.parse(join(self.path, 'Processor', 'processor.xml'))
module_name = tree.findtext('processor/module').strip()
try:
module = __import__(module_name, fromlist=['Processor'])
except ImportError as err:
self.stopped('%s: %s' % (module_name, err))
# Launch processor
if not self.stopped():
self.log(
_('start processing "${p}"', {'p': self.processing['label']}),
'a_build', 1)
self.expire = time() + self._manager.build_ttl
module.Processor(self).start()
if not self.stopped():
self.log(_('agent build completed'), 'a_end', 90)
self.result['status'] = 'a_end'
# Announce the end to the front
if not self.context['local']:
self._manager.synchronizing(self.build_id)
try:
response = urllib.urlopen(self._end_url)
except IOError as err:
self._manager.synchronizing(self.build_id, False)
return
response.close()
self._manager.synchronizing(self.build_id, False)
elif 'request' in self.context:
request = self.context['request']
request.registry['fbuild'].complete(
request, self.build_id, self._end_url.split('/')[-1])
# -------------------------------------------------------------------------
def _import_processor(self, processor_id):
"""Import processor, possibly with inheritance, in build directory.
:param processor_id: (string)
ID of processor to import.
:return: (boolean)
``True`` if it succeeds.
"""
# Find processor
src_path = self._manager.processor_path(processor_id)
if not src_path:
self.stopped(_('unknown processor "${p}"', {'p': processor_id}))
return False
# Read processor.xml file to check if other processors are needed
ancestors = etree.parse(join(src_path, 'processor.xml'))\
.findall('processor/ancestors/ancestor')
for ancestor in ancestors:
if not self._import_processor(ancestor.text.strip()):
return False
# Copy processor
copy_content(
src_path, join(self.path, 'Processor'),
exclude=('Templates', 'Variables', 'Scss'), force=True)
if exists(join(src_path, 'Templates')):
copy_content(
join(src_path, 'Templates'),
join(self.path, 'Processor', 'Templates', processor_id),
force=True)
return True