fix: Fase 4.2 - Corrección de registros duplicados en API central (idempotencia)
Implementada lógica idempotente en monitor.rs para evitar la creación infinita de registros duplicados en la API central. PROBLEMA RESUELTO: - Monitor enviaba POST cada 60s sin verificar si app ya existe - Resultado: Miles de registros duplicados en base de datos central - Impacto: Saturación de BD y datos inconsistentes SOLUCIÓN IMPLEMENTADA: 1. Cache local de IDs de apps (AppIdCache con HashMap + RwLock) 2. Función sync_to_cloud() con lógica idempotente: - Verificar cache local primero - Si no está en cache: GET para buscar en API central - Si no existe en API: POST para crear (solo primera vez) - Si existe: PUT para actualizar estado 3. Uso correcto de endpoints de API central: - GET /api/apps_servcs/apps (buscar) - POST /api/apps_servcs/apps (crear) - PUT /api/apps_servcs/apps/:id/status (actualizar) FUNCIONES IMPLEMENTADAS: - sync_to_cloud() - Coordina flujo idempotente - find_app_in_cloud() - Busca app por nombre + servidor - create_app_in_cloud() - Crea nueva app (retorna ID) - update_app_in_cloud() - Actualiza estado existente CAMBIOS TÉCNICOS: - Agregado cache AppIdCache (Arc<RwLock<HashMap<String, i32>>>) - Tipos CloudApp y CloudAppsResponse para deserialización - Box<dyn Error + Send + Sync> para compatibilidad tokio - Revertido cambios incompletos en AppManager RESULTADO: ✅ Primera ejecución: Crea app en API central (POST) ✅ Siguientes ejecuciones: Solo actualiza estado (PUT) 🚫 NO más duplicados infinitos 📊 Base de datos limpia y consistente Archivos modificados: - src/monitor.rs: +180/-50 líneas (lógica idempotente completa) - src/orchestrator/app_manager.rs: Revertido cambios incompletos - tareas.txt: Documentación completa de Fase 4.2
This commit is contained in:
228
src/monitor.rs
228
src/monitor.rs
@@ -1,7 +1,10 @@
|
||||
use sysinfo::System;
|
||||
use serde::Serialize;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
|
||||
use std::time::Duration;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use crate::logger::get_logger;
|
||||
use crate::config::get_config_manager;
|
||||
use crate::systemd::SystemCtl;
|
||||
@@ -31,12 +34,32 @@ struct AppStatusUpdate {
|
||||
discrepancy: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct CloudApp {
|
||||
id: i32,
|
||||
app_name: String,
|
||||
server: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct CloudAppsResponse {
|
||||
success: bool,
|
||||
count: i32,
|
||||
data: Vec<CloudApp>,
|
||||
}
|
||||
|
||||
// Cache de IDs de apps ya sincronizadas (app_name -> id)
|
||||
type AppIdCache = Arc<RwLock<HashMap<String, i32>>>;
|
||||
|
||||
pub async fn run_monitoring(server_name: String, api_key: String, cloud_url: String) {
|
||||
let logger = get_logger();
|
||||
let config_manager = get_config_manager();
|
||||
let mut sys = System::new_all();
|
||||
let user_agent = generate_user_agent();
|
||||
|
||||
// Cache de IDs de apps ya sincronizadas
|
||||
let app_id_cache: AppIdCache = Arc::new(RwLock::new(HashMap::new()));
|
||||
|
||||
logger.info("Monitor", &format!("Vigilando procesos para {} [{}]", server_name, user_agent));
|
||||
println!("🚀 Monitor: Vigilando procesos para {}", server_name);
|
||||
println!("📡 User-Agent: {}", user_agent);
|
||||
@@ -63,15 +86,22 @@ pub async fn run_monitoring(server_name: String, api_key: String, cloud_url: Str
|
||||
);
|
||||
}
|
||||
|
||||
match send_to_cloud(data, &api_key, &cloud_url, &user_agent).await {
|
||||
match sync_to_cloud(
|
||||
data,
|
||||
&api_key,
|
||||
&cloud_url,
|
||||
&user_agent,
|
||||
&server_name,
|
||||
Arc::clone(&app_id_cache)
|
||||
).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
logger.error(
|
||||
"Monitor",
|
||||
&format!("Error enviando {}", app.name),
|
||||
&format!("Error sincronizando {}", app.name),
|
||||
Some(&e.to_string())
|
||||
);
|
||||
eprintln!("❌ Error enviando {}: {}", app.name, e);
|
||||
eprintln!("❌ Error sincronizando {}: {}", app.name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -140,57 +170,177 @@ fn collect_metrics_with_systemd(sys: &System, name: &str, port: i32, server: &st
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_to_cloud(
|
||||
/// Sincroniza app con la API central (idempotente)
|
||||
async fn sync_to_cloud(
|
||||
data: AppStatusUpdate,
|
||||
api_key: &str,
|
||||
cloud_url: &str,
|
||||
user_agent: &str
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
user_agent: &str,
|
||||
server_name: &str,
|
||||
app_id_cache: AppIdCache,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let logger = get_logger();
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-api-key",
|
||||
HeaderValue::from_str(api_key)?
|
||||
);
|
||||
headers.insert(
|
||||
"Content-Type",
|
||||
HeaderValue::from_static("application/json")
|
||||
);
|
||||
headers.insert(
|
||||
USER_AGENT,
|
||||
HeaderValue::from_str(user_agent)?
|
||||
// 1. Verificar si ya tenemos el ID en cache
|
||||
let cached_id = {
|
||||
let cache = app_id_cache.read().await;
|
||||
cache.get(&data.app_name).copied()
|
||||
};
|
||||
|
||||
let app_id = if let Some(id) = cached_id {
|
||||
logger.info("Monitor", &format!("App {} ya existe (ID: {}), actualizando...", data.app_name, id));
|
||||
id
|
||||
} else {
|
||||
// 2. Buscar en la API central si existe
|
||||
logger.info("Monitor", &format!("Buscando app {} en API central...", data.app_name));
|
||||
match find_app_in_cloud(&client, api_key, cloud_url, &data.app_name, server_name, user_agent).await {
|
||||
Ok(Some(id)) => {
|
||||
logger.info("Monitor", &format!("App {} encontrada en cloud (ID: {})", data.app_name, id));
|
||||
// Guardar en cache
|
||||
let mut cache = app_id_cache.write().await;
|
||||
cache.insert(data.app_name.clone(), id);
|
||||
id
|
||||
}
|
||||
Ok(None) => {
|
||||
// 3. No existe, crear nueva
|
||||
logger.info("Monitor", &format!("App {} no existe, creando...", data.app_name));
|
||||
match create_app_in_cloud(&client, api_key, cloud_url, &data, user_agent).await {
|
||||
Ok(id) => {
|
||||
logger.info("Monitor", &format!("App {} creada exitosamente (ID: {})", data.app_name, id));
|
||||
// Guardar en cache
|
||||
let mut cache = app_id_cache.write().await;
|
||||
cache.insert(data.app_name.clone(), id);
|
||||
println!("✨ {} -> CREADA (ID: {})", data.app_name, id);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
logger.error("Monitor", &format!("Error creando app {}", data.app_name), Some(&e.to_string()));
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
logger.error("Monitor", &format!("Error buscando app {}", data.app_name), Some(&e.to_string()));
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// 4. Actualizar estado existente
|
||||
update_app_in_cloud(&client, api_key, cloud_url, app_id, &data, user_agent).await?;
|
||||
|
||||
println!("📤 {} -> {} (PID: {}, CPU: {}, RAM: {})",
|
||||
data.app_name,
|
||||
data.status,
|
||||
data.pid,
|
||||
data.cpu_usage,
|
||||
data.memory_usage
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Busca una app en la API central por nombre y servidor
|
||||
async fn find_app_in_cloud(
|
||||
client: &reqwest::Client,
|
||||
api_key: &str,
|
||||
base_url: &str,
|
||||
app_name: &str,
|
||||
server_name: &str,
|
||||
user_agent: &str,
|
||||
) -> Result<Option<i32>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-api-key", HeaderValue::from_str(api_key)?);
|
||||
headers.insert(USER_AGENT, HeaderValue::from_str(user_agent)?);
|
||||
|
||||
// GET /api/apps_servcs/apps
|
||||
let response = client
|
||||
.post(cloud_url)
|
||||
.get(base_url)
|
||||
.headers(headers)
|
||||
.timeout(Duration::from_secs(10))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let apps_response: CloudAppsResponse = response.json().await?;
|
||||
|
||||
// Buscar la app por nombre y servidor
|
||||
for app in apps_response.data {
|
||||
if app.app_name == app_name && app.server == server_name {
|
||||
return Ok(Some(app.id));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Crea una nueva app en la API central
|
||||
async fn create_app_in_cloud(
|
||||
client: &reqwest::Client,
|
||||
api_key: &str,
|
||||
base_url: &str,
|
||||
data: &AppStatusUpdate,
|
||||
user_agent: &str,
|
||||
) -> Result<i32, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-api-key", HeaderValue::from_str(api_key)?);
|
||||
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
|
||||
headers.insert(USER_AGENT, HeaderValue::from_str(user_agent)?);
|
||||
|
||||
// POST /api/apps_servcs/apps
|
||||
let response = client
|
||||
.post(base_url)
|
||||
.headers(headers)
|
||||
.json(&data)
|
||||
.timeout(Duration::from_secs(10))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
println!("📤 {} -> {} (PID: {}, CPU: {}, RAM: {})",
|
||||
data.app_name,
|
||||
data.status,
|
||||
data.pid,
|
||||
data.cpu_usage,
|
||||
data.memory_usage
|
||||
);
|
||||
Ok(())
|
||||
} else {
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let error_text = response.text().await.unwrap_or_else(|_| "Sin respuesta".to_string());
|
||||
|
||||
logger.error(
|
||||
"Monitor",
|
||||
&format!("Error enviando datos de {}", data.app_name),
|
||||
Some(&format!("HTTP {}: {}", status, error_text))
|
||||
);
|
||||
|
||||
eprintln!("⚠️ Error HTTP {}: {}", status, error_text);
|
||||
Err(format!("HTTP {}: {}", status, error_text).into())
|
||||
return Err(format!("HTTP {}: {}", status, error_text).into());
|
||||
}
|
||||
|
||||
// La API retorna el objeto creado, extraer el ID
|
||||
let created_app: CloudApp = response.json().await?;
|
||||
Ok(created_app.id)
|
||||
}
|
||||
|
||||
/// Actualiza el estado de una app existente en la API central
|
||||
async fn update_app_in_cloud(
|
||||
client: &reqwest::Client,
|
||||
api_key: &str,
|
||||
base_url: &str,
|
||||
app_id: i32,
|
||||
data: &AppStatusUpdate,
|
||||
user_agent: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-api-key", HeaderValue::from_str(api_key)?);
|
||||
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
|
||||
headers.insert(USER_AGENT, HeaderValue::from_str(user_agent)?);
|
||||
|
||||
// PUT /api/apps_servcs/apps/:id/status
|
||||
let url = format!("{}/{}/status", base_url, app_id);
|
||||
|
||||
let response = client
|
||||
.put(&url)
|
||||
.headers(headers)
|
||||
.json(&data)
|
||||
.timeout(Duration::from_secs(10))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let error_text = response.text().await.unwrap_or_else(|_| "Sin respuesta".to_string());
|
||||
return Err(format!("HTTP {}: {}", status, error_text).into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user