use axum::{ extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, Path, State, }, response::IntoResponse, }; use futures::{sink::SinkExt, stream::StreamExt}; use std::process::Stdio; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command as TokioCommand; use crate::logger::get_logger; use dashmap::DashMap; pub struct WebSocketManager { active_connections: Arc>, max_connections_per_app: usize, } impl WebSocketManager { pub fn new() -> Self { WebSocketManager { active_connections: Arc::new(DashMap::new()), max_connections_per_app: 5, } } fn can_connect(&self, app_name: &str) -> bool { let count = self.active_connections .get(app_name) .map(|v| *v) .unwrap_or(0); count < self.max_connections_per_app } fn increment_connection(&self, app_name: &str) { self.active_connections .entry(app_name.to_string()) .and_modify(|c| *c += 1) .or_insert(1); } fn decrement_connection(&self, app_name: &str) { self.active_connections .entry(app_name.to_string()) .and_modify(|c| { if *c > 0 { *c -= 1; } }); } } impl Default for WebSocketManager { fn default() -> Self { Self::new() } } pub async fn logs_websocket_handler( ws: WebSocketUpgrade, Path(app_name): Path, State(ws_manager): State>, ) -> impl IntoResponse { let logger = get_logger(); // Verificar límite de conexiones if !ws_manager.can_connect(&app_name) { logger.warning( "WebSocket", "Límite de conexiones excedido", Some(&app_name) ); return ws.on_upgrade(move |socket| async move { let mut socket = socket; let _ = socket.send(Message::Text( "Error: Límite de conexiones simultáneas excedido".to_string() )).await; let _ = socket.close().await; }); } logger.info("WebSocket", &format!("Nueva conexión para logs de: {}", app_name)); ws_manager.increment_connection(&app_name); ws.on_upgrade(move |socket| handle_logs_socket(socket, app_name, ws_manager)) } async fn handle_logs_socket( socket: WebSocket, app_name: String, ws_manager: Arc, ) { let logger = get_logger(); let service_name = format!("{}.service", app_name); // Iniciar journalctl let mut child = match TokioCommand::new("journalctl") .arg("-u") .arg(&service_name) .arg("-f") .arg("--output=json") .arg("-n") .arg("50") // Últimas 50 líneas .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn() { Ok(child) => child, Err(e) => { logger.error("WebSocket", "Error iniciando journalctl", Some(&e.to_string())); ws_manager.decrement_connection(&app_name); return; } }; let stdout = child.stdout.take().unwrap(); let reader = BufReader::new(stdout); let mut lines = reader.lines(); let (mut sender, mut receiver) = socket.split(); // Enviar mensaje de bienvenida let welcome = format!("📡 Conectado a logs de: {}", app_name); let _ = sender.send(Message::Text(welcome)).await; // Task para enviar logs let send_task = tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { // Parsear JSON de journalctl if let Ok(json) = serde_json::from_str::(&line) { let message = json.get("MESSAGE") .and_then(|m| m.as_str()) .unwrap_or(&line); let timestamp = json.get("__REALTIME_TIMESTAMP") .and_then(|t| t.as_str()) .unwrap_or(""); let formatted = if !timestamp.is_empty() { format!("[{}] {}", timestamp, message) } else { message.to_string() }; if sender.send(Message::Text(formatted)).await.is_err() { break; } } else { // Si no es JSON, enviar la línea tal cual if sender.send(Message::Text(line)).await.is_err() { break; } } } }); // Task para recibir mensajes del cliente (para detectar desconexión) let receive_task = tokio::spawn(async move { while let Some(msg) = receiver.next().await { if let Ok(msg) = msg { match msg { Message::Close(_) => break, Message::Ping(_) => { // Los pings se manejan automáticamente } _ => {} } } else { break; } } }); // Esperar a que termine alguna de las dos tasks tokio::select! { _ = send_task => {}, _ = receive_task => {}, } // Cleanup let _ = child.kill().await; ws_manager.decrement_connection(&app_name); logger.info("WebSocket", &format!("Conexión cerrada para: {}", app_name)); }