summaryrefslogtreecommitdiff
path: root/controller/fw/tools
diff options
context:
space:
mode:
authorjaseg <git-bigdata-wsl-arch@jaseg.de>2020-05-05 17:50:09 +0200
committerjaseg <git-bigdata-wsl-arch@jaseg.de>2020-05-05 17:50:09 +0200
commitdb50711ba4a1f41f4082981bae58f213d48d96a1 (patch)
tree6480922fdc46a51bfb2e22e57d02cc0d3aef9d06 /controller/fw/tools
parent9918eb505321183e20357221a4dcf2aa9c1e057c (diff)
downloadmaster-thesis-db50711ba4a1f41f4082981bae58f213d48d96a1.tar.gz
master-thesis-db50711ba4a1f41f4082981bae58f213d48d96a1.tar.bz2
master-thesis-db50711ba4a1f41f4082981bae58f213d48d96a1.zip
fw: Tie together all parts for an end-to-end demo
Diffstat (limited to 'controller/fw/tools')
-rw-r--r--controller/fw/tools/dsss_demod_test_runner.py17
-rw-r--r--controller/fw/tools/dsss_demod_test_waveform_gen.py21
-rwxr-xr-xcontroller/fw/tools/hum_generator.py114
-rw-r--r--controller/fw/tools/presig_gen.py2
-rw-r--r--controller/fw/tools/reed_solomon.py14
5 files changed, 146 insertions, 22 deletions
diff --git a/controller/fw/tools/dsss_demod_test_runner.py b/controller/fw/tools/dsss_demod_test_runner.py
index 27a0c8e..d3c3cfc 100644
--- a/controller/fw/tools/dsss_demod_test_runner.py
+++ b/controller/fw/tools/dsss_demod_test_runner.py
@@ -12,15 +12,13 @@ import multiprocessing
import sqlite3
import time
from urllib.parse import urlparse
-import functools
import tempfile
import itertools
import numpy as np
np.set_printoptions(linewidth=240)
-from dsss_demod_test_waveform_gen import load_noise_meas_params, load_noise_synth_params,\
- mains_noise_measured, mains_noise_synthetic, modulate as dsss_modulate
+from dsss_demod_test_waveform_gen import load_noise_gen, modulate as dsss_modulate
def build_test_binary(nbits, thf, decimation, symbols, cachedir):
@@ -46,19 +44,6 @@ def build_test_binary(nbits, thf, decimation, symbols, cachedir):
return build_id
-@functools.lru_cache()
-def load_noise_gen(url):
- schema, refpath = url.split('://')
- if not path.isabs(refpath):
- refpath = path.abspath(path.join(path.dirname(__file__), refpath))
-
- if schema == 'meas':
- return mains_noise_measured, load_noise_meas_params(refpath)
- elif schema == 'synth':
- return mains_noise_synthetic, load_noise_synth_params(refpath)
- else:
- raise ValueError('Invalid schema', schema)
-
def sequence_matcher(test_data, decoded, max_shift=3):
match_result = []
for shift in range(-max_shift, max_shift):
diff --git a/controller/fw/tools/dsss_demod_test_waveform_gen.py b/controller/fw/tools/dsss_demod_test_waveform_gen.py
index 1749bd7..414c553 100644
--- a/controller/fw/tools/dsss_demod_test_waveform_gen.py
+++ b/controller/fw/tools/dsss_demod_test_waveform_gen.py
@@ -1,4 +1,6 @@
+from os import path
+import json
import functools
import numpy as np
@@ -53,9 +55,9 @@ def mains_noise_measured(seed, n, meas_data):
def load_noise_synth_params(specfile):
with open(specfile) as f:
d = json.load(f)
- return (np.linspace(*d['x_spec']), # spl_x
- d['x_spec'][2], # spl_N
- (d['t'], d['c'], d['k'])) # psd_spl
+ return {'spl_x': np.linspace(*d['x_spec']),
+ 'spl_N': d['x_spec'][2],
+ 'psd_spl': (d['t'], d['c'], d['k']) }
def mains_noise_synthetic(seed, n, psd_spl, spl_N, spl_x):
st = np.random.RandomState(seed)
@@ -69,3 +71,16 @@ def mains_noise_synthetic(seed, n, psd_spl, spl_N, spl_x):
renoise = scipy.fftpack.ifft(spec)
return renoise[10000:][:n] + 50.00
+@functools.lru_cache()
+def load_noise_gen(url):
+ schema, refpath = url.split('://')
+ if not path.isabs(refpath):
+ refpath = path.abspath(path.join(path.dirname(__file__), refpath))
+
+ if schema == 'meas':
+ return mains_noise_measured, load_noise_meas_params(refpath)
+ elif schema == 'synth':
+ return mains_noise_synthetic, load_noise_synth_params(refpath)
+ else:
+ raise ValueError('Invalid schema', schema)
+
diff --git a/controller/fw/tools/hum_generator.py b/controller/fw/tools/hum_generator.py
new file mode 100755
index 0000000..a139491
--- /dev/null
+++ b/controller/fw/tools/hum_generator.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+import binascii
+import struct
+
+import numpy as np
+import pydub
+
+from dsss_demod_test_waveform_gen import load_noise_gen, modulate as dsss_modulate
+
+np.set_printoptions(linewidth=240)
+
+def generate_noisy_signal(
+ test_data=32,
+ test_nbits=5,
+ test_decimation=10,
+ test_signal_amplitude=20e-3,
+ noise_level=10e-3,
+ noise_spec='synth://grid_freq_psd_spl_108pt.json',
+ f_nom=50.0,
+ seed=0):
+
+ #test_data = np.random.RandomState(seed=0).randint(0, 2 * (2**test_nbits), test_duration)
+ #test_data = np.array([0, 1, 2, 3] * 50)
+ if isinstance(test_data, int):
+ test_data = np.array(range(test_data))
+
+
+ signal = np.repeat(dsss_modulate(test_data, test_nbits) * 2.0 - 1, test_decimation)
+
+ noise_gen, noise_params = load_noise_gen(noise_spec)
+ noise = noise_gen(seed, len(signal), **noise_params)
+ return np.absolute(noise + f_nom + signal*test_signal_amplitude)
+
+def write_raw_frequencies_bin(outfile, **kwargs):
+ with open(outfile, 'wb') as f:
+ for x in generate_noisy_signal(**kwargs):
+ f.write(struct.pack('f', x))
+
+def synthesize_sine(freqs, freqs_sampling_rate=10.0, output_sampling_rate=44100):
+ duration = len(freqs) / freqs_sampling_rate # seconds
+ afreq_out = np.interp(np.linspace(0, duration, int(duration*output_sampling_rate)), np.linspace(0, duration, len(freqs)), freqs)
+ return np.sin(np.cumsum(2*np.pi * afreq_out / output_sampling_rate))
+
+def write_flac(filename, signal, sampling_rate=44100):
+ signal -= np.min(signal)
+ signal /= np.max(signal)
+ signal -= 0.5
+ signal *= 2**16 - 1
+ le_bytes = signal.astype(np.int16).tobytes()
+ seg = pydub.AudioSegment(data=le_bytes, sample_width=2, frame_rate=sampling_rate, channels=1)
+ seg.export(filename, format='flac')
+
+def write_synthetic_hum_flac(filename, output_sampling_rate=44100, freqs_sampling_rate=10.0, **kwargs):
+ signal = generate_noisy_signal(**kwargs)
+ print(signal)
+ write_flac(filename, synthesize_sine(signal, freqs_sampling_rate, output_sampling_rate),
+ sampling_rate=output_sampling_rate)
+
+def emulate_adc_signal(adc_bits=12, adc_offset=0.4, adc_amplitude=0.25, freq_sampling_rate=10.0, output_sampling_rate=1000, **kwargs):
+ signal = synthesize_sine(generate_noisy_signal(), freq_sampling_rate, output_sampling_rate)
+ signal = signal*adc_amplitude + adc_offset
+ smin, smax = np.min(signal), np.max(signal)
+ if smin < 0.0 or smax > 1.0:
+ raise UserWarning('Amplitude or offset too large: Signal out of bounds with min/max [{smin}, {smax}] of ADC range')
+ signal *= 2**adc_bits -1
+ return signal
+
+def save_adc_signal(fn, signal, dtype=np.uint16):
+ with open(fn, 'wb') as f:
+ f.write(signal.astype(dtype).tobytes())
+
+def write_emulated_adc_signal_bin(filename, **kwargs):
+ save_adc_signal(filename, emulate_adc_signal(**kwargs))
+
+def hum_cmd(args):
+ write_synthetic_hum_flac(args.out_flac,
+ output_sampling_rate=args.audio_sampling_rate,
+ freqs_sampling_rate=args.frequency_sampling_rate,
+ test_data = np.array(list(binascii.unhexlify(args.data))),
+ test_nbits = args.symbol_bits,
+ test_decimation = args.decimation,
+ test_signal_amplitude = args.signal_level/1e3,
+ noise_level = args.noise_level/1e3,
+ noise_spec=args.noise_spec,
+ f_nom = args.nominal_frequency,
+ seed = args.random_seed)
+
+
+if __name__ == '__main__':
+ import argparse
+ parser = argparse.ArgumentParser()
+ cmd_parser = parser.add_subparsers(required=True)
+ hum_parser = cmd_parser.add_parser('hum', help='Generated artificial modulated mains hum')
+ # output parameters
+ hum_parser.add_argument('-a', '--audio-sampling-rate', type=int, default=44100)
+
+ # modulation parameters
+ hum_parser.add_argument('-f', '--frequency-sampling-rate', type=float, default=10.0*100/128)
+ hum_parser.add_argument('-b', '--symbol-bits', type=int, default=5, help='bits per symbol (excluding sign bit)')
+ hum_parser.add_argument('-n', '--noise-level', type=float, default=1.0, help='Scale synthetic noise level')
+ hum_parser.add_argument('-s', '--signal-level', type=float, default=20.0, help='Synthetic noise level in mHz')
+ hum_parser.add_argument('-d', '--decimation', type=int, default=10, help='DSSS modulation decimation in frequency measurement cycles')
+ hum_parser.add_argument('-o', '--nominal-frequency', type=float, default=50.0, help='Nominal mains frequency')
+ hum_parser.add_argument('-r', '--random-seed', type=int, default=0)
+ hum_parser.add_argument('--noise-spec', type=str, default='synth://grid_freq_psd_spl_108pt.json')
+ hum_parser.add_argument('out_flac', metavar='out.flac', help='FLAC output file')
+ hum_parser.add_argument('data', help='modulation data hex string')
+ hum_parser.set_defaults(func=hum_cmd)
+
+ args = parser.parse_args()
+ args.func(args)
+
diff --git a/controller/fw/tools/presig_gen.py b/controller/fw/tools/presig_gen.py
index 3f85522..c5dafe7 100644
--- a/controller/fw/tools/presig_gen.py
+++ b/controller/fw/tools/presig_gen.py
@@ -9,7 +9,7 @@ import binascii
import time
from datetime import datetime
-LINKING_KEY_SIZE = 16
+LINKING_KEY_SIZE = 15
PRESIG_VERSION = '000.001'
DOMAINS = ['all', 'country', 'region', 'vendor', 'series']
diff --git a/controller/fw/tools/reed_solomon.py b/controller/fw/tools/reed_solomon.py
index 9eee6be..c4ca6e4 100644
--- a/controller/fw/tools/reed_solomon.py
+++ b/controller/fw/tools/reed_solomon.py
@@ -64,6 +64,14 @@ def cmdline_func_test(args, print=lambda *args, **kwargs: None, benchmark=False)
).decode().replace('0', '.'))
assert test_data == decoded
+def cmdline_func_encode(args, **kwargs):
+ data = np.frombuffer(binascii.unhexlify(args.hex_str), dtype=np.uint8)
+ # Map 8 bit input to 6 bit symbol string
+ data = np.packbits(np.pad(np.unpackbits(data).reshape((-1, 6)), ((0,0),(2, 0))).flatten())
+ encoded = encode(data.tobytes(), nbits=args.bits)
+ print('symbol array:', ', '.join(f'0x{x:02x}' for x in encoded))
+ print('hex string:', binascii.hexlify(encoded).decode())
+
if __name__ == '__main__':
parser = argparse.ArgumentParser()
cmd_parser = parser.add_subparsers(required=True)
@@ -75,7 +83,9 @@ if __name__ == '__main__':
test_parser.add_argument('-s', '--seed', type=int, default=0, help='Random seed')
test_parser.set_defaults(func=cmdline_func_test)
enc_parser = cmd_parser.add_parser('encode', help='RS-Encode given hex string')
- enc_parser.add_argument('hex_str')
+ enc_parser.set_defaults(func=cmdline_func_encode)
+ enc_parser.add_argument('-b', '--bits', type=int, default=8, help='Symbol bit size')
+ enc_parser.add_argument('hex_str', type=str, help='Input data as hex string')
args = parser.parse_args()
- args.func(args, print=print)
+ args.func(args)