Initial upload of ai-code-audit-cli project
Some checks failed
Some checks failed
This commit is contained in:
184
src/scanners/bandit_scanner.py
Normal file
184
src/scanners/bandit_scanner.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Bandit security scanner for Python code."""
|
||||
|
||||
import io
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import bandit
|
||||
from bandit.core import manager, config
|
||||
from bandit.core.node_visitor import BanditNodeVisitor
|
||||
from bandit.core.test_set import BanditTestSet
|
||||
|
||||
from ..core.models import Issue, IssueCategory, SeverityLevel
|
||||
|
||||
|
||||
class BanditScanner:
|
||||
"""Scanner for security vulnerabilities using Bandit."""
|
||||
|
||||
def __init__(self, config_path: Optional[str] = None):
|
||||
"""Initialize the Bandit scanner."""
|
||||
self.config_path = config_path
|
||||
|
||||
def scan_file(self, file_path: str) -> list[Issue]:
|
||||
"""Scan a single file for security issues."""
|
||||
try:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
return []
|
||||
|
||||
content = path.read_text(encoding="utf-8")
|
||||
return self.scan_content(content, file_path)
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def scan_content(self, content: str, file_path: str) -> list[Issue]:
|
||||
"""Scan code content for security issues."""
|
||||
issues = []
|
||||
|
||||
try:
|
||||
b_config = config.BanditConfig()
|
||||
if self.config_path:
|
||||
b_config = config.BanditConfig(path=self.config_path)
|
||||
|
||||
test_set = BanditTestSet(config=b_config)
|
||||
b_manager = manager.BanditManager(b_config, "file")
|
||||
|
||||
source = io.StringIO(content)
|
||||
b_manager.discover_files([source], [""])
|
||||
|
||||
for node in b_manager.nodes:
|
||||
issue = self._bandit_result_to_issue(node, file_path)
|
||||
if issue:
|
||||
issues.append(issue)
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return issues
|
||||
|
||||
def _bandit_result_to_issue(self, node: BanditNodeVisitor, file_path: str) -> Optional[Issue]:
|
||||
"""Convert a Bandit result to an Issue."""
|
||||
try:
|
||||
if not hasattr(node, "issue") or not node.issue:
|
||||
return None
|
||||
|
||||
issue = node.issue
|
||||
|
||||
severity = self._map_severity(issue.severity)
|
||||
category = IssueCategory.SECURITY
|
||||
|
||||
message = str(issue.text)
|
||||
|
||||
suggestion = None
|
||||
if issue.confidence == "HIGH":
|
||||
suggestion = self._get_suggestion(issue.test_id)
|
||||
|
||||
return Issue(
|
||||
severity=severity,
|
||||
category=category,
|
||||
file_path=file_path,
|
||||
line_number=node.lineno if hasattr(node, "lineno") else 1,
|
||||
message=message,
|
||||
suggestion=suggestion,
|
||||
scanner_name="bandit",
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _map_severity(self, bandit_severity: str) -> SeverityLevel:
|
||||
"""Map Bandit severity to our severity levels."""
|
||||
severity_map = {
|
||||
"LOW": SeverityLevel.LOW,
|
||||
"MEDIUM": SeverityLevel.MEDIUM,
|
||||
"HIGH": SeverityLevel.HIGH,
|
||||
"ERROR": SeverityLevel.CRITICAL,
|
||||
}
|
||||
return severity_map.get(bandit_severity.upper(), SeverityLevel.MEDIUM)
|
||||
|
||||
def _get_suggestion(self, test_id: str) -> str:
|
||||
"""Get suggestion for a security issue."""
|
||||
suggestions = {
|
||||
"B101": "Avoid using assert statements in production code",
|
||||
"B102": "Use exec() is dangerous; consider using eval() with restrictions",
|
||||
"B103": "Check file permissions before using os.setuid()",
|
||||
"B104": "Hardcoded binding to all interfaces (0.0.0.0) may expose services",
|
||||
"B105": "Hardcoded password detected; use environment variables or config",
|
||||
"B106": "Hardcoded password in function call detected",
|
||||
"B107": "Hardcoded password in nested function call detected",
|
||||
"B108": "Hardcoded_tmp_directory - use tempfile module for portability",
|
||||
"B110": "try-except-pass detected; handle exceptions properly",
|
||||
"B112": "try-except-continue detected; may hide errors",
|
||||
"B201": "flask_debug_true - Debug mode is enabled",
|
||||
"B301": "Use of pickle.load() is unsafe; use json instead",
|
||||
"B302": "Use of marshal.load() is unsafe",
|
||||
"B303": "Use of eval() is dangerous; consider safer alternatives",
|
||||
"B304": "Use of cPickle.load() is unsafe",
|
||||
"B305": "Use of xml.etree.ElementTree.fromstring() is unsafe",
|
||||
"B306": "Use of xml.sax.handler() is unsafe",
|
||||
"B307": "Use of xml.expatparser is unsafe",
|
||||
"B308": "Use of markupsafe.Markup() is unsafe",
|
||||
"B309": "Use of httponly cookies is recommended",
|
||||
"B310": "Audit URL for security issues",
|
||||
"B311": "Use of random.randrange() is not cryptographically secure",
|
||||
"B312": "Use of secrets.randbelow() for cryptographic randomness",
|
||||
"B313": "Use of XML libraries with entity expansion is unsafe",
|
||||
"B314": "Use of xml.etree.ElementTree.fromstring() is unsafe",
|
||||
"B315": "Use of xml.expatparser is unsafe",
|
||||
"B316": "Use of xml.sax.handler is unsafe",
|
||||
"B317": "Use of marshal.load is unsafe",
|
||||
"B318": "Use of mktemp() is insecure",
|
||||
"B319": "Use of tvnserver/pytvnamer is insecure",
|
||||
"B320": "Use of ftplib is insecure",
|
||||
"B321": "Use of telnetlib is insecure",
|
||||
"B322": "Use of wmi is insecure",
|
||||
"B323": "Use of open in client mode is insecure",
|
||||
"B324": "Use of hashlib for password hashing; use bcrypt or argon2",
|
||||
"B401": "Import of subprocess; ensure input is sanitized",
|
||||
"B402": "Import of telnetlib; use SSH instead",
|
||||
"B403": "Import of pickle; consider json instead",
|
||||
"B404": "Import of subprocess; ensure shell=False",
|
||||
"B405": "Import of xml.*; use defusedxml instead",
|
||||
"B406": "Import of sacpy; use defusedxml instead",
|
||||
"B407": "Import of cElementTree; use defusedxml instead",
|
||||
"B408": "Import of xml.etree; use defusedxml instead",
|
||||
"B409": "Import of minidom; use defusedxml instead",
|
||||
"B410": "Import of pulldom; use defusedxml instead",
|
||||
"B411": "Import of xmlrpc; use defusedxml instead",
|
||||
"B412": "Import of httplib; use requests with verification",
|
||||
"B413": "Import of Crypto; use pycryptodome instead",
|
||||
"B414": "Use of pycrypto is insecure",
|
||||
"B415": "Use of pycryptodome with insecure modes",
|
||||
"B416": "Use of yara is insecure",
|
||||
"B417": "Use of pyftpdlib is insecure",
|
||||
"B418": "Import of socketio; use secure configuration",
|
||||
"B419": "Use of telnet is insecure",
|
||||
"B420": "Use of pyimaplib is insecure",
|
||||
"B421": "Use of email is recommended",
|
||||
"B422": "Import of django; use secure settings",
|
||||
"B501": "Use of verify=False in SSL connections is insecure",
|
||||
"B502": "Import of ssl; use secure protocols",
|
||||
"B503": "Use of TLS 1.0 or 1.1 is insecure",
|
||||
"B504": "Use of SSL context with check_hostname=False",
|
||||
"B505": "Use of deprecated SSL methods",
|
||||
"B506": "Use of yaml.load with unsafe Loader",
|
||||
"B507": "Use of yaml.load with yaml.FullLoader",
|
||||
"B601": "Use of paramiko; ensure secure configuration",
|
||||
"B602": "Use of subprocess with shell=True is dangerous",
|
||||
"B603": "Use of subprocess without shell=True",
|
||||
"B604": "Use of subprocess with shell=True or cwd",
|
||||
"B605": "Use of os.system is dangerous",
|
||||
"B606": "Use of os.popen is dangerous",
|
||||
"B607": "Use of commands.getoutput is dangerous",
|
||||
"B608": "Use of soft_unicode is deprecated",
|
||||
"B609": "Use of linux capabilities is restricted",
|
||||
}
|
||||
return suggestions.get(test_id, "Review and fix this security issue")
|
||||
|
||||
def get_plugin_info(self) -> dict:
|
||||
"""Get information about available Bandit plugins."""
|
||||
return {
|
||||
"name": "bandit",
|
||||
"version": bandit.__version__ if hasattr(bandit, "__version__") else "unknown",
|
||||
"description": "Security vulnerability scanner for Python",
|
||||
}
|
||||
Reference in New Issue
Block a user