# 3gbp_console_sqlite.py # # Provide a web-based console for controlling # the 3GBP system for GC demo. # # Endpoint '/' is the console # Endpoint '/sp/raw' is the setpoints as a json string # Endpoint '/pv/update' updates process values from a json string # Endpoint '/pv/raw' returns process values as json string from nicegui import ui, app import json import sqlite3 import os from fastapi import Response, Query import base64 # SQLite setup DB_FILE = 'console_data.db' def init_db(): """Initialize the SQLite database with required tables.""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() # Create tables if they don't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS sp_store ( key TEXT PRIMARY KEY, value TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS pv_store ( key TEXT PRIMARY KEY, value TEXT ) ''') conn.commit() conn.close() def get_dict_from_db(table_name): """Get a dictionary from a database table.""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(f'SELECT key, value FROM {table_name}') rows = cursor.fetchall() conn.close() result = {} for key, value in rows: # Convert string values to appropriate types if value.lower() == 'true': result[key] = True elif value.lower() == 'false': result[key] = False else: try: result[key] = float(value) except ValueError: result[key] = value return result def save_dict_to_db(table_name, data_dict): """Save a dictionary to a database table.""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() # First delete existing data cursor.execute(f'DELETE FROM {table_name}') # Insert new data for key, value in data_dict.items(): cursor.execute(f'INSERT INTO {table_name} (key, value) VALUES (?, ?)', (key, str(value))) conn.commit() conn.close() # Initialize default data if not present def initialize_default_data(): """Initialize default data in the database if tables are empty.""" # Check if sp_store has data conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM sp_store') count = cursor.fetchone()[0] conn.close() if count == 0: # Set default sp_store values sp_store_defaults = { 'te': 0.0, 'tb': 0.0, 'rate': 0.0, 'vel': 0.0, 'hv': 0.0, 'zoff': 0.0, 'te_p': 0.0, 'te_i': 0.0, 'te_d': 0.0, 'rate_scaling': 96.75, 'run': False, 'tune_te': False, 'tune_tb': False, } save_dict_to_db('sp_store', sp_store_defaults) # Check if pv_store has data conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM pv_store') count = cursor.fetchone()[0] conn.close() if count == 0: # Set default pv_store values pv_store_defaults = { 'te': 0.0, 'tb': 0.0, 'te_p': 0.0, 'te_i': 0.0, 'te_d': 0.0, 'tb_p': 0.0, 'tb_i': 0.0, 'tb_d': 0.0, 'ball': False, 'tune_te_done': False, } save_dict_to_db('pv_store', pv_store_defaults) # Initialize database at startup init_db() initialize_default_data() @ui.page('/') def main_page(): sp_dict = get_dict_from_db('sp_store') pv_dict = get_dict_from_db('pv_store') ui.label('3GBP Console').classes('text-2xl font-bold mb-4 text-center') # Container with flex display to place panels side by side with ui.element('div').classes('flex w-full gap-4'): # Left panel - Setpoints (editable) with ui.card().classes('flex-1'): ui.label('Setpoints (Editable)').classes('text-xl font-bold mb-2') input_fields = {} bool_fields = ['run', 'tune_te', 'tune_tb'] checkboxes = {} # Group fields by type for cleaner UI numeric_fields = {k: v for k, v in sp_dict.items() if k not in bool_fields} # Create a grid container for the key-value pairs for key, value in numeric_fields.items(): with ui.element('div').classes('flex items-center mb-2'): ui.label(f"{key}:").classes('w-1/2 font-bold') input_fields[key] = ui.input(value=str(value)).props('type=number step=0.01').classes('w-1/2') # Boolean fields (checkboxes) ui.separator().classes('my-2') ui.label('Controls').classes('font-bold') for bool_key in bool_fields: with ui.element('div').classes('flex items-center mb-2'): checkboxes[bool_key] = ui.checkbox(bool_key, value=sp_dict.get(bool_key, False)) def update_sp_values(): # Update all numeric values for key, input_field in input_fields.items(): try: sp_dict[key] = float(input_field.value) except (ValueError, TypeError): ui.notify(f'Invalid value for {key}', type='negative') return # Update all boolean values for key, checkbox in checkboxes.items(): sp_dict[key] = checkbox.value save_dict_to_db('sp_store', sp_dict) ui.notify('Setpoints updated successfully!', type='positive') ui.button('Save Changes', on_click=update_sp_values).classes('bg-blue-500 text-white mt-4') # Right panel - Process Values (read-only) with ui.card().classes('flex-1'): ui.label('Process Values (Read-Only)').classes('text-xl font-bold mb-2') pv_elements = {} # Create UI elements for process values for key, value in pv_dict.items(): with ui.element('div').classes('flex items-center mb-2'): ui.label(f"{key}:").classes('w-1/2 font-bold') if isinstance(value, bool): pv_elements[key] = ui.label('✓' if value else '✗').classes('w-1/2') else: pv_elements[key] = ui.label(f"{value}").classes('w-1/2') # Function to refresh PV values def refresh_pv_values(): updated_pv = get_dict_from_db('pv_store') for key, element in pv_elements.items(): if key in updated_pv: if isinstance(updated_pv[key], bool): element.text = '✓' if updated_pv[key] else '✗' else: element.text = f"{updated_pv[key]}" ui.notify('Process values refreshed', type='info') ui.button('Refresh Values', on_click=refresh_pv_values).classes('bg-green-500 text-white mt-4') # Base64 decode function for handling API requests BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" def base64_decode(data): """Decode Base64 string back to bytes (MicroPython-compatible).""" buffer = bytearray() data = data.rstrip("=") # Remove padding i = 0 while i < len(data): byte1 = BASE64_CHARS.index(data[i]) << 2 byte2 = BASE64_CHARS.index(data[i+1]) >> 4 if i+1 < len(data) else 0 byte3 = (BASE64_CHARS.index(data[i+1]) << 4) & 0xF0 if i+1 < len(data) else 0 byte4 = BASE64_CHARS.index(data[i+2]) >> 2 if i+2 < len(data) else 0 byte5 = (BASE64_CHARS.index(data[i+2]) << 6) & 0xC0 if i+2 < len(data) else 0 byte6 = BASE64_CHARS.index(data[i+3]) if i+3 < len(data) else 0 i += 4 buffer.append(byte1 | byte2) if i-2 < len(data): buffer.append(byte3 | byte4) if i-1 < len(data): buffer.append(byte5 | byte6) return bytes(buffer) @app.get('/pv/update') def update_dict_json(data: str = Query(None)): try: # Decode base64 JSON string json_bytes = base64_decode(data) json_data = json_bytes.decode('utf-8') update_data = json.loads(json_data) print("json_data=", json_data) print("update_data=", update_data) if not isinstance(update_data, dict): return Response(content=json.dumps({"error": "Data must be a JSON object"}), media_type="application/json") pv_dict = get_dict_from_db('pv_store') updated_keys = [] for key, value in update_data.items(): if key in pv_dict: # Convert appropriately based on key type if key in ['ball', 'tune_te_done']: pv_dict[key] = bool(value) else: pv_dict[key] = float(value) updated_keys.append(key) # Update the storage save_dict_to_db('pv_store', pv_dict) # Handle the autotune completion logic if pv_dict.get("tune_te_done", False): sp_dict = get_dict_from_db('sp_store') if sp_dict.get("tune_te", False): sp_dict["tune_te"] = False sp_dict["te_p"] = pv_dict.get("te_p", 0.0) sp_dict["te_i"] = pv_dict.get("te_i", 0.0) sp_dict["te_d"] = pv_dict.get("te_d", 0.0) save_dict_to_db('sp_store', sp_dict) return Response( content=json.dumps({ "status": "success", "updated_keys": updated_keys, "current_data": pv_dict }), media_type="application/json" ) except json.JSONDecodeError as e: return Response( content=json.dumps({"error": f"Invalid JSON format: {str(e)}"}), media_type="application/json" ) except Exception as e: return Response( content=json.dumps({"error": f"Error processing request: {str(e)}"}), media_type="application/json" ) @app.get('/sp/raw') def get_raw_sp_dict(): """Return the setpoints store as a JSON string.""" sp_dict = get_dict_from_db('sp_store') return Response(content=json.dumps(sp_dict), media_type="application/json") @app.get('/pv/raw') def get_raw_pv_dict(): """Return the process values store as a JSON string.""" pv_dict = get_dict_from_db('pv_store') return Response(content=json.dumps(pv_dict), media_type="application/json") # Start the UI application ui.run(title="3GBP Console")