Files
loglens-cli/loglens/cli/commands.py
7000pctAUTO 54aedeb5b2
Some checks failed
CI / test (3.10) (push) Failing after 4m50s
CI / test (3.11) (push) Failing after 4m52s
CI / test (3.12) (push) Failing after 4m48s
CI / test (3.9) (push) Failing after 4m53s
CI / lint (push) Failing after 4m46s
CI / build (push) Has been skipped
fix: resolve CI/CD linting and formatting issues
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035)
- Removed unused imports across all modules
- Fixed unused variables by replacing with _ prefix
- Added missing Optional type imports
- Reorganized imports for proper sorting (I001)
- Applied black formatting to all source files
2026-02-02 08:52:10 +00:00

331 lines
11 KiB
Python

'''Click CLI commands for LogLens.'''
import logging
import sys
import time
from typing import Optional
import click
from colorlog import ColoredFormatter
from loglens.analyzers.analyzer import LogAnalyzer
from loglens.formatters.json_formatter import JSONFormatter
from loglens.formatters.table_formatter import TableFormatter
from loglens.formatters.text_formatter import TextFormatter
from loglens.parsers.base import LogFormat
def setup_logging(verbosity: int = 0) -> None:
'''Setup logging configuration.'''
log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"]
level_idx = min(verbosity, len(log_levels) - 1)
level = log_levels[level_idx]
handler = ColoredFormatter(
"%(log_color)s%(levelname)-8s%(reset)s %(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
)
logger = logging.getLogger("loglens")
logger.setLevel(level)
logger.handlers = [handler]
@click.group()
@click.option("--verbosity", "-v", count=True, help="Increase output verbosity")
@click.option("--config", type=click.Path(exists=True), help="Path to config file")
@click.pass_context
def main(ctx: click.Context, verbosity: int, config: str) -> None:
'''LogLens - Parse, analyze, and summarize log files.'''
setup_logging(verbosity)
ctx.ensure_object(dict)
ctx.obj["config"] = config
@main.command("analyze")
@click.argument("files", type=click.Path(exists=True), nargs=-1)
@click.option(
"--format",
type=click.Choice(["json", "syslog", "apache", "auto"]),
default="auto",
help="Log format (auto-detect by default)",
)
@click.option(
"--output", type=click.Choice(["table", "json", "text"]), default="table", help="Output format"
)
@click.option("--follow/--no-follow", default=False, help="Follow file changes")
@click.option("--max-entries", type=int, default=100, help="Maximum entries to display")
@click.option(
"--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)"
)
@click.pass_context
def analyze(
ctx: click.Context,
files: tuple,
format: str,
output: str,
follow: bool,
max_entries: int,
json: bool,
) -> None:
'''Analyze log files and display summary.'''
if json:
output = "json"
if not files and not sys.stdin.isatty():
lines = sys.stdin.readlines()
_analyze_lines(lines, format, output, max_entries)
elif not files:
click.echo("Error: No log files specified. Use FILE or pipe data from stdin.")
click.echo("Example: cat logfile.txt | loglens analyze")
click.echo(" loglens analyze /var/log/syslog")
ctx.exit(1)
else:
for file_path in files:
_analyze_file(file_path, format, output, max_entries, follow)
def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None:
'''Analyze lines from stdin.'''
format_enum = None if format_str == "auto" else LogFormat(format_str)
analyzer = LogAnalyzer()
if format_enum is None:
result = analyzer.analyze(lines)
else:
result = analyzer.analyze(lines, format_enum)
_display_result(result, output, max_entries)
def _analyze_file(
file_path: str, format_str: str, output: str, max_entries: int, follow: bool
) -> None:
'''Analyze a single file.'''
format_enum = None if format_str == "auto" else LogFormat(format_str)
analyzer = LogAnalyzer()
if follow:
_follow_file(file_path, analyzer, format_enum, output, max_entries)
else:
result = analyzer.analyze_file(file_path, format_enum)
_display_result(result, output, max_entries)
def _follow_file(
file_path: str,
analyzer: LogAnalyzer,
format: Optional[LogFormat],
output: str,
max_entries: int,
) -> None:
'''Follow a file and analyze in real-time.'''
with open(file_path) as f:
f.seek(0, 2)
buffer = []
click.echo(f"Following {file_path}... (Ctrl+C to stop)")
try:
while True:
line = f.readline()
if line:
buffer.append(line)
if len(buffer) >= 100:
result = analyzer.analyze(buffer, format)
_display_result(result, output, max_entries)
buffer = []
else:
time.sleep(0.5)
except KeyboardInterrupt:
if buffer:
result = analyzer.analyze(buffer, format)
_display_result(result, output, max_entries)
def _display_result(result, output: str, max_entries: int) -> None:
'''Display analysis result.'''
if output == "json":
formatter = JSONFormatter()
click.echo(formatter.format(result))
elif output == "text":
formatter = TextFormatter()
click.echo(formatter.format(result))
else:
formatter = TableFormatter(max_entries=max_entries)
formatter.format(result)
@main.command("watch")
@click.argument("files", type=click.Path(exists=True), nargs=-1)
@click.option(
"--format",
type=click.Choice(["json", "syslog", "apache", "auto"]),
default="auto",
help="Log format",
)
@click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds")
@click.option("--max-entries", type=int, default=50, help="Maximum entries per update")
@click.pass_context
def watch(ctx: click.Context, files: tuple, format: str, interval: float, max_entries: int) -> None:
'''Watch log files and display live updates.'''
if not files:
click.echo("Error: No files specified for watching.")
ctx.exit(1)
format_enum = None if format == "auto" else LogFormat(format)
analyzer = LogAnalyzer()
click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.")
try:
while True:
for file_path in files:
result = analyzer.analyze_file(file_path, format_enum)
click.clear()
click.echo(f"=== {file_path} ===")
formatter = TableFormatter(max_entries=max_entries)
formatter.format(result.entries)
time.sleep(interval)
except KeyboardInterrupt:
click.echo("\nStopped watching.")
@main.command("report")
@click.argument("files", type=click.Path(exists=True), nargs=-1)
@click.option(
"--format",
type=click.Choice(["json", "syslog", "apache", "auto"]),
default="auto",
help="Log format",
)
@click.option("--output", type=click.Path(), help="Output file path (default: stdout)")
@click.option("--json/--no-json", default=False, help="Output as JSON")
@click.pass_context
def report(
ctx: click.Context, files: tuple, format: str, output: Optional[str], json: bool
) -> None:
'''Generate detailed analysis report.'''
if not files:
click.echo("Error: No log files specified.")
ctx.exit(1)
format_enum = None if format == "auto" else LogFormat(format)
analyzer = LogAnalyzer()
all_results = []
for file_path in files:
result = analyzer.analyze_file(file_path, format_enum)
all_results.append((file_path, result))
if json:
formatter = JSONFormatter()
report_data = {
"files_analyzed": len(files),
"results": [{"file": path, "analysis": result} for path, result in all_results],
}
report_text = formatter.format(report_data)
else:
lines = []
lines.append("=" * 60)
lines.append("LOG ANALYSIS REPORT")
lines.append("=" * 60)
lines.append(f"Files Analyzed: {len(files)}")
lines.append("")
for file_path, result in all_results:
lines.append(f"=== {file_path} ===")
lines.append(f"Total Lines: {result.total_lines}")
lines.append(f"Format: {result.format_detected.value}")
lines.append(
f"Critical: {result.critical_count} | Error: {result.error_count} | "
f"Warning: {result.warning_count} | Info: {result.debug_count}"
)
lines.append("")
if result.suggestions:
lines.append("Suggestions:")
for i, suggestion in enumerate(result.suggestions, 1):
lines.append(f" {i}. {suggestion}")
lines.append("")
report_text = "\n".join(lines)
if output:
with open(output, "w") as f:
f.write(report_text)
click.echo(f"Report written to {output}")
else:
click.echo(report_text)
@main.command("patterns")
@click.option("--group", help="Filter by pattern group")
@click.option(
"--severity",
type=click.Choice(["critical", "error", "warning", "info", "debug"]),
help="Filter by severity",
)
@click.pass_context
def patterns(ctx: click.Context, group: str, severity: str) -> None:
'''List available error detection patterns.'''
analyzer = LogAnalyzer()
patterns_by_group = analyzer.list_patterns_by_group()
if group:
if group in patterns_by_group:
patterns_to_show = {group: patterns_by_group[group]}
else:
click.echo(f"Unknown group: {group}")
ctx.exit(1)
else:
patterns_to_show = patterns_by_group
formatter = TableFormatter()
formatter.console.print("[bold]Available Error Patterns[/]")
for group_name, patterns in patterns_to_show.items():
formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]")
for pattern in patterns:
severity_color = {
"critical": "red",
"error": "red",
"warning": "yellow",
"info": "blue",
"debug": "grey",
}.get(pattern["severity"], "white")
formatter.console.print(
f" [bold]{pattern['name']}[/] " f"[{severity_color}]({pattern['severity']})[/]"
)
if pattern["description"]:
formatter.console.print(f" {pattern['description']}")
@main.command("info")
@click.pass_context
def info(ctx: click.Context) -> None:
'''Display LogLens information.'''
from loglens import __version__
click.echo(f"LogLens CLI v{__version__}")
click.echo("")
click.echo("Supported log formats:")
click.echo(" - JSON (JSON Lines, arrays)")
click.echo(" - Syslog (RFC 3164, RFC 5424)")
click.echo(" - Apache/Nginx (Common, Combined, Custom)")
click.echo("")
click.echo("Commands:")
click.echo(" analyze - Analyze log files")
click.echo(" watch - Watch files in real-time")
click.echo(" report - Generate detailed report")
click.echo(" patterns - List error patterns")
click.echo(" info - Show this information")