Source code for publiforge.lib.processor.leprisme.iniscript
"""INI scripts management."""
from os import walk, rename
from os.path import join, exists, dirname, basename, splitext, isfile, isabs
from os.path import normpath
from shlex import split
from ConfigParser import ConfigParser
import codecs
from ...i18n import _
from ...config import config_get, config_get_list
from ...utils import EXCLUDED_FILES, execute
from .. import bin_directory
MEDIA_EXT = {
'image': ('svg', 'png', 'tif', 'tiff', 'jpg', 'jpeg', 'pdf', 'eps', 'gif'),
'audio': ('wav', 'ogg', 'aac', 'ac3', 'mp3'),
'video': ('dv', 'mpg', 'ogv', 'mp4', 'mov', 'avi', 'webm', 'flv')}
# =============================================================================
[docs]class IniScript(object):
"""Class for INI script managment."""
# -------------------------------------------------------------------------
def __init__(self, processor):
"""Constructor method.
:param processor: (:class:`~.lib.processor.leprisme.Processor` object)
Processor object on which it depends.
"""
self._processor = processor
# -------------------------------------------------------------------------
[docs] def convert_media(self, filename, ini_file, done_tag, percent):
"""Convert a media (image, audio or video).
:param filename: (string)
Relative path to the original file to transform.
:param ini_file: (string)
Path to INI file.
:param done_tag: (string)
Tag to mark the once the conversion is done.
:param percent: (integer)
Percentage of progress.
:return: (boolean)
"""
# Read INI file
config = ConfigParser({
'here': dirname(ini_file),
'bin': bin_directory(),
'output': self._processor.output,
'processor': join(self._processor.build.path, 'Processor'),
'donetag': done_tag,
'lang': self._processor.build.context['lang'],
'filepath':
join(self._processor.build.data_path, dirname(filename)),
'stgpath': self._processor.build.data_path,
'id': '{id}', 'ext': '{ext}', 'srcext': '{srcext}',
'source': '{source}', 'sourcepath': '{sourcepath}',
'target': '{target}', 'targetpath': '{targetpath}'})
config.readfp(codecs.open(ini_file, 'r', 'utf8'))
# Compute full path to source
media_id = config_get(config, 'Source', 'id')
media_type = config_get(config, 'Source', 'type')
target = config_get(config, 'Target', 'file')
source = self.find_media(
media_type, media_id, config_get_list(config, 'Source', 'search'),
'{srcext}' not in target and target or None)
if not source or not exists(source):
self._processor.build.stopped(
_('${f}: (${t}) no source for "${i}"',
{'f': basename(ini_file)[:-5], 't': media_type,
'i': media_id}), 'a_error')
self._rename_ini(ini_file, done_tag)
return False
# Read full path to target
if not target:
self._processor.build.stopped(
_('${f}: (${t}) no target for "${i}"',
{'f': basename(ini_file)[:-5], 't': media_type,
'i': media_id}), 'a_error')
self._rename_ini(ini_file, done_tag)
return False
target = target.format(srcext=splitext(source)[1][1:])
# Try to retrieve
if self._get_in_attic(config, media_id, source, target):
self._processor.build.log(
_('${f}: (${t}) retrieving of "${i}"',
{'f': basename(ini_file)[:-5], 't': media_type,
'i': basename(target)}), step='a_build', percent=percent)
self._rename_ini(ini_file, done_tag)
return True
# Transform data
self._processor.build.log(
_('${f}: (${t}) creation of "${i}"',
{'f': basename(ini_file)[:-5], 't': media_type,
'i': basename(target)}), step='a_build', percent=percent)
section = 'Transformation:%s' % splitext(source)[1][1:]
if not config.has_section(section):
section = 'Transformation'
for step in range(100):
cmd = config_get(config, section, 'step.%d' % step)
if cmd:
cmd = cmd.format(
id=media_id, source=source, sourcepath=dirname(source),
target=target, targetpath=dirname(target))
error = execute(split(cmd.encode('utf8')), dirname(ini_file))
if error[1]:
self._processor.build.stopped(error[0], 'a_error')
self._processor.build.stopped(error[1])
self._rename_ini(ini_file, done_tag)
return False
if not exists(target):
self._processor.build.stopped(
_('${f}: (${t}) unable to make "${i}"',
{'f': basename(ini_file)[:-5], 't': media_type,
'i': media_id}), 'a_error')
self._rename_ini(ini_file, done_tag)
return False
self._rename_ini(ini_file, done_tag)
return True
# -------------------------------------------------------------------------
[docs] def post_execution(self, ini_file, target_file, done_tag):
"""Process post INI script.
:param ini_file: (string)
Path to INI file.
:param target_file: (string)
Path to target file.
:param done_tag: (string)
Tag to mark the once the conversion is done.
"""
config = ConfigParser({
'here': dirname(ini_file),
'bin': bin_directory(),
'output': self._processor.output,
'processor': join(self._processor.build.path, 'Processor'),
'donetag': done_tag,
'lang': self._processor.build.context['lang'],
'target': target_file})
config.readfp(codecs.open(ini_file, 'r', 'utf8'))
for step in range(100):
cmd = config_get(config, 'Transformation', 'step.%d' % step)
if cmd:
log = execute(
split(cmd.encode('utf8')), self._processor.output)
if log[1]:
self._processor.build.stopped(log[0], 'a_error')
self._processor.build.stopped(log[1])
self._rename_ini(ini_file, done_tag)
return
if log[0]:
try:
self._processor.build.log(log[0])
except UnicodeDecodeError:
pass
self._rename_ini(ini_file, done_tag)
# -------------------------------------------------------------------------
[docs] def find_media(self, media_type, media_id, patterns, target=None):
"""Look for a media.
:param media_type: (string)
Type of media (image, audio or video).
:param media_id: (string)
Media ID.
:param patterns: (list)
List of patterns to check.
:param target: (string, optional)
Full path to target file.
:return: (string)
Full path to source file or ``None``.
"""
# pylint: disable = too-many-locals
# Initialize
if media_type not in ('image', 'audio', 'video') or not media_id \
or not patterns:
return None
extensions = tuple(self._processor.config_list(
'Input', '%s.ext' % media_type, MEDIA_EXT[media_type]))
ext = target and splitext(target)[1][1:]
extensions = (ext,) + extensions if ext in extensions else extensions
resources = [join(self._processor.build.data_path, k) for k in
self._processor.build.pack['resources'] +
self._processor.build.processing['resources']]
# Look for file in search patterns
for pattern in [k for k in patterns if isabs(k)]:
name = self._find_media_extension(extensions, pattern)
if name is not None:
return name
patterns.remove(pattern)
# Look for the file in resource files
for resource in [k for k in resources if isfile(k)]:
name, ext = splitext(basename(resource))
if name == media_id and ext[1:] in extensions:
return resource
resources.remove(resource)
# Look for the file in resource directories
done = set()
for resource in resources:
for path, dirs, ignored_ in walk(resource):
for name in tuple(dirs):
if name in EXCLUDED_FILES or '~' in name:
dirs.remove(name)
if path in done:
continue
done.add(path)
for pattern in patterns:
name = self._find_media_extension(
extensions, join(path, pattern))
if name is not None:
return name
name = normpath(dirname(join(path, pattern)))
done.add(name)
return None
# -------------------------------------------------------------------------
@classmethod
def _find_media_extension(cls, extensions, absolute_pattern):
"""Look for the right extension for a media.
:param extensions: (list)
List of possible extensions.
:param absolute_pattern: (string)
Absolute pattern to build file name.
:return: (string)
The absolute path or ``None``.
"""
for ext in '{ext}' in absolute_pattern and extensions or '-':
name = absolute_pattern.format(ext=ext)
if isfile(name):
return normpath(name)
return None
# -------------------------------------------------------------------------
def _get_in_attic(self, config, media_id, source, target):
"""Try to retrieve the media from the attic.
:param config: (class:`ConfigParser.ConfigParser` instance)
Current configuration object.
:param media_id: (string)
Media ID.
:param source: (string)
Full path to source file.
:param target: (string)
Full path to target file.
:return: (boolean)
"""
if self._processor.build.processing['variables'].get('force'):
return False
dependencies = config_get(config, 'Target', 'dependencies', '')\
.format(id=media_id, source=source, sourcepath=dirname(source),
target=target, targetpath=dirname(target))
dependencies = [k.strip() for k in dependencies.split(',')] \
if dependencies else []
relations = config_get(config, 'Target', 'relations', '')\
.format(id=media_id, target=target, targetpath=dirname(target))
relations = [k.strip() for k in relations.split(',')] \
if relations else None
return self._processor.build.get_in_attic(
target, dependencies, relations)
# -------------------------------------------------------------------------
@classmethod
def _rename_ini(cls, ini_file, done_tag):
"""Rename INI file after processing."""
rename(
ini_file,
join(dirname(ini_file), '%s_%s' % (done_tag, basename(ini_file))))