From db50711ba4a1f41f4082981bae58f213d48d96a1 Mon Sep 17 00:00:00 2001 From: jaseg Date: Tue, 5 May 2020 17:50:09 +0200 Subject: fw: Tie together all parts for an end-to-end demo --- controller/fw/tools/dsss_demod_test_runner.py | 17 +-- .../fw/tools/dsss_demod_test_waveform_gen.py | 21 +++- controller/fw/tools/hum_generator.py | 114 +++++++++++++++++++++ controller/fw/tools/presig_gen.py | 2 +- controller/fw/tools/reed_solomon.py | 14 ++- 5 files changed, 146 insertions(+), 22 deletions(-) create mode 100755 controller/fw/tools/hum_generator.py (limited to 'controller/fw/tools') diff --git a/controller/fw/tools/dsss_demod_test_runner.py b/controller/fw/tools/dsss_demod_test_runner.py index 27a0c8e..d3c3cfc 100644 --- a/controller/fw/tools/dsss_demod_test_runner.py +++ b/controller/fw/tools/dsss_demod_test_runner.py @@ -12,15 +12,13 @@ import multiprocessing import sqlite3 import time from urllib.parse import urlparse -import functools import tempfile import itertools import numpy as np np.set_printoptions(linewidth=240) -from dsss_demod_test_waveform_gen import load_noise_meas_params, load_noise_synth_params,\ - mains_noise_measured, mains_noise_synthetic, modulate as dsss_modulate +from dsss_demod_test_waveform_gen import load_noise_gen, modulate as dsss_modulate def build_test_binary(nbits, thf, decimation, symbols, cachedir): @@ -46,19 +44,6 @@ def build_test_binary(nbits, thf, decimation, symbols, cachedir): return build_id -@functools.lru_cache() -def load_noise_gen(url): - schema, refpath = url.split('://') - if not path.isabs(refpath): - refpath = path.abspath(path.join(path.dirname(__file__), refpath)) - - if schema == 'meas': - return mains_noise_measured, load_noise_meas_params(refpath) - elif schema == 'synth': - return mains_noise_synthetic, load_noise_synth_params(refpath) - else: - raise ValueError('Invalid schema', schema) - def sequence_matcher(test_data, decoded, max_shift=3): match_result = [] for shift in range(-max_shift, max_shift): diff --git a/controller/fw/tools/dsss_demod_test_waveform_gen.py b/controller/fw/tools/dsss_demod_test_waveform_gen.py index 1749bd7..414c553 100644 --- a/controller/fw/tools/dsss_demod_test_waveform_gen.py +++ b/controller/fw/tools/dsss_demod_test_waveform_gen.py @@ -1,4 +1,6 @@ +from os import path +import json import functools import numpy as np @@ -53,9 +55,9 @@ def mains_noise_measured(seed, n, meas_data): def load_noise_synth_params(specfile): with open(specfile) as f: d = json.load(f) - return (np.linspace(*d['x_spec']), # spl_x - d['x_spec'][2], # spl_N - (d['t'], d['c'], d['k'])) # psd_spl + return {'spl_x': np.linspace(*d['x_spec']), + 'spl_N': d['x_spec'][2], + 'psd_spl': (d['t'], d['c'], d['k']) } def mains_noise_synthetic(seed, n, psd_spl, spl_N, spl_x): st = np.random.RandomState(seed) @@ -69,3 +71,16 @@ def mains_noise_synthetic(seed, n, psd_spl, spl_N, spl_x): renoise = scipy.fftpack.ifft(spec) return renoise[10000:][:n] + 50.00 +@functools.lru_cache() +def load_noise_gen(url): + schema, refpath = url.split('://') + if not path.isabs(refpath): + refpath = path.abspath(path.join(path.dirname(__file__), refpath)) + + if schema == 'meas': + return mains_noise_measured, load_noise_meas_params(refpath) + elif schema == 'synth': + return mains_noise_synthetic, load_noise_synth_params(refpath) + else: + raise ValueError('Invalid schema', schema) + diff --git a/controller/fw/tools/hum_generator.py b/controller/fw/tools/hum_generator.py new file mode 100755 index 0000000..a139491 --- /dev/null +++ b/controller/fw/tools/hum_generator.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +# coding: utf-8 + +import binascii +import struct + +import numpy as np +import pydub + +from dsss_demod_test_waveform_gen import load_noise_gen, modulate as dsss_modulate + +np.set_printoptions(linewidth=240) + +def generate_noisy_signal( + test_data=32, + test_nbits=5, + test_decimation=10, + test_signal_amplitude=20e-3, + noise_level=10e-3, + noise_spec='synth://grid_freq_psd_spl_108pt.json', + f_nom=50.0, + seed=0): + + #test_data = np.random.RandomState(seed=0).randint(0, 2 * (2**test_nbits), test_duration) + #test_data = np.array([0, 1, 2, 3] * 50) + if isinstance(test_data, int): + test_data = np.array(range(test_data)) + + + signal = np.repeat(dsss_modulate(test_data, test_nbits) * 2.0 - 1, test_decimation) + + noise_gen, noise_params = load_noise_gen(noise_spec) + noise = noise_gen(seed, len(signal), **noise_params) + return np.absolute(noise + f_nom + signal*test_signal_amplitude) + +def write_raw_frequencies_bin(outfile, **kwargs): + with open(outfile, 'wb') as f: + for x in generate_noisy_signal(**kwargs): + f.write(struct.pack('f', x)) + +def synthesize_sine(freqs, freqs_sampling_rate=10.0, output_sampling_rate=44100): + duration = len(freqs) / freqs_sampling_rate # seconds + afreq_out = np.interp(np.linspace(0, duration, int(duration*output_sampling_rate)), np.linspace(0, duration, len(freqs)), freqs) + return np.sin(np.cumsum(2*np.pi * afreq_out / output_sampling_rate)) + +def write_flac(filename, signal, sampling_rate=44100): + signal -= np.min(signal) + signal /= np.max(signal) + signal -= 0.5 + signal *= 2**16 - 1 + le_bytes = signal.astype(np.int16).tobytes() + seg = pydub.AudioSegment(data=le_bytes, sample_width=2, frame_rate=sampling_rate, channels=1) + seg.export(filename, format='flac') + +def write_synthetic_hum_flac(filename, output_sampling_rate=44100, freqs_sampling_rate=10.0, **kwargs): + signal = generate_noisy_signal(**kwargs) + print(signal) + write_flac(filename, synthesize_sine(signal, freqs_sampling_rate, output_sampling_rate), + sampling_rate=output_sampling_rate) + +def emulate_adc_signal(adc_bits=12, adc_offset=0.4, adc_amplitude=0.25, freq_sampling_rate=10.0, output_sampling_rate=1000, **kwargs): + signal = synthesize_sine(generate_noisy_signal(), freq_sampling_rate, output_sampling_rate) + signal = signal*adc_amplitude + adc_offset + smin, smax = np.min(signal), np.max(signal) + if smin < 0.0 or smax > 1.0: + raise UserWarning('Amplitude or offset too large: Signal out of bounds with min/max [{smin}, {smax}] of ADC range') + signal *= 2**adc_bits -1 + return signal + +def save_adc_signal(fn, signal, dtype=np.uint16): + with open(fn, 'wb') as f: + f.write(signal.astype(dtype).tobytes()) + +def write_emulated_adc_signal_bin(filename, **kwargs): + save_adc_signal(filename, emulate_adc_signal(**kwargs)) + +def hum_cmd(args): + write_synthetic_hum_flac(args.out_flac, + output_sampling_rate=args.audio_sampling_rate, + freqs_sampling_rate=args.frequency_sampling_rate, + test_data = np.array(list(binascii.unhexlify(args.data))), + test_nbits = args.symbol_bits, + test_decimation = args.decimation, + test_signal_amplitude = args.signal_level/1e3, + noise_level = args.noise_level/1e3, + noise_spec=args.noise_spec, + f_nom = args.nominal_frequency, + seed = args.random_seed) + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + cmd_parser = parser.add_subparsers(required=True) + hum_parser = cmd_parser.add_parser('hum', help='Generated artificial modulated mains hum') + # output parameters + hum_parser.add_argument('-a', '--audio-sampling-rate', type=int, default=44100) + + # modulation parameters + hum_parser.add_argument('-f', '--frequency-sampling-rate', type=float, default=10.0*100/128) + hum_parser.add_argument('-b', '--symbol-bits', type=int, default=5, help='bits per symbol (excluding sign bit)') + hum_parser.add_argument('-n', '--noise-level', type=float, default=1.0, help='Scale synthetic noise level') + hum_parser.add_argument('-s', '--signal-level', type=float, default=20.0, help='Synthetic noise level in mHz') + hum_parser.add_argument('-d', '--decimation', type=int, default=10, help='DSSS modulation decimation in frequency measurement cycles') + hum_parser.add_argument('-o', '--nominal-frequency', type=float, default=50.0, help='Nominal mains frequency') + hum_parser.add_argument('-r', '--random-seed', type=int, default=0) + hum_parser.add_argument('--noise-spec', type=str, default='synth://grid_freq_psd_spl_108pt.json') + hum_parser.add_argument('out_flac', metavar='out.flac', help='FLAC output file') + hum_parser.add_argument('data', help='modulation data hex string') + hum_parser.set_defaults(func=hum_cmd) + + args = parser.parse_args() + args.func(args) + diff --git a/controller/fw/tools/presig_gen.py b/controller/fw/tools/presig_gen.py index 3f85522..c5dafe7 100644 --- a/controller/fw/tools/presig_gen.py +++ b/controller/fw/tools/presig_gen.py @@ -9,7 +9,7 @@ import binascii import time from datetime import datetime -LINKING_KEY_SIZE = 16 +LINKING_KEY_SIZE = 15 PRESIG_VERSION = '000.001' DOMAINS = ['all', 'country', 'region', 'vendor', 'series'] diff --git a/controller/fw/tools/reed_solomon.py b/controller/fw/tools/reed_solomon.py index 9eee6be..c4ca6e4 100644 --- a/controller/fw/tools/reed_solomon.py +++ b/controller/fw/tools/reed_solomon.py @@ -64,6 +64,14 @@ def cmdline_func_test(args, print=lambda *args, **kwargs: None, benchmark=False) ).decode().replace('0', '.')) assert test_data == decoded +def cmdline_func_encode(args, **kwargs): + data = np.frombuffer(binascii.unhexlify(args.hex_str), dtype=np.uint8) + # Map 8 bit input to 6 bit symbol string + data = np.packbits(np.pad(np.unpackbits(data).reshape((-1, 6)), ((0,0),(2, 0))).flatten()) + encoded = encode(data.tobytes(), nbits=args.bits) + print('symbol array:', ', '.join(f'0x{x:02x}' for x in encoded)) + print('hex string:', binascii.hexlify(encoded).decode()) + if __name__ == '__main__': parser = argparse.ArgumentParser() cmd_parser = parser.add_subparsers(required=True) @@ -75,7 +83,9 @@ if __name__ == '__main__': test_parser.add_argument('-s', '--seed', type=int, default=0, help='Random seed') test_parser.set_defaults(func=cmdline_func_test) enc_parser = cmd_parser.add_parser('encode', help='RS-Encode given hex string') - enc_parser.add_argument('hex_str') + enc_parser.set_defaults(func=cmdline_func_encode) + enc_parser.add_argument('-b', '--bits', type=int, default=8, help='Symbol bit size') + enc_parser.add_argument('hex_str', type=str, help='Input data as hex string') args = parser.parse_args() - args.func(args, print=print) + args.func(args) -- cgit