In [12]:
import math
import struct

import numpy as np
from scipy import signal, optimize
from matplotlib import pyplot as plt

import rocof_test_data

In [13]:
%matplotlib widget

In [14]:
fs = 1000 # Hz
ff = 50 # Hz
duration = 60 # seconds
# test_data = rocof_test_data.sample_waveform(rocof_test_data.test_close_interharmonics_and_flicker(),
#                                             duration=20,
#                                             sampling_rate=fs,
#                                             frequency=ff)[0]
# test_data = rocof_test_data.sample_waveform(rocof_test_data.gen_noise(fmin=10, amplitude=1),
#                                             duration=20,
#                                             sampling_rate=fs,
#                                             frequency=ff)[0]


#gen = rocof_test_data.gen_noise(fmin=10, amplitude=1)
# gen = rocof_test_data.gen_noise(fmin=60, amplitude=0.2)
# gen = rocof_test_data.test_harmonics()
# gen = rocof_test_data.gen_interharmonic(*rocof_test_data.test_interharmonics)
# gen = rocof_test_data.test_amplitude_steps()
# gen = rocof_test_data.test_amplitude_and_phase_steps()
test_data = []
test_labels = [ fun.__name__.replace('test_', '') for fun in rocof_test_data.all_tests ]
for gen in rocof_test_data.all_tests:
    test_data.append(rocof_test_data.sample_waveform(gen(),
                                            duration=duration,
                                            sampling_rate=fs,
                                            frequency=ff)[0])
# d = 10 # seconds
# test_data = np.sin(2*np.pi * ff * np.linspace(0, d, int(d*fs)))

In [15]:
spr_fmt = f'{fs}Hz' if fs<1000 else f'{fs/1e3:f}'.rstrip('.0') + 'kHz'
for label, data in zip(test_labels, test_data):
    with open(f'rocof_test_data/rocof_test_{label}_{spr_fmt}.bin', 'wb') as f:
        for sample in data:
            f.write(struct.pack('<f', sample))

In [16]:
analysis_periods = 10
window_len = 256 # fs * analysis_periods/ff
nfft_factor = 1
sigma = window_len/8 # samples
quantization_bits = 14

ffts = []
for item in test_data:
    f, t, Zxx = signal.stft((item * (2**(quantization_bits-1) - 1)).round().astype(np.int16).astype(float),
                fs = fs,
                window=('gaussian', sigma),
                nperseg = window_len,
                nfft = window_len * nfft_factor)
                #boundary = 'zeros')
    ffts.append((f, t, Zxx))

In [17]:
Zxx.shape

(129, 470)

In [18]:
1000/256

3.90625

In [19]:
fig, ax = plt.subplots(len(test_data), figsize=(8, 20), sharex=True)
fig.tight_layout(pad=2, h_pad=0.1)

for fft, ax, label in zip(test_data, ax.flatten(), test_labels):
    ax.plot((item * (2**(quantization_bits-1) - 1)).round())
    
    ax.set_title(label, pad=-20, color='white', bbox=dict(boxstyle="square", ec=(0,0,0,0), fc=(0,0,0,0.8)))
    ax.grid()
    ax.set_ylabel('f [Hz]')
ax.set_xlabel('simulation time t [s]')
ax.set_xlim([5000, 5200])
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [20]:
fig, ax = plt.subplots(len(test_data), figsize=(8, 20), sharex=True)
fig.tight_layout(pad=2, h_pad=0.1)

for fft, ax, label in zip(ffts, ax.flatten(), test_labels):
    f, t, Zxx = fft
    ax.pcolormesh(t[1:], f[:250], np.abs(Zxx[:250,1:]))
    ax.set_title(label, pad=-20, color='white')
    ax.grid()
    ax.set_ylabel('f [Hz]')
    ax.set_ylim([30, 75]) # Hz
ax.set_xlabel('simulation time t [s]')
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [21]:
f

array([  0.     ,   3.90625,   7.8125 ,  11.71875,  15.625  ,  19.53125,
        23.4375 ,  27.34375,  31.25   ,  35.15625,  39.0625 ,  42.96875,
        46.875  ,  50.78125,  54.6875 ,  58.59375,  62.5    ,  66.40625,
        70.3125 ,  74.21875,  78.125  ,  82.03125,  85.9375 ,  89.84375,
        93.75   ,  97.65625, 101.5625 , 105.46875, 109.375  , 113.28125,
       117.1875 , 121.09375, 125.     , 128.90625, 132.8125 , 136.71875,
       140.625  , 144.53125, 148.4375 , 152.34375, 156.25   , 160.15625,
       164.0625 , 167.96875, 171.875  , 175.78125, 179.6875 , 183.59375,
       187.5    , 191.40625, 195.3125 , 199.21875, 203.125  , 207.03125,
       210.9375 , 214.84375, 218.75   , 222.65625, 226.5625 , 230.46875,
       234.375  , 238.28125, 242.1875 , 246.09375, 250.     , 253.90625,
       257.8125 , 261.71875, 265.625  , 269.53125, 273.4375 , 277.34375,
       281.25   , 285.15625, 289.0625 , 292.96875, 296.875  , 300.78125,
       304.6875 , 308.59375, 312.5    , 316.40625, 

In [37]:
fig, axs = plt.subplots(len(test_data), figsize=(8, 20), sharex=True)
fig.tight_layout(pad=2.2, h_pad=0, w_pad=1)

for fft, ax, label in zip(ffts, axs.flatten(), test_labels):
    f, f_t, Zxx = fft
    
    n_f, n_t = Zxx.shape
    # start, stop = 180, 220
    # start, stop = 90, 110
    # start, stop = 15, 35
    # bounds_f = slice(start // 4 * nfft_factor, stop // 4 * nfft_factor)
    f_min, f_max = 30, 70 # Hz
    bounds_f = slice(np.argmax(f > f_min), np.argmin(f < f_max))
    

    f_mean = np.zeros(Zxx.shape[1])
    for t in range(1, Zxx.shape[1] - 1):
        frame_f = f[bounds_f]
        frame_step = frame_f[1] - frame_f[0]
        time_step = f_t[1] - f_t[0]
        #if t == 10:
        #    axs[-1].plot(frame_f, frame_Z)
        frame_Z = np.abs(Zxx[bounds_f, t])
        # frame_f = f[180:220]
        # frame_Z = np.abs(Zxx[180:220, 40])
        # frame_f = f[15:35]
        # frame_Z = np.abs(Zxx[15:35, 40])
        # plt.plot(frame_f, frame_Z)

        # peak_f = frame_f[np.argmax(frame)]
        # plt.axvline(peak_f, color='red')

#         def gauss(x, *p):
#             A, mu, sigma, o = p
#             return A*np.exp(-(x-mu)**2/(2.*sigma**2)) + o
        
        def gauss(x, *p):
            A, mu, sigma = p
            return A*np.exp(-(x-mu)**2/(2.*sigma**2))
        
        f_start = frame_f[np.argmax(frame_Z)]
        A_start = np.max(frame_Z)
        p0 = [A_start, f_start, 1.]
        try:
            coeff, var = optimize.curve_fit(gauss, frame_f, frame_Z, p0=p0)
            # plt.plot(frame_f, gauss(frame_f, *coeff))
            #print(coeff)
            A, mu, sigma, *_ = coeff
            f_mean[t] = mu
        except RuntimeError:
            f_mean[t] = np.nan
    ax.plot(f_t[1:-1], f_mean[1:-1])
    
#     b, a = signal.butter(3,
#                          1/5, # Hz
#                          btype='lowpass',
#                          fs=1/time_step)
#     filtered = signal.lfilter(b, a, f_mean[1:-1], axis=0)
#     ax.plot(f_t[1:-1], filtered)
    
    ax.set_title(label, pad=-20)
    ax.set_ylabel('f [Hz]')
    ax.grid()
    if not label in ['off_frequency', 'sweep_phase_steps']:
        ax.set_ylim([49.90, 50.10])
        var = np.var(f_mean[1:-1])
        ax.text(0.5, 0.1, f'σ²={var * 1e3:.3g} mHz²', transform=ax.transAxes, ha='center')
        ax.text(0.5, 0.25, f'σ={np.sqrt(var) * 1e3:.3g} mHz', transform=ax.transAxes, ha='center')
#         ax.text(0.5, 0.2, f'filt. σ²={np.var(filtered) * 1e3:.3g} mHz', transform=ax.transAxes, ha='center')
    else:
        f_min, f_max = min(f_mean[1:-1]), max(f_mean[1:-1])
        delta = f_max - f_min
        ax.set_ylim(f_min - delta * 0.1, f_max + delta * 0.3)
        
ax.set_xlabel('simulation time t [s]')
None

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

slice(12, 15, None)
slice(12, 15, None)
slice(12, 15, None)
slice(12, 15, None)




slice(12, 15, None)
slice(12, 15, None)
slice(12, 15, None)
slice(12, 15, None)
slice(12, 15, None)
slice(12, 15, None)
