# console_json13.py # # Provide a web-based console for controlling # the 3GBP system for GC demo. # # Endpoint '/' is the console # Endpoint '/sp/raw' is the setpoints as a json string # Endpoint '/pv/update' updates process values from a json string # Endpoint '/pv/raw' returns process values as json string from nicegui import ui, app import json import sqlite3 import os from fastapi import Response, Query, Request import base64 from matplotlib import pyplot as plt # SQLite setup DB_PATH = 'console_data.db' te_endpoint_triggered = False def init_db(): """Initialize the SQLite database with required tables.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create tables if they don't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS sp_store ( key TEXT PRIMARY KEY, value TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS pv_store ( key TEXT PRIMARY KEY, value TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS te_store ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, te REAL ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS tb_store ( key TEXT PRIMARY KEY, value TEXT ) ''') conn.commit() conn.close() def get_dict_from_db(table_name): """Get a dictionary from a database table.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(f'SELECT key, value FROM {table_name}') rows = cursor.fetchall() conn.close() result = {} for key, value in rows: # Convert string values to appropriate types if value.lower() == 'true': result[key] = True elif value.lower() == 'false': result[key] = False else: try: result[key] = float(value) except ValueError: result[key] = value return result def save_dict_to_db(table_name, data_dict): """Save a dictionary to a database table.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # First delete existing data cursor.execute(f'DELETE FROM {table_name}') # Insert new data for key, value in data_dict.items(): cursor.execute(f'INSERT INTO {table_name} (key, value) VALUES (?, ?)', (key, str(value))) conn.commit() conn.close() # Initialize default data if not present def initialize_default_data(): """Initialize default data in the database if tables are empty.""" # Check if sp_store has data conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM sp_store') count = cursor.fetchone()[0] conn.close() if count == 0: # Set default sp_store values sp_store_defaults = { 'te': 0.0, 'tb': 0.0, 'rate': 0.0, 'vel': 0.0, 'hv': 0.0, 'zoff': 0.0, 'te_p': 0.0, 'te_i': 0.0, 'te_d': 0.0, 'tb_p': 0.0, 'tb_i': 0.0, 'tb_d': 0.0, 'rate_scaling': 96.75, 'run': False, 'tune_te': False, 'tune_tb': False, } save_dict_to_db('sp_store', sp_store_defaults) # Check if pv_store has data conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM pv_store') pv_count = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM te_store') te_count = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM tb_store') tb_count = cursor.fetchone()[0] conn.close() if pv_count == 0: # Set default pv_store values pv_store_defaults = { 'te_p': 0.0, 'te_i': 0.0, 'te_d': 0.0, 'tb_p': 0.0, 'tb_i': 0.0, 'tb_d': 0.0, 'ball': False, 'tune_te_done': False, } save_dict_to_db('pv_store', pv_store_defaults) # Initialize database at startup init_db() initialize_default_data() # Function to refresh PV values def refresh_pv_values(pv_elements): updated_pv = get_dict_from_db('pv_store') for key, element in pv_elements.items(): if key in updated_pv: if isinstance(updated_pv[key], bool): element.text = '✓' if updated_pv[key] else '✗' else: element.text = f"{updated_pv[key]}" ui.notify('Process values refreshed', type='info') plot_te() @ui.page('/') def main_page(): sp_dict = get_dict_from_db('sp_store') pv_dict = get_dict_from_db('pv_store') ui.label('3GBP Console').classes('text-2xl font-bold mb-4 text-center') # Container with flex display to place panels side by side with ui.element('div').classes('flex w-full gap-4'): # Left panel - Setpoints (editable) with ui.card().classes('flex-1'): ui.label('Setpoints (Editable)').classes('text-xl font-bold mb-2') input_fields = {} bool_fields = ['run', 'tune_te', 'tune_tb'] checkboxes = {} # Group fields by type for cleaner UI numeric_fields = {k: v for k, v in sp_dict.items() if k not in bool_fields} # Create a grid container for the key-value pairs for key, value in numeric_fields.items(): with ui.element('div').classes('flex items-center mb-2'): ui.label(f"{key}:").classes('w-1/2 font-bold') input_fields[key] = ui.input(value=str(value)).props('type=number step=0.01').classes('w-1/2') # Boolean fields (checkboxes) ui.separator().classes('my-2') ui.label('Controls').classes('font-bold') for bool_key in bool_fields: with ui.element('div').classes('flex items-center mb-2'): checkboxes[bool_key] = ui.checkbox(bool_key, value=sp_dict.get(bool_key, False)) def update_sp_values(): # Update all numeric values for key, input_field in input_fields.items(): try: sp_dict[key] = float(input_field.value) except (ValueError, TypeError): ui.notify(f'Invalid value for {key}', type='negative') return # Update all boolean values for key, checkbox in checkboxes.items(): sp_dict[key] = checkbox.value save_dict_to_db('sp_store', sp_dict) ui.notify('Setpoints updated successfully!', type='positive') ui.button('Save Changes', on_click=update_sp_values).classes('bg-blue-500 text-white mt-4') # Right panel - Process Values (read-only) with ui.card().classes('flex-1'): ui.label('Process Values (Read-Only)').classes('text-xl font-bold mb-2') pv_elements = {} # Create UI elements for process values for key, value in pv_dict.items(): with ui.element('div').classes('flex test-left items-center mb-2'): ui.label(f"{key}:").classes('w-1/2 test-left font-bold') if isinstance(value, bool): pv_elements[key] = ui.label('✓' if value else '✗').classes('text-right w-1/2') else: pv_elements[key] = ui.label(f"{value}").classes('text-right w-1/2') ui.button('Refresh Values', on_click=refresh_pv_values(pv_elements)).classes('bg-green-500 text-white mt-4') @ui.refreshable def plot_te(): y = read_te_history() with ui.pyplot(figsize=(3,2)): x = range(len(y)) plt.plot(x, y, '-') plt.axis((0,20,0,50)) plt.ylabel('Temp - degC') plt.grid(True) plt.title('Hotend Temperature') def read_te_history(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() try: cursor.execute("SELECT te FROM te_store") ret = cursor.fetchall() except: print("select te failed") conn.close() return ret # Store Last 20 PV Values in SQLite def update_te_history(te_value): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() try: cursor.execute("INSERT INTO te_store (te) VALUES (?)", (te_value,)) conn.commit() except: print("insert failed") try: # Keep only last 20 entries cursor.execute("DELETE FROM te_store WHERE id NOT IN (SELECT id FROM te_store ORDER BY timestamp DESC LIMIT 20)") conn.commit() except: print("delete failed") conn.close() def get_pv_history(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT te, tb FROM pv_store ORDER BY timestamp DESC LIMIT 20") history = cursor.fetchall() conn.close() return {'te': [row[0] for row in history], 'tb': [row[1] for row in history]} @app.post('/te/update') async def update_te_json(request: Request): global te_endpoint_triggered print("update te request") try: try: request_body = await request.body() update_data = json.loads(request_body.decode('utf-8')) print("request_body=", request_body) print("update_data=", update_data.get("te")) update_te_history(update_data.get("te", 0.0)) plot_te.refresh except Exception as e: print("failed request.json", str(e)) except Exception as e: print("error processing request", str(e)) return Response( content=json.dumps({"error": f"Error processing request: {str(e)}"}), media_type="application/json" ) te_endpoint_triggered = True @app.post('/pv/update') async def update_pv_json(request: Request): try: if not isinstance(update_data, dict): return Response(content=json.dumps({"error": "Data must be a JSON object"}), media_type="application/json") try: pv_dict = get_dict_from_db('pv_store') updated_keys = [] except: print("failed get_dict_from_db") for key, value in update_data.items(): if key in pv_dict: # Convert appropriately based on key type if key in ['ball', 'tune_te_done']: pv_dict[key] = bool(value) else: pv_dict[key] = float(value) updated_keys.append(key) # Update the storage try: save_dict_to_db('pv_store', pv_dict) except: print("failed save_dict_to_db") # Handle the autotune completion logic if pv_dict.get("tune_te_done", False): try: sp_dict = get_dict_from_db('sp_store') except: print("failed get_dict_from_db 2") if sp_dict.get("tune_te", False): sp_dict["tune_te"] = False sp_dict["te_p"] = pv_dict.get("te_p", 0.0) sp_dict["te_i"] = pv_dict.get("te_i", 0.0) sp_dict["te_d"] = pv_dict.get("te_d", 0.0) try: save_dict_to_db('sp_store', sp_dict) except: print("failed save_dict_to_db 2") return Response( content=json.dumps({ "status": "success", "updated_keys": updated_keys, "current_data": pv_dict }), media_type="application/json" ) except json.JSONDecodeError as e: print("invalid json format") return Response( content=json.dumps({"error": f"Invalid JSON format: {str(e)}"}), media_type="application/json" ) except Exception as e: print("error processing request", str(e)) return Response( content=json.dumps({"error": f"Error processing request: {str(e)}"}), media_type="application/json" ) except: print("unknown update exception") @app.get('/sp/raw') def get_raw_sp_dict(): """Return the setpoints store as a JSON string.""" sp_dict = get_dict_from_db('sp_store') return Response(content=json.dumps(sp_dict), media_type="application/json") @app.get('/pv/raw') def get_raw_pv_dict(): """Return the process values store as a JSON string.""" pv_dict = get_dict_from_db('pv_store') return Response(content=json.dumps(pv_dict), media_type="application/json") # Start the UI application ui.run(title="3GBP Console")