Add integration tests and fixtures
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled

This commit is contained in:
2026-02-02 08:08:55 +00:00
parent 17658d673d
commit 09841e258f

View File

@@ -0,0 +1,124 @@
"""Integration tests for LogLens."""
import pytest
import tempfile
from pathlib import Path
from loglens.parsers.base import LogFormat
from loglens.analyzers.analyzer import LogAnalyzer
from loglens.formatters.table_formatter import TableFormatter
from loglens.formatters.json_formatter import JSONFormatter
class TestEndToEnd:
"""End-to-end integration tests."""
def test_json_file_analysis(self, tmp_path):
"""Test complete JSON file analysis."""
log_content = """{"timestamp": "2024-01-15T10:30:00Z", "level": "INFO", "message": "App started"}
{"timestamp": "2024-01-15T10:30:01Z", "level": "ERROR", "message": "Connection failed"}
{"timestamp": "2024-01-15T10:30:02Z", "level": "WARNING", "message": "Deprecated API"}
"""
log_file = tmp_path / "app.log"
log_file.write_text(log_content)
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
assert result.format_detected == LogFormat.JSON
assert result.total_lines == 3
assert result.parsed_count == 3
assert result.error_count == 1
assert result.warning_count == 1
def test_syslog_file_analysis(self, tmp_path):
"""Test complete syslog file analysis."""
log_content = """Jan 15 10:30:00 server-01 systemd[1]: Started Application.
Jan 15 10:30:01 server-01 app[1234]: ERROR: Database connection failed
Jan 15 10:30:02 server-01 app[1234]: WARNING: High memory usage
"""
log_file = tmp_path / "syslog.log"
log_file.write_text(log_content)
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
assert result.format_detected == LogFormat.SYSLOG
assert result.parsed_count == 3
def test_apache_file_analysis(self, tmp_path):
"""Test complete Apache file analysis."""
log_content = '''192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /api/users HTTP/1.1" 200 1234
192.168.1.2 - - [15/Jan/2024:10:30:01 +0000] "POST /api/login HTTP/1.1" 401 567
192.168.1.3 - - [15/Jan/2024:10:30:02 +0000] "GET /api/orders HTTP/1.1" 500 4321
'''
log_file = tmp_path / "apache.log"
log_file.write_text(log_content)
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
assert result.format_detected == LogFormat.APACHE
assert result.error_count == 1
assert result.warning_count == 1
def test_json_output_format(self, tmp_path):
"""Test JSON output formatting."""
log_content = '{"level": "INFO", "message": "Test"}\n'
log_file = tmp_path / "test.log"
log_file.write_text(log_content)
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
formatter = JSONFormatter()
json_output = formatter.format(result)
assert "json" in json_output.lower() or "parsed" in json_output.lower()
def test_error_detection_in_real_logs(self, tmp_path):
"""Test error detection in realistic log scenarios."""
log_content = """{"timestamp": "2024-01-15T10:30:00Z", "level": "INFO", "message": "Application started"}
{"timestamp": "2024-01-15T10:30:01Z", "level": "ERROR", "message": "Traceback (most recent call last):"}
{"timestamp": "2024-01-15T10:30:02Z", "level": "ERROR", "message": "KeyError: 'user_id'"}
{"timestamp": "2024-01-15T10:30:03Z", "level": "INFO", "message": "Recovery successful"}
"""
log_file = tmp_path / "errors.log"
log_file.write_text(log_content)
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
assert result.error_count == 2
assert len(result.top_errors) > 0
def test_mixed_format_detection(self, tmp_path):
"""Test auto-detection with different log formats."""
log_content = """{"level": "INFO", "message": "JSON log entry"}
{"level": "ERROR", "message": "JSON error entry"}
"""
log_file = tmp_path / "mixed.log"
log_file.write_text(log_content)
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
assert result.parsed_count == 2
def test_large_file_handling(self, tmp_path):
"""Test handling of larger log files."""
lines = []
for i in range(1000):
if i % 10 == 0:
lines.append(f'{{"level": "ERROR", "message": "Error at line {i}"}}')
else:
lines.append(f'{{"level": "INFO", "message": "Line {i}"}}')
log_file = tmp_path / "large.log"
log_file.write_text("\n".join(lines))
analyzer = LogAnalyzer()
result = analyzer.analyze_file(str(log_file))
assert result.total_lines == 1000
assert result.error_count == 100