{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import struct\n", "import random\n", "import itertools\n", "import datetime\n", "import multiprocessing\n", "from collections import defaultdict\n", "import json\n", "\n", "from matplotlib import pyplot as plt\n", "import matplotlib\n", "import numpy as np\n", "from scipy import signal as sig\n", "from scipy import fftpack as fftpack\n", "import ipywidgets\n", "\n", "import pydub\n", "\n", "from tqdm.notebook import tqdm\n", "import colorednoise\n", "\n", "np.set_printoptions(linewidth=240)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib widget" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# From https://github.com/mubeta06/python/blob/master/signal_processing/sp/gold.py\n", "preferred_pairs = {5:[[2],[1,2,3]], 6:[[5],[1,4,5]], 7:[[4],[4,5,6]],\n", " 8:[[1,2,3,6,7],[1,2,7]], 9:[[5],[3,5,6]], \n", " 10:[[2,5,9],[3,4,6,8,9]], 11:[[9],[3,6,9]]}\n", "\n", "def gen_gold(seq1, seq2):\n", " gold = [seq1, seq2]\n", " for shift in range(len(seq1)):\n", " gold.append(seq1 ^ np.roll(seq2, -shift))\n", " return gold\n", "\n", "def gold(n):\n", " n = int(n)\n", " if not n in preferred_pairs:\n", " raise KeyError('preferred pairs for %s bits unknown' % str(n))\n", " t0, t1 = preferred_pairs[n]\n", " (seq0, _st0), (seq1, _st1) = sig.max_len_seq(n, taps=t0), sig.max_len_seq(n, taps=t1)\n", " return gen_gold(seq0, seq1)" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [], "source": [ "def modulate(data, nbits=5, pad=True):\n", " # 0, 1 -> -1, 1\n", " mask = np.array(gold(nbits))*2 - 1\n", " \n", " sel = mask[data>>1]\n", " data_lsb_centered = ((data&1)*2 - 1)\n", "\n", " signal = (np.multiply(sel, np.tile(data_lsb_centered, (2**nbits-1, 1)).T).flatten() + 1) // 2\n", " if pad:\n", " return np.hstack([ np.zeros(len(mask)), signal, np.zeros(len(mask)) ])\n", " else:\n", " return signal" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [], "source": [ "def generate_noisy_signal(\n", " test_duration=32,\n", " test_nbits=5,\n", " test_decimation=10,\n", " test_signal_amplitude=200e-3,\n", " noise_level=10e-3):\n", "\n", " #test_data = np.random.RandomState(seed=0).randint(0, 2 * (2**test_nbits), test_duration)\n", " #test_data = np.array([0, 1, 2, 3] * 50)\n", " test_data = np.array(range(test_duration))\n", " signal = np.repeat(modulate(test_data, test_nbits, pad=False) * 2.0 - 1, test_decimation) * test_signal_amplitude\n", " noise = colorednoise.powerlaw_psd_gaussian(1, len(signal)*2) * noise_level\n", " noise[-int(1.5*len(signal)):][:len(signal)] += signal\n", "\n", " return noise+50\n", " #with open(f'mains_sim_signals/dsss_test_noisy_padded.bin', 'wb') as f:\n", " # for e in noise:\n", " # f.write(struct.pack(']" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fig, ax = plt.subplots()\n", "ax.plot(generate_noisy_signal())" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [], "source": [ "with open('data/ref_sig_audio_test3.bin', 'wb') as f:\n", " for x in generate_noisy_signal():\n", " f.write(struct.pack('f', x))" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def synthesize_sine(freqs, freqs_sampling_rate=10.0, output_sampling_rate=44100):\n", " duration = len(freqs) / freqs_sampling_rate # seconds\n", " afreq_out = np.interp(np.linspace(0, duration, int(duration*output_sampling_rate)), np.linspace(0, duration, len(freqs)), freqs)\n", " return np.sin(np.cumsum(2*np.pi * afreq_out / output_sampling_rate))" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "test_sig = synthesize_sine(generate_noisy_signal())" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "5171d9dbbe5048e2b32c3cf7f7d03744", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "[]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fig, ax = plt.subplots()\n", "ax.plot(test_sig[:44100])" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [], "source": [ "def save_signal_flac(filename, signal, sampling_rate=44100):\n", " signal -= np.min(signal)\n", " signal /= np.max(signal)\n", " signal -= 0.5\n", " signal *= 2**16 - 1\n", " le_bytes = signal.astype(np.int16).tobytes()\n", " seg = pydub.AudioSegment(data=le_bytes, sample_width=2, frame_rate=sampling_rate, channels=1)\n", " seg.export(filename, format='flac')" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [], "source": [ "save_signal_flac('synth_sig_test_0123_02.flac', synthesize_sine(generate_noisy_signal(), freqs_sampling_rate=10.0 * 100/128, output_sampling_rate=44100))" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "def emulate_adc_signal(adc_bits=12, adc_offset=0.4, adc_amplitude=0.25, freq_sampling_rate=10.0, output_sampling_rate=1000, **kwargs):\n", " signal = synthesize_sine(generate_noisy_signal(), freq_sampling_rate, output_sampling_rate)\n", " signal = signal*adc_amplitude + adc_offset\n", " smin, smax = np.min(signal), np.max(signal)\n", " if smin < 0.0 or smax > 1.0:\n", " raise UserWarning('Amplitude or offset too large: Signal out of bounds with min/max [{smin}, {smax}] of ADC range')\n", " signal *= 2**adc_bits -1\n", " return signal" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "def save_adc_signal(fn, signal, dtype=np.uint16):\n", " with open(fn, 'wb') as f:\n", " f.write(signal.astype(dtype).tobytes())" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "save_adc_signal('emulated_adc_readings_01.bin', emulate_adc_signal(freq_sampling_rate=10.0 * 100/128))" ] } ], "metadata": { "kernelspec": { "display_name": "labenv", "language": "python", "name": "labenv" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.3" } }, "nbformat": 4, "nbformat_minor": 4 }