212 lines
6.8 KiB
Python
212 lines
6.8 KiB
Python
"""Unit tests for log analyzer."""
|
|
|
|
import pytest
|
|
from loglens.analyzers.analyzer import LogAnalyzer, AnalysisResult
|
|
from loglens.analyzers.patterns import PatternLibrary, ErrorPattern
|
|
from loglens.analyzers.severity import SeverityClassifier, SeverityLevel
|
|
from loglens.parsers.base import LogFormat
|
|
|
|
|
|
class TestPatternLibrary:
|
|
"""Tests for PatternLibrary."""
|
|
|
|
def test_detect_python_exception(self):
|
|
"""Test detection of Python exceptions."""
|
|
library = PatternLibrary()
|
|
text = "Traceback (most recent call last):"
|
|
|
|
match = library.find_match(text)
|
|
|
|
assert match is not None
|
|
assert match[0].name == "Python Exception"
|
|
|
|
def test_detect_connection_error(self):
|
|
"""Test detection of connection errors."""
|
|
library = PatternLibrary()
|
|
text = "Connection refused: ECONNREFUSED"
|
|
|
|
match = library.find_match(text)
|
|
|
|
assert match is not None
|
|
assert match[0].name == "Connection Refused"
|
|
|
|
def test_detect_http_500(self):
|
|
"""Test detection of HTTP 500 errors."""
|
|
library = PatternLibrary()
|
|
text = "Response Status: 503"
|
|
|
|
match = library.find_match(text)
|
|
|
|
assert match is not None
|
|
assert match[0].name == "HTTP 5xx Error"
|
|
|
|
def test_detect_multiple_patterns(self):
|
|
"""Test detection of multiple patterns in text."""
|
|
library = PatternLibrary()
|
|
text = "Traceback (most recent call last): Connection refused"
|
|
|
|
matches = library.detect(text)
|
|
|
|
assert len(matches) >= 2
|
|
|
|
def test_disable_pattern(self):
|
|
"""Test disabling a pattern."""
|
|
library = PatternLibrary()
|
|
library.disable_pattern("Python Exception")
|
|
|
|
text = "This has no Python exception"
|
|
match = library.find_match(text)
|
|
|
|
assert match is None
|
|
|
|
def test_add_custom_pattern(self):
|
|
"""Test adding custom pattern."""
|
|
library = PatternLibrary()
|
|
custom = ErrorPattern(
|
|
name="Custom Error",
|
|
pattern="UNIQUE_CUSTOM_PATTERN_12345",
|
|
severity="error"
|
|
)
|
|
library.add_pattern(custom)
|
|
|
|
match = library.find_match("Something UNIQUE_CUSTOM_PATTERN_12345 happened")
|
|
|
|
assert match is not None
|
|
assert match[0].name == "Custom Error"
|
|
|
|
def test_list_patterns_by_group(self):
|
|
"""Test listing patterns by group."""
|
|
library = PatternLibrary()
|
|
groups = library.list_groups()
|
|
|
|
assert "exceptions" in groups
|
|
assert "network" in groups
|
|
assert "database" in groups
|
|
|
|
|
|
class TestSeverityClassifier:
|
|
"""Tests for SeverityClassifier."""
|
|
|
|
def test_classify_critical(self):
|
|
"""Test classification of critical severity."""
|
|
classifier = SeverityClassifier()
|
|
|
|
severity = classifier.classify(None, "System panic detected")
|
|
|
|
assert severity == SeverityLevel.CRITICAL
|
|
|
|
def test_classify_error(self):
|
|
"""Test classification of error severity."""
|
|
classifier = SeverityClassifier()
|
|
|
|
severity = classifier.classify("ERROR", "Something failed")
|
|
|
|
assert severity == SeverityLevel.ERROR
|
|
|
|
def test_classify_warning(self):
|
|
"""Test classification of warning severity."""
|
|
classifier = SeverityClassifier()
|
|
|
|
severity = classifier.classify("WARN", "This is deprecated")
|
|
|
|
assert severity == SeverityLevel.WARNING
|
|
|
|
def test_classify_from_log_level(self):
|
|
"""Test classification from log level."""
|
|
classifier = SeverityClassifier()
|
|
|
|
assert classifier.classify("FATAL", "") == SeverityLevel.CRITICAL
|
|
assert classifier.classify("ERR", "") == SeverityLevel.ERROR
|
|
assert classifier.classify("WARN", "") == SeverityLevel.WARNING
|
|
assert classifier.classify("INFO", "") == SeverityLevel.INFO
|
|
assert classifier.classify("DEBUG", "") == SeverityLevel.DEBUG
|
|
|
|
def test_classify_info_default(self):
|
|
"""Test default classification to info."""
|
|
classifier = SeverityClassifier()
|
|
|
|
severity = classifier.classify(None, "Normal operation")
|
|
|
|
assert severity == SeverityLevel.INFO
|
|
|
|
def test_get_severity_order(self):
|
|
"""Test severity ordering."""
|
|
classifier = SeverityClassifier()
|
|
order = classifier.get_severity_order()
|
|
|
|
assert order[0] == SeverityLevel.CRITICAL
|
|
assert order[-1] == SeverityLevel.UNKNOWN
|
|
|
|
|
|
class TestLogAnalyzer:
|
|
"""Tests for LogAnalyzer."""
|
|
|
|
def test_analyze_json_logs(self, analyzer, sample_json_logs):
|
|
"""Test analyzing JSON logs."""
|
|
result = analyzer.analyze(sample_json_logs)
|
|
|
|
assert result.format_detected == LogFormat.JSON
|
|
assert result.parsed_count == 3
|
|
assert result.error_count == 1
|
|
assert result.warning_count == 1
|
|
|
|
def test_analyze_syslog_logs(self, analyzer, sample_syslog_logs):
|
|
"""Test analyzing syslog."""
|
|
result = analyzer.analyze(sample_syslog_logs)
|
|
|
|
assert result.format_detected == LogFormat.SYSLOG
|
|
assert result.parsed_count == 5
|
|
|
|
def test_analyze_apache_logs(self, analyzer, sample_apache_logs):
|
|
"""Test analyzing Apache logs."""
|
|
result = analyzer.analyze(sample_apache_logs)
|
|
|
|
assert result.format_detected == LogFormat.APACHE
|
|
assert result.parsed_count == 4
|
|
assert result.error_count == 1
|
|
assert result.warning_count == 1
|
|
|
|
def test_error_pattern_detection(self, analyzer, error_logs):
|
|
"""Test error pattern detection."""
|
|
result = analyzer.analyze(error_logs)
|
|
|
|
assert result.error_count > 0
|
|
assert len(result.top_errors) > 0
|
|
|
|
def test_severity_breakdown(self, analyzer, error_logs):
|
|
"""Test severity breakdown calculation."""
|
|
result = analyzer.analyze(error_logs)
|
|
|
|
assert "error" in result.severity_breakdown
|
|
assert result.error_count > 0
|
|
|
|
def test_suggestions_generation(self, analyzer, error_logs):
|
|
"""Test suggestion generation."""
|
|
result = analyzer.analyze(error_logs)
|
|
|
|
assert len(result.suggestions) > 0
|
|
|
|
def test_analyze_file(self, analyzer, tmp_path):
|
|
"""Test analyzing a log file."""
|
|
log_file = tmp_path / "test.log"
|
|
log_file.write_text('{"level": "INFO", "message": "Test"}\n')
|
|
|
|
result = analyzer.analyze_file(str(log_file))
|
|
|
|
assert result.parsed_count == 1
|
|
|
|
def test_get_pattern_info(self, analyzer):
|
|
"""Test getting pattern information."""
|
|
info = analyzer.get_pattern_info("Python Exception")
|
|
|
|
assert info is not None
|
|
assert info["name"] == "Python Exception"
|
|
assert info["severity"] == "error"
|
|
|
|
def test_list_patterns_by_group(self, analyzer):
|
|
"""Test listing patterns by group."""
|
|
groups = analyzer.list_patterns_by_group()
|
|
|
|
assert "exceptions" in groups
|
|
assert len(groups["exceptions"]) > 0
|