import paho.mqtt.client as mqtt import json import time import psutil import platform import subprocess import threading import sys import os import shutil import requests from pathlib import Path import configparser # ========================================== # 0. CARICAMENTO CONFIGURAZIONE UNIFICATA # ========================================== CONFIG_PATH = Path("/opt/node_config.json") def load_config(): try: if not CONFIG_PATH.exists(): print(f"❌ ERRORE: File {CONFIG_PATH} non trovato!") sys.exit(1) with open(CONFIG_PATH, 'r') as f: return json.load(f) except Exception as e: print(f"❌ ERRORE CRITICO JSON: {e}") sys.exit(1) # Carichiamo l'unica configurazione necessaria cfg = load_config() # Identificativi e Topic CLIENT_ID = cfg.get('client_id', 'iv3jdv').lower() BASE_TOPIC = cfg.get('mqtt', {}).get('base_topic', f"servizi/{CLIENT_ID}") TOPIC_CMD = f"{BASE_TOPIC}/cmnd" TOPIC_STAT = f"{BASE_TOPIC}/stat" # Variabili di Stato Globali boot_recovered = False current_status = "ONLINE - Pronto" auto_healing_counter = {} # ========================================== # 1. FUNZIONE NOTIFICA TELEGRAM # ========================================== def send_telegram_message(message): t_cfg = cfg.get('telegram', {}) if not t_cfg.get('enabled', False): return ora_attuale = int(time.strftime("%H")) if ora_attuale >= 23 or ora_attuale < 7: print(f"🌙 Notte fonda ({ora_attuale}:00): Notifica evitata.") return token = t_cfg.get('token') chat_id = t_cfg.get('chat_id') if not token or not chat_id or token == "TOKEN ID": return try: clean_msg = message.replace("", "").replace("", "") url = f"https://api.telegram.org/bot{token}/sendMessage" payload = {"chat_id": chat_id, "text": f"[{CLIENT_ID.upper()}]\n{clean_msg}"} requests.post(url, json=payload, timeout=10) except Exception as e: print(f"⚠️ Errore invio Telegram: {e}") # ========================================== # 2. LOGICA CAMBIO PROFILO MULTIPLO # ========================================== def get_actual_config_from_disk(): return "ONLINE - Da memoria" def switch_config(config_type): profile = cfg.get('profiles', {}).get(config_type) if not profile: return f"ERRORE: Profilo {config_type} non trovato in JSON" label = profile.get('label', f"Profilo {config_type}") services = profile.get('services', []) if not services: return f"ERRORE: Nessun servizio configurato per {config_type}" try: # 1. STOP: Ferma prima tutti i demoni coinvolti per liberare i file for s in services: subprocess.run(["sudo", "systemctl", "stop", s['name']], check=False) # 2. COPIA: Verifica e copia tutti i file di configurazione for s in services: if not os.path.exists(s['source']): return f"ERRORE: Manca il file sorgente {s['source']}" shutil.copy(s['source'], s['target']) # 3. START: Fa ripartire tutti i demoni con i nuovi file for s in services: subprocess.run(["sudo", "systemctl", "start", s['name']], check=False) send_telegram_message(f"✅ Switch multiplo completato: {label}") return f"ONLINE - {label}" except Exception as e: return f"ERRORE: {str(e)}" def force_online_if_needed(client): global boot_recovered, current_status if not boot_recovered: print("⚠️ Recupero memoria saltato. Imposto stato da disco...") current_status = get_actual_config_from_disk() client.publish(TOPIC_STAT, current_status, retain=True) boot_recovered = True # ========================================== # 3. TELEMETRIA E AUTO-HEALING # ========================================== def get_cpu_temperature(): temp = 0.0 try: temps = psutil.sensors_temperatures() if 'cpu_thermal' in temps: temp = temps['cpu_thermal'][0].current elif 'coretemp' in temps: temp = temps['coretemp'][0].current elif platform.system() == "Linux": res = os.popen('vcgencmd measure_temp').readline() if res: temp = float(res.replace("temp=","").replace("'C\n","")) except: pass return round(temp, 1) def get_system_status(): status = { "cpu_usage_percent": psutil.cpu_percent(interval=0.5), "cpu_temp": get_cpu_temperature(), "memory_usage_percent": psutil.virtual_memory().percent, "disk_usage_percent": psutil.disk_usage('/').percent, "processes": {}, "timestamp": time.strftime("%H:%M:%S"), "profiles": { "A": cfg.get('profiles', {}).get('A', {}).get('label', 'PROFILO A'), "B": cfg.get('profiles', {}).get('B', {}).get('label', 'PROFILO B') } } proc_path = Path(cfg['paths'].get('process_list', '')) if proc_path.exists(): try: target_processes = proc_path.read_text(encoding="utf-8").splitlines() running_names = {p.info['name'].lower() for p in psutil.process_iter(['name'])} for name in target_processes: name = name.strip().lower() if name: status["processes"][name] = "online" if name in running_names else "offline" except Exception as e: print(f"Errore processi: {e}") return status def check_auto_healing(client, status): if not cfg['settings'].get('auto_healing', False): return for proc_name, state in status["processes"].items(): if state == "offline": attempts = auto_healing_counter.get(proc_name, 0) if attempts < 3: auto_healing_counter[proc_name] = attempts + 1 msg = f"🛠 Auto-healing: {proc_name} offline. Riavvio {attempts+1}/3..." client.publish(f"devices/{CLIENT_ID}/logs", msg) send_telegram_message(msg) subprocess.run(["sudo", "systemctl", "restart", proc_name]) elif attempts == 3: msg = f"🚨 CRITICO: {proc_name} fallito!" client.publish(f"devices/{CLIENT_ID}/logs", msg) send_telegram_message(msg) auto_healing_counter[proc_name] = 4 else: auto_healing_counter[proc_name] = 0 def publish_all(client): status = get_system_status() # Lettura della lista file per il menu della Dashboard file_list_path = Path(cfg['paths'].get('file_list', '')) status["config_files"] = [] status["files"] = [] if file_list_path.exists(): try: files = file_list_path.read_text(encoding="utf-8").splitlines() nomi_estrattti = [Path(f.strip()).stem for f in files if f.strip()] status["config_files"] = nomi_estrattti status["files"] = nomi_estrattti except: pass client.publish(f"devices/{CLIENT_ID}/services", json.dumps(status), qos=1) if file_list_path.exists(): try: files = file_list_path.read_text(encoding="utf-8").splitlines() for f in files: p = Path(f.strip()) if p.exists(): client.publish(f"data/{CLIENT_ID}/{p.stem}/full_config", json.dumps({"raw_text": p.read_text(encoding="utf-8")}), qos=1, retain=True) except: pass return status def publish_all_ini_files(client): file_list_path = cfg.get('paths', {}).get('file_list', '/opt/file_list.txt') if not os.path.exists(file_list_path): return try: with open(file_list_path, 'r') as f: files_to_parse = [line.strip() for line in f if line.strip()] except Exception as e: print(f"Errore lettura {file_list_path}: {e}") return for file_path in files_to_parse: if not os.path.exists(file_path): continue try: base_name = os.path.splitext(os.path.basename(file_path))[0] # --- INIZIO PARSER MANUALE (Anti-Chiavi Doppie) --- ini_data = {} current_section = None with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: line = line.strip() # Salta righe vuote o commenti if not line or line.startswith(('#', ';')): continue # Riconosce le sezioni [Nome Sezione] if line.startswith('[') and line.endswith(']'): current_section = line[1:-1].strip() ini_data[current_section] = {} # Riconosce le chiavi e i valori elif '=' in line and current_section is not None: k, v = line.split('=', 1) k, v = k.strip(), v.strip() # LA MAGIA: Se la chiave esiste già, la unisce con una virgola! if k in ini_data[current_section]: ini_data[current_section][k] = str(ini_data[current_section][k]) + "," + v else: ini_data[current_section][k] = v # Pubblicazione sul broker MQTT for section, payload in ini_data.items(): topic = f"data/{CLIENT_ID}/{base_name}/{section}" client.publish(topic, json.dumps(payload), retain=True) except Exception as e: print(f"Errore parsing INI per {file_path}: {e}") def write_config_from_json(slug, json_payload): file_list_path = Path(cfg['paths'].get('file_list', '')) if not file_list_path.exists(): return try: files = file_list_path.read_text(encoding="utf-8").splitlines() for f in files: p = Path(f.strip()) if p.stem.lower() == slug.lower(): nuovi_dati = json.loads(json_payload) shutil.copy(p, str(p) + ".bak") with open(p, 'w', encoding="utf-8") as file: file.write(nuovi_dati.get("raw_text", "")) os.system(f"sudo systemctl restart {slug}") send_telegram_message(f"📝 Config {slug.upper()} aggiornata via Web.") break except Exception as e: print(f"Errore scrittura config: {e}") # ========================================== # 4. CALLBACK MQTT # ========================================== def on_connect(client, userdata, flags, rc, properties=None): if rc == 0: print(f"✅ Connesso: {CLIENT_ID.upper()}") client.subscribe([(TOPIC_CMD, 0), (TOPIC_STAT, 0)]) client.subscribe([ ("devices/control/request", 0), (f"devices/{CLIENT_ID}/control", 0), (f"devices/{CLIENT_ID}/config_set/#", 0) ]) threading.Timer(5.0, force_online_if_needed, [client]).start() publish_all(client) publish_all_ini_files(client) # Pubblica gli INI appena si connette def on_message(client, userdata, msg): global boot_recovered, current_status, cfg payload = msg.payload.decode().strip() topic = msg.topic if topic == TOPIC_STAT and not boot_recovered: if not any(x in payload.upper() for x in ["OFFLINE", "ERRORE", "RIAVVIO"]): current_status = payload boot_recovered = True client.publish(TOPIC_STAT, current_status, retain=True) elif topic == TOPIC_CMD: cmd = payload.upper() if cmd in ["A", "B"]: current_status = switch_config(cmd) client.publish(TOPIC_STAT, current_status, retain=True) boot_recovered = True publish_all(client) elif cmd == "REBOOT": client.publish(TOPIC_STAT, f"OFFLINE - Riavvio {CLIENT_ID.upper()}...", retain=False) time.sleep(1) subprocess.run(["sudo", "reboot"], check=True) elif cmd in ["TG:OFF", "TG:ON"]: nuovo_stato = (cmd == "TG:ON") cfg['telegram']['enabled'] = nuovo_stato try: with open(CONFIG_PATH, 'w') as f: json.dump(cfg, f, indent=4) client.publish(f"devices/{CLIENT_ID}/logs", f"{'🔔' if nuovo_stato else '🔇'} Notifiche {'ON' if nuovo_stato else 'OFF'}") if nuovo_stato: send_telegram_message("Notifiche riattivate!") except: pass elif topic == "devices/control/request" and payload.lower() in ["status", "update"]: publish_all(client) publish_all_ini_files(client) elif topic == f"devices/{CLIENT_ID}/control": if ":" in payload: action, service = payload.split(":") if action.lower() in ["restart", "stop", "start"]: try: subprocess.run(["sudo", "systemctl", action.lower(), service.lower()], check=True) client.publish(f"devices/{CLIENT_ID}/logs", f"✅ {action.upper()}: {service}") publish_all(client) except Exception as e: client.publish(f"devices/{CLIENT_ID}/logs", f"❌ ERROR: {str(e)}") elif topic.startswith(f"devices/{CLIENT_ID}/config_set/"): slug = topic.split("/")[-1] write_config_from_json(slug, payload) time.sleep(1) publish_all(client) publish_all_ini_files(client) def auto_publish_task(client): while True: status = publish_all(client) publish_all_ini_files(client) # <--- ECCO IL LOOP CORRETTO! check_auto_healing(client, status) time.sleep(cfg['settings'].get('update_interval', 30)) def start_service(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID.upper()) client.will_set(TOPIC_STAT, payload=f"OFFLINE - {CLIENT_ID.upper()}", qos=1, retain=False) client.username_pw_set(cfg['mqtt']['user'], cfg['mqtt']['password']) client.on_connect = on_connect client.on_message = on_message try: client.connect(cfg['mqtt']['broker'], cfg['mqtt']['port'], 60) client.loop_start() threading.Thread(target=auto_publish_task, args=(client,), daemon=True).start() while True: time.sleep(1) except Exception: sys.exit(0) if __name__ == "__main__": start_service()