{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Grid frequency estimation" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import math\n", "import sqlite3\n", "import struct\n", "import datetime\n", "import scipy.fftpack\n", "from scipy import signal as sig\n", "\n", "import matplotlib\n", "from matplotlib import pyplot as plt\n", "from matplotlib import patches\n", "import numpy as np\n", "from scipy import signal, optimize\n", "from tqdm.notebook import tnrange, tqdm" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib widget" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Setup\n", "### Load data series information from sqlite capture file\n", "\n", "One capture file may contain multiple runs/data series. Display a list of runs and their start/end time and sample count, then select the newest one in `last_run` variable." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "db = sqlite3.connect('data/waveform-raspi-ocxo-2.sqlite3')" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Run 000: 2020-04-01 14:00:25 - 2020-04-01 15:09:31 ( 1:09:05.846, 4197664sp)\n", "Run 001: 2020-04-02 11:56:41 - 2020-04-02 11:57:59 ( 0:01:18.544, 79552sp)\n", "Run 002: 2020-04-02 12:03:51 - 2020-04-02 12:12:54 ( 0:09:03.033, 549792sp)\n" ] } ], "source": [ "for run_id, start, end, count in db.execute('SELECT run_id, MIN(rx_ts), MAX(rx_ts), COUNT(*) FROM measurements GROUP BY run_id'):\n", " foo = lambda x: datetime.datetime.fromtimestamp(x/1000)\n", " start, end = foo(start), foo(end)\n", " print(f'Run {run_id:03d}: {start:%Y-%m-%d %H:%M:%S} - {end:%Y-%m-%d %H:%M:%S} ({str(end-start)[:-3]:>13}, {count*32:>9d}sp)')\n", "last_run, n_records = run_id, count" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup analog parameters\n", "\n", "Setup parameters of analog capture hardware here. This is used to scale samples from ADC counts to analog voltages. Also setup sampling rate here. Nominal sampling rate is 1 ksps." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "sampling_rate = 1000.0 * 48.6 / 48\n", "\n", "par = lambda *rs: 1/sum(1/r for r in rs) # resistor parallel calculation\n", "\n", "# Note: These are for the first prototype only!\n", "vmeas_source_impedance = 330e3\n", "vmeas_source_scale = 0.5\n", "\n", "vcc = 15.0\n", "vmeas_div_high = 27e3\n", "vmeas_div_low = par(4.7e3, 10e3)\n", "vmeas_div_voltage = vcc * vmeas_div_low / (vmeas_div_high + vmeas_div_low)\n", "vmeas_div_impedance = par(vmeas_div_high, vmeas_div_low)\n", "\n", "#vmeas_overall_factor = vmeas_div_impedance / (vmeas_source_impedance + vmeas_div_impedance)\n", "v0 = 1.5746\n", "v100 = 2.004\n", "vn100 = 1.1452\n", "\n", "adc_vcc = 3.3 # V\n", "adc_fullscale = 4095\n", "\n", "adc_val_to_voltage_factor = 1/adc_fullscale * adc_vcc\n", "\n", "adc_count_to_vmeas = lambda x: (x*adc_val_to_voltage_factor - v0) / (v100-v0) * 100" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load run data from sqlite3 capture file\n", "\n", "Load measurement data for the selected run and assemble a numpy array containing one continuous trace. " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "fa875d84971946ada24a959c1a85fe78", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(FloatProgress(value=0.0, max=17181.0), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "RMS voltage: 228.5564548966498\n" ] } ], "source": [ "limit = n_records\n", "record_size = 32\n", "skip_dropped_sections = False\n", "\n", "data = np.zeros(limit*record_size)\n", "data[:] = np.nan\n", "\n", "last_seq = None\n", "write_index = 0\n", "for i, (seq, chunk) in tqdm(enumerate(db.execute(\n", " 'SELECT seq, data FROM measurements WHERE run_id = ? ORDER BY rx_ts LIMIT ? OFFSET ?',\n", " (last_run, limit, n_records-limit))), total=n_records):\n", " \n", " if last_seq is None or seq == (last_seq + 1)%0x10000:\n", " last_seq = seq\n", " idx = write_index if skip_dropped_sections else i\n", " data[idx*record_size:(idx+1)*record_size] = np.frombuffer(chunk, dtype=' last_seq:\n", " last_seq = seq\n", " # nans = np.empty((record_size,))\n", " # nans[:] = np.nan\n", " # data = np.append(data, nans) FIXME\n", " \n", "data = (data * adc_val_to_voltage_factor - v0) / (v100-v0) * 100\n", "\n", "# https://stackoverflow.com/questions/6518811/interpolate-nan-values-in-a-numpy-array\n", "nan_helper = lambda y: (np.isnan(y), lambda z: z.nonzero()[0])\n", "\n", "# data rarely may contain NaNs where the capture script failed to read and acknowledge capture buffers from the sensor board fast enough.\n", "# For RMS calculation and overall FFT fill these NaNs with interpolated values from their neighbors.\n", "data_not_nan = np.copy(data)\n", "nans, x = nan_helper(data)\n", "data_not_nan[nans]= np.interp(x(nans), x(~nans), data[~nans])\n", "\n", "print('RMS voltage:', np.sqrt(np.mean(np.square(data_not_nan))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Show a preview of loaded data" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "2cf087d340f7461d88d2ebaba0c6f95e", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "fig, (top, bottom) = plt.subplots(2, figsize=(9,6))\n", "fig.tight_layout(pad=3, h_pad=1.8)\n", "\n", "range_start, range_len = -300, 60 # [s]\n", "\n", "data_slice = data[ int(range_start * sampling_rate) : int((range_start + range_len) * sampling_rate) ]\n", "\n", "top.grid()\n", "top.plot(np.linspace(0, range_len, int(range_len*sampling_rate)), data_slice, lw=1.0)\n", "top.set_xlim([range_len/2-0.25, range_len/2+0.25])\n", "mean = np.mean(data_not_nan)\n", "rms = np.sqrt(np.mean(np.square(data_not_nan - mean)))\n", "peak = np.max(np.abs(data_not_nan - mean))\n", "top.axhline(mean, color='red')\n", "bbox = {'facecolor': 'black', 'alpha': 0.8, 'pad': 2}\n", "top.text(0.02, 0.5, f'mean: {mean:.3f}', transform=top.transAxes, color='white', bbox=bbox, ha='left', va='center')\n", "top.text(0.98, 0.2, f'V_RMS: {rms:.3f}', transform=top.transAxes, color='white', bbox=bbox, ha='right')\n", "top.text(0.98, 0.1, f'V_Pk: {peak:.3f}', transform=top.transAxes, color='white', bbox=bbox, ha='right')\n", "top.text(0.5, 0.9, f'Run {run_id}', transform=top.transAxes, color='white', bbox=bbox, ha='center', fontweight='bold')\n", "\n", "bottom.grid()\n", "bottom.specgram(data_slice, Fs=sampling_rate)\n", "top.set_ylabel('U [V]')\n", "bottom.set_ylabel('F [Hz]')\n", "bottom.set_xlabel('t [s]')\n", "\n", "top.set_title('Voltage waveform')\n", "bottom.set_title('Voltage frequency spectrum')\n", "None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Calculate Short-Time Fourier Transform of capture" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Window length: 202 sp, zero-padded to 202 sp\n" ] } ], "source": [ "fs = sampling_rate # Hz\n", "ff = 50 # Hz\n", "\n", "analysis_periods = 10\n", "window_len = fs * analysis_periods/ff\n", "nfft_factor = 1\n", "sigma = window_len/8 # samples\n", "\n", "f, t, Zxx = signal.stft(data,\n", " fs = fs,\n", " window=('gaussian', sigma),\n", " nperseg = window_len,\n", " nfft = window_len * nfft_factor)\n", "print(f'Window length: {window_len:.0f} sp, zero-padded to {window_len * nfft_factor:.0f} sp')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Show a preview of STFT results\n", "\n", "Cut out our approximate frequency range of interest" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "432082c0f3a644d781669c57e8324ceb", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "fig, ax = plt.subplots(figsize=(9, 3))\n", "fig.tight_layout(pad=2, h_pad=0.1)\n", "\n", "ax.pcolormesh(t[-200:-100], f[:250], np.abs(Zxx[:250,-200:-100]))\n", "ax.set_title(f\"Run {last_run}\", pad=-20, color='white')\n", "ax.grid()\n", "ax.set_ylabel('f [Hz]')\n", "ax.set_ylim([30, 75]) # Hz\n", "ax.set_xlabel('simulation time t [s]')\n", "None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Run Gasior and Gonzalez for precise frequency estimation\n", "\n", "Limit analysis to frequency range of interest. If automatic adaption to totally different frequency ranges\n", "(e.g. 400Hz) would be necessary, we could switch here based on configuration or a lookup of the STFT bin\n", "containing highest overall energy.\n", "\n", "As elaborated in the Gasior and Gonzalez Paper [1] the shape of the template function should match the expected peak shape.\n", "Peak shape is determined by the STFT window function. As Gasior and Gonzalez note, a gaussian is a very good fit for a steep gaussian window." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "fd30a988dcb84bc0a29e74d3134167e6", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(FloatProgress(value=0.0, max=5443.0), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "f_t = t\n", "\n", "n_f, n_t = Zxx.shape\n", "# Frequency ROI\n", "f_min, f_max = 30, 70 # Hz\n", "# Indices of bins within ROI\n", "bounds_f = slice(np.argmax(f > f_min), np.argmin(f < f_max))\n", "\n", "# Initialize output array\n", "f_mean = np.zeros(Zxx.shape[1])\n", "\n", "# Iterate over STFT time slices\n", "for le_t in tnrange(1, Zxx.shape[1] - 1):\n", " # Cut out ROI and compute magnitude of complex fourier coefficients\n", " frame_f = f[bounds_f]\n", " frame_Z = np.abs(Zxx[bounds_f, le_t])\n", "\n", " # Template function. We use a gaussian here. This function needs to fit the window above.\n", " def gauss(x, *p):\n", " A, mu, sigma = p\n", " return A*np.exp(-(x-mu)**2/(2.*sigma**2))\n", "\n", " # Calculate initial values for curve fitting\n", " f_start = frame_f[np.argmax(frame_Z)] # index of strongest bin index\n", " A_start = np.max(frame_Z) # strongest bin value\n", " p0 = [A_start, f_start, 1.]\n", " try:\n", " # Fit template to measurement data STFT ROI \n", " coeff, var = optimize.curve_fit(gauss, frame_f, frame_Z, p0=p0)\n", " _A, f_mean[le_t], _sigma, *_ = coeff # The measured frequency is the mean of the fitted gaussian\n", " \n", " except Exception as e:\n", " # Handle fit errors\n", " f_mean[le_t] = np.nan" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Produce a plot of measurement results\n", "\n", "Include measurements of mean, standard deviation and variance of measurement data" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "1baa6cf9948b4faeb79ad81940e2b4a0", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "fig, ax = plt.subplots(figsize=(9, 5), sharex=True)\n", "fig.tight_layout(pad=2.2, h_pad=0, w_pad=1)\n", "\n", "# Cut off invalid values at fringes\n", "ax.plot(f_t[1:-2], f_mean[1:-2])\n", "ax.set_ylabel('f [Hz]')\n", "ax.grid()\n", "\n", "var = np.var(f_mean[~np.isnan(f_mean)][1:-1])\n", "mean = np.mean(f_mean[~np.isnan(f_mean)][1:-1])\n", "ax.text(0.5, 0.95, f'Run {run_id}', transform=ax.transAxes, ha='center', fontweight='bold', color='white', bbox=bbox)\n", "ax.text(0.05, 0.95, f'μ={mean:.3g} Hz', transform=ax.transAxes, ha='left', color='white', bbox=bbox)\n", "ax.text(0.05, 0.89, f'σ={np.sqrt(var) * 1e3:.3g} mHz', transform=ax.transAxes, ha='left', color='white', bbox=bbox)\n", "ax.text(0.05, 0.83, f'σ²={var * 1e3:.3g} mHz²', transform=ax.transAxes, ha='left', color='white', bbox=bbox)\n", "\n", "# Indicated missing values\n", "for i in np.where(np.isnan(f_mean))[0]:\n", " ax.axvspan(f_t[i], f_t[i+1], color='lightblue')\n", "\n", "formatter = matplotlib.ticker.FuncFormatter(lambda s, x: str(datetime.timedelta(seconds=s)))\n", "ax.xaxis.set_major_formatter(formatter)\n", "ax.set_xlabel('recording time t [hh:mm:ss]')\n", "None" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "708dbcdd2292469398199a0f6054a09d", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …" ] }, "metadata": {}, "output_type": "display_data" }, { "ename": "IndexError", "evalue": "index 0 is out of bounds for axis 0 with size 0", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mIndexError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 56\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 57\u001b[0m \u001b[0;31m# Cut out first 10min of filtered data to give filters time to settle\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 58\u001b[0;31m \u001b[0mrms_slice\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfiltered2\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mnp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwhere\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf_t\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m>\u001b[0m \u001b[0;36m10\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0;36m60\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 59\u001b[0m \u001b[0mrms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mnp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msqrt\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mnp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmean\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mnp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msquare\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrms_slice\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 60\u001b[0m \u001b[0max1\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtext\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0.5\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m0.1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34mf'RMS (band-pass): {rms*1e3:.3f}mHz'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtransform\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0max1\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtransAxes\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcolor\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'white'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbbox\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mbbox\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mha\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'center'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mIndexError\u001b[0m: index 0 is out of bounds for axis 0 with size 0" ] } ], "source": [ "f_copy = np.copy(f_mean[1:-1])\n", "f_copy[np.isnan(f_copy)] = np.mean(f_copy[~np.isnan(f_copy)])\n", "b, a = signal.cheby2(7, 86, 100, 'low', output='ba', fs=1000)\n", "filtered = signal.lfilter(b, a, f_copy)\n", "\n", "b2, a2 = signal.cheby2(3, 30, 1, 'high', output='ba', fs=1000)\n", "filtered2 = signal.lfilter(b2, a2, filtered)\n", "\n", "fig, (ax2, ax1) = plt.subplots(2, figsize=(9,7))\n", "ax1.plot(f_t[1:-1], f_copy, color='lightgray')\n", "ax1.set_ylim([49.90, 50.10])\n", "ax1.grid()\n", "formatter = matplotlib.ticker.FuncFormatter(lambda s, x: str(datetime.timedelta(seconds=s)))\n", "ax1.xaxis.set_major_formatter(formatter)\n", "zoom_offx = 7000 # s\n", "zoom_len = 300 # s\n", "ax1.set_xlim([zoom_offx, zoom_offx + zoom_len])\n", "\n", "ax1.plot(f_t[1:-1], filtered, color='orange')\n", "ax1r = ax1.twinx()\n", "ax1r.plot(f_t[1:-1], filtered2, color='red')\n", "ax1r.set_ylim([-0.015, 0.015])\n", "ax1.set_title(f'Zoomed trace ({datetime.timedelta(seconds=zoom_len)})', pad=-20)\n", "\n", "\n", "ax2.set_title(f'Run {last_run}')\n", "ax2.plot(f_t[1:-1], f_copy, color='orange')\n", "\n", "ax2r = ax2.twinx()\n", "ax2r.set_ylim([-0.1, 0.1])\n", "ax2r.plot(f_t[1:-1], filtered2, color='red')\n", "#ax2.plot(f_t[1:-1], filtered, color='orange', zorder=1)\n", "ax2.set_ylim([49.90, 50.10])\n", "ax2.set_xlim([0, f_t[-2]])\n", "ax2.grid()\n", "formatter = matplotlib.ticker.FuncFormatter(lambda s, x: str(datetime.timedelta(seconds=s)))\n", "ax2.xaxis.set_major_formatter(formatter)\n", "\n", "ax2.legend(handles=[\n", " patches.Patch(color='lightgray', label='Raw frequency'),\n", " patches.Patch(color='orange', label='low-pass filtered'),\n", " patches.Patch(color='red', label='band-pass filtered')])\n", "\n", "#ax2r.spines['right'].set_color('red')\n", "ax2r.yaxis.label.set_color('red')\n", "#ax2r.tick_params(axis='y', colors='red')\n", "\n", "#ax1r.spines['right'].set_color('red')\n", "ax1r.yaxis.label.set_color('red')\n", "#ax1r.tick_params(axis='y', colors='red')\n", "\n", "ax1.set_ylabel('f [Hz]')\n", "ax1r.set_ylabel('band-pass Δf [Hz]')\n", "ax2.set_ylabel('f [Hz]')\n", "ax2r.set_ylabel('band-pass Δf [Hz]')\n", "\n", "# Cut out first 10min of filtered data to give filters time to settle\n", "rms_slice = filtered2[np.where(f_t[1:] > 10*60)[0][0]:]\n", "rms = np.sqrt(np.mean(np.square(rms_slice)))\n", "ax1.text(0.5, 0.1, f'RMS (band-pass): {rms*1e3:.3f}mHz', transform=ax1.transAxes, color='white', bbox=bbox, ha='center')\n", "None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "chunk_size = 256\n", "#\n", "#with open('filtered_freq.bin', 'wb') as f:\n", "# for chunk in range(0, len(rms_slice), chunk_size):\n", "# out_data = rms_slice[chunk:chunk+chunk_size]\n", "# f.write(struct.pack(f'{len(out_data)}f', *out_data))\n", "# \n", "#with open('raw_freq.bin', 'wb') as f:\n", "# for chunk in range(0, len(f_copy), chunk_size):\n", "# out_data = f_copy[chunk:chunk+chunk_size]\n", "# f.write(struct.pack(f'{len(out_data)}f', *out_data))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Number of samplepoints\n", "N = len(data_not_nan)\n", "# sample spacing\n", "T = 1.0 / sampling_rate\n", "x = np.linspace(0.0, N*T, N)\n", "yf = scipy.fftpack.fft(data_not_nan * sig.blackman(N))\n", "xf = np.linspace(0.0, 1.0/(2.0*T), N//2)\n", "\n", "yf = 2.0/N * np.abs(yf[:N//2])\n", "\n", "average_from = lambda val, start, average_width: np.hstack([val[:start], [ np.mean(val[i:i+average_width]) for i in range(start, len(val), average_width) ]])\n", "\n", "average_width = 6\n", "average_start = 20\n", "yf = average_from(yf, average_start, average_width)\n", "xf = average_from(xf, average_start, average_width)\n", "yf = average_from(yf, 200, average_width)\n", "xf = average_from(xf, 200, average_width)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, ax = plt.subplots(figsize=(6, 3))\n", "fig.tight_layout()\n", "ax.loglog(xf, yf)\n", "#ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _pos: f'{1/x:.1f}'))\n", "ax.set_xlabel('f [Hz]')\n", "ax.set_ylabel('Amplitude V [V]')\n", "ax.grid()\n", "ax.set_xlim([0.001, 500])\n", "fig.subplots_adjust(bottom=0.2)\n", "\n", "for le_f in (50, 150, 250, 350, 450):\n", " ax.axvline(le_f, color=(1, 0.5, 0.5), zorder=-2)\n", "ax.annotate('50 Hz', xy=(20, 1), xycoords='data', bbox=dict(fc='white', alpha=0.8, ec='none'))\n", "font = {'family' : 'normal',\n", " 'weight' : 'normal',\n", " 'size' : 10}\n", "matplotlib.rc('font', **font)\n", "fig.savefig('fig_out/mains_voltage_spectrum.eps')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Number of samplepoints\n", "newcopy = np.copy(f_mean[1:-2])\n", "\n", "nans, x = nan_helper(newcopy)\n", "newcopy[nans]= np.interp(x(nans), x(~nans), newcopy[~nans])\n", "\n", "N = len(newcopy)\n", "# sample spacing\n", "T = 1.0 / 10\n", "x = np.linspace(0.0, N*T, N)\n", "yf = scipy.fftpack.fft(newcopy * sig.blackman(N))\n", "xf = np.linspace(0.0, 10/2, N//2)\n", "\n", "yf = 2.0/N * np.abs(yf[:N//2])\n", "\n", "average_from = lambda val, start, average_width: np.hstack([val[:start], [ np.mean(val[i:i+average_width]) for i in range(start, len(val), average_width) ]])\n", "\n", "average_width1, average_start1 = 3, 40\n", "average_width2, average_start2 = 4, 100\n", "yf = average_from(yf, average_start1, average_width1)\n", "xf = average_from(xf, average_start1, average_width1)\n", "yf = average_from(yf, average_start2, average_width2)\n", "xf = average_from(xf, average_start2, average_width2)\n", "\n", "fig, ax = plt.subplots()\n", "ax.loglog(xf, yf)\n", "ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _pos: f'{1/x:.1f}'))\n", "ax.set_xlabel('T [s]')\n", "ax.set_ylabel('Amplitude Δf [Hz]')\n", "\n", "for i, t in enumerate([60, 300, 450, 1200, 1800]):\n", " ax.axvline(1/t, color='red', alpha=0.5)\n", " ax.annotate(f'{t} s', xy=(1/t, 3e-5), xytext=(-15, 0), xycoords='data', textcoords='offset pixels', rotation=90)\n", "#ax.text(1/60, 10,'60 s', ha='left')\n", "ax.grid()\n", "#ax.set_xlim([1/60000, 0.5])\n", "#ax.set_ylim([5e-7, 2e-2])\n", "#ax.plot(xf[1:], 2e-6/xf[1:])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, ax = plt.subplots()\n", "ax.plot(np.linspace(0, (len(f_mean)-3)/10, len(f_mean)-3) , f_mean[1:-2])\n", "ax.grid()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## References\n", "\n", "1. **Gasior, M. & Gonzalez, J.** Improving FFT frequency measurement resolution by parabolic and gaussian interpolation *CERN-AB-Note-2004-021, CERN-AB-Note-2004-021, 2004*" ] } ], "metadata": { "kernelspec": { "display_name": "labenv", "language": "python", "name": "labenv" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.3" } }, "nbformat": 4, "nbformat_minor": 4 }