import paho.mqtt.client as mqtt import json import time import psutil import platform import subprocess import threading import sys import os import shutil import requests from pathlib import Path import configparser import logging from logging.handlers import RotatingFileHandler # ========================================== # 0. CONFIGURAZIONE LOGGING & HARDWARE # ========================================== logging.basicConfig( handlers=[ RotatingFileHandler('/opt/node_agent.log', maxBytes=2000000, backupCount=3), logging.StreamHandler() ], level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger("NodeAgent") try: import RPi.GPIO as GPIO GPIO_AVAILABLE = True except ImportError: GPIO_AVAILABLE = False logger.warning("Libreria RPi.GPIO non trovata. Reset hardware disabilitato.") # ========================================== # 1. CARICAMENTO CONFIGURAZIONE UNIFICATA # ========================================== CONFIG_PATH = Path("/opt/node_config.json") def load_config(): try: if not CONFIG_PATH.exists(): logger.error(f"ERRORE: File {CONFIG_PATH} non trovato!") sys.exit(1) with open(CONFIG_PATH, 'r') as f: return json.load(f) except Exception as e: logger.error(f"ERRORE CRITICO JSON: {e}") sys.exit(1) cfg = load_config() # Identificativi e Topic CLIENT_ID = cfg.get('client_id', 'iv3jdv').lower() BASE_TOPIC = cfg.get('mqtt', {}).get('base_topic', f"servizi/{CLIENT_ID}") TOPIC_CMD = f"{BASE_TOPIC}/cmnd" TOPIC_STAT = f"{BASE_TOPIC}/stat" # Variabili di Stato Globali boot_recovered = False current_status = "ONLINE - Pronto" auto_healing_counter = {} # ========================================== # 2. FUNZIONE NOTIFICA TELEGRAM # ========================================== def send_telegram_message(message): t_cfg = cfg.get('telegram', {}) if not t_cfg.get('enabled', False): return ora_attuale = int(time.strftime("%H")) if ora_attuale >= 23 or ora_attuale < 7: logger.info(f"πŸŒ™ Notte fonda ({ora_attuale}:00): Notifica Telegram evitata.") return token = t_cfg.get('token') chat_id = t_cfg.get('chat_id') if not token or not chat_id or token == "TOKEN ID": return try: clean_msg = message.replace("", "").replace("", "") url = f"https://api.telegram.org/bot{token}/sendMessage" payload = {"chat_id": chat_id, "text": f"[{CLIENT_ID.upper()}]\n{clean_msg}"} requests.post(url, json=payload, timeout=10) except Exception as e: logger.error(f"Errore invio Telegram: {e}") # ========================================== # 3. LOGICA CAMBIO PROFILO MULTIPLO # ========================================== def get_actual_config_from_disk(): return "ONLINE - Da memoria" def switch_config(config_type): profile = cfg.get('profiles', {}).get(config_type) if not profile: return f"ERRORE: Profilo {config_type} non trovato in JSON" label = profile.get('label', f"Profilo {config_type}") services = profile.get('services', []) if not services: return f"ERRORE: Nessun servizio configurato per {config_type}" try: for s in services: subprocess.run(["sudo", "systemctl", "stop", s['name']], check=False) for s in services: if not os.path.exists(s['source']): return f"ERRORE: Manca il file sorgente {s['source']}" shutil.copy(s['source'], s['target']) for s in services: subprocess.run(["sudo", "systemctl", "start", s['name']], check=False) send_telegram_message(f"βœ… Switch multiplo completato: {label}") return f"ONLINE - {label}" except Exception as e: return f"ERRORE: {str(e)}" def force_online_if_needed(client): global boot_recovered, current_status if not boot_recovered: logger.info("⚠️ Recupero memoria saltato. Imposto stato da disco...") current_status = get_actual_config_from_disk() client.publish(TOPIC_STAT, current_status, retain=True) boot_recovered = True # ========================================== # 4. TELEMETRIA E AUTO-HEALING # ========================================== def get_cpu_temperature(): temp = 0.0 try: temps = psutil.sensors_temperatures() if 'cpu_thermal' in temps: temp = temps['cpu_thermal'][0].current elif 'coretemp' in temps: temp = temps['coretemp'][0].current elif platform.system() == "Linux": res = os.popen('vcgencmd measure_temp').readline() if res: temp = float(res.replace("temp=","").replace("'C\n","")) except: pass return round(temp, 1) def get_system_status(): status = { "cpu_usage_percent": psutil.cpu_percent(interval=0.5), "cpu_temp": get_cpu_temperature(), "memory_usage_percent": psutil.virtual_memory().percent, "disk_usage_percent": psutil.disk_usage('/').percent, "processes": {}, "timestamp": time.strftime("%H:%M:%S"), "profiles": { "A": cfg.get('profiles', {}).get('A', {}).get('label', 'PROFILO A'), "B": cfg.get('profiles', {}).get('B', {}).get('label', 'PROFILO B') } } proc_path = Path(cfg['paths'].get('process_list', '')) if proc_path.exists(): try: target_processes = proc_path.read_text(encoding="utf-8").splitlines() running_names = {p.info['name'].lower() for p in psutil.process_iter(['name'])} for name in target_processes: name = name.strip().lower() if name: status["processes"][name] = "online" if name in running_names else "offline" except Exception as e: logger.error(f"Errore controllo processi: {e}") return status def check_auto_healing(client, status): if not cfg['settings'].get('auto_healing', False): return for proc_name, state in status["processes"].items(): if state == "offline": attempts = auto_healing_counter.get(proc_name, 0) if attempts < 3: auto_healing_counter[proc_name] = attempts + 1 msg = f"πŸ›  Auto-healing: {proc_name} offline. Riavvio {attempts+1}/3..." client.publish(f"devices/{CLIENT_ID}/logs", msg) send_telegram_message(msg) subprocess.run(["sudo", "systemctl", "restart", proc_name]) elif attempts == 3: msg = f"🚨 CRITICO: {proc_name} fallito!" client.publish(f"devices/{CLIENT_ID}/logs", msg) send_telegram_message(msg) auto_healing_counter[proc_name] = 4 else: auto_healing_counter[proc_name] = 0 def publish_all(client): status = get_system_status() file_list_path = Path(cfg['paths'].get('file_list', '')) status["config_files"] = [] status["files"] = [] if file_list_path.exists(): try: files = file_list_path.read_text(encoding="utf-8").splitlines() nomi_estrattti = [Path(f.strip()).stem for f in files if f.strip()] status["config_files"] = nomi_estrattti status["files"] = nomi_estrattti except: pass client.publish(f"devices/{CLIENT_ID}/services", json.dumps(status), qos=1) if file_list_path.exists(): try: files = file_list_path.read_text(encoding="utf-8").splitlines() for f in files: p = Path(f.strip()) if p.exists(): client.publish(f"data/{CLIENT_ID}/{p.stem}/full_config", json.dumps({"raw_text": p.read_text(encoding="utf-8")}), qos=1, retain=True) except: pass return status def publish_all_ini_files(client): file_list_path = cfg.get('paths', {}).get('file_list', '/opt/file_list.txt') if not os.path.exists(file_list_path): return try: with open(file_list_path, 'r') as f: files_to_parse = [line.strip() for line in f if line.strip()] except Exception as e: logger.error(f"Errore lettura {file_list_path}: {e}") return for file_path in files_to_parse: if not os.path.exists(file_path): continue try: base_name = os.path.splitext(os.path.basename(file_path))[0] ini_data = {} current_section = None with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: line = line.strip() if not line or line.startswith(('#', ';')): continue if line.startswith('[') and line.endswith(']'): current_section = line[1:-1].strip() ini_data[current_section] = {} elif '=' in line and current_section is not None: k, v = line.split('=', 1) k, v = k.strip(), v.strip() if k in ini_data[current_section]: ini_data[current_section][k] = str(ini_data[current_section][k]) + "," + v else: ini_data[current_section][k] = v for section, payload in ini_data.items(): topic = f"data/{CLIENT_ID}/{base_name}/{section}" client.publish(topic, json.dumps(payload), retain=True) except Exception as e: logger.error(f"Errore parsing INI per {file_path}: {e}") def write_config_from_json(slug, json_payload): file_list_path = Path(cfg['paths'].get('file_list', '')) if not file_list_path.exists(): return try: files = file_list_path.read_text(encoding="utf-8").splitlines() for f in files: p = Path(f.strip()) if p.stem.lower() == slug.lower(): nuovi_dati = json.loads(json_payload) shutil.copy(p, str(p) + ".bak") with open(p, 'w', encoding="utf-8") as file: file.write(nuovi_dati.get("raw_text", "")) os.system(f"sudo systemctl restart {slug}") send_telegram_message(f"πŸ“ Config {slug.upper()} aggiornata via Web.") logger.info(f"Configurazione {slug} aggiornata con successo.") break except Exception as e: logger.error(f"Errore scrittura config: {e}") # ========================================== # 5. CALLBACK MQTT # ========================================== def on_connect(client, userdata, flags, reason_code, properties=None): if reason_code == 0: logger.info(f"βœ… Connesso al broker MQTT: {CLIENT_ID.upper()}") client.subscribe([(TOPIC_CMD, 0), (TOPIC_STAT, 0)]) client.subscribe([ ("devices/control/request", 0), (f"devices/{CLIENT_ID}/control", 0), (f"devices/{CLIENT_ID}/config_set/#", 0) ]) threading.Timer(5.0, force_online_if_needed, [client]).start() publish_all(client) publish_all_ini_files(client) else: logger.error(f"❌ Errore connessione MQTT. Codice: {reason_code}") def on_disconnect(client, userdata, disconnect_flags, reason_code, properties=None): logger.warning(f"⚠️ Disconnessione dal broker MQTT! Codice: {reason_code}") logger.error("Forzo il riavvio del processo per ripristinare la rete in modo pulito...") os._exit(1) # Uccide lo script immediatamente (Systemd lo farΓ  risorgere) def on_message(client, userdata, msg): global boot_recovered, current_status, cfg payload = msg.payload.decode().strip() topic = msg.topic if topic == TOPIC_STAT and not boot_recovered: if not any(x in payload.upper() for x in ["OFFLINE", "ERRORE", "RIAVVIO"]): current_status = payload boot_recovered = True client.publish(TOPIC_STAT, current_status, retain=True) elif topic == TOPIC_CMD: cmd = payload.upper() if cmd in ["A", "B"]: current_status = switch_config(cmd) client.publish(TOPIC_STAT, current_status, retain=True) boot_recovered = True publish_all(client) elif cmd == "REBOOT": client.publish(TOPIC_STAT, f"OFFLINE - Riavvio {CLIENT_ID.upper()}...", retain=False) logger.info("Comando REBOOT ricevuto. Riavvio sistema...") time.sleep(1) subprocess.run(["sudo", "reboot"], check=True) elif cmd == 'RESET_HAT': RESET_PIN = 21 if GPIO_AVAILABLE: try: GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(RESET_PIN, GPIO.OUT) GPIO.output(RESET_PIN, GPIO.LOW) time.sleep(0.5) GPIO.output(RESET_PIN, GPIO.HIGH) GPIO.cleanup(RESET_PIN) logger.info(f"Impulso di RESET inviato al GPIO {RESET_PIN}") time.sleep(1.5) logger.info("Riavvio di MMDVMHost in corso...") subprocess.run(["sudo", "systemctl", "restart", "mmdvmhost"], check=False) client.publish(f"fleet/{CLIENT_ID}/status", "HAT RESET + MMDVM RESTART OK") client.publish(f"devices/{CLIENT_ID}/logs", "πŸ”Œ HAT Reset + MMDVMHost Restarted") except Exception as e: logger.error(f"Errore durante il reset GPIO/MMDVMHost: {e}") client.publish(f"fleet/{CLIENT_ID}/status", f"ERRORE RESET: {e}") elif cmd in ["TG:OFF", "TG:ON"]: nuovo_stato = (cmd == "TG:ON") cfg['telegram']['enabled'] = nuovo_stato try: with open(CONFIG_PATH, 'w') as f: json.dump(cfg, f, indent=4) client.publish(f"devices/{CLIENT_ID}/logs", f"{'πŸ””' if nuovo_stato else 'πŸ”‡'} Notifiche {'ON' if nuovo_stato else 'OFF'}") if nuovo_stato: send_telegram_message("Notifiche riattivate!") except Exception as e: logger.error(f"Errore salvataggio stato Telegram: {e}") elif topic == "devices/control/request" and payload.lower() in ["status", "update"]: publish_all(client) publish_all_ini_files(client) elif topic == f"devices/{CLIENT_ID}/control": if ":" in payload: action, service = payload.split(":") if action.lower() in ["restart", "stop", "start"]: try: subprocess.run(["sudo", "systemctl", action.lower(), service.lower()], check=True) client.publish(f"devices/{CLIENT_ID}/logs", f"βœ… {action.upper()}: {service}") logger.info(f"Comando servizio eseguito: {action.upper()} {service}") publish_all(client) except Exception as e: client.publish(f"devices/{CLIENT_ID}/logs", f"❌ ERROR: {str(e)}") logger.error(f"Errore esecuzione comando servizio: {e}") elif topic.startswith(f"devices/{CLIENT_ID}/config_set/"): slug = topic.split("/")[-1] write_config_from_json(slug, payload) time.sleep(1) publish_all(client) publish_all_ini_files(client) def auto_publish_task(client): while True: status = publish_all(client) publish_all_ini_files(client) check_auto_healing(client, status) time.sleep(cfg['settings'].get('update_interval', 30)) def start_service(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID.upper()) client.will_set(TOPIC_STAT, payload=f"OFFLINE - {CLIENT_ID.upper()}", qos=1, retain=False) client.username_pw_set(cfg['mqtt']['user'], cfg['mqtt']['password']) client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message while True: try: logger.info("Tentativo di connessione al broker MQTT...") client.connect(cfg['mqtt']['broker'], cfg['mqtt']['port'], 60) client.loop_start() threading.Thread(target=auto_publish_task, args=(client,), daemon=True).start() # Mantiene il processo vivo while True: time.sleep(1) except Exception as e: logger.error(f"Impossibile connettersi o connessione persa ({e}). Riprovo in 10 secondi...") time.sleep(10) if __name__ == "__main__": start_service()