kind: Agent
name: sample_stack_trace_agent # Agent name, must be unique by organisation to be published on the store.
version: 0.0.1 # Must respect semantic versioning.
description: Agent description. # Support for Markdown format.
in_selectors: # List of input selectors, this is basically the list of messages the agent should receive.
- v3.capture.stack_trace
out_selectors:
- v3.report.vulnerability
docker_file_path : Dockerfile # Dockerfile path for automated releases.
docker_build_root : . # Docker build dir for automated release build.
"""Sample Stack Trace agent implementation"""
import logging
from rich import logging as rich_logging
from ostorlab.agent import agent
from ostorlab.agent.message import message as m
from ostorlab.agent.mixins import agent_report_vulnerability_mixin as vuln_mixin
from ostorlab.agent.kb import kb
logging.basicConfig(
format="%(message)s",
datefmt="[%X]",
level="INFO",
force=True,
handlers=[rich_logging.RichHandler(rich_tracebacks=True)],
)
logger = logging.getLogger(__name__)
class SampleStackTraceAgent(agent.Agent, vuln_mixin.AgentReportVulnMixin,):
"""Sample agent to process the link message."""
def process(self, message: m.Message) -> None:
"""TODO (author): add your description here.
Args:
message:
Returns:
"""
# TODO (author): implement agent logic here.
frames = message.data.get("frames", [])
self._process_frames(frames)
def _process_frames(self, frames: list[dict[str, str | int | list[dict [str, str | bytes]]]]):
for frame in frames:
if frame["package_name"] == "package_a" and frame["class_name"] == "class_name_b" and frame["function_name"] == "dangerous_method":
for arg in frame["args"]:
if arg["name"] == "mode" and arg["value"] == b"weak_value":
self.report_vulnerability(
entry=kb.Entry(
title="My Vulnerability Title",
risk_rating=vuln_mixin.RiskRating.HIGH.name,
short_description="Vulnerability Short description",
description="Vulnerability description",
recommendation="Vulnerability recommendation",
references={"title": "https://reference.com"},
cvss_v3_vector="",
),
technical_detail=f"The method {frame['package_name']}.{frame['class_name']}.{frame['function_name']} is called using the argument {arg['name']}:{arg['value'].decode()}.",
risk_rating=vuln_mixin.RiskRating.HIGH,
)
if __name__ == "__main__":
logger.info("starting agent ...")
SampleStackTraceAgent.main()