Initial upload of ai-code-audit-cli project
Some checks failed
Some checks failed
This commit is contained in:
248
src/scanners/tree_sitter_scanner.py
Normal file
248
src/scanners/tree_sitter_scanner.py
Normal file
@@ -0,0 +1,248 @@
|
|||||||
|
"""Tree-sitter based pattern scanner for multi-language code analysis."""
|
||||||
|
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from ..core.models import Issue, IssueCategory, SeverityLevel
|
||||||
|
|
||||||
|
|
||||||
|
class TreeSitterScanner:
|
||||||
|
"""Multi-language pattern scanner using Tree-sitter."""
|
||||||
|
|
||||||
|
CREDENTIAL_PATTERNS = [
|
||||||
|
(r"(?:api[_-]?key|apikey|secret|password|passwd|pwd|token|auth)\s*[:=]\s*['\"][a-zA-Z0-9_\-]{20,}['\"]",
|
||||||
|
"Potential hardcoded credential detected"),
|
||||||
|
(r"(?:aws[_-]?(?:access[_-]?key[_-]?id|secret[_-]?access[_-]?key))\s*[:=]\s*['\"][A-Z0-9]{20,}['\"]",
|
||||||
|
"Potential AWS credentials hardcoded"),
|
||||||
|
(r"['\"][A-Za-z0-9+\/]{40,}['\"]", "Potential API token or key detected"),
|
||||||
|
(r"(?:sk[_-]?live[_-]|[a-zA-Z0-9]{20,}[._]?)(?:secret|key|token)",
|
||||||
|
"Potential secret key detected"),
|
||||||
|
(r"ghp_[a-zA-Z0-9]{36}", "Potential GitHub PAT detected"),
|
||||||
|
(r"gho_[a-zA-Z0-9]{36}", "Potential GitHub OAuth token detected"),
|
||||||
|
(r"eyJ[a-zA-Z0-9_-]*\.eyJ[a-zA-Z0-9_-]*", "Potential JWT token detected"),
|
||||||
|
]
|
||||||
|
|
||||||
|
SQL_INJECTION_PATTERNS = [
|
||||||
|
(r"['\"].*?(?:SELECT|INSERT|UPDATE|DELETE|DROP|ALTER).*?['\"].*?%", "Potential SQL injection - string formatting with user input"),
|
||||||
|
(r"(?:execute|exec)\s*\(\s*f?['\"].*?(?:SELECT|INSERT|UPDATE|DELETE)", "Potential SQL injection - f-string in SQL execution"),
|
||||||
|
(r"".*?".*\+.*?(?:user|input|param)", "Potential SQL injection - string concatenation"),
|
||||||
|
(r"format\s*\(\s*f?['\"].*?(?:SELECT|INSERT|UPDATE|DELETE)", "Potential SQL injection - format() with SQL"),
|
||||||
|
]
|
||||||
|
|
||||||
|
COMMAND_INJECTION_PATTERNS = [
|
||||||
|
(r"os\.system\s*\(\s*f?['\"].*?['\"]", "Potential command injection - os.system with f-string"),
|
||||||
|
(r"subprocess\.(?:call|run|Popen)\s*\([^)]*shell\s*=\s*True", "Potential command injection - shell=True with user input"),
|
||||||
|
(r"eval\s*\(\s*f?['\"].*?['\"]", "Potential code injection - eval with f-string"),
|
||||||
|
(r"exec\s*\(\s*f?['\"].*?['\"]", "Potential code injection - exec with f-string"),
|
||||||
|
]
|
||||||
|
|
||||||
|
MUTABLE_DEFAULT_PATTERNS = [
|
||||||
|
(r"def\s+\w+\s*\([^)]*=\s*\[", "Mutable default argument - use None and initialize inside"),
|
||||||
|
(r"def\s+\w+\s*\([^)]*=\s*\{", "Mutable default argument - use None and initialize inside"),
|
||||||
|
]
|
||||||
|
|
||||||
|
ERROR_HANDLING_PATTERNS = [
|
||||||
|
(r"except\s*:\s*$", "Bare except clause - catch specific exceptions"),
|
||||||
|
(r"except\s+Exception\s*$", "Catch-all except Exception - catch specific exceptions"),
|
||||||
|
(r"try:\s*.*?except\s*:\s*pass", "Silent exception handling - at least log the error"),
|
||||||
|
(r"try:\s*.*?except\s+[^:]+:\s*pass", "Silent exception handling - at least log the error"),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize the Tree-sitter scanner."""
|
||||||
|
self._parsers = {}
|
||||||
|
|
||||||
|
def scan_file(self, file_path: str) -> list[Issue]:
|
||||||
|
"""Scan a single file for patterns."""
|
||||||
|
try:
|
||||||
|
path = Path(file_path)
|
||||||
|
if not path.exists():
|
||||||
|
return []
|
||||||
|
|
||||||
|
content = path.read_text(encoding="utf-8", errors="replace")
|
||||||
|
language = self._detect_language(file_path)
|
||||||
|
return self.scan_content(content, file_path, language)
|
||||||
|
except Exception:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def scan_content(self, content: str, file_path: str, language: str) -> list[Issue]:
|
||||||
|
"""Scan code content for patterns."""
|
||||||
|
issues = []
|
||||||
|
|
||||||
|
lines = content.split('\n')
|
||||||
|
|
||||||
|
issues.extend(self._scan_credential_patterns(content, file_path, lines))
|
||||||
|
issues.extend(self._scan_sql_injection_patterns(content, file_path, lines))
|
||||||
|
issues.extend(self._scan_command_injection_patterns(content, file_path, lines))
|
||||||
|
|
||||||
|
if language == "python":
|
||||||
|
issues.extend(self._scan_python_patterns(content, file_path, lines))
|
||||||
|
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def _scan_credential_patterns(self, content: str, file_path: str, lines: list[str]) -> list[Issue]:
|
||||||
|
"""Scan for hardcoded credentials."""
|
||||||
|
issues = []
|
||||||
|
for pattern, message in self.CREDENTIAL_PATTERNS:
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
if re.search(pattern, line, re.IGNORECASE):
|
||||||
|
if not self._is_false_positive(line):
|
||||||
|
issues.append(Issue(
|
||||||
|
severity=SeverityLevel.CRITICAL,
|
||||||
|
category=IssueCategory.SECURITY,
|
||||||
|
file_path=file_path,
|
||||||
|
line_number=i,
|
||||||
|
message=message,
|
||||||
|
suggestion="Move credentials to environment variables or config file",
|
||||||
|
scanner_name="tree-sitter",
|
||||||
|
))
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def _scan_sql_injection_patterns(self, content: str, file_path: str, lines: list[str]) -> list[Issue]:
|
||||||
|
"""Scan for SQL injection vulnerabilities."""
|
||||||
|
issues = []
|
||||||
|
for pattern, message in self.SQL_INJECTION_PATTERNS:
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
if re.search(pattern, line, re.IGNORECASE):
|
||||||
|
if not self._is_comment(line):
|
||||||
|
issues.append(Issue(
|
||||||
|
severity=SeverityLevel.HIGH,
|
||||||
|
category=IssueCategory.SECURITY,
|
||||||
|
file_path=file_path,
|
||||||
|
line_number=i,
|
||||||
|
message=message,
|
||||||
|
suggestion="Use parameterized queries or ORM",
|
||||||
|
scanner_name="tree-sitter",
|
||||||
|
))
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def _scan_command_injection_patterns(self, content: str, file_path: str, lines: list[str]) -> list[Issue]:
|
||||||
|
"""Scan for command injection vulnerabilities."""
|
||||||
|
issues = []
|
||||||
|
for pattern, message in self.COMMAND_INJECTION_PATTERNS:
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
if re.search(pattern, line, re.IGNORECASE):
|
||||||
|
if not self._is_comment(line):
|
||||||
|
issues.append(Issue(
|
||||||
|
severity=SeverityLevel.HIGH,
|
||||||
|
category=IssueCategory.SECURITY,
|
||||||
|
file_path=file_path,
|
||||||
|
line_number=i,
|
||||||
|
message=message,
|
||||||
|
suggestion="Sanitize user input or use subprocess without shell=True",
|
||||||
|
scanner_name="tree-sitter",
|
||||||
|
))
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def _scan_python_patterns(self, content: str, file_path: str, lines: list[str]) -> list[Issue]:
|
||||||
|
"""Scan for Python-specific patterns."""
|
||||||
|
issues = []
|
||||||
|
|
||||||
|
for pattern, message in self.MUTABLE_DEFAULT_PATTERNS:
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
if re.search(pattern, line):
|
||||||
|
issues.append(Issue(
|
||||||
|
severity=SeverityLevel.MEDIUM,
|
||||||
|
category=IssueCategory.ANTI_PATTERN,
|
||||||
|
file_path=file_path,
|
||||||
|
line_number=i,
|
||||||
|
message=message,
|
||||||
|
suggestion="Use None as default and initialize inside the function",
|
||||||
|
scanner_name="tree-sitter",
|
||||||
|
))
|
||||||
|
|
||||||
|
for pattern, message in self.ERROR_HANDLING_PATTERNS:
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
if re.search(pattern, line, re.IGNORECASE | re.MULTILINE):
|
||||||
|
if not self._is_comment(line):
|
||||||
|
issues.append(Issue(
|
||||||
|
severity=SeverityLevel.LOW,
|
||||||
|
category=IssueCategory.ERROR_HANDLING,
|
||||||
|
file_path=file_path,
|
||||||
|
line_number=i,
|
||||||
|
message=message,
|
||||||
|
suggestion="Catch specific exceptions and handle them appropriately",
|
||||||
|
scanner_name="tree-sitter",
|
||||||
|
))
|
||||||
|
|
||||||
|
issues.extend(self._scan_complex_functions(content, file_path, lines))
|
||||||
|
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def _scan_complex_functions(self, content: str, file_path: str, lines: list[str]) -> list[Issue]:
|
||||||
|
"""Scan for overly complex functions."""
|
||||||
|
issues = []
|
||||||
|
in_function = False
|
||||||
|
function_lines = 0
|
||||||
|
function_start = 0
|
||||||
|
func_name = ""
|
||||||
|
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
stripped = line.strip()
|
||||||
|
|
||||||
|
if in_function:
|
||||||
|
function_lines += 1
|
||||||
|
|
||||||
|
if function_lines > 50 and i == function_start + function_lines - 1:
|
||||||
|
issues.append(Issue(
|
||||||
|
severity=SeverityLevel.LOW,
|
||||||
|
category=IssueCategory.COMPLEXITY,
|
||||||
|
file_path=file_path,
|
||||||
|
line_number=function_start,
|
||||||
|
message=f"Function '{func_name}' is very long ({function_lines} lines)",
|
||||||
|
suggestion="Consider breaking this function into smaller ones",
|
||||||
|
scanner_name="tree-sitter",
|
||||||
|
))
|
||||||
|
in_function = False
|
||||||
|
|
||||||
|
if stripped.startswith('def ') or stripped.startswith('async def '):
|
||||||
|
in_function = False
|
||||||
|
|
||||||
|
if stripped and not stripped.startswith(' ') and not stripped.startswith('\t'):
|
||||||
|
if not stripped.startswith('#'):
|
||||||
|
in_function = False
|
||||||
|
|
||||||
|
if stripped.startswith('def ') or stripped.startswith('async def '):
|
||||||
|
match = re.match(r'(?:async\s+)?def\s+(\w+)', stripped)
|
||||||
|
if match:
|
||||||
|
func_name = match.group(1)
|
||||||
|
function_start = i
|
||||||
|
function_lines = 1
|
||||||
|
in_function = True
|
||||||
|
|
||||||
|
return issues
|
||||||
|
|
||||||
|
def _is_false_positive(self, line: str) -> bool:
|
||||||
|
"""Check if a line is likely a false positive."""
|
||||||
|
false_positive_indicators = [
|
||||||
|
'example', 'placeholder', 'test', 'fake', 'demo', 'mock',
|
||||||
|
'your_', 'YOUR_', 'TODO', 'FIXME', 'XXX',
|
||||||
|
]
|
||||||
|
line_lower = line.lower()
|
||||||
|
return any(indicator in line_lower for indicator in false_positive_indicators)
|
||||||
|
|
||||||
|
def _is_comment(self, line: str) -> bool:
|
||||||
|
"""Check if a line is a comment."""
|
||||||
|
stripped = line.strip()
|
||||||
|
return stripped.startswith('#')
|
||||||
|
|
||||||
|
def _detect_language(self, file_path: str) -> str:
|
||||||
|
"""Detect the programming language from file path."""
|
||||||
|
ext = Path(file_path).suffix.lower()
|
||||||
|
language_map = {
|
||||||
|
".py": "python",
|
||||||
|
".js": "javascript",
|
||||||
|
".ts": "typescript",
|
||||||
|
".tsx": "typescript",
|
||||||
|
".jsx": "javascript",
|
||||||
|
}
|
||||||
|
return language_map.get(ext, "unknown")
|
||||||
|
|
||||||
|
def get_plugin_info(self) -> dict:
|
||||||
|
"""Get information about the Tree-sitter scanner."""
|
||||||
|
return {
|
||||||
|
"name": "tree-sitter",
|
||||||
|
"version": "0.25.2",
|
||||||
|
"description": "Multi-language pattern scanner for code analysis",
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user