Source code for publiforge.lib.processor.leprisme.containers

"""Container factory management."""

import re
import zipfile
from os import walk, remove
from os.path import exists, join, relpath, isdir, basename, dirname, splitext
from shutil import rmtree
from lxml import etree

from ...i18n import _
from ...utils import EXCLUDED_FILES, get_mime_type, make_id, execute
from ...xml import load_xml


REMOVE_PATTERN = r'(~|\.tmp)(\.\w{1,4})?$'


# =============================================================================
[docs]class ZipFactory(object): """Class for Zip container.""" uid = 'Zip' # ------------------------------------------------------------------------- def __init__(self, processor): """Constructor method. :param processor: (:class:`~.lib.processor.leprisme.Processor` object) Processor object on which it depends. """ self._processor = processor # -------------------------------------------------------------------------
[docs] def make(self, fid, output): """Make an ePub file. :param fid: (string) File identifier. :param output: (string) Full path to output directory. :return: (string) Full path to zip file. """ filename = join( output, self._processor.config('Output', 'format').format(fid=fid)) filename = '%s.zip' % splitext(filename)[0] zip_file = zipfile.ZipFile(filename, 'w', zipfile.ZIP_DEFLATED) keeptmp = self._processor.build.processing['variables'].get('keeptmp') exclude = re.compile(REMOVE_PATTERN) root = self._processor.config('container:%s' % self.uid, 'root', '') if not exists(join(output, root)): self._processor.build.stopped( _('unknown directory "${d}"', {'d': root}), 'a_error') return None root = join(output, root) for path, dirs, files in walk(root): for name in tuple(dirs): if name in EXCLUDED_FILES or exclude.search(name): dirs.remove(name) for name in files: if name not in EXCLUDED_FILES and not exclude.search(name) \ and not name.endswith('.zip'): fullname = join(path, name) zip_file.write(fullname, relpath(fullname, root)) if not keeptmp: if isdir(fullname): rmtree(fullname) else: remove(fullname) zip_file.close() return filename
# =============================================================================
[docs]class OcfFactory(object): """Class for Open Container Format (OCF) file.""" uid = 'OCF' # ------------------------------------------------------------------------- def __init__(self, processor): """Constructor method. :param processor: (:class:`~.lib.processor.leprisme.Processor` object) Processor object on which it depends. """ self._processor = processor # -------------------------------------------------------------------------
[docs] def make(self, fid, output): """Make an Open Container Format file and check its validity. :param fid: (string) File identifier. :param output: (string) Full path to output directory. :return: (string) Full path to ePub file. """ # Input directory root = self._processor.config('container:%s' % self.uid, 'root', '') if not exists(join(output, root)): self._processor.build.stopped( _('unknown directory "${d}"', {'d': root}), 'a_error') return None if not exists(join(output, root, 'mimetype')) \ or not exists(join(output, root, 'META-INF', 'container.xml')): self._processor.build.stopped( _('incorrect OCF structure'), 'a_error') return None root = join(output, root) # Update manifest manifest = self._processor.config( 'container:%s' % self.uid, 'manifest') if manifest and not self._update_manifest(root, manifest): return None # Create ZIP file exclude = re.compile(REMOVE_PATTERN) ocf_file = join( output, self._processor.config('Output', 'format').format(fid=fid)) zip_file = zipfile.ZipFile(ocf_file, 'w', zipfile.ZIP_DEFLATED) zip_file.write(join(root, 'mimetype'), 'mimetype', zipfile.ZIP_STORED) for path, dirs, files in walk(root): for name in tuple(dirs): if name in EXCLUDED_FILES or exclude.search(name): dirs.remove(name) for name in files: if name not in EXCLUDED_FILES and not exclude.search(name) \ and name != 'mimetype': zip_file.write( join(path, name), relpath(join(path, name), root)) zip_file.close() if self._processor.build.stopped(): return None # Check validity if self._check_validity(output, ocf_file): return ocf_file return None
# ------------------------------------------------------------------------- def _update_manifest(self, root, manifest): """Update file list in manifest tag. :param root: (string) Full path to root directory. :param manifest: (string) A string such as ``<relative_path_to_opf>:<xpath_to_manifest>``. :return: (boolean) ``True`` if succeeds. """ # Read manifest node # pylint: disable = too-many-locals opf_file = join(root, manifest.partition(':')[0]) tree = load_xml(opf_file) if isinstance(tree, basestring): self._processor.build.stopped(tree, 'a_error') return False root = dirname(opf_file) xpath = manifest.partition(':')[2].strip() manifest_elt = tree.xpath( xpath, namespaces={'opf': tree.getroot().tag.split('}')[0][1:]}) if len(manifest_elt) != 1: self._processor.build.stopped( _('${x}: incorrect XPATH', {'x': xpath}), 'a_error') return False manifest_elt = manifest_elt[0] # Browse declared files done = [basename(opf_file)] for elt in manifest_elt.iterchildren(tag=etree.Element): done.append(elt.get('href')) # Browse real files modified = False exclude = re.compile(REMOVE_PATTERN) for path, ignored_, files in walk(root): for name in files: relname = relpath(join(path, name), root) if relname in done or name in EXCLUDED_FILES \ or exclude.search(name): continue elt = etree.SubElement( manifest_elt, 'item', id='x_%s_%s' % ( make_id(splitext(name)[0], 'token'), splitext(name)[1][1:]), href=relname) elt.set('media-type', get_mime_type(join(path, name))[0]) elt.tail = '\n ' modified = True # Save modified file if modified: tree.write( opf_file, encoding='utf-8', xml_declaration=True, pretty_print=True) return True # ------------------------------------------------------------------------- def _check_validity(self, output, ocf_file): """Check OCF file validity. :param output: (string) Full path to ouput directory. :param ocf_file: (string) Full path to OCF file to check. :return: (boolean) ``True`` if succeeds. """ name = self._processor.config('container:%s' % self.uid, 'checker') if not name or not \ self._processor.build.processing['variables'].get('ocfcheck'): return True error = execute( [k.format(ocffile=ocf_file) for k in name.split()], output, 'idml' in name) if error[1]: remove(ocf_file) error = error[0] or error[1] error = error.replace(output, '').replace('\n', ' ') self._processor.build.stopped(error, 'a_error') return False return True