145 lines
5.4 KiB
Python
145 lines
5.4 KiB
Python
"""Main scanner orchestrator for AI Code Audit CLI."""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from .config import AuditConfig
|
|
from .models import ScanResult, Issue, IssueCategory, SeverityLevel
|
|
from ..scanners import BanditScanner, RuffScanner, TreeSitterScanner
|
|
from ..utils import FileUtils, LanguageDetector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Scanner:
|
|
"""Main scanner class that orchestrates all scanning components."""
|
|
|
|
def __init__(self, config: AuditConfig):
|
|
"""Initialize the scanner with configuration."""
|
|
self.config = config
|
|
self.file_utils = FileUtils()
|
|
self.language_detector = LanguageDetector()
|
|
self.result = ScanResult(files_scanned=0, target_path=config.target_path)
|
|
self._setup_scanners()
|
|
|
|
def _setup_scanners(self) -> None:
|
|
"""Initialize scanner components."""
|
|
self.bandit_scanner = BanditScanner()
|
|
self.ruff_scanner = RuffScanner()
|
|
self.tree_sitter_scanner = TreeSitterScanner()
|
|
|
|
def scan(self) -> ScanResult:
|
|
"""Execute the full scan operation."""
|
|
self.config.validate()
|
|
|
|
target_path = Path(self.config.target_path)
|
|
self.result.scan_time = datetime.now()
|
|
|
|
if target_path.is_file():
|
|
self._scan_file(target_path)
|
|
else:
|
|
self._scan_directory(target_path)
|
|
|
|
return self.result
|
|
|
|
def _scan_directory(self, directory: Path) -> None:
|
|
"""Scan all files in a directory."""
|
|
if self.config.verbose:
|
|
logger.info(f"Scanning directory: {directory}")
|
|
|
|
for file_path in self.file_utils.find_files(
|
|
directory,
|
|
max_size=self.config.max_file_size,
|
|
excluded_patterns=self.config.excluded_patterns,
|
|
):
|
|
if self.config.language_filter:
|
|
lang = self.language_detector.detect(file_path)
|
|
if lang.value != self.config.language_filter:
|
|
continue
|
|
|
|
self._scan_file(file_path)
|
|
|
|
def _scan_file(self, file_path: Path) -> None:
|
|
"""Scan a single file."""
|
|
if not self.config.should_scan_file(file_path):
|
|
if self.config.verbose:
|
|
logger.info(f"Skipping file (excluded or too large): {file_path}")
|
|
return
|
|
|
|
try:
|
|
file_content = file_path.read_text(encoding="utf-8", errors="replace")
|
|
file_str = str(file_path)
|
|
|
|
self.result.files_scanned += 1
|
|
|
|
language = self.language_detector.detect(file_path)
|
|
file_extension = file_path.suffix.lower()
|
|
|
|
if self.config.verbose:
|
|
logger.info(f"Scanning: {file_path} (language: {language.value})")
|
|
|
|
if language.value == "python":
|
|
self._scan_python_file(file_str, file_content)
|
|
elif language.value in ("javascript", "typescript"):
|
|
self._scan_js_ts_file(file_str, file_content, language.value)
|
|
|
|
except PermissionError:
|
|
self.result.add_warning(f"Permission denied: {file_path}")
|
|
except UnicodeDecodeError:
|
|
self.result.add_warning(f"Could not decode file (encoding issue): {file_path}")
|
|
except Exception as e:
|
|
self.result.add_warning(f"Error scanning {file_path}: {str(e)}")
|
|
if self.config.verbose:
|
|
logger.exception(f"Error scanning file: {file_path}")
|
|
|
|
def _scan_python_file(self, file_path: str, content: str) -> None:
|
|
"""Scan a Python file for issues."""
|
|
bandit_issues = self.bandit_scanner.scan_content(content, file_path)
|
|
ruff_issues = self.ruff_scanner.scan_content(content, file_path, "python")
|
|
tree_sitter_issues = self.tree_sitter_scanner.scan_content(
|
|
content, file_path, "python"
|
|
)
|
|
|
|
for issue in bandit_issues + ruff_issues + tree_sitter_issues:
|
|
if self._should_include_issue(issue):
|
|
self.result.add_issue(issue)
|
|
|
|
def _scan_js_ts_file(self, file_path: str, content: str, language: str) -> None:
|
|
"""Scan a JavaScript or TypeScript file for issues."""
|
|
ruff_issues = self.ruff_scanner.scan_content(content, file_path, language)
|
|
tree_sitter_issues = self.tree_sitter_scanner.scan_content(
|
|
content, file_path, language
|
|
)
|
|
|
|
for issue in ruff_issues + tree_sitter_issues:
|
|
if self._should_include_issue(issue):
|
|
self.result.add_issue(issue)
|
|
|
|
def _should_include_issue(self, issue: Issue) -> bool:
|
|
"""Check if an issue should be included based on filters."""
|
|
if self.config.severity_filter:
|
|
severity_order = {
|
|
SeverityLevel.LOW: 0,
|
|
SeverityLevel.MEDIUM: 1,
|
|
SeverityLevel.HIGH: 2,
|
|
SeverityLevel.CRITICAL: 3,
|
|
}
|
|
if severity_order.get(issue.severity, 0) < severity_order.get(
|
|
self._get_severity_from_string(self.config.severity_filter), -1
|
|
):
|
|
return False
|
|
|
|
return True
|
|
|
|
def _get_severity_from_string(self, severity_str: str) -> Optional[SeverityLevel]:
|
|
"""Convert severity string to enum."""
|
|
mapping = {
|
|
"low": SeverityLevel.LOW,
|
|
"medium": SeverityLevel.MEDIUM,
|
|
"high": SeverityLevel.HIGH,
|
|
"critical": SeverityLevel.CRITICAL,
|
|
}
|
|
return mapping.get(severity_str.lower())
|