Request agent

This agent processes request and responses captured during the mobile dynamic testing.


oxo.yaml

kind: Agent
name: sample_request_agent # Agent name, must be unique by organisation to be published on the store.
version: 0.0.1 # Must respect semantic versioning.
description: Agent description. # Support for Markdown format.
in_selectors: # List of input selectors, this is basically the list of messages the agent should receive.
  - v3.capture.http.request
  - v3.capture.http.response
out_selectors:
  - v3.report.vulnerability
docker_file_path : Dockerfile # Dockerfile path for automated releases.
docker_build_root : . # Docker build dir for automated release build.

agent.py

"""Sample Request agent implementation"""

import logging
from rich import logging as rich_logging

from ostorlab.agent import agent
from ostorlab.agent.message import message as m
from ostorlab.agent.mixins import agent_report_vulnerability_mixin as vuln_mixin
from ostorlab.agent.kb import kb

logging.basicConfig(
    format="%(message)s",
    datefmt="[%X]",
    level="INFO",
    force=True,
    handlers=[rich_logging.RichHandler(rich_tracebacks=True)],
)
logger = logging.getLogger(__name__)

BLACKLISTED_HEADERS = ["header1", "header2"]
HTTP_RESPONSES_STORE_KEY = "responses_mapping"


class SampleRequestAgent(agent.Agent, vuln_mixin.AgentReportVulnMixin,):
    """Sample agent to process the link message."""
    def process(self, message: m.Message) -> None:
        """TODO (author): add your description here.

        Args:
            message:

        Returns:

        """
        # TODO (author): implement agent logic here.
        if message.selector.startswith("v3.capture.http.request"):
            self._process_request(message)
            logger.info("Request processed.")
        if message.selector.startswith("v3.capture.http.response"):
            self._process_request(message)
            logger.info("Response processed.")

    def _process_request(self, message: m.Message):
        headers = message.data.get("headers", [])
        host = message.data.get("host")
        self._process_headers(host, headers)

    def _process_response(self, message: m.Message):
        headers = message.data.get("headers", [])
        host = message.data.get("host")
        self._process_headers(host, headers)

    def _process_headers(self, host: str, headers: list[dict[str, str]]):
        for header in headers:
            if header["name"].decode() in BLACKLISTED_HEADERS and header["value"].decode().startswith("insecure"):
                self.report_vulnerability(
                    entry=kb.Entry(
                        title="My Vulnerability Title",
                        risk_rating=vuln_mixin.RiskRating.HIGH.name,
                        short_description="Vulnerability Short description",
                        description="Vulnerability description",
                        recommendation="Vulnerability recommendation",
                        references={"title": "https://reference.com"},
                        cvss_v3_vector="",
                    ),
                    technical_detail=f"The header {header['name']} found in the query for the host {host}.",
                    risk_rating=vuln_mixin.RiskRating.HIGH,
                )


if __name__ == "__main__":
    logger.info("starting agent ...")
    SampleRequestAgent.main()