
Detecting Malicious file hashes with Wazuh and ThreatFox
ThreatFox is a free platform from abuse.ch with the goal of sharing indicators of compromise (IOCs) associated with malware with the infosec community, AV vendors and threat intelligence providers. It is part of the same project that created URLHaus and Malware Bazaar. Integrating this API with Wazuh can help organizations improve their ability to detect threats. In this article, we will use a Python script to integrate Wazuh manager with the Threatfox API. Here is a summary of the work done for this example: For this integration, we use the following assets: Use Case For this example, we will use the FIM module to detect a file added to a download directory, and then perform on this hash through the Treatfox API whether it is malicious or not. We start configuring the FIM alert for the desired directory. The syscheck block of the Wazuh agent configuration file (ossec.conf): <syscheck> <disabled>no</disabled> <directories realtime=”yes” check_all=”yes”>/tmp/downloads</directories> </syscheck> </ossec_config> This will trigger the wazuh syscheck rule 550: Integrity checksum changed. Check Wazuh-syschek for more information about this configuration. Threatfox integration and configuration To create a custom integration, the Wazuh manager configuration file ossec.conf has to be modified to add the integration block with the content below: <name>custom-threatfox.py</name> <hook_url>https://threatfox-api.abuse.ch/api/v1/ </hook_url> <api_key>API_KEY</api_key> <rule_id>550</rule_id> <alert_format>json</alert_format> </integration> The parameters used in the integration block are as follows: Writing the integration script On the Wazuh server, we proceed to create a file called custom-threatfox.py in the directory /var/ossec/integrations/. It is important to note that: #!/var/ossec/framework/python/bin/python3 The full script is below: # Custom script based on urlhaus integration, PRC import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM try: import requests from requests.auth import HTTPBasicAuth except Exception as e: print(“No module ‘requests’ found. Install: pip install requests”) sys.exit(1) # Global vars debug_enabled = True pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) json_alert = {} now = time.strftime(“%a %b %d %H:%M:%S %Z %Y”) # Set paths log_file = ‘{0}/logs/integrations.log’.format(pwd) socket_addr = ‘{0}/queue/sockets/queue’.format(pwd) def main(args): debug(“# Starting”) # Read args alert_file_location = args[1] apikey = args[2] debug(“# File location”) debug(alert_file_location) # Load alert. Parse JSON object. with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug(“# Processing alert”) debug(json_alert) # Checking the rules for hash fields in the alert hash_alert = check_hash_field(json_alert) # Request threatfox info msg = request_threatfox_info(json_alert, hash_alert, apikey) # If positive match, send event to Wazuh Manager if msg: send_event(msg, json_alert[“agent”]) def debug(msg): if debug_enabled: msg = “{0}: {1}\n”.format(now, msg) print(msg) f = open(log_file,”a”) f.write(msg) f.close() def collect(data): ioc = data[“data”][0][‘ioc’] threat_type = data[“data”][0][‘threat_type’] threat_type_desc = data[“data”][0][‘threat_type_desc’] ioc_type = data[“data”][0][‘ioc_type’] ioc_type_desc = data[“data”][0][‘ioc_type_desc’] malware = data[“data”][0][‘malware’] malware_printable = data[“data”][0][“malware_printable”] malware_alias = data[“data”][0][“malware_alias”] malware_malpedia = data[“data”][0][“malware_malpedia”] confidence_level = data[“data”][0][“confidence_level”] first_seen = data[“data”][0][“first_seen”] reporter = data[“data”][0][“reporter”] tags = data[“data”][0][“tags”] return ioc, threat_type, threat_type_desc, ioc_type, ioc_type_desc, malware, malware_printable, malware_alias, \ malware_malpedia, confidence_level, first_seen, reporter ,tags def in_database(data, url): result = data[‘query_status’] debug(result) if result == “ok”: return True return False def query_api(hash, apikey): params = {‘query’: “search_hash”, “hash”: hash } headers = {‘Content-Type’: ‘application/x-www-form-urlencoded’, ‘API-KEY’: apikey } response = requests.post(‘https://threatfox-api.abuse.ch/api/v1/’, headers=headers, json=params) json_response = response.json() if json_response[‘query_status’] == ‘ok’: data = json_response debug(data) return data else: alert_output = {} alert_output[“threatfox”] = {} alert_output[“integration”] = “custom-threatfox” json_response = response.json() debug(“# Error: The THREATFOX integration encountered an error”) alert_output[“threatfox”][“error”] = response.status_code try: alert_output[“threatfox”][“description”] = json_response[“errors”][0][“detail”] except KeyError: debug(“No results in Hash Checking”) sys.exit() alert_output[“threatfox”][“error_description”] = json_response[‘query_status’] send_event(alert_output) exit(0) def request_threatfox_info(alert, hash_alert, apikey): alert_output = {} # Request info using threatfox API data = query_api(hash_alert, apikey) # Create alert alert_output[“threatfox”] = {} alert_output[“integration”] = “custom-threatfox” alert_output[“threatfox”][“found”] = 0 alert_output[“threatfox”][“source”] = {} alert_output[“threatfox”][“source”][“alert_id”] = alert[“id”] alert_output[“threatfox”][“source”][“rule”] = alert[“rule”][“id”] alert_output[“threatfox”][“source”][“description”] = alert[“rule”][“description”] alert_output[“threatfox”][“source”][“hash”] = hash_alert hash = hash_alert # Check if threatfox has any info about the ioc if in_database(data, hash): alert_output[“threatfox”][“found”] = 1 # Info about the url found in threatfox if alert_output[“threatfox”][“found”] == 1: ioc, threat_type, threat_type_desc, ioc_type, ioc_type_desc, malware, malware_printable, malware_alias, \ malware_malpedia, confidence_level, first_seen, reporter, tags = collect(data) # Populate JSON Output object with threatfox request alert_output[“threatfox”][“ioc”] = ioc alert_output[“threatfox”][“threat_type”] = threat_type alert_output[“threatfox”][“threat_type_desc”] = threat_type_desc alert_output[“threatfox”][“ioc_type”] = ioc_type alert_output[“threatfox”][“ioc_type_desc”] = ioc_type_desc alert_output[“threatfox”][“malware”] = malware alert_output[“threatfox”][“malware_printable”] = malware_printable alert_output[“threatfox”][“malware_alias”] = malware_alias alert_output[“threatfox”][“malware_malpedia”] = malware_malpedia alert_output[“threatfox”][“confidence_level”] = confidence_level alert_output[“threatfox”][“first_seen”] = first_seen alert_output[“threatfox”][“reporter”] = reporter alert_output[“threatfox”][“tags”] = tags debug(alert_output) return(alert_output) def send_event(msg, agent = None): if not agent or agent[“id”] == “000”: string = ‘1:threatfox:{0}’.format(json.dumps(msg)) else: string = ‘1:[{0}] ({1}) {2}->threatfox:{3}’.format(agent[“id”], agent[“name”], agent[“ip”] if “ip” in agent else “any”, json.dumps(msg)) debug(string) sock = socket(AF_UNIX, SOCK_DGRAM) sock.connect(socket_addr) sock.send(string.encode()) sock.close() def check_hash_field(json_alert): rule_id = json_alert[“rule”][“id”] # Rule File added to the system if rule_id == “554” or rule_id == “550” or rule_id == “553”: hash_alert = json_alert[“syscheck”][“sha256_after”] return hash_alert if __name__ == “__main__”: try: # Read arguments bad_arguments = False if len(sys.argv) >= 4: msg = ‘{0} {1} {2} {3} {4}’.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else ”) debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == ‘debug’) else: msg = ‘{0} Wrong arguments’.format(now) bad_arguments = True # Logging the call f = open(log_file, ‘a’) f.write(msg +’\n’) f.close() if bad_arguments: debug(“# Exiting: Bad arguments.”) sys.exit(1) # Main function main(sys.argv) except Exception as e: debug(str(e)) raise This script reads the alerts JSON file and extracts the file hash. Then, a request is made to the Threatfox API to check if the Hash that triggered the integration script has been flagged for malicious behavior. Once the script has been created, the file owner and group are changed to root:wazuh, and execution permissions are given. chmod 750 /var/ossec/integrations/custom-threatfox.py chown root:wazuh /var/ossec/integrations/custom-threatfox.py Proceed to restart the Wazuh manager to apply the changes: systemctl restart wazuh-manager service wazuh-manager restart /var/ossec/bin/wazuh-control restart Creating the rules with Threatfox information The response obtained from Threatfox can be used to create alerts for various use cases. For example, we can alert about a malicious file that downloads malware, and get more information about the threat type. We can create several custom rules in /var/ossec/etc/rules/local_rules.xml to do this, restarting the manager to make it operational: <rule id=”100004″ level=”10″> <field name=”threatfox.reporter”>abuse_ch</field> <description>Threatfox: Malicious Hash Detected</description> </rule> </group> Testing the integration In the agent, copy a malware file sample with a sha-256 hash from malware bazaar and unzip the file with the key. 7z x /tmp/downloads/857fd5543f14e01ea3b08d3aca6ee6763042a48d7b04c9f035a4a37a4d2e0039.zip Conclusion In this article, we integrated Threatfox API with Wazuh to check file hashes associated with malicious activity. This integration allowed us to retrieve information from Threatfox about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity. By Pablo Rizzo – SOC Architect, Datasec. Detecting Malicious file hashes with Wazuh and ThreatFox
<ossec_config> <integration> #!/var/ossec/framework/python/bin/python3 <group name=”threatfox”>

