Files
fleet-control-console/agent/system_monitor.py
T

408 lines
16 KiB
Python

import paho.mqtt.client as mqtt
import json
import time
import psutil
import platform
import subprocess
import threading
import sys
import os
import shutil
import requests
from pathlib import Path
import configparser
import logging
from logging.handlers import RotatingFileHandler
# ==========================================
# 0. CONFIGURAZIONE LOGGING & HARDWARE
# ==========================================
logging.basicConfig(
handlers=[
RotatingFileHandler('/opt/node_agent.log', maxBytes=2000000, backupCount=3),
logging.StreamHandler()
],
level=logging.INFO,
format='[%(asctime)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("NodeAgent")
try:
import RPi.GPIO as GPIO
GPIO_AVAILABLE = True
except ImportError:
GPIO_AVAILABLE = False
logger.warning("Libreria RPi.GPIO non trovata. Reset hardware disabilitato.")
# ==========================================
# 1. CARICAMENTO CONFIGURAZIONE UNIFICATA
# ==========================================
CONFIG_PATH = Path("/opt/node_config.json")
def load_config():
try:
if not CONFIG_PATH.exists():
logger.error(f"ERRORE: File {CONFIG_PATH} non trovato!")
sys.exit(1)
with open(CONFIG_PATH, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"ERRORE CRITICO JSON: {e}")
sys.exit(1)
cfg = load_config()
# Identificativi e Topic
CLIENT_ID = cfg.get('client_id', 'iv3jdv').lower()
BASE_TOPIC = cfg.get('mqtt', {}).get('base_topic', f"servizi/{CLIENT_ID}")
TOPIC_CMD = f"{BASE_TOPIC}/cmnd"
TOPIC_STAT = f"{BASE_TOPIC}/stat"
# Variabili di Stato Globali
boot_recovered = False
current_status = "ONLINE - Pronto"
auto_healing_counter = {}
# ==========================================
# 2. FUNZIONE NOTIFICA TELEGRAM
# ==========================================
def send_telegram_message(message):
t_cfg = cfg.get('telegram', {})
if not t_cfg.get('enabled', False): return
ora_attuale = int(time.strftime("%H"))
if ora_attuale >= 23 or ora_attuale < 7:
logger.info(f"🌙 Notte fonda ({ora_attuale}:00): Notifica Telegram evitata.")
return
token = t_cfg.get('token')
chat_id = t_cfg.get('chat_id')
if not token or not chat_id or token == "TOKEN ID": return
try:
clean_msg = message.replace("<b>", "").replace("</b>", "")
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": f"[{CLIENT_ID.upper()}]\n{clean_msg}"}
requests.post(url, json=payload, timeout=10)
except Exception as e:
logger.error(f"Errore invio Telegram: {e}")
# ==========================================
# 3. LOGICA CAMBIO PROFILO MULTIPLO
# ==========================================
def get_actual_config_from_disk():
return "ONLINE - Da memoria"
def switch_config(config_type):
profile = cfg.get('profiles', {}).get(config_type)
if not profile:
return f"ERRORE: Profilo {config_type} non trovato in JSON"
label = profile.get('label', f"Profilo {config_type}")
services = profile.get('services', [])
if not services:
return f"ERRORE: Nessun servizio configurato per {config_type}"
try:
for s in services:
subprocess.run(["sudo", "systemctl", "stop", s['name']], check=False)
for s in services:
if not os.path.exists(s['source']):
return f"ERRORE: Manca il file sorgente {s['source']}"
shutil.copy(s['source'], s['target'])
for s in services:
subprocess.run(["sudo", "systemctl", "start", s['name']], check=False)
send_telegram_message(f"✅ Switch multiplo completato: {label}")
return f"ONLINE - {label}"
except Exception as e:
return f"ERRORE: {str(e)}"
def force_online_if_needed(client):
global boot_recovered, current_status
if not boot_recovered:
logger.info("⚠️ Recupero memoria saltato. Imposto stato da disco...")
current_status = get_actual_config_from_disk()
client.publish(TOPIC_STAT, current_status, retain=True)
boot_recovered = True
# ==========================================
# 4. TELEMETRIA E AUTO-HEALING
# ==========================================
def get_cpu_temperature():
temp = 0.0
try:
temps = psutil.sensors_temperatures()
if 'cpu_thermal' in temps: temp = temps['cpu_thermal'][0].current
elif 'coretemp' in temps: temp = temps['coretemp'][0].current
elif platform.system() == "Linux":
res = os.popen('vcgencmd measure_temp').readline()
if res: temp = float(res.replace("temp=","").replace("'C\n",""))
except: pass
return round(temp, 1)
def get_system_status():
status = {
"cpu_usage_percent": psutil.cpu_percent(interval=0.5),
"cpu_temp": get_cpu_temperature(),
"memory_usage_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent,
"processes": {},
"timestamp": time.strftime("%H:%M:%S"),
"profiles": {
"A": cfg.get('profiles', {}).get('A', {}).get('label', 'PROFILO A'),
"B": cfg.get('profiles', {}).get('B', {}).get('label', 'PROFILO B')
}
}
proc_path = Path(cfg['paths'].get('process_list', ''))
if proc_path.exists():
try:
target_processes = proc_path.read_text(encoding="utf-8").splitlines()
running_names = {p.info['name'].lower() for p in psutil.process_iter(['name'])}
for name in target_processes:
name = name.strip().lower()
if name: status["processes"][name] = "online" if name in running_names else "offline"
except Exception as e: logger.error(f"Errore controllo processi: {e}")
return status
def check_auto_healing(client, status):
if not cfg['settings'].get('auto_healing', False): return
for proc_name, state in status["processes"].items():
if state == "offline":
attempts = auto_healing_counter.get(proc_name, 0)
if attempts < 3:
auto_healing_counter[proc_name] = attempts + 1
msg = f"🛠 Auto-healing: {proc_name} offline. Riavvio {attempts+1}/3..."
client.publish(f"devices/{CLIENT_ID}/logs", msg)
send_telegram_message(msg)
subprocess.run(["sudo", "systemctl", "restart", proc_name])
elif attempts == 3:
msg = f"🚨 CRITICO: {proc_name} fallito!"
client.publish(f"devices/{CLIENT_ID}/logs", msg)
send_telegram_message(msg)
auto_healing_counter[proc_name] = 4
else:
auto_healing_counter[proc_name] = 0
def publish_all(client):
status = get_system_status()
file_list_path = Path(cfg['paths'].get('file_list', ''))
status["config_files"] = []
status["files"] = []
if file_list_path.exists():
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
nomi_estrattti = [Path(f.strip()).stem for f in files if f.strip()]
status["config_files"] = nomi_estrattti
status["files"] = nomi_estrattti
except: pass
client.publish(f"devices/{CLIENT_ID}/services", json.dumps(status), qos=1)
if file_list_path.exists():
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
for f in files:
p = Path(f.strip())
if p.exists():
client.publish(f"data/{CLIENT_ID}/{p.stem}/full_config", json.dumps({"raw_text": p.read_text(encoding="utf-8")}), qos=1, retain=True)
except: pass
return status
def publish_all_ini_files(client):
file_list_path = cfg.get('paths', {}).get('file_list', '/opt/file_list.txt')
if not os.path.exists(file_list_path): return
try:
with open(file_list_path, 'r') as f:
files_to_parse = [line.strip() for line in f if line.strip()]
except Exception as e:
logger.error(f"Errore lettura {file_list_path}: {e}")
return
for file_path in files_to_parse:
if not os.path.exists(file_path): continue
try:
base_name = os.path.splitext(os.path.basename(file_path))[0]
ini_data = {}
current_section = None
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip()
if not line or line.startswith(('#', ';')): continue
if line.startswith('[') and line.endswith(']'):
current_section = line[1:-1].strip()
ini_data[current_section] = {}
elif '=' in line and current_section is not None:
k, v = line.split('=', 1)
k, v = k.strip(), v.strip()
if k in ini_data[current_section]:
ini_data[current_section][k] = str(ini_data[current_section][k]) + "," + v
else:
ini_data[current_section][k] = v
for section, payload in ini_data.items():
topic = f"data/{CLIENT_ID}/{base_name}/{section}"
client.publish(topic, json.dumps(payload), retain=True)
except Exception as e:
logger.error(f"Errore parsing INI per {file_path}: {e}")
def write_config_from_json(slug, json_payload):
file_list_path = Path(cfg['paths'].get('file_list', ''))
if not file_list_path.exists(): return
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
for f in files:
p = Path(f.strip())
if p.stem.lower() == slug.lower():
nuovi_dati = json.loads(json_payload)
shutil.copy(p, str(p) + ".bak")
with open(p, 'w', encoding="utf-8") as file: file.write(nuovi_dati.get("raw_text", ""))
os.system(f"sudo systemctl restart {slug}")
send_telegram_message(f"📝 Config {slug.upper()} aggiornata via Web.")
logger.info(f"Configurazione {slug} aggiornata con successo.")
break
except Exception as e: logger.error(f"Errore scrittura config: {e}")
# ==========================================
# 5. CALLBACK MQTT
# ==========================================
def on_connect(client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
logger.info(f"✅ Connesso al broker MQTT: {CLIENT_ID.upper()}")
client.subscribe([(TOPIC_CMD, 0), (TOPIC_STAT, 0)])
client.subscribe([
("devices/control/request", 0),
(f"devices/{CLIENT_ID}/control", 0),
(f"devices/{CLIENT_ID}/config_set/#", 0)
])
threading.Timer(5.0, force_online_if_needed, [client]).start()
publish_all(client)
publish_all_ini_files(client)
else:
logger.error(f"❌ Errore connessione MQTT. Codice: {reason_code}")
def on_disconnect(client, userdata, disconnect_flags, reason_code, properties=None):
logger.warning(f"⚠️ Disconnessione dal broker MQTT! Codice: {reason_code}")
logger.error("Forzo il riavvio del processo per ripristinare la rete in modo pulito...")
os._exit(1) # Uccide lo script immediatamente (Systemd lo farà risorgere)
def on_message(client, userdata, msg):
global boot_recovered, current_status, cfg
payload = msg.payload.decode().strip()
topic = msg.topic
if topic == TOPIC_STAT and not boot_recovered:
if not any(x in payload.upper() for x in ["OFFLINE", "ERRORE", "RIAVVIO"]):
current_status = payload
boot_recovered = True
client.publish(TOPIC_STAT, current_status, retain=True)
elif topic == TOPIC_CMD:
cmd = payload.upper()
if cmd in ["A", "B"]:
current_status = switch_config(cmd)
client.publish(TOPIC_STAT, current_status, retain=True)
boot_recovered = True
publish_all(client)
elif cmd == "REBOOT":
client.publish(TOPIC_STAT, f"OFFLINE - Riavvio {CLIENT_ID.upper()}...", retain=False)
logger.info("Comando REBOOT ricevuto. Riavvio sistema...")
time.sleep(1)
subprocess.run(["sudo", "reboot"], check=True)
elif cmd == 'RESET_HAT':
RESET_PIN = 21
if GPIO_AVAILABLE:
try:
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(RESET_PIN, GPIO.OUT)
GPIO.output(RESET_PIN, GPIO.LOW)
time.sleep(0.5)
GPIO.output(RESET_PIN, GPIO.HIGH)
GPIO.cleanup(RESET_PIN)
logger.info(f"Impulso di RESET inviato al GPIO {RESET_PIN}")
time.sleep(1.5)
logger.info("Riavvio di MMDVMHost in corso...")
subprocess.run(["sudo", "systemctl", "restart", "mmdvmhost"], check=False)
client.publish(f"fleet/{CLIENT_ID}/status", "HAT RESET + MMDVM RESTART OK")
client.publish(f"devices/{CLIENT_ID}/logs", "🔌 HAT Reset + MMDVMHost Restarted")
except Exception as e:
logger.error(f"Errore durante il reset GPIO/MMDVMHost: {e}")
client.publish(f"fleet/{CLIENT_ID}/status", f"ERRORE RESET: {e}")
elif cmd in ["TG:OFF", "TG:ON"]:
nuovo_stato = (cmd == "TG:ON")
cfg['telegram']['enabled'] = nuovo_stato
try:
with open(CONFIG_PATH, 'w') as f: json.dump(cfg, f, indent=4)
client.publish(f"devices/{CLIENT_ID}/logs", f"{'🔔' if nuovo_stato else '🔇'} Notifiche {'ON' if nuovo_stato else 'OFF'}")
if nuovo_stato: send_telegram_message("Notifiche riattivate!")
except Exception as e: logger.error(f"Errore salvataggio stato Telegram: {e}")
elif topic == "devices/control/request" and payload.lower() in ["status", "update"]:
publish_all(client)
publish_all_ini_files(client)
elif topic == f"devices/{CLIENT_ID}/control":
if ":" in payload:
action, service = payload.split(":")
if action.lower() in ["restart", "stop", "start"]:
try:
subprocess.run(["sudo", "systemctl", action.lower(), service.lower()], check=True)
client.publish(f"devices/{CLIENT_ID}/logs", f"{action.upper()}: {service}")
logger.info(f"Comando servizio eseguito: {action.upper()} {service}")
publish_all(client)
except Exception as e:
client.publish(f"devices/{CLIENT_ID}/logs", f"❌ ERROR: {str(e)}")
logger.error(f"Errore esecuzione comando servizio: {e}")
elif topic.startswith(f"devices/{CLIENT_ID}/config_set/"):
slug = topic.split("/")[-1]
write_config_from_json(slug, payload)
time.sleep(1)
publish_all(client)
publish_all_ini_files(client)
def auto_publish_task(client):
while True:
status = publish_all(client)
publish_all_ini_files(client)
check_auto_healing(client, status)
time.sleep(cfg['settings'].get('update_interval', 30))
def start_service():
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID.upper())
client.will_set(TOPIC_STAT, payload=f"OFFLINE - {CLIENT_ID.upper()}", qos=1, retain=False)
client.username_pw_set(cfg['mqtt']['user'], cfg['mqtt']['password'])
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
while True:
try:
logger.info("Tentativo di connessione al broker MQTT...")
client.connect(cfg['mqtt']['broker'], cfg['mqtt']['port'], 60)
client.loop_start()
threading.Thread(target=auto_publish_task, args=(client,), daemon=True).start()
# Mantiene il processo vivo
while True:
time.sleep(1)
except Exception as e:
logger.error(f"Impossibile connettersi o connessione persa ({e}). Riprovo in 10 secondi...")
time.sleep(10)
if __name__ == "__main__":
start_service()