Source code for markovchain.cli.image

from argparse import FileType
from sys import stderr
from os import replace, remove, path
from shutil import copyfile
from functools import reduce
from itertools import islice
from PIL import Image

from ..storage import JsonStorage, SqliteStorage
from ..image import MarkovImage
from ..util import ObjectWrapper, truncate
from .util import (
    tqdm, load, save, infiles, outfiles as _outfiles,
    check_output_format, JSON, SQLITE,
    BAR_FORMAT, BAR_DESC_SIZE,
    save_image
)
from .util import cmd_settings # pylint:disable=unused-import


[docs]def create_arg_parser(parent): """Create command subparsers. Parameters ---------- parent : `argparse.ArgumentParser` Command parser. """ arg1 = parent.add_subparsers(dest='command') arg2 = arg1.add_parser('create') arg2.add_argument('-P', '--progress', action='store_true', help='show progress bar') arg2.add_argument('-s', '--settings', type=FileType('r'), default=None, help='settings json file') arg2.add_argument('-o', '--output', default=None, help='output file (default: stdout)') arg2.add_argument('input', nargs='*', help='input file') arg2 = arg1.add_parser('update') arg2.add_argument('-P', '--progress', action='store_true', help='show progress bar') arg2.add_argument('-s', '--settings', type=FileType('r'), default=None, help='settings json file') arg2.add_argument('-o', '--output', default=None, help='output file (default: rewrite state file)') arg2.add_argument('state', help='state file') arg2.add_argument('input', nargs='*', help='input file') arg2 = arg1.add_parser('settings') arg2.add_argument('state', help='state file') arg2 = arg1.add_parser('generate') arg2.add_argument('-P', '--progress', action='store_true', help='show progress bar') arg2.add_argument('-s', '--settings', type=FileType('r'), default=None, help='settings json file') arg2.add_argument('-ss', '--state-size', type=int, nargs='+', default=None, help='generator state sizes') arg2.add_argument('-S', '--size', metavar=('WIDTH', 'HEIGHT'), type=int, nargs=2, default=None, help='image size (default: <scanner.resize>)') arg2.add_argument('-l', '--level', type=int, default=None, help='image levels (default: <scanner.levels>)') arg2.add_argument('-c', '--count', type=int, default=1, help='generated image count (default: %(default)s)') arg2.add_argument('state', help='state file') arg2.add_argument('output', help='output file name format string') arg2 = arg1.add_parser('filter') arg2.add_argument('-P', '--progress', action='store_true', help='show progress bar') arg2.add_argument('-t', '--type', choices=('json', 'sqlite'), default='json', help='generator type (default: %(default)s)') arg2.add_argument('-s', '--settings', type=FileType('r'), default=None, help='settings json file') arg2.add_argument('-S', '--state', default=None, help='state file') arg2.add_argument('-ss', '--state-size', type=int, nargs='+', default=None, help='generator state sizes') arg2.add_argument('-l', '--level', type=int, default=1, help='filter start level (default: %(default)s)') arg2.add_argument('-c', '--count', type=int, default=1, help='generated image count (default: %(default)s)') arg2.add_argument('input', help='input image') arg2.add_argument('output', help='output file name format string')
[docs]class TraversalProgressWrapper(ObjectWrapper): # pylint: disable=too-few-public-methods """Traversal object wrapper. Shows image traversal progress. Attributes ---------- pbar_parent : `tqdm.tqdm` Parent progress bar. """
[docs] def __init__(self, obj, channels, parent=None): super().__init__(obj) self.pbar_parent = parent self.channels = channels
[docs] def __call__(self, width, height, ends=True): size = width * height levels = self.pbar_parent.total // len(self.channels) calls = self.pbar_parent.n level = calls % levels + 1 channel = self.channels[calls // levels] title = 'Level %d%s' % (level, channel) pbar = tqdm(total=size, desc=title, leave=False, unit='px', bar_format=BAR_FORMAT, dynamic_ncols=True) try: for xy in super().__call__(width, height, ends): if xy is not None: pbar.update(1) yield xy finally: pbar.close() self.pbar_parent.update(1)
[docs]def read(fnames, markov, progress, leave=True): """Read data files and update a generator. Parameters ---------- fnames : `list` of `str` File paths. markov : `markovchain.base.MarkovBase` Generator to update. progress : `bool` Show progress bar. leave : `bool`, optional Leave progress bars (default: `True`). """ pbar = None channels = markov.imgtype.channels tr = markov.scanner.traversal if progress and not isinstance(tr[0], TraversalProgressWrapper): tr[0] = TraversalProgressWrapper(tr[0], channels) tr = tr[0] try: with infiles(fnames, progress, leave) as fnames: for fname in fnames: if progress: title = truncate(fname, BAR_DESC_SIZE - 1, False) pbar = tqdm( total=markov.levels * len(channels), desc=title, leave=False, unit='lvl', bar_format=BAR_FORMAT, dynamic_ncols=True ) tr.pbar_parent = pbar markov.data(Image.open(fname), False) if progress: pbar.close() finally: if pbar is not None: pbar.close()
[docs]def outfiles(markov, fmt, nfiles, progress, start=0): """Get output file paths. Parameters ---------- markov : `markovchain.base.MarkovBase` Markov chain generator. fmt : `str` File path format string. nfiles : `int` Number of files. progress : `bool` Show progress bars. start : `int`, optional Initial image level (default: 0). Returns ------- `generator` of `str` Output file paths. """ pbar = None channels = markov.imgtype.channels tr = markov.scanner.traversal if progress and not isinstance(tr[0], TraversalProgressWrapper): tr[0] = TraversalProgressWrapper(tr[0], channels) tr = tr[0] try: with _outfiles(fmt, nfiles, progress) as fnames: for fname in fnames: if progress: title = truncate(fname, BAR_DESC_SIZE - 1, False) pbar = tqdm( initial=start * len(channels), total=markov.levels * len(channels), desc=title, leave=False, unit='lvl', bar_format=BAR_FORMAT, dynamic_ncols=True ) tr.pbar_parent = pbar yield fname if progress: pbar.close() pbar = None finally: if pbar is not None: pbar.close()
[docs]def cmd_create(args): """Create a generator. Parameters ---------- args : `argparse.Namespace` Command arguments. """ if args.type == SQLITE: if path.exists(args.output): remove(args.output) storage = SqliteStorage(db=args.output, settings=args.settings) else: storage = JsonStorage(settings=args.settings) markov = MarkovImage.from_storage(storage) read(args.input, markov, args.progress) save(markov, args.output, args)
[docs]def cmd_update(args): """Update a generator. Parameters ---------- args : `argparse.Namespace` Command arguments. """ if args.type == SQLITE and args.output is not None: copyfile(args.state, args.output) args.state = args.output markov = load(MarkovImage, args.state, args) read(args.input, markov, args.progress) if args.output is None: if args.type == SQLITE: save(markov, None, args) elif args.type == JSON: name, ext = path.splitext(args.state) tmp = name + '.tmp' + ext save(markov, tmp, args) replace(tmp, args.state) else: save(markov, args.output, args)
[docs]def cmd_generate(args): """Generate images. Parameters ---------- args : `argparse.Namespace` Command arguments. """ check_output_format(args.output, args.count) markov = load(MarkovImage, args.state, args) if args.size is None: if markov.scanner.resize is None: print('Unknown output image size', file=stderr) exit(1) width, height = markov.scanner.resize else: width, height = args.size if args.level is None: scale = markov.scanner.min_size else: scale = reduce( lambda x, y: x * y, islice(markov.scanner.level_scale, 0, args.level - 1), 1 ) width, height = width // scale, height // scale markov.scanner.traversal[0].show_progress = args.progress for fname in outfiles(markov, args.output, args.count, args.progress): img = markov( width, height, state_size=args.state_size, levels=args.level ) save_image(img, fname)
[docs]def cmd_filter(args): """Filter an image. Parameters ---------- args : `argparse.Namespace` Command arguments. """ check_output_format(args.output, args.count) img = Image.open(args.input) width, height = img.size if args.state is not None: markov = load(MarkovImage, args.state, args) else: args.state = () if args.type == JSON: storage = JsonStorage(settings=args.settings) else: storage = SqliteStorage(settings=args.settings) markov = MarkovImage.from_storage(storage) read([args.input], markov, args.progress, False) args.level = min(args.level, markov.levels - 1) - 1 if args.level < 0: args.level = -1 scale = markov.scanner.min_size width, height = width // scale, height // scale start = None else: scale = reduce( lambda x, y: x * y, islice(markov.scanner.level_scale, args.level, markov.levels), 1 ) width, height = width // scale, height // scale start = img.resize((width, height), markov.scanner.scale) for fname in outfiles(markov, args.output, args.count, args.progress, args.level + 1): img = markov( width, height, state_size=args.state_size, start_level=args.level, start_image=start ) save_image(img, fname)