# Grid frequency estimation

In [1]:
import math
import sqlite3
import struct
import datetime
import scipy.fftpack
from scipy import signal as sig

import matplotlib
from matplotlib import pyplot as plt
from matplotlib import patches
import numpy as np
from scipy import signal, optimize
from tqdm.notebook import tnrange, tqdm

In [2]:
%matplotlib widget

## Step 1: Setup
### Load data series information from sqlite capture file

One capture file may contain multiple runs/data series. Display a list of runs and their start/end time and sample count, then select the newest one in `last_run` variable.

In [3]:
db = sqlite3.connect('data/waveform-raspi-ocxo-2.sqlite3')

In [4]:
for run_id, start, end, count in db.execute('SELECT run_id, MIN(rx_ts), MAX(rx_ts), COUNT(*) FROM measurements GROUP BY run_id'):
 foo = lambda x: datetime.datetime.fromtimestamp(x/1000)
 start, end = foo(start), foo(end)
 print(f'Run {run_id:03d}: {start:%Y-%m-%d %H:%M:%S} - {end:%Y-%m-%d %H:%M:%S} ({str(end-start)[:-3]:>13}, {count*32:>9d}sp)')
last_run, n_records = run_id, count

Run 000: 2020-04-01 14:00:25 - 2020-04-01 15:09:31 ( 1:09:05.846, 4197664sp)
Run 001: 2020-04-02 11:56:41 - 2020-04-02 11:57:59 ( 0:01:18.544, 79552sp)
Run 002: 2020-04-02 12:03:51 - 2020-04-02 12:12:54 ( 0:09:03.033, 549792sp)


### Setup analog parameters

Setup parameters of analog capture hardware here. This is used to scale samples from ADC counts to analog voltages. Also setup sampling rate here. Nominal sampling rate is 1 ksps.

In [5]:
sampling_rate = 1000.0 * 48.6 / 48

par = lambda *rs: 1/sum(1/r for r in rs) # resistor parallel calculation

# Note: These are for the first prototype only!
vmeas_source_impedance = 330e3
vmeas_source_scale = 0.5

vcc = 15.0
vmeas_div_high = 27e3
vmeas_div_low = par(4.7e3, 10e3)
vmeas_div_voltage = vcc * vmeas_div_low / (vmeas_div_high + vmeas_div_low)
vmeas_div_impedance = par(vmeas_div_high, vmeas_div_low)

#vmeas_overall_factor = vmeas_div_impedance / (vmeas_source_impedance + vmeas_div_impedance)
v0 = 1.5746
v100 = 2.004
vn100 = 1.1452

adc_vcc = 3.3 # V
adc_fullscale = 4095

adc_val_to_voltage_factor = 1/adc_fullscale * adc_vcc

adc_count_to_vmeas = lambda x: (x*adc_val_to_voltage_factor - v0) / (v100-v0) * 100

### Load run data from sqlite3 capture file

Load measurement data for the selected run and assemble a numpy array containing one continuous trace. 

In [6]:
limit = n_records
record_size = 32
skip_dropped_sections = False

data = np.zeros(limit*record_size)
data[:] = np.nan

last_seq = None
write_index = 0
for i, (seq, chunk) in tqdm(enumerate(db.execute(
 'SELECT seq, data FROM measurements WHERE run_id = ? ORDER BY rx_ts LIMIT ? OFFSET ?',
 (last_run, limit, n_records-limit))), total=n_records):
 
 if last_seq is None or seq == (last_seq + 1)%0x10000:
 last_seq = seq
 idx = write_index if skip_dropped_sections else i
 data[idx*record_size:(idx+1)*record_size] = np.frombuffer(chunk, dtype=' last_seq:
 last_seq = seq
 # nans = np.empty((record_size,))
 # nans[:] = np.nan
 # data = np.append(data, nans) FIXME
 
data = (data * adc_val_to_voltage_factor - v0) / (v100-v0) * 100

# https://stackoverflow.com/questions/6518811/interpolate-nan-values-in-a-numpy-array
nan_helper = lambda y: (np.isnan(y), lambda z: z.nonzero()[0])

# data rarely may contain NaNs where the capture script failed to read and acknowledge capture buffers from the sensor board fast enough.
# For RMS calculation and overall FFT fill these NaNs with interpolated values from their neighbors.
data_not_nan = np.copy(data)
nans, x = nan_helper(data)
data_not_nan[nans]= np.interp(x(nans), x(~nans), data[~nans])

print('RMS voltage:', np.sqrt(np.mean(np.square(data_not_nan))))

HBox(children=(FloatProgress(value=0.0, max=17181.0), HTML(value='')))


RMS voltage: 228.5564548966498


### Show a preview of loaded data

In [13]:
fig, (top, bottom) = plt.subplots(2, figsize=(9,6))
fig.tight_layout(pad=3, h_pad=1.8)

range_start, range_len = -300, 60 # [s]

data_slice = data[ int(range_start * sampling_rate) : int((range_start + range_len) * sampling_rate) ]

top.grid()
top.plot(np.linspace(0, range_len, int(range_len*sampling_rate)), data_slice, lw=1.0)
top.set_xlim([range_len/2-0.25, range_len/2+0.25])
mean = np.mean(data_not_nan)
rms = np.sqrt(np.mean(np.square(data_not_nan - mean)))
peak = np.max(np.abs(data_not_nan - mean))
top.axhline(mean, color='red')
bbox = {'facecolor': 'black', 'alpha': 0.8, 'pad': 2}
top.text(0.02, 0.5, f'mean: {mean:.3f}', transform=top.transAxes, color='white', bbox=bbox, ha='left', va='center')
top.text(0.98, 0.2, f'V_RMS: {rms:.3f}', transform=top.transAxes, color='white', bbox=bbox, ha='right')
top.text(0.98, 0.1, f'V_Pk: {peak:.3f}', transform=top.transAxes, color='white', bbox=bbox, ha='right')
top.text(0.5, 0.9, f'Run {run_id}', transform=top.transAxes, color='white', bbox=bbox, ha='center', fontweight='bold')

bottom.grid()
bottom.specgram(data_slice, Fs=sampling_rate)
top.set_ylabel('U [V]')
bottom.set_ylabel('F [Hz]')
bottom.set_xlabel('t [s]')

top.set_title('Voltage waveform')
bottom.set_title('Voltage frequency spectrum')
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## Step 2: Calculate Short-Time Fourier Transform of capture

In [14]:
fs = sampling_rate # Hz
ff = 50 # Hz

analysis_periods = 10
window_len = fs * analysis_periods/ff
nfft_factor = 1
sigma = window_len/8 # samples

f, t, Zxx = signal.stft(data,
 fs = fs,
 window=('gaussian', sigma),
 nperseg = window_len,
 nfft = window_len * nfft_factor)
print(f'Window length: {window_len:.0f} sp, zero-padded to {window_len * nfft_factor:.0f} sp')

Window length: 202 sp, zero-padded to 202 sp


### Show a preview of STFT results

Cut out our approximate frequency range of interest

In [15]:
fig, ax = plt.subplots(figsize=(9, 3))
fig.tight_layout(pad=2, h_pad=0.1)

ax.pcolormesh(t[-200:-100], f[:250], np.abs(Zxx[:250,-200:-100]))
ax.set_title(f"Run {last_run}", pad=-20, color='white')
ax.grid()
ax.set_ylabel('f [Hz]')
ax.set_ylim([30, 75]) # Hz
ax.set_xlabel('simulation time t [s]')
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## Step 3: Run Gasior and Gonzalez for precise frequency estimation

Limit analysis to frequency range of interest. If automatic adaption to totally different frequency ranges
(e.g. 400Hz) would be necessary, we could switch here based on configuration or a lookup of the STFT bin
containing highest overall energy.

As elaborated in the Gasior and Gonzalez Paper [1] the shape of the template function should match the expected peak shape.
Peak shape is determined by the STFT window function. As Gasior and Gonzalez note, a gaussian is a very good fit for a steep gaussian window.

In [16]:
f_t = t

n_f, n_t = Zxx.shape
# Frequency ROI
f_min, f_max = 30, 70 # Hz
# Indices of bins within ROI
bounds_f = slice(np.argmax(f > f_min), np.argmin(f < f_max))

# Initialize output array
f_mean = np.zeros(Zxx.shape[1])

# Iterate over STFT time slices
for le_t in tnrange(1, Zxx.shape[1] - 1):
 # Cut out ROI and compute magnitude of complex fourier coefficients
 frame_f = f[bounds_f]
 frame_Z = np.abs(Zxx[bounds_f, le_t])

 # Template function. We use a gaussian here. This function needs to fit the window above.
 def gauss(x, *p):
 A, mu, sigma = p
 return A*np.exp(-(x-mu)**2/(2.*sigma**2))

 # Calculate initial values for curve fitting
 f_start = frame_f[np.argmax(frame_Z)] # index of strongest bin index
 A_start = np.max(frame_Z) # strongest bin value
 p0 = [A_start, f_start, 1.]
 try:
 # Fit template to measurement data STFT ROI 
 coeff, var = optimize.curve_fit(gauss, frame_f, frame_Z, p0=p0)
 _A, f_mean[le_t], _sigma, *_ = coeff # The measured frequency is the mean of the fitted gaussian
 
 except Exception as e:
 # Handle fit errors
 f_mean[le_t] = np.nan

HBox(children=(FloatProgress(value=0.0, max=5443.0), HTML(value='')))




### Produce a plot of measurement results

Include measurements of mean, standard deviation and variance of measurement data

In [17]:
fig, ax = plt.subplots(figsize=(9, 5), sharex=True)
fig.tight_layout(pad=2.2, h_pad=0, w_pad=1)

# Cut off invalid values at fringes
ax.plot(f_t[1:-2], f_mean[1:-2])
ax.set_ylabel('f [Hz]')
ax.grid()

var = np.var(f_mean[~np.isnan(f_mean)][1:-1])
mean = np.mean(f_mean[~np.isnan(f_mean)][1:-1])
ax.text(0.5, 0.95, f'Run {run_id}', transform=ax.transAxes, ha='center', fontweight='bold', color='white', bbox=bbox)
ax.text(0.05, 0.95, f'μ={mean:.3g} Hz', transform=ax.transAxes, ha='left', color='white', bbox=bbox)
ax.text(0.05, 0.89, f'σ={np.sqrt(var) * 1e3:.3g} mHz', transform=ax.transAxes, ha='left', color='white', bbox=bbox)
ax.text(0.05, 0.83, f'σ²={var * 1e3:.3g} mHz²', transform=ax.transAxes, ha='left', color='white', bbox=bbox)

# Indicated missing values
for i in np.where(np.isnan(f_mean))[0]:
 ax.axvspan(f_t[i], f_t[i+1], color='lightblue')

formatter = matplotlib.ticker.FuncFormatter(lambda s, x: str(datetime.timedelta(seconds=s)))
ax.xaxis.set_major_formatter(formatter)
ax.set_xlabel('recording time t [hh:mm:ss]')
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [18]:
f_copy = np.copy(f_mean[1:-1])
f_copy[np.isnan(f_copy)] = np.mean(f_copy[~np.isnan(f_copy)])
b, a = signal.cheby2(7, 86, 100, 'low', output='ba', fs=1000)
filtered = signal.lfilter(b, a, f_copy)

b2, a2 = signal.cheby2(3, 30, 1, 'high', output='ba', fs=1000)
filtered2 = signal.lfilter(b2, a2, filtered)

fig, (ax2, ax1) = plt.subplots(2, figsize=(9,7))
ax1.plot(f_t[1:-1], f_copy, color='lightgray')
ax1.set_ylim([49.90, 50.10])
ax1.grid()
formatter = matplotlib.ticker.FuncFormatter(lambda s, x: str(datetime.timedelta(seconds=s)))
ax1.xaxis.set_major_formatter(formatter)
zoom_offx = 7000 # s
zoom_len = 300 # s
ax1.set_xlim([zoom_offx, zoom_offx + zoom_len])

ax1.plot(f_t[1:-1], filtered, color='orange')
ax1r = ax1.twinx()
ax1r.plot(f_t[1:-1], filtered2, color='red')
ax1r.set_ylim([-0.015, 0.015])
ax1.set_title(f'Zoomed trace ({datetime.timedelta(seconds=zoom_len)})', pad=-20)


ax2.set_title(f'Run {last_run}')
ax2.plot(f_t[1:-1], f_copy, color='orange')

ax2r = ax2.twinx()
ax2r.set_ylim([-0.1, 0.1])
ax2r.plot(f_t[1:-1], filtered2, color='red')
#ax2.plot(f_t[1:-1], filtered, color='orange', zorder=1)
ax2.set_ylim([49.90, 50.10])
ax2.set_xlim([0, f_t[-2]])
ax2.grid()
formatter = matplotlib.ticker.FuncFormatter(lambda s, x: str(datetime.timedelta(seconds=s)))
ax2.xaxis.set_major_formatter(formatter)

ax2.legend(handles=[
 patches.Patch(color='lightgray', label='Raw frequency'),
 patches.Patch(color='orange', label='low-pass filtered'),
 patches.Patch(color='red', label='band-pass filtered')])

#ax2r.spines['right'].set_color('red')
ax2r.yaxis.label.set_color('red')
#ax2r.tick_params(axis='y', colors='red')

#ax1r.spines['right'].set_color('red')
ax1r.yaxis.label.set_color('red')
#ax1r.tick_params(axis='y', colors='red')

ax1.set_ylabel('f [Hz]')
ax1r.set_ylabel('band-pass Δf [Hz]')
ax2.set_ylabel('f [Hz]')
ax2r.set_ylabel('band-pass Δf [Hz]')

# Cut out first 10min of filtered data to give filters time to settle
rms_slice = filtered2[np.where(f_t[1:] > 10*60)[0][0]:]
rms = np.sqrt(np.mean(np.square(rms_slice)))
ax1.text(0.5, 0.1, f'RMS (band-pass): {rms*1e3:.3f}mHz', transform=ax1.transAxes, color='white', bbox=bbox, ha='center')
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

IndexError: index 0 is out of bounds for axis 0 with size 0

In [None]:
chunk_size = 256
#
#with open('filtered_freq.bin', 'wb') as f:
# for chunk in range(0, len(rms_slice), chunk_size):
# out_data = rms_slice[chunk:chunk+chunk_size]
# f.write(struct.pack(f'{len(out_data)}f', *out_data))
# 
#with open('raw_freq.bin', 'wb') as f:
# for chunk in range(0, len(f_copy), chunk_size):
# out_data = f_copy[chunk:chunk+chunk_size]
# f.write(struct.pack(f'{len(out_data)}f', *out_data))

In [None]:
# Number of samplepoints
N = len(data_not_nan)
# sample spacing
T = 1.0 / sampling_rate
x = np.linspace(0.0, N*T, N)
yf = scipy.fftpack.fft(data_not_nan * sig.blackman(N))
xf = np.linspace(0.0, 1.0/(2.0*T), N//2)

yf = 2.0/N * np.abs(yf[:N//2])

average_from = lambda val, start, average_width: np.hstack([val[:start], [ np.mean(val[i:i+average_width]) for i in range(start, len(val), average_width) ]])

average_width = 6
average_start = 20
yf = average_from(yf, average_start, average_width)
xf = average_from(xf, average_start, average_width)
yf = average_from(yf, 200, average_width)
xf = average_from(xf, 200, average_width)

In [None]:
fig, ax = plt.subplots(figsize=(6, 3))
fig.tight_layout()
ax.loglog(xf, yf)
#ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _pos: f'{1/x:.1f}'))
ax.set_xlabel('f [Hz]')
ax.set_ylabel('Amplitude V [V]')
ax.grid()
ax.set_xlim([0.001, 500])
fig.subplots_adjust(bottom=0.2)

for le_f in (50, 150, 250, 350, 450):
 ax.axvline(le_f, color=(1, 0.5, 0.5), zorder=-2)
ax.annotate('50 Hz', xy=(20, 1), xycoords='data', bbox=dict(fc='white', alpha=0.8, ec='none'))
font = {'family' : 'normal',
 'weight' : 'normal',
 'size' : 10}
matplotlib.rc('font', **font)
fig.savefig('fig_out/mains_voltage_spectrum.eps')

In [None]:
# Number of samplepoints
newcopy = np.copy(f_mean[1:-2])

nans, x = nan_helper(newcopy)
newcopy[nans]= np.interp(x(nans), x(~nans), newcopy[~nans])

N = len(newcopy)
# sample spacing
T = 1.0 / 10
x = np.linspace(0.0, N*T, N)
yf = scipy.fftpack.fft(newcopy * sig.blackman(N))
xf = np.linspace(0.0, 10/2, N//2)

yf = 2.0/N * np.abs(yf[:N//2])

average_from = lambda val, start, average_width: np.hstack([val[:start], [ np.mean(val[i:i+average_width]) for i in range(start, len(val), average_width) ]])

average_width1, average_start1 = 3, 40
average_width2, average_start2 = 4, 100
yf = average_from(yf, average_start1, average_width1)
xf = average_from(xf, average_start1, average_width1)
yf = average_from(yf, average_start2, average_width2)
xf = average_from(xf, average_start2, average_width2)

fig, ax = plt.subplots()
ax.loglog(xf, yf)
ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _pos: f'{1/x:.1f}'))
ax.set_xlabel('T [s]')
ax.set_ylabel('Amplitude Δf [Hz]')

for i, t in enumerate([60, 300, 450, 1200, 1800]):
 ax.axvline(1/t, color='red', alpha=0.5)
 ax.annotate(f'{t} s', xy=(1/t, 3e-5), xytext=(-15, 0), xycoords='data', textcoords='offset pixels', rotation=90)
#ax.text(1/60, 10,'60 s', ha='left')
ax.grid()
#ax.set_xlim([1/60000, 0.5])
#ax.set_ylim([5e-7, 2e-2])
#ax.plot(xf[1:], 2e-6/xf[1:])

In [None]:
fig, ax = plt.subplots()
ax.plot(np.linspace(0, (len(f_mean)-3)/10, len(f_mean)-3) , f_mean[1:-2])
ax.grid()

## References

1. **Gasior, M. & Gonzalez, J.** Improving FFT frequency measurement resolution by parabolic and gaussian interpolation *CERN-AB-Note-2004-021, CERN-AB-Note-2004-021, 2004*