Harden exec tool with safety guard
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from nanobot.agent.tools.base import Tool
|
||||
@@ -10,9 +12,35 @@ from nanobot.agent.tools.base import Tool
|
||||
class ExecTool(Tool):
|
||||
"""Tool to execute shell commands."""
|
||||
|
||||
def __init__(self, timeout: int = 60, working_dir: str | None = None):
|
||||
def __init__(
|
||||
self,
|
||||
timeout: int = 60,
|
||||
working_dir: str | None = None,
|
||||
deny_patterns: list[str] | None = None,
|
||||
allow_patterns: list[str] | None = None,
|
||||
restrict_to_working_dir: bool = False,
|
||||
):
|
||||
self.timeout = timeout
|
||||
self.working_dir = working_dir
|
||||
self.deny_patterns = deny_patterns or [
|
||||
r"\brm\s+-rf\b",
|
||||
r"\brm\s+-fr\b",
|
||||
r"\brm\s+-r\b",
|
||||
r"\bdel\s+/f\b",
|
||||
r"\bdel\s+/q\b",
|
||||
r"\brmdir\s+/s\b",
|
||||
r"\bformat\b",
|
||||
r"\bmkfs\b",
|
||||
r"\bdd\s+if=",
|
||||
r">\s*/dev/sd",
|
||||
r"\bdiskpart\b",
|
||||
r"\bshutdown\b",
|
||||
r"\breboot\b",
|
||||
r"\bpoweroff\b",
|
||||
r":\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\};\s*:",
|
||||
]
|
||||
self.allow_patterns = allow_patterns or []
|
||||
self.restrict_to_working_dir = restrict_to_working_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@@ -41,6 +69,9 @@ class ExecTool(Tool):
|
||||
|
||||
async def execute(self, command: str, working_dir: str | None = None, **kwargs: Any) -> str:
|
||||
cwd = working_dir or self.working_dir or os.getcwd()
|
||||
guard_error = self._guard_command(command, cwd)
|
||||
if guard_error:
|
||||
return guard_error
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_shell(
|
||||
@@ -83,3 +114,35 @@ class ExecTool(Tool):
|
||||
|
||||
except Exception as e:
|
||||
return f"Error executing command: {str(e)}"
|
||||
|
||||
def _guard_command(self, command: str, cwd: str) -> str | None:
|
||||
"""Best-effort safety guard for potentially destructive commands."""
|
||||
cmd = command.strip()
|
||||
lower = cmd.lower()
|
||||
|
||||
for pattern in self.deny_patterns:
|
||||
if re.search(pattern, lower):
|
||||
return "Error: Command blocked by safety guard (dangerous pattern detected)"
|
||||
|
||||
if self.allow_patterns:
|
||||
if not any(re.search(p, lower) for p in self.allow_patterns):
|
||||
return "Error: Command blocked by safety guard (not in allowlist)"
|
||||
|
||||
if self.restrict_to_working_dir:
|
||||
if "..\\" in cmd or "../" in cmd:
|
||||
return "Error: Command blocked by safety guard (path traversal detected)"
|
||||
|
||||
cwd_path = Path(cwd).resolve()
|
||||
|
||||
win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd)
|
||||
posix_paths = re.findall(r"/[^\\s\"']+", cmd)
|
||||
|
||||
for raw in win_paths + posix_paths:
|
||||
try:
|
||||
p = Path(raw).resolve()
|
||||
except Exception:
|
||||
continue
|
||||
if cwd_path not in p.parents and p != cwd_path:
|
||||
return "Error: Command blocked by safety guard (path outside working dir)"
|
||||
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user