summaryrefslogtreecommitdiff
path: root/support/lib/util.py
blob: d88f9f6f4cca47776891ad78144b881cc756b5f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import sys, contextlib, subprocess, tempfile, shutil, re, os, inspect


class UserError(Exception):
	def __init__(self, message, *args):
		super(UserError, self).__init__(message.format(*args))


def main(fn):
	"""Decorator for "main" functions. Decorates a function that should be called when the containing module is run as a script (e.g. via python -m <module>)."""
	
	frame = inspect.currentframe().f_back
	
	def wrapped_fn(*args, **kwargs):
		try:
			fn(*args, **kwargs)
		except UserError as e:
			print >> sys.stderr, 'Error:', e
			sys.exit(1)
		except KeyboardInterrupt:
			sys.exit(2)
	
	if frame.f_globals['__name__'] == '__main__':
		wrapped_fn(*sys.argv[1:])
	
	# Allow the main function also to be called explicitly
	return wrapped_fn


def rename_atomic(source_path, target_path):
	"""
	Move the file at source_path to target_path.
	
	If both paths reside on the same device, os.rename() is used, otherwise the file is copied to a temporary name next to target_path and moved from there using os.rename().
	"""
	
	source_dir_stat = os.stat(os.path.dirname(source_path))
	target_dir_stat = os.stat(os.path.dirname(target_path))
	
	if source_dir_stat.st_dev == target_dir_stat.st_dev:
		os.rename(source_path, target_path)
	else:
		temp_path = target_path + '~'
		
		shutil.copyfile(source_path, temp_path)
		os.rename(temp_path, target_path)


@contextlib.contextmanager
def TemporaryDirectory():
	dir = tempfile.mkdtemp()
	
	try:
		yield dir
	finally:
		shutil.rmtree(dir)


@contextlib.contextmanager
def command_context(args, remove_env = [], set_env = { }, working_dir = None, use_stderr = False):
	env = dict(os.environ)
	
	for i in remove_env:
		del env[i]
	
	for k, v in set_env.items():
		env[k] = v
	
	if use_stderr:
		stderr = subprocess.PIPE
	else:
		stderr = None
	
	try:
		process = subprocess.Popen(args, env = env, cwd = working_dir, stderr = stderr)
		process.wait()
	except OSError as e:
		raise UserError('Error running {}: {}', args[0], e)
	
	try:
		yield process
	except:
		try:
			process.kill()
		except OSError:
			# Ignore exceptions here so we don't mask the already-being-thrown exception.
			pass
		
		raise
	finally:
		process.wait()
	
	if process.returncode:
		raise UserError('Command failed: {}', ' '.join(args))


def command(args, remove_env = [], set_env = { }, working_dir = None):
	with command_context(args, remove_env, set_env, working_dir) as process:
		process.wait()


def bash_escape_string(string):
	return "'{}'".format(re.sub("'", "'\"'\"'", string))


def write_file(path, data):
	temp_path = path + '~'
	
	with open(temp_path, 'wb') as file:
		file.write(data)
	
	os.rename(temp_path, path)


def read_file(path):
	with open(path, 'rb') as file:
		return file.read()