Building the Assemblyline Analyzer for TheHive’s Cortex.

Building the Assemblyline Analyzer for TheHive’s Cortex.

  1. Deploying (and using) TheHive4 [Part 1]
  2. Building TheHive4 (4.0.5) and configuring MISP, Cortex and Webhooks.
  3. Building the Assemblyline Analyzer for TheHive’s Cortex.
  4. TheHive 4.1.0 Deployment and Integration with MISP

Static analysis for me has become more fun with the inclusion of Assemblyline into my arsenal. But the lack of integration between other elements of my FOSS SOC stack was concerning.

In this post I detail not only how to write a Cortex Analyzer, but also how to integrate with other appliances with that analyzer.

What is Cortex?

Cortex is an analysis backend to TheHive. Tasks for technical analysis and information gathering through TheHive are offloaded to Cortex. Cortex then subseqently executes the analyzer requested, and the results are returned to TheHive for association in an active case.

What is Assemblyline?

Assemblyline is a static analysis platform for malware inspection and decompiling. It is enriched by multiple services which are performed on the submitted file, but also on extracted objects in those files.

Analysis High-level Plan

The basic requirement for AssemblyLine Analyser is to take a file or url submitted to TheHive through Cortex, perform static analysis of that artefact with AssemblyLine. The results from that analysis will be brought back into TheHive via Cortex for inclusion within the Security Incident Response Platform.

Requirements Gathering

The use cases I will be catering for are the following scenarios:

  • A suspicious file needs to be analysed, for malicious or social engineering borne material.
  • A payload has been found in a dropper, or a suspicious file is hosted online, and the file needs to be analysed for malicious content.

Limitation: Assemblyline will only provide SHA256 values for files submitted through the platform (at present).

Divide and Conquer

Handling Input from Cortex

The input to a Cortex Analyzer takes the form of a JSON string which contains information pertinent to the IOC being analysed, and some configuration with respect to TLP, PAP, and proxy configuration.

{
    "data":"d41d8cd98f00b204e9800998ecf8427e",
    "dataType":"hash",
    "tlp":0,
    "config":{
        "host":"1234567890abcdef",
        "apikey":"1234567890abcdef",
        "max_tlp":3,
        "check_tlp":true,
        "service":"AnalyseFile"
        [..]
    },
    "proxy":{
        "http":"http://myproxy:8080",
        "https":"https://myproxy:8080"
      }
  }

The first thing we need to do is create the analysis task within AssemblyLine and reply with a status code, and potentially a relating IOC for the file (i.e. MD5, SHA1 or SHA256).

In circumstances where AssemblyLine has seen the submitted file before, you will likely get an analysis report if you were to follow-up with a RetrieveAnalysis request.

Expected Output from Analyzer

{
    "success":true,
    "artifacts":[..],
    "summary":{
        "taxonomies":[..]
    },
    "full":{..}
}

Proof of Concept

Now we can start piecing together the analyser from input to output. For this analyzser I will creating three service request types: AnalyseFile and RetrieveAnalysis

AnalyseFile

This function will take in a file, and then pass that file to AssemblyLine for static analysis.

Since analysis could take a while through Assemblyline, the results for this analysis may not be returned quickly. So the analysis task will sit in a for while condition until the analysis completes.

    def AnalyseFile(self):
        al_client = get_client(self.assemblyline_host, apikey=(self.assemblyline_user, self.assemblyline_key), verify=False)
        response = al_client.submit(path=self.filepath, fname=self.filename)
        return response

AnalyseURL

This function will take in a URL, and pass that URL to AssemblyLine for downloading of the binary and subsequent analysis.

Since analysis could take a while through Assemblyline, the results for this analysis may not be returned quickly. So the analysis task will sit in a for while condition until the analysis completes.

    def AnalyseURL(self):
        al_client = get_client(self.assemblyline_host, apikey=(self.assemblyline_user, self.assemblyline_key), verify=False)
        response = al_client.submit(url=self.url)
        return response

RetrieveAnalysis

This function will take in a SHA256 value, and then pass that value to the AssemblyLine API to retrieve associated results and analysis reports.

    def RetrieveAnalysis(self):
        al_client = get_client(self.assemblyline_host, apikey=(self.assemblyline_user, self.assemblyline_key), verify=False)
        report = al_client.file.result(sha256=self.hash)
        return report

Testing

When tested within PyCharm, the Assemblyline_Client python library correctly connects to the AssemblyLine service, submits the analysis, and returns the Job ID associated. When transposed into the Cortex Analyzer template structure, the execution returns the JSON from the AssemblyLine API and injects it into the result parameter.

Pulling it all together

There are several files required to build out this analyzer:

  • AssemblyLine.py
  • AssemblyLine_AnalyseFile.json
  • AssemblyLine_AnalyseURL.json
  • AssemblyLine_RetrieveAnalysis.json

AssemblyLine.py

#!/usr/bin/env python3
# encoding: utf-8
import os
import sys
import json

from assemblyline_client import get_client
from cortexutils.analyzer import Analyzer


class AssemblyLineAnalyzer(Analyzer):

    def __init__(self):
        Analyzer.__init__(self)
        self.service = self.get_param('config.service', None, 'Service parameter is missing')

        self.assemblyline_host = self.get_param('config.al_host', None, 'Missing Assemblyline Host')
        self.assemblyline_user = self.get_param('config.al_user', None, 'Missing Assemblyline User')
        self.assemblyline_key = self.get_param('config.al_key', None, 'Missing Assemblyline Key')

        self.polling_interval = self.get_param('config.polling_interval', 60)
        self.proxies = self.get_param('config.proxy', None)

    def run(self):
        if self.data_type == 'file':
            try:
                self.filepath = self.getParam('file', None, 'File is missing')
                self.filename = self.getParam('attachment.name', 'noname.ext')
                parsingResult = self.AnalyseFile()
                self.report(parsingResult)
            except Exception as e:
                self.unexpectedError(e)

        elif self.data_type == 'url':
            try:
                self.url = self.get_data()
                parsingResult = self.AnalyseURL()
                self.report(parsingResult)
            except Exception as e:
                self.unexpectedError(e)

        elif self.data_type == 'hash':
            try:
                self.hash = self.get_data()
                parsingResult = self.RetrieveAnalysis()
                self.report(parsingResult)
            except Exception as e:
                self.unexpectedError(e)

        else:
            self.notSupported()

    def summary(self, raw):
        taxonomies = []
        level = "info"
        namespace = "Assemblyline"

        if self.service == "AnalyseFile":
            predicate = "AnalyseFile"
        elif self.service == "AnalyseURL":
            predicate = "AnalyseURL"
        elif self.service == "RetrieveAnalysis":
            predicate = "RetrieveAnalysis"

        value = ''
        print(raw)

    def AnalyseFile(self):
        al_client = get_client(self.assemblyline_host, apikey=(self.assemblyline_user, self.assemblyline_key), verify=False)
        response = al_client.submit(path=self.filepath, fname=self.filename)
        return response

    def AnalyseURL(self):
        al_client = get_client(self.assemblyline_host, apikey=(self.assemblyline_user, self.assemblyline_key), verify=False)
        response = al_client.submit(url=self.url)
        return response

    def RetrieveAnalysis(self):
        al_client = get_client(self.assemblyline_host, apikey=(self.assemblyline_user, self.assemblyline_key), verify=False)
        report = al_client.file.result(sha256=self.hash)
        return report

if __name__ == '__main__':
    AssemblyLineAnalyzer().run()

AssemblyLine_AnalyseFile.json

{
  "name": "AssemblyLine_AnalyseFile",
  "version": "1.0",
  "author": "McHughSecurity",
  "url": "https://github.com/TheHive-Project/Cortex-Analyzers",
  "license": "AGPL-V3",
  "description": "Use AssemblyLine to perform static analysis of a file",
  "dataTypeList": ["file"],
  "command": "AssemblyLine/AssemblyLine.py",
  "baseConfig": "AssemblyLine",
  "config": {
    "service": "AnalyseFile"
  },
  "configurationItems": [
    {
      "name": "al_host",
      "description": "Host or IP for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "al_user",
      "description": "UserID key for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "al_key",
      "description": "API key for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "polling_interval",
      "description": "Define time interval between two requests attempts for the report",
      "type": "number",
      "multi": false,
      "required": false,
      "defaultValue": 60
    }
  ]
}

AssemblyLine_AnalyseURL.json

{
  "name": "AssemblyLine_AnalyseURL",
  "version": "1.0",
  "author": "McHughSecurity",
  "url": "https://github.com/TheHive-Project/Cortex-Analyzers",
  "license": "AGPL-V3",
  "description": "Use AssemblyLine to perform static analysis of an Internet hosted file",
  "dataTypeList": ["url"],
  "command": "AssemblyLine/AssemblyLine.py",
  "baseConfig": "AssemblyLine",
  "config": {
    "service": "AnalyseURL"
  },
  "configurationItems": [
    {
      "name": "al_host",
      "description": "Host or IP for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "al_user",
      "description": "UserID key for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "al_key",
      "description": "API key for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "polling_interval",
      "description": "Define time interval between two requests attempts for the report",
      "type": "number",
      "multi": false,
      "required": false,
      "defaultValue": 60
    }
  ]
}

AssemblyLine_RetrieveAnalysis.json

{
  "name": "AssemblyLine_RetrieveAnalysis",
  "version": "1.0",
  "author": "McHughSecurity",
  "url": "https://github.com/TheHive-Project/Cortex-Analyzers",
  "license": "AGPL-V3",
  "description": "Retrieve analysis reports from AssemblyLine for a file or hash",
  "dataTypeList": ["hash"],
  "command": "AssemblyLine/AssemblyLine.py",
  "baseConfig": "AssemblyLine",
  "config": {
    "service": "RetrieveAnalysis"
  },
  "configurationItems": [
    {
      "name": "al_host",
      "description": "Host or IP for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "al_user",
      "description": "UserID key for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "al_key",
      "description": "API key for AssemblyLine",
      "type": "string",
      "multi": false,
      "required": true
    },
    {
      "name": "polling_interval",
      "description": "Define time interval between two requests attempts for the report",
      "type": "number",
      "multi": false,
      "required": false,
      "defaultValue": 60
    }
  ]
}

The Final Product

The final product of the analyzer submits an analysis to AssemblyLine, with the subsequent analysis being retrieved from the server when RetrieveAnalysis is called.

The analyzer is currently sitting within the TheHive-Project GitHub as a pull request. Which you are welcome to explore and test.

https://github.com/TheHive-Project/Cortex-Analyzers/pull/961

Leave a Reply

1 × 3 =