diff --git a/src/codeguard/core/scanner.py b/src/codeguard/core/scanner.py new file mode 100644 index 0000000..027643b --- /dev/null +++ b/src/codeguard/core/scanner.py @@ -0,0 +1,143 @@ +"""Main scanner module for CodeGuard.""" + +import os +from pathlib import Path +from typing import Optional + +from codeguard.analyzers.language_detector import LanguageDetector +from codeguard.analyzers.security_analyzer import SecurityAnalyzer +from codeguard.core.models import Config, Finding, Language, Severity +from codeguard.llm.client import OllamaClient +from codeguard.utils.ignore import IgnoreParser + + +class CodeScanner: + SUPPORTED_EXTENSIONS = { + ".py": Language.PYTHON, + ".js": Language.JAVASCRIPT, + ".ts": Language.TYPESCRIPT, + ".go": Language.GO, + ".rs": Language.RUST, + } + + def __init__( + self, + ollama_url: str, + model: str, + timeout: int, + config: Optional[Config] = None, + ): + self.ollama_client = OllamaClient(ollama_url, timeout=timeout) + self.model = model + self.config = config or Config() + self.language_detector = LanguageDetector() + self.security_analyzer = SecurityAnalyzer(self.ollama_client, model) + self.ignore_parser = IgnoreParser() + + def scan( + self, + path: str, + include: Optional[list[str]] = None, + exclude: Optional[list[str]] = None, + ) -> list[Finding]: + findings: list[Finding] = [] + ignore_path = os.environ.get("CODEGUARD_IGNORE", ".codeguardignore") + + target = Path(path) + if not target.exists(): + raise FileNotFoundError(f"Path not found: {path}") + + if target.is_file(): + files = [target] + else: + files = self._discover_files(target, ignore_path, include, exclude) + + for file_path in files: + try: + lang = self._get_language(file_path) + if lang is None: + continue + + file_findings = self._analyze_file(file_path, lang) + findings.extend(file_findings) + + except Exception: + continue + + findings = [f for f in findings if self._passes_threshold(f.severity)] + + return findings + + def check_files(self, paths: list[str]) -> list[Finding]: + findings = [] + for path in paths: + try: + lang = self._get_language(Path(path)) + if lang: + file_findings = self._analyze_file(Path(path), lang) + findings.extend(file_findings) + except Exception: + continue + return findings + + def _discover_files( + self, + root: Path, + ignore_path: str, + include: Optional[list[str]], + exclude: Optional[list[str]], + ) -> list[Path]: + files = [] + ignore_parser = IgnoreParser() + if (root / ignore_path).exists(): + ignore_parser.load_from_file(root / ignore_path) + + for ext, lang in self.SUPPORTED_EXTENSIONS.items(): + pattern = f"**/*{ext}" + if include: + for inc in include: + for f in root.glob(inc): + if f.suffix == ext: + files.append(f) + else: + files.extend(root.glob(pattern)) + + filtered_files = [] + for f in files: + rel_path = str(f.relative_to(root)) + if not ignore_parser.should_ignore(rel_path): + if exclude: + skip = False + for exc in exclude: + if exc in rel_path: + skip = True + break + if not skip: + filtered_files.append(f) + else: + filtered_files.append(f) + + return filtered_files + + def _get_language(self, file_path: Path) -> Optional[Language]: + ext = file_path.suffix.lower() + return self.SUPPORTED_EXTENSIONS.get(ext) + + def _analyze_file(self, file_path: Path, language: Language) -> list[Finding]: + try: + content = file_path.read_text(encoding="utf-8") + if len(content) > self.config.max_file_size: + content = content[: self.config.chunk_size] + + return self.security_analyzer.analyze(content, language, str(file_path)) + except Exception: + return [] + + def _passes_threshold(self, severity: Severity) -> bool: + threshold = self.config.severity_threshold + severity_levels = ["low", "medium", "high", "critical"] + try: + sev_level = severity.value if hasattr(severity, 'value') else severity + return severity_levels.index(sev_level) >= severity_levels.index(threshold) + except (ValueError, AttributeError): + return False