diff options
Diffstat (limited to 'sim_runner.py')
-rw-r--r-- | sim_runner.py | 282 |
1 files changed, 0 insertions, 282 deletions
diff --git a/sim_runner.py b/sim_runner.py deleted file mode 100644 index d615caa..0000000 --- a/sim_runner.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python3 - -import threading -import queue -import itertools -import pathlib -import tempfile -import sys -import sqlite3 -import time -import math -import json -import subprocess - -import tqdm -import click -from tabulate import tabulate - - -def mesh_args(db, coil_id, mesh_type, mesh_file, outfile): - mesh_type = {'split': '--mesh-split-out', 'normal': '--mesh-out', 'mutual': '--mesh-mutual-out'}[mesh_type] - rows = db.execute('SELECT key, value FROM results WHERE coil_id=?', (coil_id,)).fetchall() - args = ['python', '-m', 'twisted_coil_gen_twolayer', mesh_type, mesh_file, '--pcb'] - for k, v in rows: - prefix, _, k = k.partition('.') - if v != 'False' and prefix == 'gen': - args.append('--' + k.replace('_', '-')) - if v != 'True': - args.append(str(v)) - args.append(outfile) - return args - - -def get_mesh_file(db, mesh_dir, run_id, coil_id, mesh_type): - db.execute('CREATE TABLE IF NOT EXISTS meshes(coil_id INTEGER, mesh_type TEXT, error INTEGER, filename TEXT, timestamp TEXT DEFAULT current_timestamp, FOREIGN KEY (coil_id) REFERENCES coils(coil_id))') - - row = db.execute('SELECT * FROM meshes WHERE coil_id=? AND mesh_type=? ORDER BY timestamp DESC LIMIT 1', (coil_id, mesh_type)).fetchone() - if row is not None: - mesh_file = mesh_dir / row['filename'] - if mesh_file.is_file(): - return mesh_file - - timestamp = time.strftime('%Y-%m-%d_%H-%M-%S') - return mesh_dir / f'mesh-{run_id}-{coil_id}-{mesh_type}-{timestamp}.msh' - - -def ensure_mesh(db, mesh_dir, log_dir, run_id, coil_id, mesh_type): - mesh_file = get_mesh_file(db, mesh_dir, run_id, coil_id, mesh_type) - - if mesh_file.is_file(): - return mesh_file - - db.execute('INSERT INTO meshes(coil_id, mesh_type, error, filename) VALUES (?, ?, 0, ?)', (coil_id, mesh_type, mesh_file.name)) - db.commit() - - mesh_file.parent.mkdir(exist_ok=True) - with tempfile.NamedTemporaryFile(suffix='.kicad_pcb') as f: - args = mesh_args(db, coil_id, mesh_type, mesh_file, f.name) - tqdm.tqdm.write(' '.join(map(str, args))) - logfile = log_dir / mesh_file.with_suffix('.log').name - logfile.parent.mkdir(exist_ok=True) - try: - res = subprocess.run(args, check=True, capture_output=True, text=True) - logfile.write_text(res.stdout + res.stderr) - - except subprocess.CalledProcessError as e: - print('Mesh generation failed with exit code {e.returncode}', file=sys.stderr) - logfile.write_text(e.stdout + e.stderr) - print(e.stdout + e.stderr) - raise - - return mesh_file - - -@click.group() -@click.option('-d', '--database', default='coil_parameters.sqlite3') -@click.pass_context -def cli(ctx, database): - ctx.ensure_object(dict) - def connect(): - db = sqlite3.connect(database) - db.row_factory = sqlite3.Row - return db - ctx.obj['db_connect'] = connect - - -@cli.command() -@click.pass_context -def list_runs(ctx): - for row in ctx.obj['db_connect']().execute('SELECT * FROM runs ORDER BY timestamp').fetchall(): - print(row['run_id'], row['timestamp'], row['version']) - - -@cli.command() -@click.pass_context -def list_runs(ctx): - for row in ctx.obj['db_connect']().execute('SELECT * FROM runs ORDER BY timestamp').fetchall(): - print(row['run_id'], row['timestamp'], row['version']) - - -@cli.command() -@click.option('-r', '--run-id') -@click.option('-m', '--mesh-dir', default='meshes') -@click.pass_context -def list_coils(ctx, run_id, mesh_dir): - db = ctx.obj['db_connect']() - if run_id is None: - run_id, = db.execute('SELECT run_id FROM runs ORDER BY timestamp DESC LIMIT 1').fetchone() - timestamp, = db.execute('SELECT timestamp FROM runs WHERE run_id=?', (run_id,)).fetchone() - mesh_dir = pathlib.Path(mesh_dir) - - print(f'Listing meshes for run {run_id} at {timestamp}') - print() - - keys = {'gen.turns': 'N', - 'gen.twists': 'T', - 'gen.single_layer': '1L', - 'gen.inner_diameter': 'ID[mm]', - 'gen.outer_diameter': 'OD[mm]', - 'calculated_fill_factor': 'Fill factor', - 'calculated_approximate_inductance': 'L [µH]', - 'calculated_trace_length': 'track len [mm]', - 'calculated_approximate_resistance': 'R [mΩ]'} - out = [] - for row in db.execute('SELECT *, MAX(meshes.timestamp) FROM coils LEFT JOIN meshes ON coils.coil_id=meshes.coil_id WHERE run_id=? GROUP BY coils.coil_id, mesh_type ORDER BY meshes.timestamp', (run_id,)).fetchall(): - if row['timestamp']: - if row['error']: - state = 'ERROR' - elif not (mesh_dir / row['filename']).is_file(): - state = 'NOT FOUND' - else: - state = 'SUCCESS' - else: - state = 'NOT RUN' - - params = dict(db.execute('SELECT key, value FROM results WHERE coil_id=?', (row['coil_id'],)).fetchall()) - - if 'calculated_approximate_inductance' in params: - params['calculated_approximate_inductance'] = f'{float(params["calculated_approximate_inductance"])*1e6:.02f}' - - if 'calculated_trace_length' in params: - params['calculated_trace_length'] = f'{float(params["calculated_trace_length"])*1e3:.03f}' - - if 'calculated_approximate_resistance' in params: - params['calculated_approximate_resistance'] = f'{float(params["calculated_approximate_resistance"])*1e3:.03f}' - - if 'calculated_fill_factor' in params: - params['calculated_fill_factor'] = f'{float(params["calculated_fill_factor"]):.03f}' - - out.append([row['coil_id'], row['mesh_type'], state, row['timestamp']] + [params.get(key, '-') for key in keys]) - - print(tabulate(out, headers=['coil', 'mesh', 'state', 'time'] + list(keys.values()), disable_numparse=True, stralign='right')) - -@cli.command() -@click.argument('coil_id', type=int) -@click.argument('mesh_type', type=click.Choice(['normal', 'split', 'mutual'])) -@click.option('--mesh-file', default='/tmp/test.msh') -@click.option('--pcb-file', default='/tmp/test.kicad_pcb') -@click.pass_context -def cmdline(ctx, coil_id, mesh_type, mesh_file, pcb_file): - print(' '.join(mesh_args(ctx.obj['db_connect'](), coil_id, mesh_type, mesh_file, pcb_file))) - -@cli.group() -@click.option('-r', '--run-id') -@click.option('-l', '--log-dir', default='logs') -@click.option('-m', '--mesh-dir', default='meshes') -@click.pass_context -def run(ctx, run_id, log_dir, mesh_dir): - if run_id is None: - run_id, = ctx.obj['db_connect']().execute('SELECT run_id FROM runs ORDER BY timestamp DESC LIMIT 1').fetchone() - ctx.obj['run_id'] = run_id - ctx.obj['log_dir'] = pathlib.Path(log_dir) - ctx.obj['mesh_dir'] = pathlib.Path(mesh_dir) - - -@run.command() -@click.option('-j', '--num-jobs', type=int, default=1, help='Number of jobs to run in parallel') -@click.pass_context -def generate_meshes(ctx, num_jobs): - db = ctx.obj['db_connect']() - rows = [row['coil_id'] for row in db.execute('SELECT coil_id FROM coils WHERE run_id=?', (ctx.obj['run_id'],)).fetchall()] - mesh_types = ['split', 'normal', 'mutual'] - - params = list(itertools.product(rows, mesh_types)) - all_files = {get_mesh_file(db, ctx.obj['mesh_dir'], ctx.obj['run_id'], coil_id, mesh_type): (coil_id, mesh_type) for coil_id, mesh_type in params} - todo = [(coil_id, mesh_type) for f, (coil_id, mesh_type) in all_files.items() if not f.is_file()] - - q = queue.Queue() - for elem in todo: - q.put(elem) - - tq = tqdm.tqdm(total=len(todo)) - def queue_worker(): - try: - while True: - coil_id, mesh_type = q.get_nowait() - try: - ensure_mesh(ctx.obj['db_connect'](), ctx.obj['mesh_dir'], ctx.obj['log_dir'], ctx.obj['run_id'], coil_id, mesh_type) - except subprocess.CalledProcessError: - tqdm.tqdm.write(f'Error generating {mesh_type} mesh for {coil_id=}') - tq.update(1) - q.task_done() - except queue.Empty: - pass - - tqdm.tqdm.write(f'Found {len(params)-len(todo)} meshes out of a total of {len(params)}.') - tqdm.tqdm.write(f'Processing the remaining {len(todo)} meshes on {num_jobs} workers in parallel.') - threads = [] - for i in range(num_jobs): - t = threading.Thread(target=queue_worker, daemon=True) - t.start() - threads.append(t) - q.join() - -@run.command() -@click.option('-j', '--num-jobs', type=int, default=1, help='Number of jobs to run in parallel') -@click.pass_context -def self_inductance(ctx, num_jobs): - db = ctx.obj['db_connect']() - - q = queue.Queue() - - def queue_worker(): - try: - while True: - mesh_file, logfile = q.get_nowait() - with tempfile.TemporaryDirectory() as tmpdir: - try: - tqdm.tqdm.write(f'Processing {mesh_file}') - res = subprocess.run(['python', '-m', 'coil_parasitics', 'inductance', '--sim-dir', tmpdir, mesh_file], check=True, capture_output=True) - logfile.write_text(res.stdout+res.stderr) - except subprocess.CalledProcessError as e: - print(f'Error running simulation, rc={e.returncode}') - logfile.write_text(e.stdout+e.stderr) - tq.update(1) - q.task_done() - except queue.Empty: - pass - - num_meshes, num_params, num_completed = 0, 0, 0 - for coil_id, in db.execute('SELECT coil_id FROM coils WHERE run_id=?', (ctx.obj['run_id'],)).fetchall(): - num_params += 1 - mesh_file = get_mesh_file(ctx.obj['db_connect'](), ctx.obj['mesh_dir'], ctx.obj['run_id'], coil_id, 'normal') - if mesh_file.is_file(): - num_meshes += 1 - logfile = ctx.obj['log_dir'] / (mesh_file.stem + '_elmer_self_inductance.log') - if logfile.is_file(): - num_completed += 1 - else: - q.put((mesh_file, logfile)) - - tqdm.tqdm.write(f'Found {num_meshes} meshes out of a total of {num_params} with {num_completed} completed simulations.') - tqdm.tqdm.write(f'Processing the remaining {num_meshes-num_completed} simulations on {num_jobs} workers in parallel.') - - tq = tqdm.tqdm(total=num_meshes-num_completed) - threads = [] - for i in range(num_jobs): - t = threading.Thread(target=queue_worker, daemon=True) - t.start() - threads.append(t) - q.join() - -@run.command() -@click.pass_context -def self_capacitance(ctx): - db = ctx.obj['db_connect']() - for coil_id, in tqdm.tqdm(db.execute('SELECT coil_id FROM coils WHERE run_id=?', (ctx.obj['run_id'],)).fetchall()): - mesh_file = get_mesh_file(ctx.obj['db_connect'](), ctx.obj['mesh_dir'], ctx.obj['run_id'], coil_id, 'normal') - if mesh_file.is_file(): - logfile = ctx.obj['log_dir'] / (mesh_file.stem + '_elmer_self_capacitance.log') - with tempfile.TemporaryDirectory() as tmpdir: - try: - res = subprocess.run(['python', '-m', 'coil_parasitics', 'self-capacitance', '--sim-dir', tmpdir, mesh_file], check=True, capture_output=True) - logfile.write_text(res.stdout+res.stderr) - except subprocess.CalledProcessError as e: - print(f'Error running simulation, rc={e.returncode}') - logfile.write_text(e.stdout+e.stderr) - - -if __name__ == '__main__': - cli() - |