In [1]:
import struct
import random
import itertools
import datetime
import multiprocessing
from collections import defaultdict
import json

from matplotlib import pyplot as plt
import matplotlib
import numpy as np
from scipy import signal as sig
from scipy import fftpack as fftpack
import ipywidgets

import pydub

from tqdm.notebook import tqdm
import colorednoise

np.set_printoptions(linewidth=240)

In [2]:
%matplotlib widget

In [3]:
# From https://github.com/mubeta06/python/blob/master/signal_processing/sp/gold.py
preferred_pairs = {5:[[2],[1,2,3]], 6:[[5],[1,4,5]], 7:[[4],[4,5,6]],
                        8:[[1,2,3,6,7],[1,2,7]], 9:[[5],[3,5,6]], 
                        10:[[2,5,9],[3,4,6,8,9]], 11:[[9],[3,6,9]]}

def gen_gold(seq1, seq2):
    gold = [seq1, seq2]
    for shift in range(len(seq1)):
        gold.append(seq1 ^ np.roll(seq2, -shift))
    return gold

def gold(n):
    n = int(n)
    if not n in preferred_pairs:
        raise KeyError('preferred pairs for %s bits unknown' % str(n))
    t0, t1 = preferred_pairs[n]
    (seq0, _st0), (seq1, _st1) = sig.max_len_seq(n, taps=t0), sig.max_len_seq(n, taps=t1)
    return gen_gold(seq0, seq1)

In [49]:
def modulate(data, nbits=5, pad=True):
    # 0, 1 -> -1, 1
    mask = np.array(gold(nbits))*2 - 1
    
    sel = mask[data>>1]
    data_lsb_centered = ((data&1)*2 - 1)

    signal = (np.multiply(sel, np.tile(data_lsb_centered, (2**nbits-1, 1)).T).flatten() + 1) // 2
    if pad:
        return np.hstack([ np.zeros(len(mask)), signal, np.zeros(len(mask)) ])
    else:
        return signal

In [53]:
def generate_noisy_signal(
        test_duration=32,
        test_nbits=5,
        test_decimation=10,
        test_signal_amplitude=200e-3,
        noise_level=10e-3):

    #test_data = np.random.RandomState(seed=0).randint(0, 2 * (2**test_nbits), test_duration)
    #test_data = np.array([0, 1, 2, 3] * 50)
    test_data = np.array(range(test_duration))
    signal = np.repeat(modulate(test_data, test_nbits, pad=False) * 2.0 - 1, test_decimation) * test_signal_amplitude
    noise = colorednoise.powerlaw_psd_gaussian(1, len(signal)*2) * noise_level
    noise[-int(1.5*len(signal)):][:len(signal)] += signal

    return noise+50
    #with open(f'mains_sim_signals/dsss_test_noisy_padded.bin', 'wb') as f:
    #    for e in noise:
    #        f.write(struct.pack('<f', e))

In [54]:
fig, ax = plt.subplots()
ax.plot(generate_noisy_signal())

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

[<matplotlib.lines.Line2D at 0x7f2ab8cabca0>]

In [58]:
with open('data/ref_sig_audio_test3.bin', 'wb') as f:
    for x in generate_noisy_signal():
        f.write(struct.pack('f', x))

In [7]:
def synthesize_sine(freqs, freqs_sampling_rate=10.0, output_sampling_rate=44100):
    duration = len(freqs) / freqs_sampling_rate # seconds
    afreq_out = np.interp(np.linspace(0, duration, int(duration*output_sampling_rate)), np.linspace(0, duration, len(freqs)), freqs)
    return np.sin(np.cumsum(2*np.pi * afreq_out / output_sampling_rate))

In [8]:
test_sig = synthesize_sine(generate_noisy_signal())

In [9]:
fig, ax = plt.subplots()
ax.plot(test_sig[:44100])

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

[<matplotlib.lines.Line2D at 0x7f2a90476bb0>]

In [56]:
def save_signal_flac(filename, signal, sampling_rate=44100):
    signal -= np.min(signal)
    signal /= np.max(signal)
    signal -= 0.5
    signal *= 2**16 - 1
    le_bytes = signal.astype(np.int16).tobytes()
    seg = pydub.AudioSegment(data=le_bytes, sample_width=2, frame_rate=sampling_rate, channels=1)
    seg.export(filename, format='flac')

In [57]:
save_signal_flac('synth_sig_test_0123_02.flac', synthesize_sine(generate_noisy_signal(), freqs_sampling_rate=10.0 * 100/128, output_sampling_rate=44100))

In [11]:
def emulate_adc_signal(adc_bits=12, adc_offset=0.4, adc_amplitude=0.25, freq_sampling_rate=10.0, output_sampling_rate=1000, **kwargs):
    signal = synthesize_sine(generate_noisy_signal(), freq_sampling_rate, output_sampling_rate)
    signal = signal*adc_amplitude + adc_offset
    smin, smax = np.min(signal), np.max(signal)
    if smin < 0.0 or smax > 1.0:
        raise UserWarning('Amplitude or offset too large: Signal out of bounds with min/max [{smin}, {smax}] of ADC range')
    signal *= 2**adc_bits -1
    return signal

In [12]:
def save_adc_signal(fn, signal, dtype=np.uint16):
    with open(fn, 'wb') as f:
        f.write(signal.astype(dtype).tobytes())

In [13]:
save_adc_signal('emulated_adc_readings_01.bin', emulate_adc_signal(freq_sampling_rate=10.0 * 100/128))