add mqtt connection tracking

This commit is contained in:
2026-04-20 21:38:49 +02:00
parent cb57da334e
commit f259e0a774
2 changed files with 145 additions and 138 deletions
+50 -19
View File
@@ -7,6 +7,20 @@ import sqlite3
import urllib.request
import threading
import time
import logging
from logging.handlers import RotatingFileHandler
# --- CONFIGURAZIONE LOGGING ---
logging.basicConfig(
handlers=[
RotatingFileHandler('/opt/web-control/fleet_console.log', maxBytes=2000000, backupCount=3),
logging.StreamHandler()
],
level=logging.INFO,
format='[%(asctime)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("FleetHub")
# --- PERCORSI ---
DB_PATH = '/opt/web-control/monitor.db'
@@ -39,7 +53,7 @@ def init_db():
h = generate_password_hash('admin123')
c.execute("INSERT INTO users (username, password_hash, role, allowed_nodes) VALUES (?,?,?,?)",
('admin', h, 'admin', 'all'))
print(">>> UTENTE DI DEFAULT CREATO - User: admin | Pass: admin123 <<<")
logger.info(">>> UTENTE DI DEFAULT CREATO - User: admin | Pass: admin123 <<<")
conn.commit()
conn.close()
@@ -96,7 +110,7 @@ device_configs = {}
client_telemetry = {}
device_health = {}
last_seen_reflector = {}
network_mapping = {} # Memorizza quale network gestisce TS1 e TS2 per ogni nodo
network_mapping = {}
if os.path.exists(CACHE_FILE):
try:
@@ -106,6 +120,28 @@ if os.path.exists(CACHE_FILE):
active_calls = {}
with open(CONFIG_PATH) as f: config = json.load(f)
# --- CALLBACKS MQTT ---
def on_connect(client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
logger.info("✅ Connesso al Broker MQTT con successo! Sottoscrizione ai topic in corso...")
client.subscribe([
("servizi/+/stat", 0),
("dmr-gateway/+/json", 0),
("devices/+/services", 0),
("nxdn-gateway/+/json", 0),
("ysf-gateway/+/json", 0),
("p25-gateway/+/json", 0),
("dstar-gateway/+/json", 0),
("mmdvm/+/json", 0),
("devices/#", 0),
("data/#", 0)
])
else:
logger.error(f"❌ Errore di connessione MQTT. Codice motivo: {reason_code}")
def on_disconnect(client, userdata, disconnect_flags, reason_code, properties=None):
logger.warning(f"⚠️ Disconnessione MQTT rilevata! Codice motivo: {reason_code}. Tentativo di riconnessione automatico in corso...")
def on_message(client, userdata, msg):
try:
topic = msg.topic
@@ -122,9 +158,9 @@ def on_message(client, userdata, msg):
device_configs[cid_conf] = {}
try:
device_configs[cid_conf][svc_name] = json.loads(payload)
print(f"DEBUG: Configurazione salvata per {cid_conf} -> {svc_name}")
logger.info(f"Configurazione salvata per {cid_conf} -> {svc_name}")
except Exception as e:
print(f"Errore parsing config JSON: {e}")
logger.error(f"Errore parsing config JSON: {e}")
elif parts[0] == 'servizi':
client_states[cid] = payload
@@ -146,7 +182,7 @@ def on_message(client, userdata, msg):
"files": data.get("files", data.get("config_files", [])),
"profiles": data.get("profiles", {"A": "PROFILO A", "B": "PROFILO B"})
}
except Exception as e: print(f"Errore parsing health: {e}")
except Exception as e: logger.error(f"Errore parsing health: {e}")
# NUOVO BLOCCO: Intercettazione configurazione DMRGateway
elif len(parts) >= 4 and parts[0] == 'data' and parts[2].lower() == 'dmrgateway' and (parts[3].upper().startswith('NETWORK') or parts[3].upper().startswith('DMR NETWORK')):
@@ -154,16 +190,11 @@ def on_message(client, userdata, msg):
cid = parts[1].lower()
data = json.loads(payload)
# Inizializza il dizionario per questo nodo se non esiste
if cid not in network_mapping:
network_mapping[cid] = {"ts1": "", "ts2": ""}
# Se la rete è abilitata, cerchiamo di capire su che TimeSlot lavora
if str(data.get("Enabled")) == "1":
net_name = data.get("Name", "Net").upper()
# In DMRGateway, il primo numero di QUALSIASI regola indica il TimeSlot (1 o 2).
# Analizziamo tutte le regole di routing possibili.
is_ts1 = False
is_ts2 = False
@@ -173,12 +204,11 @@ def on_message(client, userdata, msg):
if val.startswith("1"): is_ts1 = True
if val.startswith("2"): is_ts2 = True
# Assegniamo il nome trovato allo Slot corrispondente
if is_ts1: network_mapping[cid]["ts1"] = net_name
if is_ts2: network_mapping[cid]["ts2"] = net_name
except Exception as e:
print(f"Errore parsing DMRGateway per {cid}: {e}")
logger.error(f"Errore parsing DMRGateway per {cid}: {e}")
elif parts[0] in ['dmr-gateway', 'nxdn-gateway', 'ysf-gateway', 'p25-gateway', 'dstar-gateway']:
data = json.loads(payload)
@@ -220,7 +250,7 @@ def on_message(client, userdata, msg):
sk = f"ts{d.get('slot', 1)}"
if act in ['start', 'late_entry']:
src = get_call(d.get('source_id'))
dst = str(d.get('destination_id')) # <-- Catturiamo il TG!
dst = str(d.get('destination_id'))
active_calls[cid][sk] = {'src': src, 'dst': dst}
client_telemetry[cid]["alt"] = ""
client_telemetry[cid][sk] = f"🎙️ {src} ➔ TG {dst}"
@@ -261,13 +291,15 @@ def on_message(client, userdata, msg):
client_telemetry[cid]["alt"] = f"{'' if act=='end' else '⚠️'} {name}: {info['src']}"
save_cache(client_telemetry)
if k in active_calls[cid]: del active_calls[cid][k]
except Exception as e: print(f"ERRORE MQTT: {e}")
except Exception as e: logger.error(f"ERRORE MQTT MSG: {e}")
# --- INIZIALIZZAZIONE CLIENT MQTT ---
mqtt_backend = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, "flask_backend")
mqtt_backend.username_pw_set(config['mqtt']['user'], config['mqtt']['password'])
mqtt_backend.on_connect = on_connect
mqtt_backend.on_disconnect = on_disconnect
mqtt_backend.on_message = on_message
mqtt_backend.connect(config['mqtt']['broker'], config['mqtt']['port'])
mqtt_backend.subscribe([("servizi/+/stat",0), ("dmr-gateway/+/json",0), ("devices/+/services",0), ("nxdn-gateway/+/json",0), ("ysf-gateway/+/json",0), ("p25-gateway/+/json",0), ("dstar-gateway/+/json",0), ("mmdvm/+/json",0), ("devices/#", 0), ("data/#", 0)])
mqtt_backend.loop_start()
@app.route('/')
@@ -410,7 +442,6 @@ def delete_user(user_id):
conn.close()
return jsonify({"success": True})
# --- NUOVA API PER LA MODIFICA DEGLI UTENTI ESISTENTI ---
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
if session.get('role') != 'admin':
@@ -491,14 +522,14 @@ def auto_update_ids():
})
now = time.strftime("%H:%M")
if now == target_time:
print(f">>> [AUTO-UPDATE] Orario raggiunto ({now}). Download in corso...")
logger.info(f">>> [AUTO-UPDATE] Orario raggiunto ({now}). Download in corso...")
urllib.request.urlretrieve(urls["dmr"], DMR_IDS_PATH)
urllib.request.urlretrieve(urls["nxdn"], NXDN_IDS_PATH)
load_ids()
print(f">>> [AUTO-UPDATE] Completato con successo.")
logger.info(f">>> [AUTO-UPDATE] Completato con successo.")
time.sleep(65)
except Exception as e:
print(f">>> [AUTO-UPDATE] Errore: {e}")
logger.error(f">>> [AUTO-UPDATE] Errore: {e}")
time.sleep(30)
@app.route('/api/ui_config', methods=['GET'])