diff --git a/src/codeguard/analyzers/security_analyzer.py b/src/codeguard/analyzers/security_analyzer.py new file mode 100644 index 0000000..d164f6c --- /dev/null +++ b/src/codeguard/analyzers/security_analyzer.py @@ -0,0 +1,259 @@ +"""Security analysis module.""" + +import json +import re +from codeguard.core.models import ( + Finding, + FindingType, + FixSuggestion, + Language, + Location, + Severity, +) +from codeguard.llm.client import LLMClient + + +class SecurityAnalyzer: + def __init__(self, client: LLMClient, model: str = "codellama"): + self.client = client + self.model = model + self.system_prompts = self._load_system_prompts() + self.vulnerability_patterns = self._load_vulnerability_patterns() + + def _load_system_prompts(self) -> dict[Language, str]: + return { + Language.PYTHON: """You are a security expert specializing in Python code analysis. +Analyze the provided Python code for: +1. SQL injection vulnerabilities +2. XSS vulnerabilities +3. Hardcoded secrets (API keys, passwords) +4. Command injection +5. Insecure deserialization +6. Path traversal +7. Authentication bypasses +8. Input validation issues + +Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion. +Return only valid JSON array.""", + Language.JAVASCRIPT: """You are a security expert specializing in JavaScript/TypeScript code analysis. +Analyze the provided code for: +1. SQL injection +2. XSS vulnerabilities +3. Hardcoded secrets +4. Command injection +5. Insecure dependencies +6. Prototype pollution +7. Authentication issues +8. Input validation problems + +Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion. +Return only valid JSON array.""", + Language.GO: """You are a security expert specializing in Go code analysis. +Analyze the provided Go code for: +1. SQL injection +2. XSS vulnerabilities +3. Hardcoded secrets +4. Command injection +5. Race conditions +6. Error handling issues +7. Concurrency problems +8. Input validation + +Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion. +Return only valid JSON array.""", + Language.RUST: """You are a security expert specializing in Rust code analysis. +Analyze the provided Rust code for: +1. Memory safety issues +2. Concurrency problems +3. Error handling issues +4. Unsafe code usage +5. Cryptographic weaknesses +6. Input validation + +Provide findings in JSON format with: id, type, severity, title, description, location (file, line), cwe_id, and fix suggestion. +Return only valid JSON array.""", + } + + def _load_vulnerability_patterns(self) -> dict[str, list[str]]: + return { + "hardcoded_secret": [ + r"(?i)(api_key|apikey|secret|password|passwd|pwd)\s*=\s*['\"][^'\"]+['\"]", + r"(?i)AWS_ACCESS_KEY_ID\s*=\s*['\"][^'\"]+['\"]", + r"(?i)AWS_SECRET_ACCESS_KEY\s*=\s*['\"][^'\"]+['\"]", + r"(?i)Bearer\s+[a-zA-Z0-9\-_]+\.[a-zA-Z0-9\-_]+", + ], + "sql_injection": [ + r"(?i).*execute\s*\(\s*f?['\"].*['\"]\s*\)", + r"(?i).*\.query\s*\(\s*f?['\"][^'\"]*%\s*s[^'\"]*['\"]", + ], + "command_injection": [ + r"(?i)os\.system\s*\(", + r"(?i)subprocess\.\w+\s*\(\s*['\"][^'\"]*['\"]", + r"(?i)eval\s*\(\s*f?['\"]", + ], + "path_traversal": [ + r"(?i)open\s*\([^,]+,\s*['\"][rwa]", + r"(?i)os\.path\.join\s*\([^)]*\.", + ], + } + + def analyze( + self, code: str, language: Language, file_path: str + ) -> list[Finding]: + findings: list[Finding] = [] + + pattern_findings = self._scan_patterns(code, file_path, language) + findings.extend(pattern_findings) + + llm_findings = self._analyze_with_llm(code, language, file_path) + findings.extend(llm_findings) + + return findings + + def _scan_patterns( + self, code: str, file_path: str, language: Language + ) -> list[Finding]: + findings = [] + lines = code.split("\n") + + for pattern_name, patterns in self.vulnerability_patterns.items(): + for pattern in patterns: + try: + regex = re.compile(pattern) + for i, line in enumerate(lines, 1): + if regex.search(line): + finding = self._create_finding( + pattern_name=pattern_name, + line=i, + code_snippet=line.strip(), + file_path=file_path, + language=language, + ) + findings.append(finding) + except re.error: + continue + + return findings + + def _analyze_with_llm( + self, code: str, language: Language, file_path: str + ) -> list[Finding]: + findings: list[Finding] = [] + + if language not in self.system_prompts: + return findings + + system_prompt = self.system_prompts[language] + + try: + response = self.client.chat( + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"File: {file_path}\n\n{code}"}, + ], + model=self.model, + ) + + parsed_findings = self._parse_llm_response(response, file_path, language) + findings.extend(parsed_findings) + + except Exception: + pass + + return findings + + def _parse_llm_response( + self, response: str, file_path: str, language: Language + ) -> list[Finding]: + findings: list[Finding] = [] + + try: + json_match = re.search(r'\[.*\]', response, re.DOTALL) + if json_match: + data = json.loads(json_match.group()) + for item in data: + finding = self._create_finding_from_llm(item, file_path, language) + findings.append(finding) + except (json.JSONDecodeError, KeyError): + pass + + return findings + + def _create_finding( + self, + pattern_name: str, + line: int, + code_snippet: str, + file_path: str, + language: Language, + ) -> Finding: + severity_map = { + "hardcoded_secret": Severity.HIGH, + "sql_injection": Severity.CRITICAL, + "command_injection": Severity.CRITICAL, + "path_traversal": Severity.HIGH, + } + + title_map = { + "hardcoded_secret": "Hardcoded Secret Found", + "sql_injection": "Potential SQL Injection", + "command_injection": "Potential Command Injection", + "path_traversal": "Potential Path Traversal", + } + + cwe_map = { + "hardcoded_secret": "CWE-798", + "sql_injection": "CWE-89", + "command_injection": "CWE-78", + "path_traversal": "CWE-22", + } + + return Finding( + id=f"pattern-{pattern_name}-{line}", + type=FindingType.VULNERABILITY, + severity=severity_map.get(pattern_name, Severity.MEDIUM), + title=title_map.get(pattern_name, f"Security Issue: {pattern_name}"), + description=f"Potential {pattern_name} vulnerability detected", + location=Location( + file=file_path, + line=line, + end_line=line, + column=0, + code_snippet=code_snippet, + ), + cwe_id=cwe_map.get(pattern_name), + fix=None, + language=language, + ) + + def _create_finding_from_llm( + self, item: dict, file_path: str, language: Language + ) -> Finding: + location_data = item.get("location", {}) + fix_data = item.get("fix", {}) + + fix_suggestion = None + if fix_data: + fix_suggestion = FixSuggestion( + description=fix_data.get("description", ""), + code=fix_data.get("code"), + explanation=fix_data.get("explanation"), + ) + + return Finding( + id=item.get("id", f"llm-{hash(item.get('title', ''))}"), + type=FindingType(item.get("type", "vulnerability")), + severity=Severity(item.get("severity", "medium")), + title=item.get("title", "Security Issue"), + description=item.get("description", ""), + location=Location( + file=file_path, + line=location_data.get("line", 1), + end_line=location_data.get("end_line"), + column=location_data.get("column"), + code_snippet=location_data.get("code_snippet"), + ), + cwe_id=item.get("cwe_id"), + fix=fix_suggestion, + language=language, + )