Write an agent
Learn how to write an OXO agent.
The purpose of this tutorial is to walk you through writing an OXO agent, in python, that can interact with others to perform complex security assessments tasks.
Before we start, we should go over a few important concepts that we will need later on :
Agent: An agent is at the core of oxo detection. Agents can perform a variety of tasks, from security scanning, file
analysis, subdomain enumeration and are able to communicate with others.
Message: Contain information exchanged between agents, like an IP address, a file blog or subdomain list.
Selector: Each agent defines its own in- & out-selectors, the role of selectors is to express the type of messages the
agent is expecting. Simply put, an agent listens to messages coming through its in-selectors & can emit back messages
through its out-selectors.
Without further ado, let’s start and write an agent that takes a file asset, scans it using the Virustotal API and sends back a technical report of the scan. This is a simple agent that only interacts with a few system agents with the sole purpose of covering the basics.
Main parts
OXO offers a template agent that you can use as a template to
bootstrap. The steps to use it are explained in the README.md
of the repo, but we will go over all the steps in the tutorial.
oxo.yaml: is an Agent Definition file that defines a set of attributes specific to the agent. These attributes will be used later on to define the agent behavior.
kind: Agent
name: virustotal
version: 0.0.1
description: Agent responsible for scanning files through the Virus Total public API.
in_selectors:
- v3.asset.file
out_selectors:
- v3.report.vulnerability
restart_policy: on-failure
args:
- name: "api_key"
type: "string"
description: "Virus Total public API Key."
value: "__ADD_API_KEY_HERE__"
docker_file_path : Dockerfile
docker_build_root : .
- kind: should be ‘Agent’ for this simplistic scenario, or AgentGroup when we run multiple agents at the same time.
- name of the agent: should be lowercase and only contains alphanumeric characters.
- version: should respect the semantic versioning convention.
in- & out-selector: Lists of the agent in & out selectors, explained above. - args : Arguments we want to pass to our agents, in this case it is the api key to the VirusTotal Database, for more information check here.
- docker_file_path: The path to the dockerfile that will assemble the docker image.
- docker_build_root: Docker build directory for automated release build.
For the full list of agent definition fields, check the following link.
requirements.txt : At the time of writing this tutorial, OXO SDK only supports python. The requirements.txt is specific to python and should have all the libraries needed by our agent. Ostorlab is a must.
Dockerfile: Steps to assemble our docker image, and finally the command to run the agent.
FROM python:3.8-alpine as base
FROM base as builder
RUN mkdir /install
WORKDIR /install
COPY requirement.txt /requirement.txt
RUN pip install --prefix=/install -r /requirement.txt
FROM base
COPY --from=builder /install /usr/local
RUN mkdir -p /app/agent
ENV PYTHONPATH=/app
COPY agent /app/agent
COPY oxo.yaml /app/agent/oxo.yaml
WORKDIR /app
CMD ["python3", "/app/agent/virus_total_agent.py"]
Agent Logic
As we can see from the template, all agents should inherit from the base ostorlab.agent.Agent
class The ostorlab.agent.Agent
base
class gives us access to several methods, the one we should care to implement is the process method, as the name shows,
it is the method responsible for receiving and processing the message. In our case it is where the actual scan through
the Virustotal Database will happen.
def process(self, message: msg.Message) -> None:
response = virustotal.scan_file_from_message(message, self.api_key)
try:
scans = virustotal.get_scans(response)
except virustotal.VirusTotalApiError:
logger.error('Virus Total API encountered some problems. Please try again.')
raise
technical_detail = process_scans.get_technical_details(scans)
risk_rating = process_scans.get_risk_rating(scans)
self.report_vulnerability(entry=kb.KB.VIRUSTOTAL_SCAN,
technical_detail=technical_detail,
risk_rating=risk_rating)
We are scanning our file with the help of the scan_file_from_message
, from a module we created called virustotal.
def scan_file_from_message(message: agent_message.Message, api_key: str) -> Dict:
file = message.data['content']
file_md5_hash = hashlib.md5(file)
hash_hexa = file_md5_hash.hexdigest()
virustotal_client = virus_total_apis.PublicApi(api_key)
response = virustotal_client.get_file_report(hash_hexa)
return response
The Virus Total API, takes the hexadecimal representation of the md5 hashed file, runs it through a list of antivirus,
and sends back a json response.
We then extract the output of the singular scans, and use it to find the respective risk rating and technical report.
In case the agent intends to report back a vulnerability, it should also inherit from the report_vuln_mixin
to get
access to the report_vuln
method that makes it easy to emit the vulnerability.
The report_vuln
takes the following arguments:
- Knowledge-base Entry : References a vulnerability, privacy issue, or informational entries with descriptions,
recommendations & extra references. For our case, we will be using the VIRUSTOTAL_SCAN entry.
OXO has its own Knowledge Base that you can use, however you can easily add yours by creating an instance of thekb.Entry
class.
from ostorlab.agent.kb import kb
kb_entry = kb.Entry(title='title',
short_description='short_description',
description='description',
recommendation = 'some recommendation',
references = {'title': 'link to reference'},
security_issue = True,
privacy_issue = False,
has_public_exploit = True,
targeted_by_malware = True,
targeted_by_ransomware = True,
targeted_by_nation_state = True)
- Risk Rating: One of the following categories to evaluate the risk. The enum is also part of the
agent_report_vulnerability_mixin
.
class RiskRating(enum.Enum):
HIGH = enum.auto()
MEDIUM = enum.auto()
LOW = enum.auto()
POTENTIALLY = enum.auto()
HARDENING = enum.auto()
SECURE = enum.auto()
IMPORTANT = enum.auto()
INFO = enum.auto()
for more information about this categorization, you can check the OXO doc.
- Technical detail: A technical report of the scan's result.
Testing
Now that we bootstrapped our agents, next step is of course, unit tests ;). For our example we will use pytest
,
as OXO ships with a pytest
plugin to ease testing setup.
Below is our test code that checks that our agents emits a vulnerability report when Virustotal return a valid detection report:
def testVirusTotalAgent_whenVirusTotalApiReturnsValidDetectionResponse_emitsVulnerabilityReport(mocker, agent_mock, virustotal_agent, message):
def virustotal_valid_response(message):
"""Method for mocking the Virus Total public API valid response."""
response = {
'results': {
'scans': {
'Bkav': {
'detected': False, 'version': '1.3.0.9899', 'result': None, 'update': '20220107'
},
'Elastic': {
'detected': True, 'version': '4.0.32', 'result': 'eicar', 'update': '20211223'
}
},
'scan_id': 'ID42',
'sha1': 'some_sha1',
'resource': 'some_ressource_id',
'response_code': 1,
},
'response_code': 200
}
return response
mocker.patch('virus_total_apis.PublicApi.get_file_report', side_effect=virustotal_valid_response)
try:
virustotal_agent.process(message)
except virustotal.VirusTotalApiError:
pytest.fail('Unexpected VirusTotalApiError because response is returned with status 200.')
assert len(agent_mock) == 1
assert agent_mock[0].selector == 'v3.report.vulnerability'
assert agent_mock[0].data['risk_rating'] == 'HIGH'
assert agent_mock[0].data['title'] == 'Virustotal malware analysis (MD5 based search)'
assert agent_mock[0].data['references'] == [{'title': 'Virustotal', 'url': 'https://www.virustotal.com/'}]
- mocker: a pytest fixture to help us mock the response of the Virustotal API request.
- agent_mock: an OXO fixture, part of the sdk, that you can use to mimic the emission and reception of messages.
- virustotal_agent: a fixture that generates an instance of the agent itself.
For full source code sample, check the public repo source code.
definition = agent_definitions.AgentDefinition(
name='agent_virustotal',
in_selectors=['v3.asset.file'],
out_selectors=['v3.report.vulnerability'],
args=[{'name': 'api_key',
'type': 'string',
'value': 'some_api_key',
'description': 'Api key for the virus total API.'}])
settings = runtime_definitions.AgentSettings(key='agent_virustotal_key',
bus_url='NA',
bus_exchange_topic='NA')
virustotal_agent = virus_total_agent.VirusTotalAgent(definition, settings)
The assertions prove that a message have been sent, with the v3.report.vulnerability
selector, had a HIGH
risk rating
and uses the correct KB
entry.
Building and Running
Now that we have the main parts of our agents ready, We can give it a try and start by building the agent:
oxo agent build -f oxo.yaml -o dev
oxo scan run --follow=agent/dev/virustotal --agent agent/dev/virustotal file -f malicious_file
We are passing the --follow
flag to stream the logs of the agents and flag any errors.
We can check the status of the scan with:
oxo scan list
Once it’s done. We can view the list of vulnerabilities and the access the technical report with the following commands:
oxo vulnz list -s <scan_id>
And finally:
oxo vulnz describe -v <vuln_id>
Publishing
The final step now is to publish the agent on the OXO Store. You can downloaded it later, or share it with others to use it.
Head to the OXO platform, Side Menu -> Library -> Agent publish
You should fill the following values:
- name: Name of your agent: lowercase and accepts - and _ only as special characters. eg:
virus_total
- git repository URL: The link to your repository
- YAML path of the agent definition, in this case :
oxo.yaml
Hit Publish and you are good to go.