Request agent
This agent processes request and responses captured during the mobile dynamic testing.
oxo.yaml
kind: Agent
name: sample_request_agent # Agent name, must be unique by organisation to be published on the store.
version: 0.0.1 # Must respect semantic versioning.
description: Agent description. # Support for Markdown format.
in_selectors: # List of input selectors, this is basically the list of messages the agent should receive.
- v3.capture.http.request
- v3.capture.http.response
out_selectors:
- v3.report.vulnerability
docker_file_path : Dockerfile # Dockerfile path for automated releases.
docker_build_root : . # Docker build dir for automated release build.
agent.py
"""Sample Request agent implementation"""
import logging
from rich import logging as rich_logging
from ostorlab.agent import agent
from ostorlab.agent.message import message as m
from ostorlab.agent.mixins import agent_report_vulnerability_mixin as vuln_mixin
from ostorlab.agent.kb import kb
logging.basicConfig(
format="%(message)s",
datefmt="[%X]",
level="INFO",
force=True,
handlers=[rich_logging.RichHandler(rich_tracebacks=True)],
)
logger = logging.getLogger(__name__)
BLACKLISTED_HEADERS = ["header1", "header2"]
HTTP_RESPONSES_STORE_KEY = "responses_mapping"
class SampleRequestAgent(agent.Agent, vuln_mixin.AgentReportVulnMixin,):
"""Sample agent to process the link message."""
def process(self, message: m.Message) -> None:
"""TODO (author): add your description here.
Args:
message:
Returns:
"""
# TODO (author): implement agent logic here.
if message.selector.startswith("v3.capture.http.request"):
self._process_request(message)
logger.info("Request processed.")
if message.selector.startswith("v3.capture.http.response"):
self._process_request(message)
logger.info("Response processed.")
def _process_request(self, message: m.Message):
headers = message.data.get("headers", [])
host = message.data.get("host")
self._process_headers(host, headers)
def _process_response(self, message: m.Message):
headers = message.data.get("headers", [])
host = message.data.get("host")
self._process_headers(host, headers)
def _process_headers(self, host: str, headers: list[dict[str, str]]):
for header in headers:
if header["name"].decode() in BLACKLISTED_HEADERS and header["value"].decode().startswith("insecure"):
self.report_vulnerability(
entry=kb.Entry(
title="My Vulnerability Title",
risk_rating=vuln_mixin.RiskRating.HIGH.name,
short_description="Vulnerability Short description",
description="Vulnerability description",
recommendation="Vulnerability recommendation",
references={"title": "https://reference.com"},
cvss_v3_vector="",
),
technical_detail=f"The header {header['name']} found in the query for the host {host}.",
risk_rating=vuln_mixin.RiskRating.HIGH,
)
if __name__ == "__main__":
logger.info("starting agent ...")
SampleRequestAgent.main()