260 lines
8.7 KiB
Python
260 lines
8.7 KiB
Python
"""Security analysis module."""
|
|
|
|
import json
|
|
import re
|
|
from codeguard.core.models import (
|
|
Finding,
|
|
FindingType,
|
|
FixSuggestion,
|
|
Language,
|
|
Location,
|
|
Severity,
|
|
)
|
|
from codeguard.llm.client import LLMClient
|
|
|
|
|
|
class SecurityAnalyzer:
|
|
def __init__(self, client: LLMClient, model: str = "codellama"):
|
|
self.client = client
|
|
self.model = model
|
|
self.system_prompts = self._load_system_prompts()
|
|
self.vulnerability_patterns = self._load_vulnerability_patterns()
|
|
|
|
def _load_system_prompts(self) -> dict[Language, str]:
|
|
return {
|
|
Language.PYTHON: """You are a security expert specializing in Python code analysis.
|
|
Analyze the provided Python code for:
|
|
1. SQL injection vulnerabilities
|
|
2. XSS vulnerabilities
|
|
3. Hardcoded secrets (API keys, passwords)
|
|
4. Command injection
|
|
5. Insecure deserialization
|
|
6. Path traversal
|
|
7. Authentication bypasses
|
|
8. Input validation issues
|
|
|
|
Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion.
|
|
Return only valid JSON array.""",
|
|
Language.JAVASCRIPT: """You are a security expert specializing in JavaScript/TypeScript code analysis.
|
|
Analyze the provided code for:
|
|
1. SQL injection
|
|
2. XSS vulnerabilities
|
|
3. Hardcoded secrets
|
|
4. Command injection
|
|
5. Insecure dependencies
|
|
6. Prototype pollution
|
|
7. Authentication issues
|
|
8. Input validation problems
|
|
|
|
Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion.
|
|
Return only valid JSON array.""",
|
|
Language.GO: """You are a security expert specializing in Go code analysis.
|
|
Analyze the provided Go code for:
|
|
1. SQL injection
|
|
2. XSS vulnerabilities
|
|
3. Hardcoded secrets
|
|
4. Command injection
|
|
5. Race conditions
|
|
6. Error handling issues
|
|
7. Concurrency problems
|
|
8. Input validation
|
|
|
|
Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion.
|
|
Return only valid JSON array.""",
|
|
Language.RUST: """You are a security expert specializing in Rust code analysis.
|
|
Analyze the provided Rust code for:
|
|
1. Memory safety issues
|
|
2. Concurrency problems
|
|
3. Error handling issues
|
|
4. Unsafe code usage
|
|
5. Cryptographic weaknesses
|
|
6. Input validation
|
|
|
|
Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion.
|
|
Return only valid JSON array.""",
|
|
}
|
|
|
|
def _load_vulnerability_patterns(self) -> dict[str, list[str]]:
|
|
return {
|
|
"hardcoded_secret": [
|
|
r"(?i)(api_key|apikey|secret|password|passwd|pwd)\s*=\s*['\"][^'\"]+['\"]",
|
|
r"(?i)AWS_ACCESS_KEY_ID\s*=\s*['\"][^'\"]+['\"]",
|
|
r"(?i)AWS_SECRET_ACCESS_KEY\s*=\s*['\"][^'\"]+['\"]",
|
|
r"(?i)Bearer\s+[a-zA-Z0-9\-_]+\.[a-zA-Z0-9\-_]+",
|
|
],
|
|
"sql_injection": [
|
|
r"(?i).*execute\s*\(\s*f?['\"].*['\"]\s*\)",
|
|
r"(?i).*\.query\s*\(\s*f?['\"][^'\"]*%\s*s[^'\"]*['\"]",
|
|
],
|
|
"command_injection": [
|
|
r"(?i)os\.system\s*\(",
|
|
r"(?i)subprocess\.\w+\s*\(\s*['\"][^'\"]*['\"]",
|
|
r"(?i)eval\s*\(\s*f?['\"]",
|
|
],
|
|
"path_traversal": [
|
|
r"(?i)open\s*\([^,]+,\s*['\"][rwa]",
|
|
r"(?i)os\.path\.join\s*\([^)]*\.",
|
|
],
|
|
}
|
|
|
|
def analyze(
|
|
self, code: str, language: Language, file_path: str
|
|
) -> list[Finding]:
|
|
findings: list[Finding] = []
|
|
|
|
pattern_findings = self._scan_patterns(code, file_path, language)
|
|
findings.extend(pattern_findings)
|
|
|
|
llm_findings = self._analyze_with_llm(code, language, file_path)
|
|
findings.extend(llm_findings)
|
|
|
|
return findings
|
|
|
|
def _scan_patterns(
|
|
self, code: str, file_path: str, language: Language
|
|
) -> list[Finding]:
|
|
findings = []
|
|
lines = code.split("\n")
|
|
|
|
for pattern_name, patterns in self.vulnerability_patterns.items():
|
|
for pattern in patterns:
|
|
try:
|
|
regex = re.compile(pattern)
|
|
for i, line in enumerate(lines, 1):
|
|
if regex.search(line):
|
|
finding = self._create_finding(
|
|
pattern_name=pattern_name,
|
|
line=i,
|
|
code_snippet=line.strip(),
|
|
file_path=file_path,
|
|
language=language,
|
|
)
|
|
findings.append(finding)
|
|
except re.error:
|
|
continue
|
|
|
|
return findings
|
|
|
|
def _analyze_with_llm(
|
|
self, code: str, language: Language, file_path: str
|
|
) -> list[Finding]:
|
|
findings: list[Finding] = []
|
|
|
|
if language not in self.system_prompts:
|
|
return findings
|
|
|
|
system_prompt = self.system_prompts[language]
|
|
|
|
try:
|
|
response = self.client.chat(
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": f"File: {file_path}\n\n{code}"},
|
|
],
|
|
model=self.model,
|
|
)
|
|
|
|
parsed_findings = self._parse_llm_response(response, file_path, language)
|
|
findings.extend(parsed_findings)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return findings
|
|
|
|
def _parse_llm_response(
|
|
self, response: str, file_path: str, language: Language
|
|
) -> list[Finding]:
|
|
findings: list[Finding] = []
|
|
|
|
try:
|
|
json_match = re.search(r'\[.*\]', response, re.DOTALL)
|
|
if json_match:
|
|
data = json.loads(json_match.group())
|
|
for item in data:
|
|
finding = self._create_finding_from_llm(item, file_path, language)
|
|
findings.append(finding)
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
return findings
|
|
|
|
def _create_finding(
|
|
self,
|
|
pattern_name: str,
|
|
line: int,
|
|
code_snippet: str,
|
|
file_path: str,
|
|
language: Language,
|
|
) -> Finding:
|
|
severity_map = {
|
|
"hardcoded_secret": Severity.HIGH,
|
|
"sql_injection": Severity.CRITICAL,
|
|
"command_injection": Severity.CRITICAL,
|
|
"path_traversal": Severity.HIGH,
|
|
}
|
|
|
|
title_map = {
|
|
"hardcoded_secret": "Hardcoded Secret Found",
|
|
"sql_injection": "Potential SQL Injection",
|
|
"command_injection": "Potential Command Injection",
|
|
"path_traversal": "Potential Path Traversal",
|
|
}
|
|
|
|
cwe_map = {
|
|
"hardcoded_secret": "CWE-798",
|
|
"sql_injection": "CWE-89",
|
|
"command_injection": "CWE-78",
|
|
"path_traversal": "CWE-22",
|
|
}
|
|
|
|
return Finding(
|
|
id=f"pattern-{pattern_name}-{line}",
|
|
type=FindingType.VULNERABILITY,
|
|
severity=severity_map.get(pattern_name, Severity.MEDIUM),
|
|
title=title_map.get(pattern_name, f"Security Issue: {pattern_name}"),
|
|
description=f"Potential {pattern_name} vulnerability detected",
|
|
location=Location(
|
|
file=file_path,
|
|
line=line,
|
|
end_line=line,
|
|
column=0,
|
|
code_snippet=code_snippet,
|
|
),
|
|
cwe_id=cwe_map.get(pattern_name),
|
|
fix=None,
|
|
language=language,
|
|
)
|
|
|
|
def _create_finding_from_llm(
|
|
self, item: dict, file_path: str, language: Language
|
|
) -> Finding:
|
|
location_data = item.get("location", {})
|
|
fix_data = item.get("fix", {})
|
|
|
|
fix_suggestion = None
|
|
if fix_data:
|
|
fix_suggestion = FixSuggestion(
|
|
description=fix_data.get("description", ""),
|
|
code=fix_data.get("code"),
|
|
explanation=fix_data.get("explanation"),
|
|
)
|
|
|
|
return Finding(
|
|
id=item.get("id", f"llm-{hash(item.get('title', ''))}"),
|
|
type=FindingType(item.get("type", "vulnerability")),
|
|
severity=Severity(item.get("severity", "medium")),
|
|
title=item.get("title", "Security Issue"),
|
|
description=item.get("description", ""),
|
|
location=Location(
|
|
file=file_path,
|
|
line=location_data.get("line", 1),
|
|
end_line=location_data.get("end_line"),
|
|
column=location_data.get("column"),
|
|
code_snippet=location_data.get("code_snippet"),
|
|
),
|
|
cwe_id=item.get("cwe_id"),
|
|
fix=fix_suggestion,
|
|
language=language,
|
|
)
|