summaryrefslogtreecommitdiff
path: root/ROCOF test data generator.ipynb
diff options
context:
space:
mode:
Diffstat (limited to 'ROCOF test data generator.ipynb')
-rw-r--r--ROCOF test data generator.ipynb235
1 files changed, 235 insertions, 0 deletions
diff --git a/ROCOF test data generator.ipynb b/ROCOF test data generator.ipynb
new file mode 100644
index 0000000..df94a7b
--- /dev/null
+++ b/ROCOF test data generator.ipynb
@@ -0,0 +1,235 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# ROCOF test waveform library\n",
+ "\n",
+ "This is a re-implementation of the ROCOF test waveforms described in https://zenodo.org/record/3559798\n",
+ "\n",
+ "**This file is exported as a python module and loaded from other notebooks here, so please make sure to re-export when changing it.**"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import math\n",
+ "import itertools\n",
+ "\n",
+ "import numpy as np\n",
+ "from scipy import signal\n",
+ "from matplotlib import pyplot as plt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%matplotlib notebook"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def sample_waveform(generator, duration:\"s\"=10, sampling_rate:\"sp/s\"=10000, frequency:\"Hz\"=50):\n",
+ " samples = int(duration*sampling_rate)\n",
+ " phases = np.linspace(0, 2*np.pi, 6, endpoint=False)\n",
+ " omega_t = np.linspace(phases, phases + 2*np.pi*duration*frequency, samples)\n",
+ " fundamental = np.sin(omega_t)\n",
+ " return generator(omega_t, fundamental, sampling_rate=sampling_rate, duration=duration, frequency=frequency).swapaxes(0, 1)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def gen_harmonics(amplitudes, phases=[]):\n",
+ " return lambda omega_t, fundamental, **_: fundamental + np.sum([\n",
+ " a*np.sin((p if p else 0) + i*omega_t)\n",
+ " for i, (a, p) in enumerate(itertools.zip_longest(amplitudes, phases), start=2)\n",
+ " ], axis=0)\n",
+ "\n",
+ "def test_harmonics():\n",
+ " return gen_harmonics([0.02, 0.05, 0.01, 0.06, 0.005, 0.05, 0.005, 0.015, 0.005, 0.035, 0.005, 0.003])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def gen_interharmonic(amplitudes, ih=[], ih_phase=[]):\n",
+ " def gen(omega_t, fundamental, **_):\n",
+ " return fundamental + np.sum([\n",
+ " a*np.sin(omega_t * ih + (p if p else 0))\n",
+ " for a, ih, p in itertools.zip_longest(amplitudes, ih, ih_phase)\n",
+ " ], axis=0)\n",
+ " return gen\n",
+ "\n",
+ "def test_interharmonics():\n",
+ " return gen_interharmonic([0.1], [15.01401], [np.pi])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def gen_noise(amplitude=0.2, fmax:'Hz'=4.9e3, fmin:'Hz'=100, filter_order=6):\n",
+ " def gen(omega_t, fundamental, sampling_rate, **_):\n",
+ " noise = np.random.normal(0, amplitude, fundamental.shape)\n",
+ " b, a = signal.butter(filter_order,\n",
+ " [fmin, min(fmax, sampling_rate//2-1)],\n",
+ " btype='bandpass',\n",
+ " fs=sampling_rate)\n",
+ " return fundamental + signal.lfilter(b, a, noise, axis=0)\n",
+ " return gen\n",
+ "\n",
+ "def test_noise():\n",
+ " return gen_noise()\n",
+ "\n",
+ "def test_noise_loud():\n",
+ " return gen_noise(amplitude=0.5, fmin=10)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 406,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def gen_steps(size_amplitude=0.1, size_phase=0.1*np.pi, steps_per_sec=1):\n",
+ " def gen(omega_t, fundamental, duration, **_):\n",
+ " n = int(steps_per_sec * duration)\n",
+ " indices = np.random.randint(0, len(omega_t), n)\n",
+ " amplitudes = np.random.normal(1, size_amplitude, (n, 6))\n",
+ " phases = np.random.normal(0, size_phase, (n, 6))\n",
+ " amplitude = np.ones(omega_t.shape)\n",
+ " for start, end, a, p in zip(indices, indices[1:], amplitudes, phases):\n",
+ " omega_t[start:end] += p\n",
+ " amplitude[start:end] = a\n",
+ " return amplitude*np.sin(omega_t)\n",
+ " return gen\n",
+ "\n",
+ "def test_amplitude_steps():\n",
+ " return gen_steps(size_amplitude=0.4, size_phase=0)\n",
+ "\n",
+ "def test_phase_steps():\n",
+ " return gen_steps(size_amplitude=0, size_phase=0.1)\n",
+ "\n",
+ "def test_amplitude_and_phase_steps():\n",
+ " return gen_steps(size_amplitude=0.2, size_phase=0.07)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 418,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def step_gen(shape, stdev, duration, steps_per_sec=1.0, mean=0.0):\n",
+ " samples, channels = shape\n",
+ " n = int(steps_per_sec * duration)\n",
+ " indices = np.random.randint(0, samples, n)\n",
+ " phases = np.random.normal(mean, stdev, (n, 6))\n",
+ " amplitude = np.ones((samples, channels))\n",
+ " out = np.zeros(shape)\n",
+ " for start, end, a in zip(indices, indices[1:], amplitude):\n",
+ " out[start:end] = a\n",
+ " return out\n",
+ "\n",
+ "def gen_chirp(fmin, fmax, period, dwell_time=1.0, amplitude=None, phase_steps=None):\n",
+ " def gen(omega_t, fundamental, sampling_rate, duration, **_):\n",
+ " samples = int(duration*sampling_rate)\n",
+ " phases = np.linspace(0, 2*np.pi, 6, endpoint=False)\n",
+ " \n",
+ " c = (fmax-fmin)/period\n",
+ " t = np.linspace(0, duration, samples)\n",
+ " \n",
+ " x = np.repeat(np.reshape(2*np.pi*fmin*t, (-1,1)), 6, axis=1)\n",
+ " data = (phases + x)[:int(sampling_rate*dwell_time)]\n",
+ " current_phase = 2*np.pi*fmin*dwell_time\n",
+ " direction = 'up'\n",
+ " \n",
+ " for idx in range(int(dwell_time*sampling_rate), samples, int(2*period*sampling_rate)):\n",
+ " t1 = np.linspace(0, period, int(period*sampling_rate))\n",
+ " t2 = np.linspace(0, period, int(period*sampling_rate))\n",
+ " chirp_phase = np.hstack((\n",
+ " 2*np.pi*(c/2 * t1**2 + fmin * t1),\n",
+ " 2*np.pi*(-c/2 * t2**2 + fmax * t2 - (c/2 * period**2 + fmin * period))\n",
+ " ))\n",
+ " chirp_phase = np.repeat(np.reshape(chirp_phase, (-1, 1)), 6, axis=1)\n",
+ " new = phases + chirp_phase + current_phase\n",
+ " current_phase = chirp_phase[-1]\n",
+ " data = np.vstack((data, new))\n",
+ " \n",
+ " data = data[:len(fundamental)]\n",
+ " \n",
+ " if phase_steps:\n",
+ " (step_amplitude, steps_per_sec) = phase_steps\n",
+ " steps = step_gen(data.shape, step_amplitude, duration, steps_per_sec)\n",
+ " data += steps\n",
+ " \n",
+ " if amplitude is None:\n",
+ " return np.sin(data)\n",
+ " else:\n",
+ " return fundamental + amplitude*np.sin(data)\n",
+ " return gen\n",
+ "\n",
+ "def test_close_interharmonics_and_flicker():\n",
+ " return gen_chirp(90.0, 150.0, 10, 1, amplitude=0.1)\n",
+ "\n",
+ "def test_off_frequency():\n",
+ "# return gen_chirp(48.0, 52.0, 0.25, 1)\n",
+ " return gen_chirp(48.0, 52.0, 10, 1)\n",
+ "\n",
+ "def test_sweep_phase_steps():\n",
+ " return gen_chirp(48.0, 52.0, 10, 1, phase_steps=(0.1, 1))\n",
+ "# return gen_chirp(48.0, 52.0, 0.25, 1, phase_steps=(0.1, 1))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "all_tests = [test_harmonics, test_interharmonics, test_noise, test_noise_loud, test_amplitude_steps, test_phase_steps, test_amplitude_and_phase_steps, test_close_interharmonics_and_flicker, test_off_frequency, test_sweep_phase_steps]"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.7.5"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}