From 3566dbc9438935d7cc6951a882a9d2ca148fb58d Mon Sep 17 00:00:00 2001 From: Michael Schwarz Date: Wed, 20 Sep 2017 20:33:29 +0200 Subject: PEP8 --- support/lib/util.py | 197 ++++++++++++++++++++++++++++------------------------ 1 file changed, 105 insertions(+), 92 deletions(-) (limited to 'support/lib/util.py') diff --git a/support/lib/util.py b/support/lib/util.py index 8b38bc2..c00a5fe 100644 --- a/support/lib/util.py +++ b/support/lib/util.py @@ -1,117 +1,130 @@ -import sys, contextlib, subprocess, tempfile, shutil, re, os, inspect +import contextlib +import inspect +import os +import re +import shutil +import subprocess +import sys +import tempfile class UserError(Exception): - def __init__(self, message, *args): - super(UserError, self).__init__(message.format(*args)) + def __init__(self, message, *args): + super(UserError, self).__init__(message.format(*args)) def main(fn): - """Decorator for "main" functions. Decorates a function that should be called when the containing module is run as a script (e.g. via python -m ).""" - - frame = inspect.currentframe().f_back - - def wrapped_fn(*args, **kwargs): - try: - fn(*args, **kwargs) - except UserError as e: - print >> sys.stderr, 'Error:', e - sys.exit(1) - except KeyboardInterrupt: - sys.exit(2) - - if frame.f_globals['__name__'] == '__main__': - wrapped_fn(*sys.argv[1:]) - - # Allow the main function also to be called explicitly - return wrapped_fn + """ + Decorator for "main" functions. Decorates a function that should be + called when the containing module is run as a script (e.g. via python -m + ). + """ + frame = inspect.currentframe().f_back + + def wrapped_fn(*args, **kwargs): + try: + fn(*args, **kwargs) + except UserError as e: + print >> sys.stderr, 'Error:', e + sys.exit(1) + except KeyboardInterrupt: + sys.exit(2) + + if frame.f_globals['__name__'] == '__main__': + wrapped_fn(*sys.argv[1:]) + + # Allow the main function also to be called explicitly + return wrapped_fn def rename_atomic(source_path, target_path): - """ - Move the file at source_path to target_path. - - If both paths reside on the same device, os.rename() is used, otherwise the file is copied to a temporary name next to target_path and moved from there using os.rename(). - """ - - source_dir_stat = os.stat(os.path.dirname(source_path)) - target_dir_stat = os.stat(os.path.dirname(target_path)) - - if source_dir_stat.st_dev == target_dir_stat.st_dev: - os.rename(source_path, target_path) - else: - temp_path = target_path + '~' - - shutil.copyfile(source_path, temp_path) - os.rename(temp_path, target_path) + """ + Move the file at source_path to target_path. + + If both paths reside on the same device, os.rename() is used, otherwise + the file is copied to a temporary name next to target_path and moved from + there using os.rename(). + """ + source_dir_stat = os.stat(os.path.dirname(source_path)) + target_dir_stat = os.stat(os.path.dirname(target_path)) + + if source_dir_stat.st_dev == target_dir_stat.st_dev: + os.rename(source_path, target_path) + else: + temp_path = target_path + '~' + + shutil.copyfile(source_path, temp_path) + os.rename(temp_path, target_path) @contextlib.contextmanager def TemporaryDirectory(): - dir = tempfile.mkdtemp() - - try: - yield dir - finally: - shutil.rmtree(dir) + dir = tempfile.mkdtemp() + + try: + yield dir + finally: + shutil.rmtree(dir) @contextlib.contextmanager -def command_context(args, remove_env = [], set_env = { }, working_dir = None, use_stderr = False): - env = dict(os.environ) - - for i in remove_env: - del env[i] - - for k, v in set_env.items(): - env[k] = v - - if use_stderr: - stderr = subprocess.PIPE - else: - stderr = None - - try: - process = subprocess.Popen(args, env = env, cwd = working_dir, stderr = stderr) - except OSError as e: - raise UserError('Error running {}: {}', args[0], e) - - try: - yield process - except: - try: - process.kill() - except OSError: - # Ignore exceptions here so we don't mask the already-being-thrown exception. - pass - - raise - finally: - # Use communicate so that we won't deadlock if the process generates some unread output. - process.communicate() - - if process.returncode: - raise UserError('Command failed: {}', ' '.join(args)) - - -def command(args, remove_env = [], set_env = { }, working_dir = None): - with command_context(args, remove_env, set_env, working_dir): - pass +def command_context(args, remove_env=[], set_env={}, working_dir=None, use_stderr=False): + env = dict(os.environ) + + for i in remove_env: + del env[i] + + for k, v in set_env.items(): + env[k] = v + + if use_stderr: + stderr = subprocess.PIPE + else: + stderr = None + + try: + process = subprocess.Popen(args, env=env, cwd=working_dir, stderr=stderr) + except OSError as e: + raise UserError('Error running {}: {}', args[0], e) + + try: + yield process + except: + try: + process.kill() + except OSError: + # Ignore exceptions here so we don't mask the + # already-being-thrown exception. + pass + + raise + finally: + # Use communicate so that we won't deadlock if the process generates + # some unread output. + process.communicate() + + if process.returncode: + raise UserError('Command failed: {}', ' '.join(args)) + + +def command(args, remove_env=[], set_env={}, working_dir=None): + with command_context(args, remove_env, set_env, working_dir): + pass def bash_escape_string(string): - return "'{}'".format(re.sub("'", "'\"'\"'", string)) + return "'{}'".format(re.sub("'", "'\"'\"'", string)) def write_file(path, data): - temp_path = path + '~' - - with open(temp_path, 'wb') as file: - file.write(data) - - os.rename(temp_path, path) + temp_path = path + '~' + + with open(temp_path, 'wb') as file: + file.write(data) + + os.rename(temp_path, path) def read_file(path): - with open(path, 'rb') as file: - return file.read() + with open(path, 'rb') as file: + return file.read() -- cgit