import paho.mqtt.client as mqtt import json import time import psutil import platform import subprocess import threading import sys import os import shutil import requests from pathlib import Path import configparser try: import RPi.GPIO as GPIO # This variable must be GLOBAL, so defined at the top! GPIO_AVAILABLE = True except ImportError: GPIO_AVAILABLE = False print("Warning: RPi.GPIO library not found. Hardware reset disabled.") # ========================================== # 0. UNIFIED CONFIGURATION LOADING # ========================================== CONFIG_PATH = Path("/opt/node_config.json") def load_config(): try: if not CONFIG_PATH.exists(): print(f"❌ ERROR: File {CONFIG_PATH} not found!") sys.exit(1) with open(CONFIG_PATH, 'r') as f: return json.load(f) except Exception as e: print(f"❌ CRITICAL JSON ERROR: {e}") sys.exit(1) # Load the single necessary configuration cfg = load_config() # Identifiers and Topics CLIENT_ID = cfg.get('client_id', 'iv3jdv').lower() BASE_TOPIC = cfg.get('mqtt', {}).get('base_topic', f"servizi/{CLIENT_ID}") TOPIC_CMD = f"{BASE_TOPIC}/cmnd" TOPIC_STAT = f"{BASE_TOPIC}/stat" # Global State Variables boot_recovered = False current_status = "ONLINE - Ready" auto_healing_counter = {} # ========================================== # 1. TELEGRAM NOTIFICATION FUNCTION # ========================================== def send_telegram_message(message): t_cfg = cfg.get('telegram', {}) if not t_cfg.get('enabled', False): return current_hour = int(time.strftime("%H")) if current_hour >= 23 or current_hour < 7: print(f"🌙 Late night ({current_hour}:00): Notification skipped.") return token = t_cfg.get('token') chat_id = t_cfg.get('chat_id') if not token or not chat_id or token == "TOKEN ID": return try: clean_msg = message.replace("", "").replace("", "") url = f"https://api.telegram.org/bot{token}/sendMessage" payload = {"chat_id": chat_id, "text": f"[{CLIENT_ID.upper()}]\n{clean_msg}"} requests.post(url, json=payload, timeout=10) except Exception as e: print(f"⚠️ Telegram send error: {e}") # ========================================== # 2. MULTIPLE PROFILE SWITCH LOGIC # ========================================== def get_actual_config_from_disk(): return "ONLINE - From memory" def switch_config(config_type): profile = cfg.get('profiles', {}).get(config_type) if not profile: return f"ERROR: Profile {config_type} not found in JSON" label = profile.get('label', f"Profile {config_type}") services = profile.get('services', []) if not services: return f"ERROR: No services configured for {config_type}" try: # 1. STOP: Stop all involved daemons first to release files for s in services: subprocess.run(["sudo", "systemctl", "stop", s['name']], check=False) # 2. COPY: Verify and copy all configuration files for s in services: if not os.path.exists(s['source']): return f"ERROR: Missing source file {s['source']}" shutil.copy(s['source'], s['target']) # 3. START: Restart all daemons with the new files for s in services: subprocess.run(["sudo", "systemctl", "start", s['name']], check=False) send_telegram_message(f"✅ Multiple switch completed: {label}") return f"ONLINE - {label}" except Exception as e: return f"ERROR: {str(e)}" def force_online_if_needed(client): global boot_recovered, current_status if not boot_recovered: print("⚠️ Memory recovery skipped. Setting status from disk...") current_status = get_actual_config_from_disk() client.publish(TOPIC_STAT, current_status, retain=True) boot_recovered = True # ========================================== # 3. TELEMETRY AND AUTO-HEALING # ========================================== def get_cpu_temperature(): temp = 0.0 try: temps = psutil.sensors_temperatures() if 'cpu_thermal' in temps: temp = temps['cpu_thermal'][0].current elif 'coretemp' in temps: temp = temps['coretemp'][0].current elif platform.system() == "Linux": res = os.popen('vcgencmd measure_temp').readline() if res: temp = float(res.replace("temp=","").replace("'C\n","")) except: pass return round(temp, 1) def get_system_status(): status = { "cpu_usage_percent": psutil.cpu_percent(interval=0.5), "cpu_temp": get_cpu_temperature(), "memory_usage_percent": psutil.virtual_memory().percent, "disk_usage_percent": psutil.disk_usage('/').percent, "processes": {}, "timestamp": time.strftime("%H:%M:%S"), "profiles": { "A": cfg.get('profiles', {}).get('A', {}).get('label', 'PROFILE A'), "B": cfg.get('profiles', {}).get('B', {}).get('label', 'PROFILE B') } } proc_path = Path(cfg['paths'].get('process_list', '')) if proc_path.exists(): try: target_processes = proc_path.read_text(encoding="utf-8").splitlines() running_names = {p.info['name'].lower() for p in psutil.process_iter(['name'])} for name in target_processes: name = name.strip().lower() if name: status["processes"][name] = "online" if name in running_names else "offline" except Exception as e: print(f"Process error: {e}") return status def check_auto_healing(client, status): if not cfg['settings'].get('auto_healing', False): return for proc_name, state in status["processes"].items(): if state == "offline": attempts = auto_healing_counter.get(proc_name, 0) if attempts < 3: auto_healing_counter[proc_name] = attempts + 1 msg = f"🛠 Auto-healing: {proc_name} offline. Restarting {attempts+1}/3..." client.publish(f"devices/{CLIENT_ID}/logs", msg) send_telegram_message(msg) # --- SPECIAL RULE FOR MMDVMHOST --- # If the failed daemon is MMDVMHost, perform a hardware reset of the modem first if proc_name.lower() == "mmdvmhost" and GPIO_AVAILABLE: try: RESET_PIN = 21 GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(RESET_PIN, GPIO.OUT) # Send pulse GPIO.output(RESET_PIN, GPIO.LOW) time.sleep(0.5) GPIO.output(RESET_PIN, GPIO.HIGH) GPIO.cleanup(RESET_PIN) msg_hw = "🔌 Auto-healing: Hardware HAT Reset sent" print(f"[{CLIENT_ID}] {msg_hw}") client.publish(f"devices/{CLIENT_ID}/logs", msg_hw) # Wait for the modem firmware to boot before starting the daemon time.sleep(1.5) except Exception as e: print(f"Auto-healing GPIO Error: {e}") # ---------------------------------- # Restart the service (whether MMDVMHost or any other) subprocess.run(["sudo", "systemctl", "restart", proc_name]) elif attempts == 3: msg = f"🚨 CRITICAL: {proc_name} failed!" client.publish(f"devices/{CLIENT_ID}/logs", msg) send_telegram_message(msg) auto_healing_counter[proc_name] = 4 else: auto_healing_counter[proc_name] = 0 def publish_all(client): status = get_system_status() # Read file list for Dashboard menu file_list_path = Path(cfg['paths'].get('file_list', '')) status["config_files"] = [] status["files"] = [] if file_list_path.exists(): try: files = file_list_path.read_text(encoding="utf-8").splitlines() extracted_names = [Path(f.strip()).stem for f in files if f.strip()] status["config_files"] = extracted_names status["files"] = extracted_names except: pass client.publish(f"devices/{CLIENT_ID}/services", json.dumps(status), qos=1) if file_list_path.exists(): try: files = file_list_path.read_text(encoding="utf-8").splitlines() for f in files: p = Path(f.strip()) if p.exists(): client.publish(f"data/{CLIENT_ID}/{p.stem}/full_config", json.dumps({"raw_text": p.read_text(encoding="utf-8")}), qos=1, retain=True) except: pass return status def publish_all_ini_files(client): file_list_path = cfg.get('paths', {}).get('file_list', '/opt/file_list.txt') if not os.path.exists(file_list_path): return try: with open(file_list_path, 'r') as f: files_to_parse = [line.strip() for line in f if line.strip()] except Exception as e: print(f"Error reading {file_list_path}: {e}") return for file_path in files_to_parse: if not os.path.exists(file_path): continue try: base_name = os.path.splitext(os.path.basename(file_path))[0] # --- START MANUAL PARSER (Anti-Duplicate Keys) --- ini_data = {} current_section = None with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: line = line.strip() # Skip empty lines or comments if not line or line.startswith(('#', ';')): continue # Recognize sections [Section Name] if line.startswith('[') and line.endswith(']'): current_section = line[1:-1].strip() ini_data[current_section] = {} # Recognize keys and values elif '=' in line and current_section is not None: k, v = line.split('=', 1) k, v = k.strip(), v.strip() # THE MAGIC: If the key already exists, merge it with a comma! if k in ini_data[current_section]: ini_data[current_section][k] = str(ini_data[current_section][k]) + "," + v else: ini_data[current_section][k] = v # Publish on MQTT broker for section, payload in ini_data.items(): topic = f"data/{CLIENT_ID}/{base_name}/{section}" client.publish(topic, json.dumps(payload), retain=True) except Exception as e: print(f"INI parsing error for {file_path}: {e}") def write_config_from_json(slug, json_payload): file_list_path = Path(cfg['paths'].get('file_list', '')) if not file_list_path.exists(): return try: files = file_list_path.read_text(encoding="utf-8").splitlines() for f in files: p = Path(f.strip()) if p.stem.lower() == slug.lower(): new_data = json.loads(json_payload) shutil.copy(p, str(p) + ".bak") with open(p, 'w', encoding="utf-8") as file: file.write(new_data.get("raw_text", "")) os.system(f"sudo systemctl restart {slug}") send_telegram_message(f"📝 Config {slug.upper()} updated via Web.") break except Exception as e: print(f"Config write error: {e}") # ========================================== # 4. MQTT CALLBACKS # ========================================== def on_connect(client, userdata, flags, rc, properties=None): if rc == 0: print(f"✅ Connected: {CLIENT_ID.upper()}") client.subscribe([(TOPIC_CMD, 0), (TOPIC_STAT, 0)]) client.subscribe([ ("devices/control/request", 0), (f"devices/{CLIENT_ID}/control", 0), (f"devices/{CLIENT_ID}/config_set/#", 0) ]) threading.Timer(5.0, force_online_if_needed, [client]).start() publish_all(client) publish_all_ini_files(client) # Publish INIs as soon as connected def on_message(client, userdata, msg): global boot_recovered, current_status, cfg payload = msg.payload.decode().strip() topic = msg.topic if topic == TOPIC_STAT and not boot_recovered: if not any(x in payload.upper() for x in ["OFFLINE", "ERROR", "REBOOT"]): current_status = payload boot_recovered = True client.publish(TOPIC_STAT, current_status, retain=True) elif topic == TOPIC_CMD: cmd = payload.upper() if cmd in ["A", "B"]: current_status = switch_config(cmd) client.publish(TOPIC_STAT, current_status, retain=True) boot_recovered = True publish_all(client) elif cmd == "REBOOT": client.publish(TOPIC_STAT, f"OFFLINE - Rebooting {CLIENT_ID.upper()}...", retain=False) time.sleep(1) subprocess.run(["sudo", "reboot"], check=True) elif cmd == 'RESET_HAT': # Correct GPIO pin for MMDVM board hardware reset RESET_PIN = 21 if GPIO_AVAILABLE: try: GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(RESET_PIN, GPIO.OUT) # 1. Send reset pulse (LOW for 0.5 seconds) GPIO.output(RESET_PIN, GPIO.LOW) time.sleep(0.5) GPIO.output(RESET_PIN, GPIO.HIGH) # Release GPIO resources GPIO.cleanup(RESET_PIN) print(f"[{CLIENT_ID}] RESET pulse sent to GPIO {RESET_PIN}") # 2. Wait 1.5 seconds to let the microcontroller firmware reboot time.sleep(1.5) # 3. Restart MMDVMHost service to realign serial communication print(f"[{CLIENT_ID}] Restarting MMDVMHost...") subprocess.run(["sudo", "systemctl", "restart", "mmdvmhost"], check=False) # 4. Send confirmations to dashboard client.publish(f"fleet/{CLIENT_ID}/status", "HAT RESET + MMDVM RESTART OK") client.publish(f"devices/{CLIENT_ID}/logs", "🔌 HAT Reset + MMDVMHost Restarted") except Exception as e: print(f"Error during GPIO/MMDVMHost reset: {e}") client.publish(f"fleet/{CLIENT_ID}/status", f"RESET ERROR: {e}") elif cmd in ["TG:OFF", "TG:ON"]: new_state = (cmd == "TG:ON") cfg['telegram']['enabled'] = new_state try: with open(CONFIG_PATH, 'w') as f: json.dump(cfg, f, indent=4) client.publish(f"devices/{CLIENT_ID}/logs", f"{'🔔' if new_state else '🔇'} Notifications {'ON' if new_state else 'OFF'}") if new_state: send_telegram_message("Notifications enabled!") except: pass elif topic == "devices/control/request" and payload.lower() in ["status", "update"]: publish_all(client) publish_all_ini_files(client) elif topic == f"devices/{CLIENT_ID}/control": if ":" in payload: action, service = payload.split(":") if action.lower() in ["restart", "stop", "start"]: try: subprocess.run(["sudo", "systemctl", action.lower(), service.lower()], check=True) client.publish(f"devices/{CLIENT_ID}/logs", f"✅ {action.upper()}: {service}") publish_all(client) except Exception as e: client.publish(f"devices/{CLIENT_ID}/logs", f"❌ ERROR: {str(e)}") elif topic.startswith(f"devices/{CLIENT_ID}/config_set/"): slug = topic.split("/")[-1] write_config_from_json(slug, payload) time.sleep(1) publish_all(client) publish_all_ini_files(client) def auto_publish_task(client): while True: status = publish_all(client) publish_all_ini_files(client) check_auto_healing(client, status) time.sleep(cfg['settings'].get('update_interval', 30)) def start_service(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID.upper()) client.will_set(TOPIC_STAT, payload=f"OFFLINE - {CLIENT_ID.upper()}", qos=1, retain=False) client.username_pw_set(cfg['mqtt']['user'], cfg['mqtt']['password']) client.on_connect = on_connect client.on_message = on_message try: client.connect(cfg['mqtt']['broker'], cfg['mqtt']['port'], 60) client.loop_start() threading.Thread(target=auto_publish_task, args=(client,), daemon=True).start() while True: time.sleep(1) except Exception: sys.exit(0) if __name__ == "__main__": start_service()