Files
SIAX-MONITOR/src/api/websocket.rs
pablinux 3798f911f1 fix: Corregir formato de service_name en WebSocket de logs
Problema:
- WebSocket de logs usaba formato incorrecto: {app_name}.service
- Debería ser: siax-app-{app_name}.service
- Esto causaba que journalctl no encontrara el servicio
- Los logs de aplicaciones NO funcionaban

Solución:
- Corregir format!() en websocket.rs línea 96
- Ahora: format!("siax-app-{}.service", app_name)
- journalctl ahora busca el servicio correcto

Los logs de aplicaciones ahora funcionan correctamente vía:
journalctl -u siax-app-IDEAS.service -f --output=json -n 50
2026-01-18 04:12:30 -05:00

189 lines
5.4 KiB
Rust

use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
response::IntoResponse,
};
use futures::{sink::SinkExt, stream::StreamExt};
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
use crate::logger::get_logger;
use dashmap::DashMap;
pub struct WebSocketManager {
active_connections: Arc<DashMap<String, usize>>,
max_connections_per_app: usize,
}
impl WebSocketManager {
pub fn new() -> Self {
WebSocketManager {
active_connections: Arc::new(DashMap::new()),
max_connections_per_app: 5,
}
}
fn can_connect(&self, app_name: &str) -> bool {
let count = self.active_connections
.get(app_name)
.map(|v| *v)
.unwrap_or(0);
count < self.max_connections_per_app
}
fn increment_connection(&self, app_name: &str) {
self.active_connections
.entry(app_name.to_string())
.and_modify(|c| *c += 1)
.or_insert(1);
}
fn decrement_connection(&self, app_name: &str) {
self.active_connections
.entry(app_name.to_string())
.and_modify(|c| {
if *c > 0 {
*c -= 1;
}
});
}
}
impl Default for WebSocketManager {
fn default() -> Self {
Self::new()
}
}
pub async fn logs_websocket_handler(
ws: WebSocketUpgrade,
Path(app_name): Path<String>,
State(ws_manager): State<Arc<WebSocketManager>>,
) -> impl IntoResponse {
let logger = get_logger();
// Verificar límite de conexiones
if !ws_manager.can_connect(&app_name) {
logger.warning(
"WebSocket",
"Límite de conexiones excedido",
Some(&app_name)
);
return ws.on_upgrade(move |socket| async move {
let mut socket = socket;
let _ = socket.send(Message::Text(
"Error: Límite de conexiones simultáneas excedido".to_string()
)).await;
let _ = socket.close().await;
});
}
logger.info("WebSocket", &format!("Nueva conexión para logs de: {}", app_name));
ws_manager.increment_connection(&app_name);
ws.on_upgrade(move |socket| handle_logs_socket(socket, app_name, ws_manager))
}
async fn handle_logs_socket(
socket: WebSocket,
app_name: String,
ws_manager: Arc<WebSocketManager>,
) {
let logger = get_logger();
let service_name = format!("siax-app-{}.service", app_name);
// Iniciar journalctl
let mut child = match TokioCommand::new("journalctl")
.arg("-u")
.arg(&service_name)
.arg("-f")
.arg("--output=json")
.arg("-n")
.arg("50") // Últimas 50 líneas
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
{
Ok(child) => child,
Err(e) => {
logger.error("WebSocket", "Error iniciando journalctl", Some(&e.to_string()));
ws_manager.decrement_connection(&app_name);
return;
}
};
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
let (mut sender, mut receiver) = socket.split();
// Enviar mensaje de bienvenida
let welcome = format!("📡 Conectado a logs de: {}", app_name);
let _ = sender.send(Message::Text(welcome)).await;
// Task para enviar logs
let send_task = tokio::spawn(async move {
while let Ok(Some(line)) = lines.next_line().await {
// Parsear JSON de journalctl
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
let message = json.get("MESSAGE")
.and_then(|m| m.as_str())
.unwrap_or(&line);
let timestamp = json.get("__REALTIME_TIMESTAMP")
.and_then(|t| t.as_str())
.unwrap_or("");
let formatted = if !timestamp.is_empty() {
format!("[{}] {}", timestamp, message)
} else {
message.to_string()
};
if sender.send(Message::Text(formatted)).await.is_err() {
break;
}
} else {
// Si no es JSON, enviar la línea tal cual
if sender.send(Message::Text(line)).await.is_err() {
break;
}
}
}
});
// Task para recibir mensajes del cliente (para detectar desconexión)
let receive_task = tokio::spawn(async move {
while let Some(msg) = receiver.next().await {
if let Ok(msg) = msg {
match msg {
Message::Close(_) => break,
Message::Ping(_) => {
// Los pings se manejan automáticamente
}
_ => {}
}
} else {
break;
}
}
});
// Esperar a que termine alguna de las dos tasks
tokio::select! {
_ = send_task => {},
_ = receive_task => {},
}
// Cleanup
let _ = child.kill().await;
ws_manager.decrement_connection(&app_name);
logger.info("WebSocket", &format!("Conexión cerrada para: {}", app_name));
}