Skip links

Detecting Malicious file hashes with Wazuh and ThreatFox

Detecting Malicious file hashes with Wazuh and ThreatFox

16/05/2024

By Pablo Rizzo – SOC Architect, Datasec.

ThreatFox is a free platform from abuse.ch with the goal of sharing indicators of compromise (IOCs) associated with malware with the infosec community, AV vendors and threat intelligence providers. It is part of the same project that created URLHaus and Malware Bazaar. Integrating this API with Wazuh can help organizations improve their ability to detect threats.

In this article, we will use a Python script to integrate Wazuh manager with the Threatfox API. Here is a summary of the work done for this example:

  • Configuring Wazuh integrator component.
  • Use of Python script to communicate with Threatfox API.
  • Creating rules for alerting when a malicious hash is identified.

 

For this integration, we use the following assets:

  • Wazuh 4.7.2 with its FIM module
  • Threatfox API for checking file hashes: https://threatfox-api.abuse.ch/api/v1/
  • Agent Ubuntu 22.04 with Wazuh 4.7.2 agent to download a malware sample (for testing purposes) 

 

Use Case

For this example, we will use the FIM module to detect a file added to a download directory, and then perform on this hash through the Treatfox API whether it is malicious or not. 

We start configuring the FIM alert for the desired directory. The syscheck block of the Wazuh agent configuration file (ossec.conf):

<ossec_config>

  <syscheck>

  <disabled>no</disabled>

  <directories realtime=”yes” check_all=”yes”>/tmp/downloads</directories>

  </syscheck>

</ossec_config>

This will trigger the wazuh syscheck rule 550: Integrity checksum changed.

Check Wazuh-syschek for more information about this configuration.

 

Threatfox integration and configuration

To create a custom integration, the Wazuh manager configuration file ossec.conf has to be modified to add the integration block with the content below:

<integration>

  <name>custom-threatfox.py</name>

  <hook_url>https://threatfox-api.abuse.ch/api/v1/ </hook_url>

  <rule_id>550</rule_id>

  <alert_format>json</alert_format>

</integration>

 

The parameters used in the integration block are as follows:

  • name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
  • hook_url: This is the API URL provided by Threatfox.
  • rule_id: The ID of the Wazuh rule that will trigger this integration.
  • alert_format: Indicates the format in which the script receives the alerts.

 

Writing the integration script

On the Wazuh server, we proceed to create a file called custom-threatfox.py in the directory /var/ossec/integrations/. It is important to note that:

  • The first line of the integration script must indicate its interpreter or else Wazuh will not know how to read and execute the script.

#!/var/ossec/framework/python/bin/python3

  • The function request_threatfox_info() is the function that requests the hash data from Threatfox.
  • The function check_hash_field() could be used to set more rules with different hash fields.

 

The full script is below:

#!/var/ossec/framework/python/bin/python3

# Copyright (C) 2015-2024, Wazuh Inc.

# Custom script based on urlhaus integration

import json

import sys

import time

import os

from socket import socket, AF_UNIX, SOCK_DGRAM

try:

    import requests

    from requests.auth import HTTPBasicAuth

except Exception as e:

    print(“No module ‘requests’ found. Install: pip install requests”)

    sys.exit(1)

# Global vars

debug_enabled = True

pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

json_alert = {}

now = time.strftime(“%a %b %d %H:%M:%S %Z %Y”)

# Set paths

log_file = ‘{0}/logs/integrations.log’.format(pwd)

socket_addr = ‘{0}/queue/sockets/queue’.format(pwd)

def main(args):

    debug(“# Starting”)

    # Read args

    alert_file_location = args[1]

    debug(“# File location”)

    debug(alert_file_location)

    # Load alert. Parse JSON object.

    with open(alert_file_location) as alert_file:

        json_alert = json.load(alert_file)

    debug(“# Processing alert”)

    debug(json_alert)

    # Checking the rules for url fields in the alert

    hash_alert = check_hash_field(json_alert)

    

    # Request urlhaus info

    msg = request_threatfox_info(json_alert, hash_alert)

    # If positive match, send event to Wazuh Manager

    if msg:

        send_event(msg, json_alert[“agent”])

def debug(msg):

    if debug_enabled:

        msg = “{0}: {1}\n”.format(now, msg)

        print(msg)

        f = open(log_file,”a”)

        f.write(msg)

        f.close()

def collect(data):

  ioc = data[“data”][0][‘ioc’]

  threat_type = data[“data”][0][‘threat_type’]

  threat_type_desc = data[“data”][0][‘threat_type_desc’]

  ioc_type = data[“data”][0][‘ioc_type’]

  ioc_type_desc = data[“data”][0][‘ioc_type_desc’]

  malware = data[“data”][0][‘malware’]

  malware_printable = data[“data”][0][“malware_printable”]

  malware_alias = data[“data”][0][“malware_alias”]

  malware_malpedia = data[“data”][0][“malware_malpedia”]

  confidence_level = data[“data”][0][“confidence_level”]

  first_seen = data[“data”][0][“first_seen”]

  reporter = data[“data”][0][“reporter”]

  tags = data[“data”][0][“tags”]

  return  ioc, threat_type, threat_type_desc, ioc_type, ioc_type_desc, malware, malware_printable, malware_alias, \

        malware_malpedia, confidence_level, first_seen, reporter ,tags 

def in_database(data, url):

  result = data[‘query_status’]

  debug(result)

  if result == “ok”:

    return True

  return False

def query_api(hash):

  params = {‘query’: “search_hash”, “hash”: hash }

  headers = {‘Content-Type’: ‘application/x-www-form-urlencoded’,}

  response = requests.post(‘https://threatfox-api.abuse.ch/api/v1/’, headers=headers, json=params)

  json_response = response.json()

  if json_response[‘query_status’] == ‘ok’:

      data = json_response

      debug(data)

      return data

  else:

      alert_output = {}

      alert_output[“threatfox”] = {}

      alert_output[“integration”] = “custom-threatfox”

      json_response = response.json()

      debug(“# Error: The THREATFOX integration encountered an error”)

      alert_output[“threatfox”][“error”] = response.status_code

      

      try:

        alert_output[“threatfox”][“description”] = json_response[“errors”][0][“detail”]

      

      except KeyError:

          debug(“No results in Hash Checking”)

          sys.exit()

          

  alert_output[“urlhaus”][“error_description”] = json_response[‘query_status’]

  send_event(alert_output)

  exit(0)

def request_threatfox_info(alert, hash_alert):

    alert_output = {}

    

    # Request info using urlhaus API

    data = query_api(hash_alert)

    # Create alert

    alert_output[“threatfox”] = {}

    alert_output[“integration”] = “custom-threatfox”

    alert_output[“threatfox”][“found”] = 0

    alert_output[“threatfox”][“source”] = {}

    alert_output[“threatfox”][“source”][“alert_id”] = alert[“id”]

    alert_output[“threatfox”][“source”][“rule”] = alert[“rule”][“id”]

    alert_output[“threatfox”][“source”][“description”] = alert[“rule”][“description”]

    alert_output[“threatfox”][“source”][“hash”] = hash_alert

    hash = hash_alert

    

    # Check if threatfox has any info about the url

    if in_database(data, hash):

      alert_output[“threatfox”][“found”] = 1

    # Info about the url found in urlhaus

    if alert_output[“threatfox”][“found”] == 1:

        ioc, threat_type, threat_type_desc, ioc_type, ioc_type_desc, malware, malware_printable, malware_alias, \

        malware_malpedia, confidence_level, first_seen, reporter, tags = collect(data)

        # Populate JSON Output object with urlhaus request

        alert_output[“threatfox”][“ioc”] = ioc

        alert_output[“threatfox”][“threat_type”] = threat_type

        alert_output[“threatfox”][“threat_type_desc”] = threat_type_desc

        alert_output[“threatfox”][“ioc_type”] = ioc_type

        alert_output[“threatfox”][“ioc_type_desc”] = ioc_type_desc

        alert_output[“threatfox”][“malware”] = malware

        alert_output[“threatfox”][“malware_printable”] = malware_printable

        alert_output[“threatfox”][“malware_alias”] = malware_alias

        alert_output[“threatfox”][“malware_malpedia”] = malware_malpedia

        alert_output[“threatfox”][“confidence_level”] = confidence_level

        alert_output[“threatfox”][“first_seen”] = first_seen

        alert_output[“threatfox”][“reporter”] = reporter

        alert_output[“threatfox”][“tags”] = tags

    debug(alert_output)

    return(alert_output)

def send_event(msg, agent = None):

    if not agent or agent[“id”] == “000”:

        string = ‘1:threatfox:{0}’.format(json.dumps(msg))

    

    else:

        string = ‘1:[{0}] ({1}) {2}->threatfox:{3}’.format(agent[“id”], agent[“name”], agent[“ip”] if “ip” in agent else “any”, json.dumps(msg))

    debug(string)

    sock = socket(AF_UNIX, SOCK_DGRAM)

    sock.connect(socket_addr)

    sock.send(string.encode())

    sock.close()

def check_hash_field(json_alert):

    rule_id = json_alert[“rule”][“id”]

    

    # Rule File added to the system    

    if rule_id == “550”:

        hash_alert = json_alert[“syscheck”][“sha256_after”]

        return hash_alert

    

        

if __name__ == “__main__”:

    try:

        # Read arguments

        bad_arguments = False

        if len(sys.argv) >= 4:

            msg = ‘{0} {1} {2} {3} {4}’.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else ”)

            debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == ‘debug’)

        

        else:

            msg = ‘{0} Wrong arguments’.format(now)

            bad_arguments = True

        # Logging the call

        f = open(log_file, ‘a’)

        f.write(msg +’\n’)

        f.close()

        if bad_arguments:

            debug(“# Exiting: Bad arguments.”)

            sys.exit(1)

        # Main function

        main(sys.argv)

    except Exception as e:

        debug(str(e))

        raise

 

This script reads the alerts JSON file and extracts the file hash. Then, a request is made to the Threatfox API to check if the Hash that triggered the integration script has been flagged for malicious behavior.

Once the script has been created, the file owner and group are changed to root:wazuh, and execution permissions are given.

chmod 750 /var/ossec/integrations/custom-threatfox.py

chown root:wazuh /var/ossec/integrations/custom-threatfox.py

Proceed to restart the Wazuh manager to apply the changes:

 

  • For systemd-based Linux systems

systemctl restart wazuh-manager

  • For SysV init-based Linux systems

service wazuh-manager restart

  • For other Unix-based OS

/var/ossec/bin/wazuh-control restart

 

Creating the rules with Threatfox information

The response obtained from Threatfox can be used to create alerts for various use cases. For example, we can alert about a malicious file that downloads malware, and get more information about the threat type. We can create several custom rules in /var/ossec/etc/rules/local_rules.xml to do this, restarting the manager to make it operational:

<group name=”threatfox”>

<rule id=”100004″ level=”10″>

  <field name=”threatfox.reporter”>abuse_ch</field>

  <description>Threatfox: Malicious Hash Detected</description>

</rule>

</group>

 

Testing the integration

In the agent, copy a malware file sample with a sha-256 hash from malware bazaar and unzip the file with the key.

7z x /tmp/downloads/857fd5543f14e01ea3b08d3aca6ee6763042a48d7b04c9f035a4a37a4d2e0039.zip

 

Conclusion

In this article, we integrated Threatfox API with Wazuh to check file hashes associated with malicious activity. This integration allowed us to retrieve information from Threatfox about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity.