summaryrefslogtreecommitdiff
path: root/controller
diff options
context:
space:
mode:
authorjaseg <git-bigdata-wsl-arch@jaseg.de>2020-04-23 12:41:16 +0200
committerjaseg <git-bigdata-wsl-arch@jaseg.de>2020-04-23 12:41:16 +0200
commita6f8d8be2db532efb196caaef67cabb627fdde2a (patch)
treecf55e1142a4b25333d4dac7e56ca41d028934051 /controller
parent96c480ea5447dc7172ad56ca895dbed54070b8b0 (diff)
downloadmaster-thesis-a6f8d8be2db532efb196caaef67cabb627fdde2a.tar.gz
master-thesis-a6f8d8be2db532efb196caaef67cabb627fdde2a.tar.bz2
master-thesis-a6f8d8be2db532efb196caaef67cabb627fdde2a.zip
ma: Add blurb on aluminium smelters
Diffstat (limited to 'controller')
-rw-r--r--controller/fw/tools/dsss_demod_test_runner.py58
1 files changed, 43 insertions, 15 deletions
diff --git a/controller/fw/tools/dsss_demod_test_runner.py b/controller/fw/tools/dsss_demod_test_runner.py
index 51fc09b..27a0c8e 100644
--- a/controller/fw/tools/dsss_demod_test_runner.py
+++ b/controller/fw/tools/dsss_demod_test_runner.py
@@ -84,33 +84,56 @@ def run_test(seed, amplitude_spec, background, nbits, decimation, symbols, thfs,
signal = np.repeat(dsss_modulate(test_data, nbits) * 2.0 - 1, decimation)
# We're re-using the seed here. This is not a problem.
noise = noise_gen(seed, len(signal), *noise_params)
+ amplitudes = amplitude_spec[0] * 10 ** np.linspace(0, amplitude_spec[1], amplitude_spec[2])
# DEBUG
+ my_pid = multiprocessing.current_process().pid
+ wql = len(amplitudes) * len(thfs)
+ print(f'[{my_pid}] starting, got workqueue of length {wql}')
+ i = 0
# Map lsb to sign to match test program
# test_data = (test_data>>1) * (2*(test_data&1) - 1)
+ # END DEBUG
- amplitudes = amplitude_spec[0] * 10 ** np.linspace(0, amplitude_spec[1], amplitude_spec[2])
output = []
for amp in amplitudes:
with tempfile.NamedTemporaryFile(dir=cachedir) as f:
waveform = signal*amp + noise
f.write(waveform.astype('float32').tobytes())
f.flush()
+ # DEBUG
+ fcopy = f'/tmp/test-{path.basename(f.name)}'
+ import shutil
+ shutil.copy(f.name, fcopy)
+ # END DEBUG
for thf in thfs:
- cmdline = [lookup_binary(nbits, thf, decimation, symbols), f.name]
- proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE, text=True)
- stdout, _stderr = proc.communicate()
- if proc.returncode != 0:
- raise SystemError(f'Subprocess signalled error: {proc.returncode}')
-
- lines = stdout.splitlines()
- matched = [ l.partition('[')[2].partition(']')[0]
- for l in lines if l.strip().startswith('data sequence received:') ]
- matched = [ [ int(elem) for elem in l.split(',') ] for l in matched ]
-
- ser = min(sequence_matcher(test_data, match) for match in matched) if matched else None
rpars = ResultParams(nbits, thf, decimation, symbols, seed, amp, background)
- output.append((rpars, ser))
+ cmdline = [lookup_binary(nbits, thf, decimation, symbols), f.name]
+ # DEBUG
+ starttime = time.time()
+ # END DEBUG
+ try:
+ proc = subprocess.run(cmdline, stdout=subprocess.PIPE, encoding='utf-8', check=True, timeout=300)
+
+ lines = proc.stdout.splitlines()
+ matched = [ l.partition('[')[2].partition(']')[0]
+ for l in lines if l.strip().startswith('data sequence received:') ]
+ matched = [ [ int(elem) for elem in l.split(',') ] for l in matched ]
+
+ ser = min(sequence_matcher(test_data, match) for match in matched) if matched else None
+ output.append((rpars, ser))
+ # DEBUG
+ #print(f'[{my_pid}] ran {i}/{wql}: time={time.time() - starttime}\n {ser=}\n {rpars}\n {" ".join(cmdline)}\n {fcopy}', flush=True)
+ i += 1
+ # END DEBUG
+
+ except subprocess.TimeoutExpired:
+ output.append((rpars, None))
+ # DEBUG
+ print(f'[{my_pid}] ran {i}/{wql}: Timeout!\n {rpars}\n {" ".join(cmdline)}\n {fcopy}', flush=True)
+ i += 1
+ # END DEBUG
+ print(f'[{my_pid}] finished.')
return output
def parallel_generator(db, table, columns, builder, param_list, desc, context={}, params_mapper=lambda *args: args,
@@ -132,7 +155,10 @@ def parallel_generator(db, table, columns, builder, param_list, desc, context={}
pool.close()
print('Using', len(param_list) - len(jobs), 'cached jobs', flush=True)
with tqdm(total=len(jobs), desc=desc) as tq:
- for params, res in jobs:
+ for i, (params, res) in enumerate(jobs):
+ # DEBUG
+ print('Got result', i, params, res)
+ # END DEBUG
tq.update(1)
result = res.get()
with db as conn:
@@ -195,6 +221,8 @@ if __name__ == '__main__':
data_db.execute('CREATE TABLE IF NOT EXISTS waveforms'
'(seed, amplitude_spec, background, nbits, decimation, symbols, thresholds, result, timestamp)')
+ 'SELECT FROM waveforms GROUP BY (amplitude_spec, background, nbits, decimation, symbols, thresholds, result)'
+
dec_param_groups = defaultdict(lambda: [])
for nbits, thf, decimation, symbols in dec_paramses:
dec_param_groups[(nbits, decimation, symbols)].append(thf)