178 lines
6.0 KiB
Python
178 lines
6.0 KiB
Python
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Dict, Any
|
|
from pathlib import Path
|
|
import re
|
|
|
|
|
|
@dataclass
|
|
class ChunkMetadata:
|
|
file_path: Path
|
|
file_name: str
|
|
language: str
|
|
start_line: int
|
|
end_line: int
|
|
line_count: int
|
|
docstring: Optional[str] = None
|
|
imports: List[str] = field(default_factory=list)
|
|
decorators: List[str] = field(default_factory=list)
|
|
parameters: List[str] = field(default_factory=list)
|
|
return_type: Optional[str] = None
|
|
complexity_score: int = 1
|
|
original_content: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ParsedChunk:
|
|
name: str
|
|
chunk_type: str
|
|
content: str
|
|
metadata: ChunkMetadata
|
|
priority: int = 0
|
|
dependencies: List[str] = field(default_factory=list)
|
|
summary: str = ""
|
|
is_boilerplate: bool = False
|
|
|
|
|
|
class CodeChunker:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.boilerplate_patterns = [
|
|
(r'@property\s*\n\s*def\s+(\w+)\s*\([^)]*\)\s*:', 'property'),
|
|
(r'@abstractmethod\s*\n\s*def\s+(\w+)\s*\([^)]*\)\s*:', 'abstractmethod'),
|
|
(r'@staticmethod\s*\n\s*def\s+(\w+)\s*\([^)]*\)\s*:', 'staticmethod'),
|
|
(r'@classmethod\s*\n\s*def\s+(\w+)\s*\([^)]*\)\s*:', 'classmethod'),
|
|
(r'def\s+__init__\s*\([^)]*\)\s*:', '__init__'),
|
|
(r'def\s+__str__\s*\([^)]*\)\s*:', '__str__'),
|
|
(r'def\s+__repr__\s*\([^)]*\)\s*:', '__repr__'),
|
|
(r'def\s+__eq__\s*\([^)]*\)\s*:', '__eq__'),
|
|
(r'def\s+__hash__\s*\([^)]*\)\s*:', '__hash__'),
|
|
(r'def\s+__lt__\s*\([^)]*\)\s*:', '__lt__'),
|
|
(r'def\s+__le__\s*\([^)]*\)\s*:', '__le__'),
|
|
(r'def\s+__gt__\s*\([^)]*\)\s*:', '__gt__'),
|
|
(r'def\s+__ge__\s*\([^)]*\)\s*:', '__ge__'),
|
|
]
|
|
|
|
def chunk_all(self, chunks: List[ParsedChunk]) -> List[ParsedChunk]:
|
|
"""Process all chunks: remove boilerplate, add priorities."""
|
|
result = []
|
|
for chunk in chunks:
|
|
chunk = self._remove_boilerplate(chunk)
|
|
chunk = self._calculate_priority(chunk)
|
|
result.append(chunk)
|
|
return self._sort_by_priority(result)
|
|
|
|
def _remove_boilerplate(self, chunk: ParsedChunk) -> ParsedChunk:
|
|
"""Identify and mark boilerplate chunks."""
|
|
if chunk.chunk_type != "function":
|
|
return chunk
|
|
|
|
content = chunk.content
|
|
for pattern, pattern_type in self.boilerplate_patterns:
|
|
if re.search(pattern, content, re.MULTILINE):
|
|
chunk.is_boilerplate = True
|
|
break
|
|
|
|
if chunk.metadata.docstring and len(chunk.metadata.docstring) > 200:
|
|
pass
|
|
elif chunk.metadata.parameters and len(chunk.metadata.parameters) > 10:
|
|
pass
|
|
|
|
return chunk
|
|
|
|
def _calculate_priority(self, chunk: ParsedChunk) -> ParsedChunk:
|
|
"""Calculate priority score for a chunk."""
|
|
priority = 0
|
|
|
|
name_lower = chunk.name.lower()
|
|
high_priority_keywords = ['main', 'run', 'execute', 'process', 'handle', 'start']
|
|
medium_priority_keywords = ['core', 'service', 'controller', 'manager', 'factory']
|
|
|
|
for keyword in high_priority_keywords:
|
|
if keyword in name_lower:
|
|
priority += 50
|
|
break
|
|
|
|
for keyword in medium_priority_keywords:
|
|
if keyword in name_lower:
|
|
priority += 25
|
|
break
|
|
|
|
if chunk.chunk_type == "class":
|
|
priority += 20
|
|
elif chunk.chunk_type == "function":
|
|
priority += 10
|
|
|
|
priority += min(chunk.metadata.line_count // 10, 30)
|
|
|
|
if chunk.metadata.complexity_score > 5:
|
|
priority += chunk.metadata.complexity_score * 5
|
|
|
|
if chunk.metadata.decorators:
|
|
priority += len(chunk.metadata.decorators) * 5
|
|
|
|
if not chunk.is_boilerplate:
|
|
priority += 10
|
|
|
|
chunk.priority = priority
|
|
return chunk
|
|
|
|
def _sort_by_priority(self, chunks: List[ParsedChunk]) -> List[ParsedChunk]:
|
|
"""Sort chunks by priority (highest first)."""
|
|
return sorted(chunks, key=lambda c: c.priority, reverse=True)
|
|
|
|
def split_large_chunk(self, chunk: ParsedChunk) -> List[ParsedChunk]:
|
|
"""Split a large chunk into smaller pieces."""
|
|
if chunk.metadata.line_count <= self.config.max_chunk_size:
|
|
return [chunk]
|
|
|
|
lines = chunk.content.split('\n')
|
|
parts = []
|
|
current_part = []
|
|
current_lines = 0
|
|
|
|
for i, line in enumerate(lines):
|
|
current_part.append(line)
|
|
current_lines += 1
|
|
|
|
if current_lines >= self.config.max_chunk_size:
|
|
part_content = '\n'.join(current_part)
|
|
part_metadata = chunk.metadata
|
|
part_metadata.start_line = chunk.metadata.start_line + i - current_lines + 1
|
|
part_metadata.end_line = chunk.metadata.start_line + i
|
|
part_metadata.line_count = current_lines
|
|
|
|
parts.append(ParsedChunk(
|
|
name=f"{chunk.name}_part_{len(parts) + 1}",
|
|
chunk_type=chunk.chunk_type,
|
|
content=part_content,
|
|
metadata=part_metadata,
|
|
priority=chunk.priority // 2
|
|
))
|
|
current_part = []
|
|
current_lines = 0
|
|
|
|
if current_part:
|
|
part_content = '\n'.join(current_part)
|
|
part_metadata = chunk.metadata
|
|
part_metadata.start_line = chunk.metadata.start_line + len(lines) - current_lines
|
|
part_metadata.end_line = chunk.metadata.start_line + len(lines) - 1
|
|
part_metadata.line_count = current_lines
|
|
|
|
parts.append(ParsedChunk(
|
|
name=f"{chunk.name}_part_{len(parts) + 1}",
|
|
chunk_type=chunk.chunk_type,
|
|
content=part_content,
|
|
metadata=part_metadata,
|
|
priority=chunk.priority // 2
|
|
))
|
|
|
|
return parts
|
|
|
|
|
|
class ChunkPriority:
|
|
CRITICAL = 100
|
|
HIGH = 75
|
|
MEDIUM = 50
|
|
LOW = 25
|
|
MINIMAL = 10
|