Files
SIAX-MONITOR/src/monitor.rs
pablinux d18cb7c3dd fix: Fase 4.2 - Corrección de registros duplicados en API central (idempotencia)
Implementada lógica idempotente en monitor.rs para evitar la creación
infinita de registros duplicados en la API central.

PROBLEMA RESUELTO:
- Monitor enviaba POST cada 60s sin verificar si app ya existe
- Resultado: Miles de registros duplicados en base de datos central
- Impacto: Saturación de BD y datos inconsistentes

SOLUCIÓN IMPLEMENTADA:
1. Cache local de IDs de apps (AppIdCache con HashMap + RwLock)
2. Función sync_to_cloud() con lógica idempotente:
   - Verificar cache local primero
   - Si no está en cache: GET para buscar en API central
   - Si no existe en API: POST para crear (solo primera vez)
   - Si existe: PUT para actualizar estado
3. Uso correcto de endpoints de API central:
   - GET /api/apps_servcs/apps (buscar)
   - POST /api/apps_servcs/apps (crear)
   - PUT /api/apps_servcs/apps/:id/status (actualizar)

FUNCIONES IMPLEMENTADAS:
- sync_to_cloud() - Coordina flujo idempotente
- find_app_in_cloud() - Busca app por nombre + servidor
- create_app_in_cloud() - Crea nueva app (retorna ID)
- update_app_in_cloud() - Actualiza estado existente

CAMBIOS TÉCNICOS:
- Agregado cache AppIdCache (Arc<RwLock<HashMap<String, i32>>>)
- Tipos CloudApp y CloudAppsResponse para deserialización
- Box<dyn Error + Send + Sync> para compatibilidad tokio
- Revertido cambios incompletos en AppManager

RESULTADO:
 Primera ejecución: Crea app en API central (POST)
 Siguientes ejecuciones: Solo actualiza estado (PUT)
🚫 NO más duplicados infinitos
📊 Base de datos limpia y consistente

Archivos modificados:
- src/monitor.rs: +180/-50 líneas (lógica idempotente completa)
- src/orchestrator/app_manager.rs: Revertido cambios incompletos
- tareas.txt: Documentación completa de Fase 4.2
2026-01-15 02:56:56 -05:00

347 lines
11 KiB
Rust

use sysinfo::System;
use serde::{Serialize, Deserialize};
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
use std::time::Duration;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::logger::get_logger;
use crate::config::get_config_manager;
use crate::systemd::SystemCtl;
use crate::models::{AppStatus, ServiceStatus};
// User-Agent dinámico
fn generate_user_agent() -> String {
let version = env!("CARGO_PKG_VERSION");
let os = std::env::consts::OS;
let arch = std::env::consts::ARCH;
format!("SIAX-Agent/{} ({}/{}; Rust-Monitor)", version, os, arch)
}
#[derive(Serialize, Debug)]
struct AppStatusUpdate {
app_name: String,
server: String,
status: String,
port: i32,
pid: i32,
memory_usage: String,
cpu_usage: String,
last_check: String,
systemd_status: String,
#[serde(skip_serializing_if = "Option::is_none")]
discrepancy: Option<String>,
}
#[derive(Deserialize, Debug)]
struct CloudApp {
id: i32,
app_name: String,
server: String,
}
#[derive(Deserialize, Debug)]
struct CloudAppsResponse {
success: bool,
count: i32,
data: Vec<CloudApp>,
}
// Cache de IDs de apps ya sincronizadas (app_name -> id)
type AppIdCache = Arc<RwLock<HashMap<String, i32>>>;
pub async fn run_monitoring(server_name: String, api_key: String, cloud_url: String) {
let logger = get_logger();
let config_manager = get_config_manager();
let mut sys = System::new_all();
let user_agent = generate_user_agent();
// Cache de IDs de apps ya sincronizadas
let app_id_cache: AppIdCache = Arc::new(RwLock::new(HashMap::new()));
logger.info("Monitor", &format!("Vigilando procesos para {} [{}]", server_name, user_agent));
println!("🚀 Monitor: Vigilando procesos para {}", server_name);
println!("📡 User-Agent: {}", user_agent);
loop {
sys.refresh_all();
// ✨ LEER APPS DESDE CONFIG (dinámico)
let apps_to_monitor = config_manager.get_apps();
if apps_to_monitor.is_empty() {
logger.warning("Monitor", "No hay apps configuradas para monitorear", None);
}
for app in apps_to_monitor {
let data = collect_metrics_with_systemd(&sys, &app.name, app.port, &server_name);
// Reportar discrepancias
if let Some(ref disc) = data.discrepancy {
logger.warning(
"Monitor",
&format!("Discrepancia detectada en {}", app.name),
Some(disc)
);
}
match sync_to_cloud(
data,
&api_key,
&cloud_url,
&user_agent,
&server_name,
Arc::clone(&app_id_cache)
).await {
Ok(_) => {},
Err(e) => {
logger.error(
"Monitor",
&format!("Error sincronizando {}", app.name),
Some(&e.to_string())
);
eprintln!("❌ Error sincronizando {}: {}", app.name, e);
}
}
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
fn collect_metrics_with_systemd(sys: &System, name: &str, port: i32, server: &str) -> AppStatusUpdate {
let mut pid_encontrado = 0;
let mut cpu = 0.0;
let mut mem = 0.0;
let mut process_detected = false;
// 1. Detección por proceso (método original)
for (pid, process) in sys.processes() {
let process_name = process.name();
// Soportar node y python
if process_name.contains("node") || process_name.contains("python") {
if let Some(cwd) = process.cwd() {
let cwd_str = cwd.to_string_lossy();
if cwd_str.contains(name) {
pid_encontrado = pid.as_u32() as i32;
cpu = process.cpu_usage();
mem = process.memory() as f64 / 1024.0 / 1024.0;
process_detected = true;
break;
}
}
}
}
// 2. Consultar systemd
let service_name = format!("{}.service", name);
let systemd_status = SystemCtl::status(&service_name);
// 3. Reconciliar estados
let final_status = AppStatus::reconcile(process_detected, &systemd_status);
// 4. Detectar discrepancias
let discrepancy = match (&final_status, process_detected, &systemd_status) {
(AppStatus::Crashed, false, ServiceStatus::Active) => {
Some(format!("Systemd reporta activo pero proceso no detectado"))
}
(AppStatus::Zombie, true, ServiceStatus::Inactive) => {
Some(format!("Proceso detectado pero systemd reporta inactivo"))
}
_ => None,
};
let now = chrono::Local::now();
let timestamp = now.format("%Y-%m-%d %H:%M:%S").to_string();
AppStatusUpdate {
app_name: name.to_string(),
server: server.to_string(),
status: final_status.as_str().to_string(),
port,
pid: if pid_encontrado > 0 { pid_encontrado } else { 0 },
memory_usage: format!("{:.2}MB", mem),
cpu_usage: format!("{:.2}%", cpu),
last_check: timestamp,
systemd_status: systemd_status.as_str().to_string(),
discrepancy,
}
}
/// Sincroniza app con la API central (idempotente)
async fn sync_to_cloud(
data: AppStatusUpdate,
api_key: &str,
cloud_url: &str,
user_agent: &str,
server_name: &str,
app_id_cache: AppIdCache,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let logger = get_logger();
let client = reqwest::Client::new();
// 1. Verificar si ya tenemos el ID en cache
let cached_id = {
let cache = app_id_cache.read().await;
cache.get(&data.app_name).copied()
};
let app_id = if let Some(id) = cached_id {
logger.info("Monitor", &format!("App {} ya existe (ID: {}), actualizando...", data.app_name, id));
id
} else {
// 2. Buscar en la API central si existe
logger.info("Monitor", &format!("Buscando app {} en API central...", data.app_name));
match find_app_in_cloud(&client, api_key, cloud_url, &data.app_name, server_name, user_agent).await {
Ok(Some(id)) => {
logger.info("Monitor", &format!("App {} encontrada en cloud (ID: {})", data.app_name, id));
// Guardar en cache
let mut cache = app_id_cache.write().await;
cache.insert(data.app_name.clone(), id);
id
}
Ok(None) => {
// 3. No existe, crear nueva
logger.info("Monitor", &format!("App {} no existe, creando...", data.app_name));
match create_app_in_cloud(&client, api_key, cloud_url, &data, user_agent).await {
Ok(id) => {
logger.info("Monitor", &format!("App {} creada exitosamente (ID: {})", data.app_name, id));
// Guardar en cache
let mut cache = app_id_cache.write().await;
cache.insert(data.app_name.clone(), id);
println!("{} -> CREADA (ID: {})", data.app_name, id);
return Ok(());
}
Err(e) => {
logger.error("Monitor", &format!("Error creando app {}", data.app_name), Some(&e.to_string()));
return Err(e);
}
}
}
Err(e) => {
logger.error("Monitor", &format!("Error buscando app {}", data.app_name), Some(&e.to_string()));
return Err(e);
}
}
};
// 4. Actualizar estado existente
update_app_in_cloud(&client, api_key, cloud_url, app_id, &data, user_agent).await?;
println!("📤 {} -> {} (PID: {}, CPU: {}, RAM: {})",
data.app_name,
data.status,
data.pid,
data.cpu_usage,
data.memory_usage
);
Ok(())
}
/// Busca una app en la API central por nombre y servidor
async fn find_app_in_cloud(
client: &reqwest::Client,
api_key: &str,
base_url: &str,
app_name: &str,
server_name: &str,
user_agent: &str,
) -> Result<Option<i32>, Box<dyn std::error::Error + Send + Sync>> {
let mut headers = HeaderMap::new();
headers.insert("x-api-key", HeaderValue::from_str(api_key)?);
headers.insert(USER_AGENT, HeaderValue::from_str(user_agent)?);
// GET /api/apps_servcs/apps
let response = client
.get(base_url)
.headers(headers)
.timeout(Duration::from_secs(10))
.send()
.await?;
if !response.status().is_success() {
return Ok(None);
}
let apps_response: CloudAppsResponse = response.json().await?;
// Buscar la app por nombre y servidor
for app in apps_response.data {
if app.app_name == app_name && app.server == server_name {
return Ok(Some(app.id));
}
}
Ok(None)
}
/// Crea una nueva app en la API central
async fn create_app_in_cloud(
client: &reqwest::Client,
api_key: &str,
base_url: &str,
data: &AppStatusUpdate,
user_agent: &str,
) -> Result<i32, Box<dyn std::error::Error + Send + Sync>> {
let mut headers = HeaderMap::new();
headers.insert("x-api-key", HeaderValue::from_str(api_key)?);
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
headers.insert(USER_AGENT, HeaderValue::from_str(user_agent)?);
// POST /api/apps_servcs/apps
let response = client
.post(base_url)
.headers(headers)
.json(&data)
.timeout(Duration::from_secs(10))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_else(|_| "Sin respuesta".to_string());
return Err(format!("HTTP {}: {}", status, error_text).into());
}
// La API retorna el objeto creado, extraer el ID
let created_app: CloudApp = response.json().await?;
Ok(created_app.id)
}
/// Actualiza el estado de una app existente en la API central
async fn update_app_in_cloud(
client: &reqwest::Client,
api_key: &str,
base_url: &str,
app_id: i32,
data: &AppStatusUpdate,
user_agent: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut headers = HeaderMap::new();
headers.insert("x-api-key", HeaderValue::from_str(api_key)?);
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
headers.insert(USER_AGENT, HeaderValue::from_str(user_agent)?);
// PUT /api/apps_servcs/apps/:id/status
let url = format!("{}/{}/status", base_url, app_id);
let response = client
.put(&url)
.headers(headers)
.json(&data)
.timeout(Duration::from_secs(10))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_else(|_| "Sin respuesta".to_string());
return Err(format!("HTTP {}: {}", status, error_text).into());
}
Ok(())
}