Add source code files
This commit is contained in:
143
src/codeguard/core/scanner.py
Normal file
143
src/codeguard/core/scanner.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Main scanner module for CodeGuard."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from codeguard.analyzers.language_detector import LanguageDetector
|
||||
from codeguard.analyzers.security_analyzer import SecurityAnalyzer
|
||||
from codeguard.core.models import Config, Finding, Language, Severity
|
||||
from codeguard.llm.client import OllamaClient
|
||||
from codeguard.utils.ignore import IgnoreParser
|
||||
|
||||
|
||||
class CodeScanner:
|
||||
SUPPORTED_EXTENSIONS = {
|
||||
".py": Language.PYTHON,
|
||||
".js": Language.JAVASCRIPT,
|
||||
".ts": Language.TYPESCRIPT,
|
||||
".go": Language.GO,
|
||||
".rs": Language.RUST,
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ollama_url: str,
|
||||
model: str,
|
||||
timeout: int,
|
||||
config: Optional[Config] = None,
|
||||
):
|
||||
self.ollama_client = OllamaClient(ollama_url, timeout=timeout)
|
||||
self.model = model
|
||||
self.config = config or Config()
|
||||
self.language_detector = LanguageDetector()
|
||||
self.security_analyzer = SecurityAnalyzer(self.ollama_client, model)
|
||||
self.ignore_parser = IgnoreParser()
|
||||
|
||||
def scan(
|
||||
self,
|
||||
path: str,
|
||||
include: Optional[list[str]] = None,
|
||||
exclude: Optional[list[str]] = None,
|
||||
) -> list[Finding]:
|
||||
findings: list[Finding] = []
|
||||
ignore_path = os.environ.get("CODEGUARD_IGNORE", ".codeguardignore")
|
||||
|
||||
target = Path(path)
|
||||
if not target.exists():
|
||||
raise FileNotFoundError(f"Path not found: {path}")
|
||||
|
||||
if target.is_file():
|
||||
files = [target]
|
||||
else:
|
||||
files = self._discover_files(target, ignore_path, include, exclude)
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
lang = self._get_language(file_path)
|
||||
if lang is None:
|
||||
continue
|
||||
|
||||
file_findings = self._analyze_file(file_path, lang)
|
||||
findings.extend(file_findings)
|
||||
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
findings = [f for f in findings if self._passes_threshold(f.severity)]
|
||||
|
||||
return findings
|
||||
|
||||
def check_files(self, paths: list[str]) -> list[Finding]:
|
||||
findings = []
|
||||
for path in paths:
|
||||
try:
|
||||
lang = self._get_language(Path(path))
|
||||
if lang:
|
||||
file_findings = self._analyze_file(Path(path), lang)
|
||||
findings.extend(file_findings)
|
||||
except Exception:
|
||||
continue
|
||||
return findings
|
||||
|
||||
def _discover_files(
|
||||
self,
|
||||
root: Path,
|
||||
ignore_path: str,
|
||||
include: Optional[list[str]],
|
||||
exclude: Optional[list[str]],
|
||||
) -> list[Path]:
|
||||
files = []
|
||||
ignore_parser = IgnoreParser()
|
||||
if (root / ignore_path).exists():
|
||||
ignore_parser.load_from_file(root / ignore_path)
|
||||
|
||||
for ext, lang in self.SUPPORTED_EXTENSIONS.items():
|
||||
pattern = f"**/*{ext}"
|
||||
if include:
|
||||
for inc in include:
|
||||
for f in root.glob(inc):
|
||||
if f.suffix == ext:
|
||||
files.append(f)
|
||||
else:
|
||||
files.extend(root.glob(pattern))
|
||||
|
||||
filtered_files = []
|
||||
for f in files:
|
||||
rel_path = str(f.relative_to(root))
|
||||
if not ignore_parser.should_ignore(rel_path):
|
||||
if exclude:
|
||||
skip = False
|
||||
for exc in exclude:
|
||||
if exc in rel_path:
|
||||
skip = True
|
||||
break
|
||||
if not skip:
|
||||
filtered_files.append(f)
|
||||
else:
|
||||
filtered_files.append(f)
|
||||
|
||||
return filtered_files
|
||||
|
||||
def _get_language(self, file_path: Path) -> Optional[Language]:
|
||||
ext = file_path.suffix.lower()
|
||||
return self.SUPPORTED_EXTENSIONS.get(ext)
|
||||
|
||||
def _analyze_file(self, file_path: Path, language: Language) -> list[Finding]:
|
||||
try:
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
if len(content) > self.config.max_file_size:
|
||||
content = content[: self.config.chunk_size]
|
||||
|
||||
return self.security_analyzer.analyze(content, language, str(file_path))
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _passes_threshold(self, severity: Severity) -> bool:
|
||||
threshold = self.config.severity_threshold
|
||||
severity_levels = ["low", "medium", "high", "critical"]
|
||||
try:
|
||||
sev_level = severity.value if hasattr(severity, 'value') else severity
|
||||
return severity_levels.index(sev_level) >= severity_levels.index(threshold)
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
Reference in New Issue
Block a user