Files
fleet-control-console/agent/system_monitor.py
T

361 lines
14 KiB
Python

import paho.mqtt.client as mqtt
import json
import time
import psutil
import platform
import subprocess
import threading
import sys
import os
import shutil
import requests
from pathlib import Path
import configparser
# ==========================================
# 0. CARICAMENTO CONFIGURAZIONE UNIFICATA
# ==========================================
CONFIG_PATH = Path("/opt/node_config.json")
def load_config():
try:
if not CONFIG_PATH.exists():
print(f"❌ ERRORE: File {CONFIG_PATH} non trovato!")
sys.exit(1)
with open(CONFIG_PATH, 'r') as f:
return json.load(f)
except Exception as e:
print(f"❌ ERRORE CRITICO JSON: {e}")
sys.exit(1)
# Carichiamo l'unica configurazione necessaria
cfg = load_config()
# Identificativi e Topic
CLIENT_ID = cfg.get('client_id', 'iv3jdv').lower()
BASE_TOPIC = cfg.get('mqtt', {}).get('base_topic', f"servizi/{CLIENT_ID}")
TOPIC_CMD = f"{BASE_TOPIC}/cmnd"
TOPIC_STAT = f"{BASE_TOPIC}/stat"
# Variabili di Stato Globali
boot_recovered = False
current_status = "ONLINE - Pronto"
auto_healing_counter = {}
# ==========================================
# 1. FUNZIONE NOTIFICA TELEGRAM
# ==========================================
def send_telegram_message(message):
t_cfg = cfg.get('telegram', {})
if not t_cfg.get('enabled', False): return
ora_attuale = int(time.strftime("%H"))
if ora_attuale >= 23 or ora_attuale < 7:
print(f"🌙 Notte fonda ({ora_attuale}:00): Notifica evitata.")
return
token = t_cfg.get('token')
chat_id = t_cfg.get('chat_id')
if not token or not chat_id or token == "TOKEN ID": return
try:
clean_msg = message.replace("<b>", "").replace("</b>", "")
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": f"[{CLIENT_ID.upper()}]\n{clean_msg}"}
requests.post(url, json=payload, timeout=10)
except Exception as e:
print(f"⚠️ Errore invio Telegram: {e}")
# ==========================================
# 2. LOGICA CAMBIO PROFILO MULTIPLO
# ==========================================
def get_actual_config_from_disk():
return "ONLINE - Da memoria"
def switch_config(config_type):
profile = cfg.get('profiles', {}).get(config_type)
if not profile:
return f"ERRORE: Profilo {config_type} non trovato in JSON"
label = profile.get('label', f"Profilo {config_type}")
services = profile.get('services', [])
if not services:
return f"ERRORE: Nessun servizio configurato per {config_type}"
try:
# 1. STOP: Ferma prima tutti i demoni coinvolti per liberare i file
for s in services:
subprocess.run(["sudo", "systemctl", "stop", s['name']], check=False)
# 2. COPIA: Verifica e copia tutti i file di configurazione
for s in services:
if not os.path.exists(s['source']):
return f"ERRORE: Manca il file sorgente {s['source']}"
shutil.copy(s['source'], s['target'])
# 3. START: Fa ripartire tutti i demoni con i nuovi file
for s in services:
subprocess.run(["sudo", "systemctl", "start", s['name']], check=False)
send_telegram_message(f"✅ Switch multiplo completato: {label}")
return f"ONLINE - {label}"
except Exception as e:
return f"ERRORE: {str(e)}"
def force_online_if_needed(client):
global boot_recovered, current_status
if not boot_recovered:
print("⚠️ Recupero memoria saltato. Imposto stato da disco...")
current_status = get_actual_config_from_disk()
client.publish(TOPIC_STAT, current_status, retain=True)
boot_recovered = True
# ==========================================
# 3. TELEMETRIA E AUTO-HEALING
# ==========================================
def get_cpu_temperature():
temp = 0.0
try:
temps = psutil.sensors_temperatures()
if 'cpu_thermal' in temps: temp = temps['cpu_thermal'][0].current
elif 'coretemp' in temps: temp = temps['coretemp'][0].current
elif platform.system() == "Linux":
res = os.popen('vcgencmd measure_temp').readline()
if res: temp = float(res.replace("temp=","").replace("'C\n",""))
except: pass
return round(temp, 1)
def get_system_status():
status = {
"cpu_usage_percent": psutil.cpu_percent(interval=0.5),
"cpu_temp": get_cpu_temperature(),
"memory_usage_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent,
"processes": {},
"timestamp": time.strftime("%H:%M:%S"),
"profiles": {
"A": cfg.get('profiles', {}).get('A', {}).get('label', 'PROFILO A'),
"B": cfg.get('profiles', {}).get('B', {}).get('label', 'PROFILO B')
}
}
proc_path = Path(cfg['paths'].get('process_list', ''))
if proc_path.exists():
try:
target_processes = proc_path.read_text(encoding="utf-8").splitlines()
running_names = {p.info['name'].lower() for p in psutil.process_iter(['name'])}
for name in target_processes:
name = name.strip().lower()
if name: status["processes"][name] = "online" if name in running_names else "offline"
except Exception as e: print(f"Errore processi: {e}")
return status
def check_auto_healing(client, status):
if not cfg['settings'].get('auto_healing', False): return
for proc_name, state in status["processes"].items():
if state == "offline":
attempts = auto_healing_counter.get(proc_name, 0)
if attempts < 3:
auto_healing_counter[proc_name] = attempts + 1
msg = f"🛠 Auto-healing: {proc_name} offline. Riavvio {attempts+1}/3..."
client.publish(f"devices/{CLIENT_ID}/logs", msg)
send_telegram_message(msg)
subprocess.run(["sudo", "systemctl", "restart", proc_name])
elif attempts == 3:
msg = f"🚨 CRITICO: {proc_name} fallito!"
client.publish(f"devices/{CLIENT_ID}/logs", msg)
send_telegram_message(msg)
auto_healing_counter[proc_name] = 4
else:
auto_healing_counter[proc_name] = 0
def publish_all(client):
status = get_system_status()
# Lettura della lista file per il menu della Dashboard
file_list_path = Path(cfg['paths'].get('file_list', ''))
status["config_files"] = []
status["files"] = []
if file_list_path.exists():
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
nomi_estrattti = [Path(f.strip()).stem for f in files if f.strip()]
status["config_files"] = nomi_estrattti
status["files"] = nomi_estrattti
except: pass
client.publish(f"devices/{CLIENT_ID}/services", json.dumps(status), qos=1)
if file_list_path.exists():
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
for f in files:
p = Path(f.strip())
if p.exists():
client.publish(f"data/{CLIENT_ID}/{p.stem}/full_config", json.dumps({"raw_text": p.read_text(encoding="utf-8")}), qos=1, retain=True)
except: pass
return status
def publish_all_ini_files(client):
file_list_path = cfg.get('paths', {}).get('file_list', '/opt/file_list.txt')
if not os.path.exists(file_list_path): return
try:
with open(file_list_path, 'r') as f:
files_to_parse = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Errore lettura {file_list_path}: {e}")
return
for file_path in files_to_parse:
if not os.path.exists(file_path): continue
try:
base_name = os.path.splitext(os.path.basename(file_path))[0]
# --- INIZIO PARSER MANUALE (Anti-Chiavi Doppie) ---
ini_data = {}
current_section = None
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip()
# Salta righe vuote o commenti
if not line or line.startswith(('#', ';')):
continue
# Riconosce le sezioni [Nome Sezione]
if line.startswith('[') and line.endswith(']'):
current_section = line[1:-1].strip()
ini_data[current_section] = {}
# Riconosce le chiavi e i valori
elif '=' in line and current_section is not None:
k, v = line.split('=', 1)
k, v = k.strip(), v.strip()
# LA MAGIA: Se la chiave esiste già, la unisce con una virgola!
if k in ini_data[current_section]:
ini_data[current_section][k] = str(ini_data[current_section][k]) + "," + v
else:
ini_data[current_section][k] = v
# Pubblicazione sul broker MQTT
for section, payload in ini_data.items():
topic = f"data/{CLIENT_ID}/{base_name}/{section}"
client.publish(topic, json.dumps(payload), retain=True)
except Exception as e:
print(f"Errore parsing INI per {file_path}: {e}")
def write_config_from_json(slug, json_payload):
file_list_path = Path(cfg['paths'].get('file_list', ''))
if not file_list_path.exists(): return
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
for f in files:
p = Path(f.strip())
if p.stem.lower() == slug.lower():
nuovi_dati = json.loads(json_payload)
shutil.copy(p, str(p) + ".bak")
with open(p, 'w', encoding="utf-8") as file: file.write(nuovi_dati.get("raw_text", ""))
os.system(f"sudo systemctl restart {slug}")
send_telegram_message(f"📝 Config {slug.upper()} aggiornata via Web.")
break
except Exception as e: print(f"Errore scrittura config: {e}")
# ==========================================
# 4. CALLBACK MQTT
# ==========================================
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
print(f"✅ Connesso: {CLIENT_ID.upper()}")
client.subscribe([(TOPIC_CMD, 0), (TOPIC_STAT, 0)])
client.subscribe([
("devices/control/request", 0),
(f"devices/{CLIENT_ID}/control", 0),
(f"devices/{CLIENT_ID}/config_set/#", 0)
])
threading.Timer(5.0, force_online_if_needed, [client]).start()
publish_all(client)
publish_all_ini_files(client) # Pubblica gli INI appena si connette
def on_message(client, userdata, msg):
global boot_recovered, current_status, cfg
payload = msg.payload.decode().strip()
topic = msg.topic
if topic == TOPIC_STAT and not boot_recovered:
if not any(x in payload.upper() for x in ["OFFLINE", "ERRORE", "RIAVVIO"]):
current_status = payload
boot_recovered = True
client.publish(TOPIC_STAT, current_status, retain=True)
elif topic == TOPIC_CMD:
cmd = payload.upper()
if cmd in ["A", "B"]:
current_status = switch_config(cmd)
client.publish(TOPIC_STAT, current_status, retain=True)
boot_recovered = True
publish_all(client)
elif cmd == "REBOOT":
client.publish(TOPIC_STAT, f"OFFLINE - Riavvio {CLIENT_ID.upper()}...", retain=False)
time.sleep(1)
subprocess.run(["sudo", "reboot"], check=True)
elif cmd in ["TG:OFF", "TG:ON"]:
nuovo_stato = (cmd == "TG:ON")
cfg['telegram']['enabled'] = nuovo_stato
try:
with open(CONFIG_PATH, 'w') as f: json.dump(cfg, f, indent=4)
client.publish(f"devices/{CLIENT_ID}/logs", f"{'🔔' if nuovo_stato else '🔇'} Notifiche {'ON' if nuovo_stato else 'OFF'}")
if nuovo_stato: send_telegram_message("Notifiche riattivate!")
except: pass
elif topic == "devices/control/request" and payload.lower() in ["status", "update"]:
publish_all(client)
publish_all_ini_files(client)
elif topic == f"devices/{CLIENT_ID}/control":
if ":" in payload:
action, service = payload.split(":")
if action.lower() in ["restart", "stop", "start"]:
try:
subprocess.run(["sudo", "systemctl", action.lower(), service.lower()], check=True)
client.publish(f"devices/{CLIENT_ID}/logs", f"{action.upper()}: {service}")
publish_all(client)
except Exception as e: client.publish(f"devices/{CLIENT_ID}/logs", f"❌ ERROR: {str(e)}")
elif topic.startswith(f"devices/{CLIENT_ID}/config_set/"):
slug = topic.split("/")[-1]
write_config_from_json(slug, payload)
time.sleep(1)
publish_all(client)
publish_all_ini_files(client)
def auto_publish_task(client):
while True:
status = publish_all(client)
publish_all_ini_files(client) # <--- ECCO IL LOOP CORRETTO!
check_auto_healing(client, status)
time.sleep(cfg['settings'].get('update_interval', 30))
def start_service():
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID.upper())
client.will_set(TOPIC_STAT, payload=f"OFFLINE - {CLIENT_ID.upper()}", qos=1, retain=False)
client.username_pw_set(cfg['mqtt']['user'], cfg['mqtt']['password'])
client.on_connect = on_connect
client.on_message = on_message
try:
client.connect(cfg['mqtt']['broker'], cfg['mqtt']['port'], 60)
client.loop_start()
threading.Thread(target=auto_publish_task, args=(client,), daemon=True).start()
while True: time.sleep(1)
except Exception: sys.exit(0)
if __name__ == "__main__":
start_service()