Skip links

“Imposible traveler” detection with Wazuh

“Imposible traveler” detection with Wazuh

What is the “Impossible Traveler” use case?

The concept of “Impossible Traveler” is an anomaly detection technique based on the geographic location of a user’s actions. It is so called because it refers to situations in which a user apparently accesses a system from two or more distant locations in a very short period of time, which would be basically impossible in the real world. This type of behavior may indicate compromised or unauthorized access to an account.

Example:

A typical example could be if a user, “User1”, logs in from Buenos Aires at 10:00 AM and, about 20 minutes later, logs in from London. Unless “User1” can travel instantaneously between these two points, this event is suspicious and could be because a malicious actor has taken control of the user’s credentials.

Approximately how long does it take a person to travel from Buenos Aires to London?

Therefore, if we previously detect User1’s VPN access from Buenos Aires, it is impossible for him to be able to access VPN from London 20 minutes later, since at a minimum, the difference between one event and the other should be no less from 2:55 p.m. (approx.). So, if 20 minutes later we detect that you connected via VPN from London, we raise an impossible traveler alert.

Objective of this solution

The goal of this implementation is to provide a custom solution that enables “Impossible Traveler” detection within Wazuh. Wazuh, although a powerful security and monitoring platform, does not have the native ability to perform correlations based on the geographic location of users over time. Therefore, the use of a Python script is proposed that extracts information about connection events (as in the case of VPN accesses), consults geolocation services to determine the geographical location of the IP, and performs temporal and spatial comparisons to identify suspicious access.

This approach provides the flexibility to store and compare user login events, allowing system administrators to be alerted when an “Impossible Traveler” is detected, that is, when a user’s actions appear physically infeasible.

Focus on VPN detection

In this post, we will focus on detecting “impossible travelers” on VPN connections. However, the approach can be scaled to detect this type of behavior for any type of login or action that is tied to a geographic location.

Implementation in Wazuh

Wazuh is an open source platform for threat monitoring and detection, offering great flexibility through custom integrations. Although its basic functionality does not include advanced detection based on the geographical location of events, we can extend its capabilities using custom scripts and external APIs.

To implement the “Impossible Traveler” use case, we will develop a Python script that will take care of:

–  Collect information about connection events (in this case, VPN access).

–  Query a geolocation API to determine the physical location of the IP address associated with each event.

–  Store this information in a local database.

–  Compare this data with previous events from the same user.

–  Determine if the distance and time between two accesses are compatible with a realistic displacement.

–  Generate alerts if a physically impossible trip is detected.

Why do we need a custom script?

Although Wazuh has many functionalities, it does not include a built-in ability to correlate multiple events and store location data on the platform for later comparison. Therefore, we implemented a Python script to handle this task efficiently. This approach allows us to:

  1. Store and compare user logins.
  2. Calculate the distance between connection points and determine if it is physically possible for a user to have traveled from one point to another in the available time.
  3. Generate an alert if travel appears impossible, which could indicate a compromised or malicious login.

Script Features

General script flow:

  1. Extract VPN connection event data: When a VPN connection event occurs in Wazuh, the script is triggered and extracts key data such as source IP, user and timestamp.
  2. Query Geolocation API: The script uses the IP-API to obtain the geographic coordinates (latitude and longitude) associated with the IP address.
  3. Database storage: VPN connection information is stored in an SQLite database. This database will contain relevant information, such as the timestamp, user, source IP, latitude and longitude of the location.
  4. Comparison with previous events: For each new connection event, the script checks if a previous event for the same user and client already exists in the database. If it exists, calculate the geographic distance between the two locations and compare whether the user could have traveled between them in the available time.
  5. Impossible trip detection: If the trip seems impossible (the distance between the two locations is greater than what would be possible to travel in the available time), an alert is generated in Wazuh.
  6. Connection detection from a different country than the previous one: If the user establishes a connection from one country and then from another, even within reasonable times, an alert is raised.
  7. Database update: Regardless of whether the trip was possible or not, the database is updated with the information of the new connection event.

 

Database

The database used is a SQLite3 that will store the following information for each registered VPN connection:

  • timestamp: Time and date of the event.
  • user: Name of the user making the connection.
  • srcip: Source IP of the event.
  • lat: Latitude of the event location.
  • lon: Length of the event location.
  • country: Country where the event originated.
  • city: City where the event originated.
  • regionName: Region where the event originated.

Script flowchart

Wazuh configurations

<group name=»vpn_imposible_traveler»>

<rule id=»555555″ level=»0″>

<location>Imposible_traveler_VPN</location>

<description>Group de reglas viajero imposible</description>

</rule>

 

<!– Viajero imposible –>

<rule id=»555556″ level=»14″>

<if_sid>555555</if_sid>

<match>»Event ID»: «1»</match>

<description>Viajero Imposible VPN</description>

</rule>

 

<!– Conexión previa desde un pais distinto al del nuevo evento –>

<rule id=»555557″ level=»3″>

<if_sid>555555</if_sid>

<match>»Event ID»: «2»</match>

<description>El usuario establecio una conexion VPN desde un país distinto al anterior</description>

</rule>

 

</group>

Configuration inside ossec.conf:

<integration>

<name>custom-viajero_imposible</name>

<rule_id>ID’s-VPN</rule_id>

<alert_format>json</alert_format>

</integration>

Once we have this, we need to paste the following script into /var/ossec/integrations.

SCRIPT:

#!/var/ossec/framework/python/bin/python3

import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
from datetime import datetime
from geopy.distance import geodesic
import requests
import sqlite3

# Global vars
SOCKET_ADDR = ‘/var/ossec/queue/sockets/queue’
BD = ‘/var/ossec/var/db/DB_Imposible_traveller.db’

def main():
try:
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(SOCKET_ADDR)

# Read alerts.json
alert_file = open(sys.argv[1])
alert_json = json.loads(alert_file.read())
alert_file.close()

alert_timestamp = alert_json[‘timestamp’]

#Posibles srcip
posibles_srcip = [‘srcip’, ‘src_ip’, ‘remip’]
for key in alert_json[‘data’]:
if key in posibles_srcip:
alert_srcip = alert_json[‘data’][key]
break

alert_user = alert_json[‘data’][‘dstuser’]

# IP-API info
new_event = query_api(alert_srcip)
coords_new_event = (new_event[3], new_event[4])

# Read database
# Connect to SQLite database
conn = sqlite3.connect(BD)

# Create a cursor object
cursor = conn.cursor()

cursor.execute(”’
SELECT * FROM logs WHERE user = ?
”’
, (alert_user,))

user_in_db = cursor.fetchone()

# If user exist in db, check if it possible to travel from point A to B
if user_in_db:
coords_existent_event = (user_in_db[3], user_in_db[4])

if not is_possible_travell(user_in_db[0], alert_timestamp, coords_existent_event, coords_new_event):

# Wazuh alert log
msg = {
“Event”: “The user established a VPN connection from point A and then from point B in a physically impossible time”,
“User”: alert_user,
“Timestamp first VPN connection”: user_in_db[0],
“Country first VPN connection”: user_in_db[5],
“City first VPN connection”: user_in_db[6],
“Region first VPN connection”: user_in_db[7],
“IP first VPN connection “: user_in_db[2],
“Latitud first VPN connection”: user_in_db[3],
“Longitud first VPN connection”: user_in_db[4],
“Timestamp new VPN connection”: alert_timestamp,
“Country new VPN connection”: new_event[0],
“City new VPN connection”: new_event[1],
“Region new VPN connection”: new_event[2],
“IP new VPN connection”: alert_srcip,
“Latitud new VPN connection”: new_event[3],
“Longitud new VPN connection”: new_event[4],
“Event ID”: “1”
}

msg = json.dumps(msg, ensure_ascii=False, indent=4)

send_to_wazuh(sock, msg)

# Case in wich user connects from one country and then from another country but in range of time acceptable, it is interesting to generate an alert and analyze it
elif user_in_db[5] != new_event[0]:

# Wazuh alert log
msg = {
“Event”: “The user established a VPN connection from one country and then from another different country in reasonable times, validate with client”,
“User”: alert_user,
“Timestamp first VPN connection”: user_in_db[0],
“Country first VPN connection”: user_in_db[5],
“City first VPN connection”: user_in_db[6],
“Region first VPN connection”: user_in_db[7],
“IP first VPN connection”: user_in_db[2],
“Latitud first VPN connection”: user_in_db[3],
“Longitud first VPN connection”: user_in_db[4],
“Timestamp new VPN connection”: alert_timestamp,
“County new VPN connection”: new_event[0],
“City new VPN connection”: new_event[1],
“Region new VPN connection”: new_event[2],
“IP new VPN connection”: alert_srcip,
“Latitud new VPN connection”: new_event[3],
“Longitud new VPN connection”: new_event[4],
“Event ID”: “2”
}

msg = json.dumps(msg, ensure_ascii=False, indent=4)

send_to_wazuh(sock, msg)

# Finally, regardless of whether it is possible to travel or not, update the data with the new event.
cursor.execute(”’
UPDATE logs
SET timestamp = ?, srcip = ?, lat = ?, lon = ?, country = ?, city = ?, regionName = ?
WHERE user = ?
”’
, (alert_timestamp, alert_srcip, new_event[3], new_event[4], new_event[0], new_event[1], new_event[2], alert_user))

with open(‘/var/ossec/logs/integrations.log’, “a”) as file:
file.write(f’User {alert_user} updated’)

else:

cursor.execute(”’
INSERT INTO logs (timestamp, user, srcip, country, city, regionName, lat, lon)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
”’
, (alert_timestamp, alert_user, alert_srcip, new_event[0], new_event[1], new_event[2], new_event[3], new_event[4]))

with open(‘/var/ossec/logs/integrations.log’, “a”) as file:
file.write(f’User {alert_user} added to database’)

# Save the changes made to the database and close the connection
conn.commit()
conn.close()

except Exception as e:
with open(‘/var/ossec/logs/integrations.log’, “a”) as file:
file.write(str(e))
finally:
sock.close()

def send_to_wazuh(sock, msg):
string = f’1:Imposible_traveler_VPN:{msg}
sock.send(string.encode())

with open(‘/var/ossec/logs/integrations.log’, “a”) as file:
file.write(f’Alerta enviada a Wazuh {string})

# Function to calculate if it is possible to travel between two places with the given time
def is_possible_travell(tiempo_a, tiempo_b, coords_a, coords_b):
# Calculate distance between points A and B using geopy (geodesic)
distance = geodesic(coords_a, coords_b).kilometers

# Calculate the time difference in hours
fmt = “%Y-%m-%dT%H:%M:%S.%f%z” # Date and time format
time_a_dt = datetime.strptime(tiempo_a, fmt)
time_b_dt = datetime.strptime(tiempo_b, fmt)
delta_time = abs((time_b_dt – time_a_dt).total_seconds() / 3600) # Convert hours to seconds

velocity = 800 # Average airplane speed in km/h

# Calculate the time required to travel the distance at the given speed
time_necessary = distance / velocity

# Check if the trip is possible
return delta_time >= time_necessary

def query_api(ip):
url = f’http://ip-api.com/json/{ip}?fields=country,regionName,city,lat,lon,query’
response = requests.get(url)

if response.status_code == 200:
data = response.json()
return data[‘country’], data[‘city’], data[‘regionName’], data[‘lat’], data[‘lon’]
else:
return None

if __name__ == “__main__”:
main()

Then assign the necessary permissions to the file:

chown root:wazuh custom-Imposible_traveller.py

chmod 750 custom-Imposible_traveller.py

Finally, we must generate the SQLite database, save it in /var/ossec/var/db/ and assign permissions to it.

This database has to be a table with the fields specified above.

Example script to create database:

#/var/ossec/framework/python/bin/python3
import sqlite3
# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect('/var/ossec/var/db/DB_Imposible_traveller.db')
# Create a cursor object to interact with the database
cursor = conn.cursor()
# Create table with the specified columns
cursor.execute('''
CREATE TABLE IF NOT EXISTS logs (
    timestamp TEXT,
    usuario TEXT,
    srcip TEXT,
    lat REAL,
    lon REAL,
    country TEXT,
    city TEXT,
    regionName TEXT
)
''')
# Commit the changes and close the connection
conn.commit()
conn.close()
print("Table created successfully!")

Sample

Initially, we will establish a VPN connection with the user: user1 from the IP: 167.61.103.172.

The user will be added to the database:

Then, we will establish a VPN connection with the same user, user1, but from a different IP address, simulating another location, and using a timestamp that makes it impossible to reach that point in the indicated time.

Result:

The user data is updated with the new event data:

Possible Improvements
Some additional optimizations that can be implemented include:

  • Monitoring of Repetitive Locations: If a user consistently connects from the same city and suddenly logs in from a completely different location, even with sufficient time, it would be prudent to raise an alert for possible anomalous behavior.
  • Geographic Margin of Error: Since geolocation APIs are not 100% accurate, it is advisable to establish a margin of error of approximately 300 km for events within the same country, which would help reduce the rate of false positives.
  • Means of Transport: For this example, the average speed of an airplane was used. To enhance accuracy, the speeds of different modes of transport could be calculated to determine whether travel between locations is feasible.

Conclusion
The “Impossible Traveler” use case is a powerful anomaly detection technique that can help prevent unauthorized access to critical systems. The integration of a custom script with Wazuh allows us to store, compare, and analyze logins in an advanced manner, generating alerts when suspicious access is detected. This solution is flexible and scalable, making it applicable to other types of events beyond VPN connections.

Need help?

Datasec is a company dedicated exclusively to providing services, training, and software in the field of information security and cybersecurity. With more than 30 years of uninterrupted history, we have developed a unique combination of knowledge, software and experience.

Website: http://datasec-soft.com/en/

Contact Us: http://datasec-soft.com/en/contact-us/