Add CLI commands and main entry point
Some checks failed
Some checks failed
This commit is contained in:
317
loglens/cli/commands.py
Normal file
317
loglens/cli/commands.py
Normal file
@@ -0,0 +1,317 @@
|
||||
"""Click CLI commands for LogLens."""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
from colorlog import ColoredFormatter
|
||||
|
||||
from loglens.parsers.base import LogFormat
|
||||
from loglens.analyzers.analyzer import LogAnalyzer
|
||||
from loglens.formatters.table_formatter import TableFormatter
|
||||
from loglens.formatters.json_formatter import JSONFormatter
|
||||
from loglens.formatters.text_formatter import TextFormatter
|
||||
|
||||
|
||||
def setup_logging(verbosity: int = 0) -> None:
|
||||
"""Setup logging configuration."""
|
||||
log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"]
|
||||
level_idx = min(verbosity, len(log_levels) - 1)
|
||||
level = log_levels[level_idx]
|
||||
|
||||
handler = ColoredFormatter(
|
||||
"%(log_color)s%(levelname)-8s%(reset)s %(message)s",
|
||||
log_colors={
|
||||
"DEBUG": "cyan",
|
||||
"INFO": "green",
|
||||
"WARNING": "yellow",
|
||||
"ERROR": "red",
|
||||
"CRITICAL": "red,bg_white",
|
||||
}
|
||||
)
|
||||
|
||||
logger = logging.getLogger("loglens")
|
||||
logger.setLevel(level)
|
||||
logger.handlers = [handler]
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--verbosity", "-v", count=True, help="Increase output verbosity")
|
||||
@click.option("--config", type=click.Path(exists=True), help="Path to config file")
|
||||
@click.pass_context
|
||||
def main(ctx: click.Context, verbosity: int, config: str) -> None:
|
||||
"""LogLens - Parse, analyze, and summarize log files."""
|
||||
setup_logging(verbosity)
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["config"] = config
|
||||
|
||||
|
||||
@main.command("analyze")
|
||||
@click.argument("files", type=click.Path(exists=True), nargs=-1)
|
||||
@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]),
|
||||
default="auto", help="Log format (auto-detect by default)")
|
||||
@click.option("--output", type=click.Choice(["table", "json", "text"]), default="table",
|
||||
help="Output format")
|
||||
@click.option("--follow/--no-follow", default=False, help="Follow file changes")
|
||||
@click.option("--max-entries", type=int, default=100, help="Maximum entries to display")
|
||||
@click.option("--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)")
|
||||
@click.pass_context
|
||||
def analyze(
|
||||
ctx: click.Context,
|
||||
files: tuple,
|
||||
format: str,
|
||||
output: str,
|
||||
follow: bool,
|
||||
max_entries: int,
|
||||
json: bool
|
||||
) -> None:
|
||||
"""Analyze log files and display summary."""
|
||||
if json:
|
||||
output = "json"
|
||||
|
||||
if not files and not sys.stdin.isatty():
|
||||
lines = sys.stdin.readlines()
|
||||
_analyze_lines(lines, format, output, max_entries)
|
||||
elif not files:
|
||||
click.echo("Error: No log files specified. Use FILE or pipe data from stdin.")
|
||||
click.echo("Example: cat logfile.txt | loglens analyze")
|
||||
click.echo(" loglens analyze /var/log/syslog")
|
||||
ctx.exit(1)
|
||||
else:
|
||||
for file_path in files:
|
||||
_analyze_file(file_path, format, output, max_entries, follow)
|
||||
|
||||
|
||||
def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None:
|
||||
"""Analyze lines from stdin."""
|
||||
format_enum = None if format_str == "auto" else LogFormat(format_str)
|
||||
analyzer = LogAnalyzer()
|
||||
|
||||
if format_enum is None:
|
||||
result = analyzer.analyze(lines)
|
||||
else:
|
||||
result = analyzer.analyze(lines, format_enum)
|
||||
|
||||
_display_result(result, output, max_entries)
|
||||
|
||||
|
||||
def _analyze_file(file_path: str, format_str: str, output: str, max_entries: int, follow: bool) -> None:
|
||||
"""Analyze a single file."""
|
||||
format_enum = None if format_str == "auto" else LogFormat(format_str)
|
||||
analyzer = LogAnalyzer()
|
||||
|
||||
if follow:
|
||||
_follow_file(file_path, analyzer, format_enum, output, max_entries)
|
||||
else:
|
||||
result = analyzer.analyze_file(file_path, format_enum)
|
||||
_display_result(result, output, max_entries)
|
||||
|
||||
|
||||
def _follow_file(file_path: str, analyzer: LogAnalyzer, format: Optional[LogFormat],
|
||||
output: str, max_entries: int) -> None:
|
||||
"""Follow a file and analyze in real-time."""
|
||||
with open(file_path, "r") as f:
|
||||
f.seek(0, 2)
|
||||
buffer = []
|
||||
|
||||
click.echo(f"Following {file_path}... (Ctrl+C to stop)")
|
||||
|
||||
try:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if line:
|
||||
buffer.append(line)
|
||||
if len(buffer) >= 100:
|
||||
result = analyzer.analyze(buffer, format)
|
||||
_display_result(result, output, max_entries)
|
||||
buffer = []
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
except KeyboardInterrupt:
|
||||
if buffer:
|
||||
result = analyzer.analyze(buffer, format)
|
||||
_display_result(result, output, max_entries)
|
||||
|
||||
|
||||
def _display_result(result, output: str, max_entries: int) -> None:
|
||||
"""Display analysis result."""
|
||||
if output == "json":
|
||||
formatter = JSONFormatter()
|
||||
click.echo(formatter.format(result))
|
||||
elif output == "text":
|
||||
formatter = TextFormatter()
|
||||
click.echo(formatter.format(result))
|
||||
else:
|
||||
formatter = TableFormatter(max_entries=max_entries)
|
||||
formatter.format(result)
|
||||
|
||||
|
||||
@main.command("watch")
|
||||
@click.argument("files", type=click.Path(exists=True), nargs=-1)
|
||||
@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]),
|
||||
default="auto", help="Log format")
|
||||
@click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds")
|
||||
@click.option("--max-entries", type=int, default=50, help="Maximum entries per update")
|
||||
@click.pass_context
|
||||
def watch(
|
||||
ctx: click.Context,
|
||||
files: tuple,
|
||||
format: str,
|
||||
interval: float,
|
||||
max_entries: int
|
||||
) -> None:
|
||||
"""Watch log files and display live updates."""
|
||||
if not files:
|
||||
click.echo("Error: No files specified for watching.")
|
||||
ctx.exit(1)
|
||||
|
||||
format_enum = None if format == "auto" else LogFormat(format)
|
||||
analyzer = LogAnalyzer()
|
||||
|
||||
click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.")
|
||||
|
||||
try:
|
||||
while True:
|
||||
for file_path in files:
|
||||
result = analyzer.analyze_file(file_path, format_enum)
|
||||
|
||||
click.clear()
|
||||
click.echo(f"=== {file_path} ===")
|
||||
formatter = TableFormatter(max_entries=max_entries)
|
||||
formatter.format(result.entries)
|
||||
|
||||
time.sleep(interval)
|
||||
except KeyboardInterrupt:
|
||||
click.echo("\nStopped watching.")
|
||||
|
||||
|
||||
@main.command("report")
|
||||
@click.argument("files", type=click.Path(exists=True), nargs=-1)
|
||||
@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]),
|
||||
default="auto", help="Log format")
|
||||
@click.option("--output", type=click.Path(), help="Output file path (default: stdout)")
|
||||
@click.option("--json/--no-json", default=False, help="Output as JSON")
|
||||
@click.pass_context
|
||||
def report(
|
||||
ctx: click.Context,
|
||||
files: tuple,
|
||||
format: str,
|
||||
output: Optional[str],
|
||||
json: bool
|
||||
) -> None:
|
||||
"""Generate detailed analysis report."""
|
||||
if not files:
|
||||
click.echo("Error: No log files specified.")
|
||||
ctx.exit(1)
|
||||
|
||||
format_enum = None if format == "auto" else LogFormat(format)
|
||||
analyzer = LogAnalyzer()
|
||||
|
||||
all_results = []
|
||||
for file_path in files:
|
||||
result = analyzer.analyze_file(file_path, format_enum)
|
||||
all_results.append((file_path, result))
|
||||
|
||||
if json:
|
||||
formatter = JSONFormatter()
|
||||
report_data = {
|
||||
"files_analyzed": len(files),
|
||||
"results": [
|
||||
{"file": path, "analysis": result}
|
||||
for path, result in all_results
|
||||
]
|
||||
}
|
||||
report_text = formatter.format(report_data)
|
||||
else:
|
||||
lines = []
|
||||
lines.append("=" * 60)
|
||||
lines.append("LOG ANALYSIS REPORT")
|
||||
lines.append("=" * 60)
|
||||
lines.append(f"Files Analyzed: {len(files)}")
|
||||
lines.append("")
|
||||
|
||||
for file_path, result in all_results:
|
||||
lines.append(f"=== {file_path} ===")
|
||||
lines.append(f"Total Lines: {result.total_lines}")
|
||||
lines.append(f"Format: {result.format_detected.value}")
|
||||
lines.append(f"Critical: {result.critical_count} | Error: {result.error_count} | "
|
||||
f"Warning: {result.warning_count} | Info: {result.debug_count}")
|
||||
lines.append("")
|
||||
|
||||
if result.suggestions:
|
||||
lines.append("Suggestions:")
|
||||
for i, suggestion in enumerate(result.suggestions, 1):
|
||||
lines.append(f" {i}. {suggestion}")
|
||||
lines.append("")
|
||||
|
||||
report_text = "\n".join(lines)
|
||||
|
||||
if output:
|
||||
with open(output, "w") as f:
|
||||
f.write(report_text)
|
||||
click.echo(f"Report written to {output}")
|
||||
else:
|
||||
click.echo(report_text)
|
||||
|
||||
|
||||
@main.command("patterns")
|
||||
@click.option("--group", help="Filter by pattern group")
|
||||
@click.option("--severity", type=click.Choice(["critical", "error", "warning", "info", "debug"]),
|
||||
help="Filter by severity")
|
||||
@click.pass_context
|
||||
def patterns(ctx: click.Context, group: str, severity: str) -> None:
|
||||
"""List available error detection patterns."""
|
||||
analyzer = LogAnalyzer()
|
||||
patterns_by_group = analyzer.list_patterns_by_group()
|
||||
|
||||
if group:
|
||||
if group in patterns_by_group:
|
||||
patterns_to_show = {group: patterns_by_group[group]}
|
||||
else:
|
||||
click.echo(f"Unknown group: {group}")
|
||||
ctx.exit(1)
|
||||
else:
|
||||
patterns_to_show = patterns_by_group
|
||||
|
||||
formatter = TableFormatter()
|
||||
formatter.console.print("[bold]Available Error Patterns[/]")
|
||||
|
||||
for group_name, patterns in patterns_to_show.items():
|
||||
formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]")
|
||||
for pattern in patterns:
|
||||
severity_color = {
|
||||
"critical": "red",
|
||||
"error": "red",
|
||||
"warning": "yellow",
|
||||
"info": "blue",
|
||||
"debug": "grey"
|
||||
}.get(pattern["severity"], "white")
|
||||
formatter.console.print(
|
||||
f" [bold]{pattern['name']}[/] "
|
||||
f"[{severity_color}]({pattern['severity']})[/]"
|
||||
)
|
||||
if pattern["description"]:
|
||||
formatter.console.print(f" {pattern['description']}")
|
||||
|
||||
|
||||
@main.command("info")
|
||||
@click.pass_context
|
||||
def info(ctx: click.Context) -> None:
|
||||
"""Display LogLens information."""
|
||||
from loglens import __version__
|
||||
|
||||
click.echo(f"LogLens CLI v{__version__}")
|
||||
click.echo("")
|
||||
click.echo("Supported log formats:")
|
||||
click.echo(" - JSON (JSON Lines, arrays)")
|
||||
click.echo(" - Syslog (RFC 3164, RFC 5424)")
|
||||
click.echo(" - Apache/Nginx (Common, Combined, Custom)")
|
||||
click.echo("")
|
||||
click.echo("Commands:")
|
||||
click.echo(" analyze - Analyze log files")
|
||||
click.echo(" watch - Watch files in real-time")
|
||||
click.echo(" report - Generate detailed report")
|
||||
click.echo(" patterns - List error patterns")
|
||||
click.echo(" info - Show this information")
|
||||
Reference in New Issue
Block a user