
Caso de uso: «Viajero Imposible» en Wazuh
El concepto de «Viajero Imposible» es una técnica de detección de anomalías basada en la ubicación geográfica de las acciones de un usuario. Se denomina así porque se refiere a situaciones en las que un usuario aparentemente accede a un sistema desde dos o más ubicaciones distantes en un período de tiempo muy corto, lo que sería físicamente imposible en el mundo real. Este tipo de comportamiento puede indicar un acceso comprometido o no autorizado a una cuenta. Ejemplo: Un ejemplo típico podría ser si un usuario, «Usuario1», inicia sesión desde Buenos Aires a las 10:00 AM y, unos 20 minutos después, registra un acceso desde Londres. A menos que «Usuario1» pueda viajar instantáneamente entre estos dos puntos, este evento es sospechoso y podría deberse a que un actor malintencionado ha tomado control de las credenciales del usuario. ¿Cuánto se demora una persona en viajar de Buenos Aires a Londres aproximadamente? Por lo que, si nosotros detectamos previamente un acceso por VPN del Usuario1 desde Buenos Aires, es imposible que 20 minutos más tarde pueda acceder por VPN desde Londres, ya que como mínimo, la diferencia entre un evento y el otro debería ser de no menos de 14h 55min (aprox). Entonces, si 20 minutos después detectamos que se conecto por VPN desde Londres levantamos una alerta de viajero imposible. Objetivo de esta solución El objetivo de esta implementación es proporcionar una solución personalizada que permita la detección del «Viajero Imposible» dentro de Wazuh. Wazuh, aunque es una potente plataforma de seguridad y monitoreo, no cuenta con la capacidad nativa para realizar correlaciones basadas en la ubicación geográfica de los usuarios a lo largo del tiempo. Por ello, se propone el uso de un script en Python que extrae información sobre eventos de conexión (como en el caso de accesos VPN), consulta servicios de geolocalización para determinar la ubicación geográfica de la IP, y realiza comparaciones temporales y espaciales para identificar accesos sospechosos. Este enfoque proporciona la flexibilidad necesaria para almacenar y comparar eventos de inicio de sesión de usuarios, permitiendo alertar a los administradores de sistemas cuando se detecte un «Viajero Imposible», es decir, cuando las acciones de un usuario parecen físicamente inviables. En esta publicación, nos centraremos en la detección de «viajeros imposibles» en conexiones VPN. Sin embargo, el enfoque se puede escalar para detectar este tipo de comportamiento en cualquier tipo de inicio de sesión o acción que esté vinculada a una ubicación geográfica. Wazuh es una plataforma de código abierto para la monitorización y detección de amenazas, que ofrece una gran flexibilidad mediante integraciones personalizadas. Aunque su funcionalidad básica no incluye una detección avanzada basada en la ubicación geográfica de los eventos, podemos extender sus capacidades utilizando scripts personalizados y APIs externas. Para implementar el caso de uso del «Viajero Imposible», desarrollaremos un script en Python que se encargará de: Aunque Wazuh tiene muchas funcionalidades, no incluye una capacidad integrada para correlacionar múltiples eventos y almacenar datos de ubicación en la plataforma para su comparación posterior. Por lo tanto, implementamos un script en Python para gestionar esta tarea de manera eficiente. Este enfoque nos permite: La base de datos utilizada es una SQLite3 que almacenará la siguiente información para cada conexión VPN registrada: Diagrama del flujo del script Configuraciones Wazuh Reglas: <group name=»vpn_imposible_traveler»> <rule id=»555555″ level=»0″> <location>Imposible_traveler_VPN</location> <description>Group de reglas viajero imposible</description> </rule> <!– Viajero imposible –> <rule id=»555556″ level=»14″> <if_sid>555555</if_sid> <match>»Event ID»: «1»</match> <description>Viajero Imposible VPN</description> </rule> <!– Conexión previa desde un pais distinto al del nuevo evento –> <rule id=»555557″ level=»3″> <if_sid>555555</if_sid> <match>»Event ID»: «2»</match> <description>El usuario establecio una conexion VPN desde un país distinto al anterior</description> </rule> </group> Configuración en el ossec.conf: <integration> <name>custom-viajero_imposible</name> <rule_id>ID’s-VPN</rule_id> <alert_format>json</alert_format> </integration> Una vez que tenemos esto, debemos pegar el siguiente script en /var/ossec/integrations. SCRIPT: #!/var/ossec/framework/python/bin/python3 import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM from datetime import datetime from geopy.distance import geodesic import requests import sqlite3 # Global vars SOCKET_ADDR = ‘/var/ossec/queue/sockets/queue’ BD = ‘/var/ossec/var/db/BD_CU_ViajeroImposible.db’ def main(): try: sock = socket(AF_UNIX, SOCK_DGRAM) sock.connect(SOCKET_ADDR) # Leo el alerts.json alert_file = open(sys.argv[1]) alert_json = json.loads(alert_file.read()) alert_file.close() # Extraigo: Timestamp, Usuario, IP_Origen, Cliente alert_timestamp = alert_json[‘timestamp’] #Posibles srcip posibles_srcip = [‘srcip’, ‘src_ip’, ‘remip’] for key in alert_json[‘data’]: if key in posibles_srcip: alert_srcip = alert_json[‘data’][key] break alert_usuario = alert_json[‘data’][‘dstuser’] # HAY QUE NORMALIZAR ESTE CAMPO EN LOS DECODER PARA QUE TODOS SEAN dstuser # Traigo de IP-API los datos de la latitud y longitud para luego calcular la distancia del punto A al punto B info_nuevo_evento = query_api(alert_srcip) coords_evento_nuevo = (info_nuevo_evento[3], info_nuevo_evento[4]) # Leo la base de datos, la cual tiene este formato: timestamp,user,cliente,srcip,lat,long ACTUALIZAR # Connect to SQLite database conn = sqlite3.connect(BD) # Create a cursor object cursor = conn.cursor() cursor.execute(»’ SELECT * FROM logs WHERE usuario = ? »’, (alert_usuario,)) usuario_en_bd = cursor.fetchone() # Si existe el usuario evaluo si es posible viajar del punto A al punto B en el tiempo determinado por tiempo__evento_nuevo – tiempo__evento_viejo if usuario_en_bd: coords_evento_existente = (usuario_en_bd[3], usuario_en_bd[4]) # Calculo si es posible o no viajar del punto A al punto B en el tiempo dado por la diferencia de horas de los eventos if not es_posible_viaje(usuario_en_bd[0], alert_timestamp, coords_evento_existente, coords_evento_nuevo): # Genero la alerta de wazuh msg = { «Evento»: «El usuario establecio una conexion VPN desde un punto A y luego desde un punto B en un tiempo fisicamente imposible», «Usuario»: alert_usuario, «Timestamp anterior conexión VPN»: usuario_en_bd[0], «País anterior conexión VPN»: usuario_en_bd[5], «Ciudad anterior conexión VPN»: usuario_en_bd[6], «Region anterior conexión VPN»: usuario_en_bd[7], «IP de origen anterior conexión VPN»: usuario_en_bd[2], «Latitud aproximada anterior conexión VPN»: usuario_en_bd[3], «Longitud aproximada anterior conexión VPN»: usuario_en_bd[4], «Timestamp nueva conexón VPN»: alert_timestamp, «País nueva conexión VPN»: info_nuevo_evento[0], «Ciudad nueva conexión VPN»: info_nuevo_evento[1], «Region nueva conexión VPN»: info_nuevo_evento[2], «IP de origen nueva conexión VPN»: alert_srcip, «Latitud aproximada nueva conexión VPN»: info_nuevo_evento[3], «Longitud aproximada nueva conexión VPN»: info_nuevo_evento[4], «Timestamp nueva conexón VPN»: alert_timestamp, «País nueva conexión VPN»: info_nuevo_evento[0], «Ciudad nueva conexión VPN»: info_nuevo_evento[1], «Region nueva conexión VPN»: info_nuevo_evento[2], «IP de origen nueva conexión VPN»: alert_srcip, «Latitud aproximada nueva conexión VPN»: info_nuevo_evento[3], «Longitud aproximada nueva conexión VPN»: info_nuevo_evento[4], «Event ID»: «1» } msg = json.dumps(msg, ensure_ascii=False, indent=4) enviar_a_wazuh(sock, msg) # Caso que sea posible el viaje pero se conecto desde otro pais, es interesante alertarlo elif usuario_en_bd[5] != info_nuevo_evento[0]: # Genero la alerta de wazuh msg = { «Evento»: «El usuario establecio una conexion VPN desde un país y luego desde otro país diferente en tiempos razonables, validar con cliente», «Usuario»: alert_usuario, «Timestamp anterior conexión VPN»: usuario_en_bd[0], «País anterior conexión VPN»: usuario_en_bd[5], «Ciudad anterior conexión VPN»: usuario_en_bd[6], «Region anterior conexión VPN»: usuario_en_bd[7], «IP de origen anterior conexión VPN»: usuario_en_bd[2], «Latitud aproximada anterior conexión VPN»: usuario_en_bd[3], «Longitud aproximada anterior conexión VPN»: usuario_en_bd[4], «Timestamp nueva conexón VPN»: alert_timestamp, «País nueva conexión VPN»: info_nuevo_evento[0], «Ciudad nueva conexión VPN»: info_nuevo_evento[1], «Region nueva conexión VPN»: info_nuevo_evento[2], «IP de origen nueva conexión VPN»: alert_srcip, «Latitud aproximada nueva conexión VPN»: info_nuevo_evento[3], «Longitud aproximada nueva conexión VPN»: info_nuevo_evento[4], «Event ID»: «2» } msg = json.dumps(msg, ensure_ascii=False, indent=4) enviar_a_wazuh(sock, msg) # Por último independientemente de si es posible viajar o no actualizo los datos con los del nuevo evento cursor.execute(»’ UPDATE logs SET timestamp = ?, srcip = ?, lat = ?, lon = ?, country = ?, city = ?, regionName = ? WHERE usuario = ? »’, (alert_timestamp, alert_srcip, info_nuevo_evento[3], info_nuevo_evento[4], info_nuevo_evento[0], info_nuevo_evento[1], info_nuevo_evento[2], alert_usuario)) with open(‘/var/ossec/logs/integrations.log’, «a») as file: file.write(f’Usuario {alert_usuario} actualizado’) else: cursor.execute(»’ INSERT INTO logs (timestamp, usuario, srcip, country, city, regionName, lat, lon) VALUES (?, ?, ?, ?, ?, ?, ?, ?) »’, (alert_timestamp, alert_usuario, alert_srcip, info_nuevo_evento[0], info_nuevo_evento[1], info_nuevo_evento[2], info_nuevo_evento[3], info_nuevo_evento[4])) with open(‘/var/ossec/logs/integrations.log’, «a») as file: file.write(f’Usuario {alert_usuario} agregado a la base de datos’) # Guardo los cambios hechos en la base de datos y cierro la conexión conn.commit() conn.close() except Exception as e: with open(‘/var/ossec/logs/integrations.log’, «a») as file: file.write(str(e)) finally: sock.close() def enviar_a_wazuh(sock, msg): string = f’1:Imposible_traveler_VPN:{msg}’ sock.send(string.encode()) with open(‘/var/ossec/logs/integrations.log’, «a») as file: file.write(f’Alerta enviada a Wazuh {string}’) # Función para calcular si es posible viajar entre dos lugares con el tiempo dado def es_posible_viaje(tiempo_a, tiempo_b, coords_a, coords_b): # Calcular la distancia entre los puntos A y B utilizando geopy (geodesic) distancia = geodesic(coords_a, coords_b).kilometers # Calcular la diferencia de tiempo en horas fmt = «%Y-%m-%dT%H:%M:%S.%f%z» # Formato de fecha y hora tiempo_a_dt = datetime.strptime(tiempo_a, fmt) tiempo_b_dt = datetime.strptime(tiempo_b, fmt) delta_tiempo = abs((tiempo_b_dt – tiempo_a_dt).total_seconds() / 3600) # Convertir segundos a horas velocidad = 800 # Velocidad promedio avión en km/h # Calcular el tiempo necesario para recorrer la distancia a la velocidad dada tiempo_necesario = distancia / velocidad # Verificar si es posible el viaje return delta_tiempo >= tiempo_necesario def query_api(ip): url = f’http://ip-api.com/json/{ip}?fields=country,regionName,city,lat,lon,query’ respuesta = requests.get(url) if respuesta.status_code == 200: data = respuesta.json() return data[‘country’], data[‘city’], data[‘regionName’], data[‘lat’], data[‘lon’] else: return None if __name__ == «__main__»: main() Luego asignar los permisos necesarios al archivo: chmod 750 custom-viajero_imposible.py Por último, debemos generar la base de datos SQLite, guardarla en /var/ossec/var/db/ y asignarle permisos. Ejemplo de script para crear base de datos: #/var/ossec/framework/python/bin/python3 import sqlite3 # Connect to SQLite database (or create it if it doesn’t exist) conn = sqlite3.connect(‘/var/ossec/var/db/BD_CU_ViajeroImposible.db’) # Create a cursor object to interact with the database cursor = conn.cursor() # Create table with the specified columns cursor.execute(»’ CREATE TABLE IF NOT EXISTS logs ( timestamp TEXT, usuario TEXT, srcip TEXT, lat REAL, lon REAL, country TEXT, city TEXT, regionName TEXT ) »’) # Commit the changes and close the connection conn.commit() conn.close() print(«Table created successfully!») Inicialmente vamos a establecer una conexión VPN con el usuario: user1 desde la IP: 167.61.103.172. Se agrega el usuario a la base de datos: Luego vamos a establecer una conexión VPN con el mismo usuario pero desde otra IP, simulando otra ubicación y con un timestamp que sabemos que es imposible llegar a ese punto en el tiempo indicado. Resultado: Se actualizan los datos del usuario con los datos del nuevos evento: A continuación, vamos a mostrar el caso donde el usuario se conecte por VPN inicialmente desde un país y posteriormente establece conexión VPN desde otro país. Por ejemplo, viaja de la India a Nepal (101.251.4.0), el tiempo de recorrido entre un punto y el otro es de 1h 30 min aproximadamente. Algunas optimizaciones adicionales que se pueden implementar: El caso de uso del «Viajero Imposible» es una poderosa técnica de detección de anomalías que puede ayudar a prevenir accesos no autorizados en sistemas críticos. La integración de un script personalizado con Wazuh nos permite almacenar, comparar y analizar los inicios de sesión de manera avanzada, generando alertas cuando se detectan accesos sospechosos. Esta solución es flexible y escalable, lo que la hace aplicable a otros tipos de eventos, más allá de las conexiones VPN. Caso de uso: «Viajero Imposible» en Wazuh
¿Qué es el caso de uso «Viajero Imposible»?

Enfoque en la detección de VPN
Implementación en Wazuh
¿Por qué necesitamos un script personalizado?
Funcionalidades del script
Flujo general del script:
Base de datos

chown root:wazuh custom-viajero_imposible.py
Esta base de datos tiene que ser una tabla con los campos especificados anteriormente. Muestra
![]()
![]()

![]()


Posibles mejoras
Conclusión

