In [1]:
import struct
import random
import itertools
import datetime
import multiprocessing
from collections import defaultdict
import json
import traceback
import glob

from matplotlib import pyplot as plt
import matplotlib
from matplotlib import ticker
import numpy as np
from scipy import signal as sig
from scipy import fftpack as fftpack
import ipywidgets
from IPython.display import set_matplotlib_formats
import scipy

from tqdm.notebook import tqdm
import colorednoise
import accupy

np.set_printoptions(linewidth=240)

In [2]:
%matplotlib widget
#%matplotlib inline
#set_matplotlib_formats('png', 'pdf')
#font = {'family' : 'normal',
# 'weight' : 'normal',
# 'size' : 6}
#matplotlib.rc('font', **font)

In [3]:
sampling_rate = 10 # sp/s

In [4]:
# From https://github.com/mubeta06/python/blob/master/signal_processing/sp/gold.py
preferred_pairs = {5:[[2],[1,2,3]], 6:[[5],[1,4,5]], 7:[[4],[4,5,6]],
 8:[[1,2,3,6,7],[1,2,7]], 9:[[5],[3,5,6]], 
 10:[[2,5,9],[3,4,6,8,9]], 11:[[9],[3,6,9]]}

def gen_gold(seq1, seq2):
 gold = [seq1, seq2]
 for shift in range(len(seq1)):
 gold.append(seq1 ^ np.roll(seq2, -shift))
 return gold

def gold(n):
 n = int(n)
 
 min_key, max_key = min(preferred_pairs.keys()), max(preferred_pairs.keys())
 
 if n < min_key:
 raise KeyError('preferred pairs for %s bits unknown' % str(n))
 
 if n > max_key:
 t0, t1 = preferred_pairs[max_key]
 (seq0, _st0), (seq1, _st1) = sig.max_len_seq(max_key, taps=t0), sig.max_len_seq(max_key, taps=t1)
 raw_codes = gen_gold(seq0, seq1)
 # Hackety hack: This is super-experimental test code and guaranteed sub-optimal!
 
 raw_codes = np.array(raw_codes)
 out = raw_codes[:-1]
 for i in range(n - max_key):
 out = np.hstack([out[:out.shape[0]//2], out[out.shape[0]//2:]])
 return out
 
 else:
 t0, t1 = preferred_pairs[n]
 (seq0, _st0), (seq1, _st1) = sig.max_len_seq(n, taps=t0), sig.max_len_seq(n, taps=t1)
 return np.array(gen_gold(seq0, seq1))

In [5]:
def modulate(data, nbits=5, pad=True):
 # 0, 1 -> -1, 1
 mask = np.array(gold(nbits))*2 - 1
 
 sel = mask[data>>1]
 data_lsb_centered = ((data&1)*2 - 1)

 signal = (np.multiply(sel, np.tile(data_lsb_centered, (sel.shape[1], 1)).T).flatten() + 1) // 2
 if pad:
 print('mod: padding by', mask.shape[1])
 return np.hstack([ np.zeros(mask.shape[1]), signal, np.zeros(mask.shape[1]) ])
 else:
 return signal

In [6]:
def correlate(sequence, nbits=5, decimation=1, mask_filter=lambda x: x, mask_max=None):
 codes = gold(nbits)
 mask = np.tile(np.array(codes)[:,:,np.newaxis]*2 - 1, (1, 1, decimation)).reshape((codes.shape[0], codes.shape[1] * decimation))
 mask = mask.astype(np.float64)

 # Our input signal has large DC bias. Remove DC bias to reduce numerical errors during correlation.
 sequence -= np.mean(sequence)
 
 #return np.array([np.correlate(sequence, row, mode='valid') for row in mask[:mask_max]])
 return np.array([sig.correlate(sequence, row, mode='valid') for row in mask[:mask_max]]) / len(sequence)

In [7]:
with open('data/fmeas_export_ocxo_2day.bin', 'rb') as f:
 meas_data = np.copy(np.frombuffer(f.read(), dtype='float32'))
 print('mean:', np.mean(meas_data), 'len:', len(meas_data))
 meas_data -= np.mean(meas_data)
 meas_data = np.hstack([meas_data, meas_data[::-1]] * 50)
 def mains_noise_meas(n):
 last_valid = len(meas_data) - n
 start = random.randint(0, last_valid)
 return meas_data[start:start+n]

mean: 50.00341 len: 1946174


In [8]:
def load_noise_meas_params(capture_file):
 with open(capture_file, 'rb') as f:
 meas_data = np.copy(np.frombuffer(f.read(), dtype='float32'))
 print('mean:', np.mean(meas_data), 'len:', len(meas_data))
 meas_data -= np.mean(meas_data)
 return {'meas_data': meas_data}

def mains_noise_measured(n, meas_data):
 last_valid = len(meas_data) - n
 start = np.random.randint(last_valid)
 return meas_data[start:start+n]

def load_noise_synth_params(specfile):
 with open(specfile) as f:
 d = json.load(f)
 return {'spl_x': np.linspace(*d['x_spec']),
 'spl_N': d['x_spec'][2],
 'psd_spl': (d['t'], d['c'], d['k']) }
 
def mains_noise_synthetic(n, psd_spl, spl_N, spl_x):
 noise = np.random.normal(size=spl_N) * 2
 spec = scipy.fftpack.fft(noise) **2
 spec *= np.exp(scipy.interpolate.splev(spl_x, psd_spl))

 spec **= 1/2
 
 renoise = scipy.fftpack.ifft(spec)
 return np.hstack([renoise[10000:], renoise[10000:][::-1]])[:n]

noise_params = load_noise_synth_params('data/grid_freq_psd_spl_108pt.json')
mains_noise = lambda n: mains_noise_synthetic(n, **noise_params)

In [9]:
def generate_test_signal(duration, nbits=6, signal_amplitude=2.0e-3, decimation=10, seed=0, data=None):
 test_data = np.random.RandomState(seed=seed).randint(0, 2 * (2**nbits), duration) if data is None else data
 
 signal = np.repeat(modulate(test_data, nbits, pad=True) * 2.0 - 1, decimation) * signal_amplitude
 noise = mains_noise_meas(len(signal))
 
 return test_data, signal.astype(np.longdouble) + noise.astype(np.longdouble)

In [10]:
gold(12)[:2].mean(axis=1)

array([0.50806058, 0.49242794])

In [20]:
nbits = 10
decimation = 8
# working params: decimation=15 nbits=21 power=200kW
# working params: decimation=8 nbits=10 power=10MW
grid_frequency_characteristic = 25e9 # watt per hertz
modulation_power = 10000e3 # watt
data = np.array([0, 1, 2, 3])
amplitude = modulation_power / grid_frequency_characteristic
print(f'amplitude @P_mod={modulation_power/1e3}kW: Δf={amplitude*1e6}µHz')

# Generate test data for this simulation run
test_data, signal = generate_test_signal(duration=2, nbits=nbits, signal_amplitude=amplitude, decimation=decimation, seed=0, data=data)
print(test_data, signal.shape)

def correlate_test(sequence, nbits, decimation, mask_max):
 codes = gold(nbits)
 mask = np.tile(np.array(codes)[:,:,np.newaxis]*2 - 1, (1, 1, decimation)).reshape((codes.shape[0], codes.shape[1] * decimation))
 mask = mask.astype(np.float64)

 # Our input signal has large DC bias. Remove DC bias to reduce numerical errors during correlation.
 sequence -= np.mean(sequence)
 
 #return np.array([np.correlate(sequence, row, mode='valid') for row in mask[:mask_max]])
 return np.array([accupy.fsum(np.multiply(sequence, row)) for row in mask[:mask_max]]) / len(sequence)

signal = signal.astype(np.float64)
fig, axs = plt.subplots(len(data), figsize=(12, 5))
for i, ax in enumerate(axs.flatten()):
 gen_per = len(data) + 2 # one period for each symbol, plus one period of padding at the beginning and end
 symbol_length = signal.shape[0]//gen_per
 l, r = int(max(0, (i + 1) * symbol_length - 1000)), int((i + 2) * symbol_length + 1000)
 signal_slice = signal[l:r]
 cor_an = correlate(signal_slice, nbits=nbits, decimation=decimation, mask_max=2)
 
 l, r = int((i + 1) * symbol_length), int((i + 2) * symbol_length)
 signal_slice = signal[l:r]
 #print('on peak [numpy]:', correlate(signal_slice, nbits=nbits, decimation=decimation, mask_max=2).flatten() / amplitude)
 #print('on peak [accupy]:', correlate_test(signal_slice, nbits=nbits, decimation=decimation, mask_max=2).flatten() / amplitude)
 
 l, r = int((i + 1 - 0.2) * symbol_length), int((i + 2 - 0.2) * symbol_length)
 signal_slice = signal[l:r]
 #print('off peak [numpy]:', correlate(signal_slice, nbits=nbits, decimation=decimation, mask_max=2).flatten() / amplitude)
 #print('off peak [accupy]:', correlate_test(signal_slice, nbits=nbits, decimation=decimation, mask_max=2).flatten() / amplitude)
 #ax.plot(cor_an.T)

 cwt_res = np.array([ sig.cwt(row, sig.ricker, [0.73 * decimation]).flatten() for row in cor_an ])
 print(f'signal strength (off/on peak): {np.sqrt(np.mean(cwt_res[0,cwt_res.shape[1]//6:cwt_res.shape[1]//3]**2)):+.3e} | {cwt_res[0,cwt_res.shape[1]//2]:+.3e}')
 ax.plot(cwt_res.T / amplitude)

amplitude @P_mod=10000.0kW: Δf=400.0µHz
mod: padding by 1023
[0 1 2 3] (49104,)


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

signal strength (off/on peak): +4.900e-05 | -6.406e-04
signal strength (off/on peak): +7.699e-05 | +5.627e-04
signal strength (off/on peak): +8.441e-05 | +4.206e-05
signal strength (off/on peak): +8.178e-05 | -8.413e-05


In [127]:
decimation = 10
grid_frequency_characteristic = 25e9 # watt per hertz
modulation_power = 10e3 # watt
amplitude = modulation_power / grid_frequency_characteristic
print(f'amplitude @P_mod={modulation_power/1e3}kW: Δf={amplitude*1e6}µHz')

ps_off = []
xs = np.array(list(range(5, 16)))
for nbits in xs:
 data = np.array([1])

 def correlate_test(sequence, nbits, decimation, mask_max):
 codes = gold(nbits)
 mask = np.tile(np.array(codes)[:,:,np.newaxis]*2 - 1, (1, 1, decimation)).reshape((codes.shape[0], codes.shape[1] * decimation))
 mask = mask.astype(np.float64)

 # Our input signal has large DC bias. Remove DC bias to reduce numerical errors during correlation.
 sequence -= np.mean(sequence)

 return np.array([sig.correlate(sequence, row, mode='valid') for row in mask[:mask_max]])
 #return np.array([accupy.fsum(np.multiply(sequence, row)) for row in mask[:mask_max]]) / len(sequence)

 vals = []
 for i in range(20):
 test_data, signal = generate_test_signal(duration=2, nbits=nbits, signal_amplitude=amplitude, decimation=decimation, seed=random.randint(0, 0xffffffff), data=data)
 cor_an = correlate(signal, nbits=nbits, decimation=decimation, mask_max=2)
 vals.append(np.sqrt(np.mean(cor_an**2)))
 ps_off.append((np.mean(vals), np.std(vals)))
 print(vals)

ps_off = np.array(ps_off)

amplitude @P_mod=10.0kW: Δf=0.39999999999999997µHz
mod: padding by 31


 return test_data, signal.astype(np.longdouble) + noise.astype(np.longdouble)


mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
mod: padding by 31
[0.00031921745429844274216, 0.0003466253347083378983, 0.00044946868526334726654, 0.00043601785453840703518, 0.00027364766834844621423, 0.00034429714350871021226, 0.00029257480354291570764, 0.00043761373963434835483, 0.00034653346037204395042, 0.00041369145870726819648, 0.00031663491690404936546, 0.0003630671579533060333, 0.0002934121262870590337, 0.00036264881740159871053, 0.00033973548163424448722, 0.00045000162460528714934, 0.00036345615545028226934, 0.00034203880459996953548, 0.0004340491996267533479, 0.00027402160564997715637]
mod: padding by 63
mod: padding by 63
mod: padding by 63
mod: padding by 63
mod: padding by 63
mod: pa

In [129]:
fig, ax = plt.subplots(figsize=(12, 5))
ax.errorbar(x=xs, y=ps_off[:,0], yerr=ps_off[:,1])

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

