# copilot_mp_pid.py # # Write a micropython script for rp2040 using asyncio, timer, wiznet5k # and usocket libraries to send the json string to the /api/update # endpoint once per second. Incorporate a PID loop that reads a value # from an ADC channel once per second and generates a PWM output, using # a setpoint from the nicegui endpoint, and updating with the current # process value. Provide an autotune function for determining the # P,I,and D values. # # RLS 20250422 import uasyncio as asyncio from machine import Timer, ADC, Pin, PWM, SPI import network import usocket import ujson import time import math # Initialize network with Wiznet5K Ethernet module spi = SPI(0, 2_000_000, mosi=Pin(19), miso=Pin(20), sck=Pin(18)) nic = network.WIZNET5K(spi, Pin(12), Pin(25)) # spi, cs, rst nic.active(True) nic.ifconfig(['10.10.10.104', '255.255.255.0', '10.10.10.1', '8.8.8.8']) # Define ADC & PWM adc = machine.ADC(27) # ADC channel heat = machine.PWM(machine.Pin(8)) # PWM output heat.freq(1000) # Set PWM frequency heat.duty_u16(0) stepper = machine.PWM(9, freq=25000, duty_u16=0) dir_pin = machine.Pin(10, Pin.OUT) dir_pin.value(0) BASE64_CHARS ="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" def base64_encode(data): """Encode bytes-like object to Base64 string (MicroPython-compatible).""" encoded = "" buffer = bytearray(data) length = len(buffer) i = 0 while i < length: byte1 = buffer[i] if i < length else 0 byte2 = buffer[i+1] if i+1 < length else 0 byte3 = buffer[i+2] if i+2 < length else 0 i += 3 encoded += BASE64_CHARS[(byte1 >> 2) & 0x3F] encoded += BASE64_CHARS[((byte1 << 4) | (byte2 >> 4)) & 0x3F] encoded += BASE64_CHARS[((byte2 << 2) | (byte3 >> 6)) & 0x3F] if i - 1 < length else "=" encoded += BASE64_CHARS[byte3 & 0x3F] if i < length else "=" return encoded # Define constants VCC = 3.3 # Supply voltage PULLUP_RESISTOR = 4700 # 4.7K Ohm pull-up BETA = 3950 # Beta parameter R0 = 100000 # Resistance at 25°C T0 = 298.15 # Reference temp in Kelvin (25°C) def adc_to_temperature(adc_value, adc_max=65535): """Convert ADC reading to temperature in Celsius.""" # Convert ADC value to voltage voltage = (adc_value / adc_max) * VCC print("adc volts=", voltage) # Calculate thermistor resistance resistance = PULLUP_RESISTOR / ((VCC / voltage) - 1) # Apply Beta parameter equation temperature_kelvin = 1 / ((1 / T0) + (1 / BETA) * math.log(resistance / R0)) temperature_celsius = temperature_kelvin - 273.15 # Convert to Celsius return temperature_celsius # PID Controller Initialization class PID: def __init__(self, kp=1.0, ki=0.0, kd=0.0): self.kp = kp self.ki = ki self.kd = kd self.last_error = 0 self.integral = 0 def compute(self, sp, pv): error = sp - pv self.integral += error derivative = error - self.last_error self.last_error = error return self.kp * error + self.ki * self.integral + self.kd * derivative # PID Autotune (Basic Implementation) def autotune(target_value): best_kp, best_ki, best_kd = 1.0, 0.01, 0.05 # Start with some defaults print("autotune") for _ in range(10): # Test different values pid = PID(best_kp, best_ki, best_kd) adc_reading = adc.read_u16() temperature = adc_to_temperature(adc_reading) response = pid.compute(target_value, temperature) print("at adc=",adc_reading," temp=",temperature," resp=", response) time.sleep(1) # Simulate system response if abs(response - target_value) < 0.01: # Stop if response is close return best_kp, best_ki, best_kd best_kp += 0.05 # Adjust parameters best_ki += 0.005 best_kd += 0.01 return best_kp, best_ki, best_kd # Extract JSON from http response def extract_json(response): parts = response.split(b'\r\n\r\n', 1) if len(parts) > 1: return parts[1].decode('utf-8') return '{}' def get_http_response(sock): response = b"" while True: chunk = sock.recv(1024) # print(chunk) if not chunk: break response += chunk return response # Fetch setpoint from API async def fetch_setpoints(): try: sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) sock.connect(('10.10.10.1', 8080)) # API Server IP & Port sock.send(b"GET /sp/raw HTTP/1.1\r\nHost: 10.10.10.1\r\n\r\n") response = get_http_response(sock) sock.close() json_data = extract_json(response) data = ujson.loads(json_data) # Extract JSON body # return float(data.get("te", 0.0)) return (data) except: print("fetch_setpoint error") # return 0.0 # Default if error occurs return ({}) # Update API endpoint with process value async def send_update(process_value): json_data = ujson.dumps(process_value) encoded_data = base64_encode(json_data.encode()) # Encode JSON as base64 # encoded_data = ujson.dumps(json_data).encode() print("json=", json_data) print("encoded=", encoded_data) try: sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) sock.connect(('10.10.10.1', 8080)) # API Server IP & Port sock.send(b"GET /pv/update?data=" + encoded_data.encode() + b" HTTP/1.1\r\nHost: 192.168.1.50\r\n\r\n") # sock.send(b"GET /pv/update?data=" + encoded_data + b" HTTP/1.1\r\nHost: 10.10.10.1\r\n\r\n") sock.close() except Exception as e: print(f"Error sending data: {e}") # Main loop with asyncio async def loop(): while True: try: setpoints = await fetch_setpoints() # Get current setpoint from API except: print("failed fetch_setpoints") process_value = {} # auto-tune pid if ('tune_te' in setpoints and setpoints["tune_te"] == True): best_kp, best_ki, best_kd = 1.0, 0.01, 0.05 # Starting defaults pid = PID(best_kp, best_ki, best_kd) adc_reading = adc.read_u16() temperature = adc_to_temperature(adc_reading) response = pid.compute(target_value, temperature) print("at adc=",adc_reading," temp=",temperature," resp=", response) heat.duty_u16(int(dc * 65535.0)) # Adjust PWM output if abs(response - target_value) < 0.01: # Stop if response is close kp = best_kp ki = best_ki kd = best_kd process_value["tune_te_done"] = True process_value["te_p"] = kp process_value["te_i"] = ki process_value["te_d"] = kd else: best_kp += 0.05 # Adjust parameters best_ki += 0.005 best_kd += 0.01 else: kp = setpoints["te_p"] ki = setpoints["te_i"] kd = setpoints["te_d"] te_sp = setpoints["te"] print("te_sp=",te_sp) t = adc.read_u16() print("adc=", t) te_pv = adc_to_temperature(t) # convert to degC print("te_pv=",te_pv) process_value["te"] = te_pv pid = PID(kp, ki, kd) dc = pid.compute(te_sp, te_pv) print(" dc=", dc) heat.duty_u16(int(dc * 65535.0)) # Adjust PWM output process_value["tune_te_done"] = True rate = setpoints["rate"] * setpoints["rate_scaling"] if rate < -8: dir_pin.value(1) stepper.init(freq=-rate, duty_u16=32767) elif rate > 8: dir_pin.value(0) stepper.init(freq=rate, duty_u16=32767) else: stepper.init(freq=10, duty_u16=0) try: await send_update(process_value) # Send current process value to API except: print("failed send_update") await asyncio.sleep(1) # Run every second # Start asyncio loop asyncio.run(loop())