Files
fleet-control-console/agent/system_monitor.py
T

403 lines
16 KiB
Python

import paho.mqtt.client as mqtt
import json
import time
import psutil
import platform
import subprocess
import threading
import sys
import os
import shutil
import requests
from pathlib import Path
import configparser
try:
import RPi.GPIO as GPIO
# This variable must be GLOBAL, so defined at the top!
GPIO_AVAILABLE = True
except ImportError:
GPIO_AVAILABLE = False
print("Warning: RPi.GPIO library not found. Hardware reset disabled.")
# ==========================================
# 0. UNIFIED CONFIGURATION LOADING
# ==========================================
CONFIG_PATH = Path("/opt/node_config.json")
def load_config():
try:
if not CONFIG_PATH.exists():
print(f"❌ ERROR: File {CONFIG_PATH} not found!")
sys.exit(1)
with open(CONFIG_PATH, 'r') as f:
return json.load(f)
except Exception as e:
print(f"❌ CRITICAL JSON ERROR: {e}")
sys.exit(1)
# Load the single necessary configuration
cfg = load_config()
# Identifiers and Topics
CLIENT_ID = cfg.get('client_id', 'iv3jdv').lower()
BASE_TOPIC = cfg.get('mqtt', {}).get('base_topic', f"servizi/{CLIENT_ID}")
TOPIC_CMD = f"{BASE_TOPIC}/cmnd"
TOPIC_STAT = f"{BASE_TOPIC}/stat"
# Global State Variables
boot_recovered = False
current_status = "ONLINE - Ready"
auto_healing_counter = {}
# ==========================================
# 1. TELEGRAM NOTIFICATION FUNCTION
# ==========================================
def send_telegram_message(message):
t_cfg = cfg.get('telegram', {})
if not t_cfg.get('enabled', False): return
current_hour = int(time.strftime("%H"))
if current_hour >= 23 or current_hour < 7:
print(f"🌙 Late night ({current_hour}:00): Notification skipped.")
return
token = t_cfg.get('token')
chat_id = t_cfg.get('chat_id')
if not token or not chat_id or token == "TOKEN ID": return
try:
clean_msg = message.replace("<b>", "").replace("</b>", "")
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": f"[{CLIENT_ID.upper()}]\n{clean_msg}"}
requests.post(url, json=payload, timeout=10)
except Exception as e:
print(f"⚠️ Telegram send error: {e}")
# ==========================================
# 2. MULTIPLE PROFILE SWITCH LOGIC
# ==========================================
def get_actual_config_from_disk():
return "ONLINE - From memory"
def switch_config(config_type):
profile = cfg.get('profiles', {}).get(config_type)
if not profile:
return f"ERROR: Profile {config_type} not found in JSON"
label = profile.get('label', f"Profile {config_type}")
services = profile.get('services', [])
if not services:
return f"ERROR: No services configured for {config_type}"
try:
# 1. STOP: Stop all involved daemons first to release files
for s in services:
subprocess.run(["sudo", "systemctl", "stop", s['name']], check=False)
# 2. COPY: Verify and copy all configuration files
for s in services:
if not os.path.exists(s['source']):
return f"ERROR: Missing source file {s['source']}"
shutil.copy(s['source'], s['target'])
# 3. START: Restart all daemons with the new files
for s in services:
subprocess.run(["sudo", "systemctl", "start", s['name']], check=False)
send_telegram_message(f"✅ Multiple switch completed: {label}")
return f"ONLINE - {label}"
except Exception as e:
return f"ERROR: {str(e)}"
def force_online_if_needed(client):
global boot_recovered, current_status
if not boot_recovered:
print("⚠️ Memory recovery skipped. Setting status from disk...")
current_status = get_actual_config_from_disk()
client.publish(TOPIC_STAT, current_status, retain=True)
boot_recovered = True
# ==========================================
# 3. TELEMETRY AND AUTO-HEALING
# ==========================================
def get_cpu_temperature():
temp = 0.0
try:
temps = psutil.sensors_temperatures()
if 'cpu_thermal' in temps: temp = temps['cpu_thermal'][0].current
elif 'coretemp' in temps: temp = temps['coretemp'][0].current
elif platform.system() == "Linux":
res = os.popen('vcgencmd measure_temp').readline()
if res: temp = float(res.replace("temp=","").replace("'C\n",""))
except: pass
return round(temp, 1)
def get_system_status():
status = {
"cpu_usage_percent": psutil.cpu_percent(interval=0.5),
"cpu_temp": get_cpu_temperature(),
"memory_usage_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent,
"processes": {},
"timestamp": time.strftime("%H:%M:%S"),
"profiles": {
"A": cfg.get('profiles', {}).get('A', {}).get('label', 'PROFILE A'),
"B": cfg.get('profiles', {}).get('B', {}).get('label', 'PROFILE B')
}
}
proc_path = Path(cfg['paths'].get('process_list', ''))
if proc_path.exists():
try:
target_processes = proc_path.read_text(encoding="utf-8").splitlines()
running_names = {p.info['name'].lower() for p in psutil.process_iter(['name'])}
for name in target_processes:
name = name.strip().lower()
if name: status["processes"][name] = "online" if name in running_names else "offline"
except Exception as e: print(f"Process error: {e}")
return status
def check_auto_healing(client, status):
if not cfg['settings'].get('auto_healing', False): return
for proc_name, state in status["processes"].items():
if state == "offline":
attempts = auto_healing_counter.get(proc_name, 0)
if attempts < 3:
auto_healing_counter[proc_name] = attempts + 1
msg = f"🛠 Auto-healing: {proc_name} offline. Restarting {attempts+1}/3..."
client.publish(f"devices/{CLIENT_ID}/logs", msg)
send_telegram_message(msg)
subprocess.run(["sudo", "systemctl", "restart", proc_name])
elif attempts == 3:
msg = f"🚨 CRITICAL: {proc_name} failed!"
client.publish(f"devices/{CLIENT_ID}/logs", msg)
send_telegram_message(msg)
auto_healing_counter[proc_name] = 4
else:
auto_healing_counter[proc_name] = 0
def publish_all(client):
status = get_system_status()
# Read file list for Dashboard menu
file_list_path = Path(cfg['paths'].get('file_list', ''))
status["config_files"] = []
status["files"] = []
if file_list_path.exists():
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
extracted_names = [Path(f.strip()).stem for f in files if f.strip()]
status["config_files"] = extracted_names
status["files"] = extracted_names
except: pass
client.publish(f"devices/{CLIENT_ID}/services", json.dumps(status), qos=1)
if file_list_path.exists():
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
for f in files:
p = Path(f.strip())
if p.exists():
client.publish(f"data/{CLIENT_ID}/{p.stem}/full_config", json.dumps({"raw_text": p.read_text(encoding="utf-8")}), qos=1, retain=True)
except: pass
return status
def publish_all_ini_files(client):
file_list_path = cfg.get('paths', {}).get('file_list', '/opt/file_list.txt')
if not os.path.exists(file_list_path): return
try:
with open(file_list_path, 'r') as f:
files_to_parse = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Error reading {file_list_path}: {e}")
return
for file_path in files_to_parse:
if not os.path.exists(file_path): continue
try:
base_name = os.path.splitext(os.path.basename(file_path))[0]
# --- START MANUAL PARSER (Anti-Duplicate Keys) ---
ini_data = {}
current_section = None
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip()
# Skip empty lines or comments
if not line or line.startswith(('#', ';')):
continue
# Recognize sections [Section Name]
if line.startswith('[') and line.endswith(']'):
current_section = line[1:-1].strip()
ini_data[current_section] = {}
# Recognize keys and values
elif '=' in line and current_section is not None:
k, v = line.split('=', 1)
k, v = k.strip(), v.strip()
# THE MAGIC: If the key already exists, merge it with a comma!
if k in ini_data[current_section]:
ini_data[current_section][k] = str(ini_data[current_section][k]) + "," + v
else:
ini_data[current_section][k] = v
# Publish on MQTT broker
for section, payload in ini_data.items():
topic = f"data/{CLIENT_ID}/{base_name}/{section}"
client.publish(topic, json.dumps(payload), retain=True)
except Exception as e:
print(f"INI parsing error for {file_path}: {e}")
def write_config_from_json(slug, json_payload):
file_list_path = Path(cfg['paths'].get('file_list', ''))
if not file_list_path.exists(): return
try:
files = file_list_path.read_text(encoding="utf-8").splitlines()
for f in files:
p = Path(f.strip())
if p.stem.lower() == slug.lower():
new_data = json.loads(json_payload)
shutil.copy(p, str(p) + ".bak")
with open(p, 'w', encoding="utf-8") as file: file.write(new_data.get("raw_text", ""))
os.system(f"sudo systemctl restart {slug}")
send_telegram_message(f"📝 Config {slug.upper()} updated via Web.")
break
except Exception as e: print(f"Config write error: {e}")
# ==========================================
# 4. MQTT CALLBACKS
# ==========================================
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
print(f"✅ Connected: {CLIENT_ID.upper()}")
client.subscribe([(TOPIC_CMD, 0), (TOPIC_STAT, 0)])
client.subscribe([
("devices/control/request", 0),
(f"devices/{CLIENT_ID}/control", 0),
(f"devices/{CLIENT_ID}/config_set/#", 0)
])
threading.Timer(5.0, force_online_if_needed, [client]).start()
publish_all(client)
publish_all_ini_files(client) # Publish INIs as soon as connected
def on_message(client, userdata, msg):
global boot_recovered, current_status, cfg
payload = msg.payload.decode().strip()
topic = msg.topic
if topic == TOPIC_STAT and not boot_recovered:
if not any(x in payload.upper() for x in ["OFFLINE", "ERROR", "REBOOT"]):
current_status = payload
boot_recovered = True
client.publish(TOPIC_STAT, current_status, retain=True)
elif topic == TOPIC_CMD:
cmd = payload.upper()
if cmd in ["A", "B"]:
current_status = switch_config(cmd)
client.publish(TOPIC_STAT, current_status, retain=True)
boot_recovered = True
publish_all(client)
elif cmd == "REBOOT":
client.publish(TOPIC_STAT, f"OFFLINE - Rebooting {CLIENT_ID.upper()}...", retain=False)
time.sleep(1)
subprocess.run(["sudo", "reboot"], check=True)
elif cmd == 'RESET_HAT':
# Correct GPIO pin for MMDVM board hardware reset
RESET_PIN = 21
if GPIO_AVAILABLE:
try:
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(RESET_PIN, GPIO.OUT)
# 1. Send reset pulse (LOW for 0.5 seconds)
GPIO.output(RESET_PIN, GPIO.LOW)
time.sleep(0.5)
GPIO.output(RESET_PIN, GPIO.HIGH)
# Release GPIO resources
GPIO.cleanup(RESET_PIN)
print(f"[{CLIENT_ID}] RESET pulse sent to GPIO {RESET_PIN}")
# 2. Wait 1.5 seconds to let the microcontroller firmware reboot
time.sleep(1.5)
# 3. Restart MMDVMHost service to realign serial communication
print(f"[{CLIENT_ID}] Restarting MMDVMHost...")
subprocess.run(["sudo", "systemctl", "restart", "mmdvmhost"], check=False)
# 4. Send confirmations to dashboard
client.publish(f"fleet/{CLIENT_ID}/status", "HAT RESET + MMDVM RESTART OK")
client.publish(f"devices/{CLIENT_ID}/logs", "🔌 HAT Reset + MMDVMHost Restarted")
except Exception as e:
print(f"Error during GPIO/MMDVMHost reset: {e}")
client.publish(f"fleet/{CLIENT_ID}/status", f"RESET ERROR: {e}")
elif cmd in ["TG:OFF", "TG:ON"]:
new_state = (cmd == "TG:ON")
cfg['telegram']['enabled'] = new_state
try:
with open(CONFIG_PATH, 'w') as f: json.dump(cfg, f, indent=4)
client.publish(f"devices/{CLIENT_ID}/logs", f"{'🔔' if new_state else '🔇'} Notifications {'ON' if new_state else 'OFF'}")
if new_state: send_telegram_message("Notifications enabled!")
except: pass
elif topic == "devices/control/request" and payload.lower() in ["status", "update"]:
publish_all(client)
publish_all_ini_files(client)
elif topic == f"devices/{CLIENT_ID}/control":
if ":" in payload:
action, service = payload.split(":")
if action.lower() in ["restart", "stop", "start"]:
try:
subprocess.run(["sudo", "systemctl", action.lower(), service.lower()], check=True)
client.publish(f"devices/{CLIENT_ID}/logs", f"{action.upper()}: {service}")
publish_all(client)
except Exception as e: client.publish(f"devices/{CLIENT_ID}/logs", f"❌ ERROR: {str(e)}")
elif topic.startswith(f"devices/{CLIENT_ID}/config_set/"):
slug = topic.split("/")[-1]
write_config_from_json(slug, payload)
time.sleep(1)
publish_all(client)
publish_all_ini_files(client)
def auto_publish_task(client):
while True:
status = publish_all(client)
publish_all_ini_files(client) # <--- HERE IS THE CORRECT LOOP!
check_auto_healing(client, status)
time.sleep(cfg['settings'].get('update_interval', 30))
def start_service():
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID.upper())
client.will_set(TOPIC_STAT, payload=f"OFFLINE - {CLIENT_ID.upper()}", qos=1, retain=False)
client.username_pw_set(cfg['mqtt']['user'], cfg['mqtt']['password'])
client.on_connect = on_connect
client.on_message = on_message
try:
client.connect(cfg['mqtt']['broker'], cfg['mqtt']['port'], 60)
client.loop_start()
threading.Thread(target=auto_publish_task, args=(client,), daemon=True).start()
while True: time.sleep(1)
except Exception: sys.exit(0)
if __name__ == "__main__":
start_service()