1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
import contextlib
import inspect
import os
import re
import shutil
import subprocess
import sys
import tempfile
class UserError(Exception):
def __init__(self, message, *args):
super(UserError, self).__init__(message.format(*args))
def main(fn):
"""
Decorator for "main" functions. Decorates a function that should be
called when the containing module is run as a script (e.g. via python -m
<module>).
"""
frame = inspect.currentframe().f_back
def wrapped_fn(*args, **kwargs):
try:
fn(*args, **kwargs)
except UserError as e:
print >> sys.stderr, 'Error:', e
sys.exit(1)
except KeyboardInterrupt:
sys.exit(2)
if frame.f_globals['__name__'] == '__main__':
wrapped_fn(*sys.argv[1:])
# Allow the main function also to be called explicitly
return wrapped_fn
def rename_atomic(source_path, target_path):
"""
Move the file at source_path to target_path.
If both paths reside on the same device, os.rename() is used, otherwise
the file is copied to a temporary name next to target_path and moved from
there using os.rename().
"""
source_dir_stat = os.stat(os.path.dirname(source_path))
target_dir_stat = os.stat(os.path.dirname(target_path))
if source_dir_stat.st_dev == target_dir_stat.st_dev:
os.rename(source_path, target_path)
else:
temp_path = target_path + '~'
shutil.copyfile(source_path, temp_path)
os.rename(temp_path, target_path)
@contextlib.contextmanager
def TemporaryDirectory():
dir = tempfile.mkdtemp()
try:
yield dir
finally:
shutil.rmtree(dir)
@contextlib.contextmanager
def command_context(args, remove_env=[], set_env={}, working_dir=None, use_stderr=False):
env = dict(os.environ)
for i in remove_env:
del env[i]
for k, v in set_env.items():
env[k] = v
if use_stderr:
stderr = subprocess.PIPE
else:
stderr = None
try:
process = subprocess.Popen(args, env=env, cwd=working_dir, stderr=stderr)
except OSError as e:
raise UserError('Error running {}: {}', args[0], e)
try:
yield process
except:
try:
process.kill()
except OSError:
# Ignore exceptions here so we don't mask the
# already-being-thrown exception.
pass
raise
finally:
# Use communicate so that we won't deadlock if the process generates
# some unread output.
process.communicate()
if process.returncode:
raise UserError('Command failed: {}', ' '.join(args))
def command(args, remove_env=[], set_env={}, working_dir=None):
with command_context(args, remove_env, set_env, working_dir):
pass
def bash_escape_string(string):
return "'{}'".format(re.sub("'", "'\"'\"'", string))
def write_file(path, data):
temp_path = path + '~'
with open(temp_path, 'wb') as file:
file.write(data)
os.rename(temp_path, path)
def read_file(path):
with open(path, 'rb') as file:
return file.read()
|