"""Unit tests for log parsers.""" import pytest from loglens.parsers.base import LogFormat, ParsedLogEntry from loglens.parsers.json_parser import JSONParser from loglens.parsers.syslog_parser import SyslogParser from loglens.parsers.apache_parser import ApacheParser from loglens.parsers.factory import ParserFactory class TestJSONParser: """Tests for JSON parser.""" def test_parse_valid_json(self): """Test parsing valid JSON log entry.""" parser = JSONParser() line = '{"timestamp": "2024-01-15T10:30:00Z", "level": "INFO", "message": "Test"}' entry = parser.parse(line, 1) assert entry is not None assert entry.raw_line == line assert entry.line_number == 1 assert entry.message == "Test" assert entry.level == "INFO" def test_can_parse_json(self): """Test can_parse detection for JSON.""" parser = JSONParser() assert parser.can_parse('{"key": "value"}') assert parser.can_parse('[1, 2, 3]') assert not parser.can_parse("not json") assert not parser.can_parse("") def test_parse_invalid_json(self): """Test parsing invalid JSON returns error entry.""" parser = JSONParser() line = '{"invalid": json}' entry = parser.parse(line, 1) assert entry is not None assert entry.severity == "error" assert "JSON parse error" in entry.message def test_parse_batch(self): """Test batch parsing.""" parser = JSONParser() lines = [ '{"level": "INFO", "message": "First"}', '{"level": "ERROR", "message": "Second"}', '{"level": "INFO", "message": "Third"}', ] results = parser.parse_batch(lines) assert len(results) == 3 assert results[0].message == "First" assert results[1].message == "Second" assert results[2].message == "Third" class TestSyslogParser: """Tests for Syslog parser.""" def test_parse_rfc3164(self): """Test parsing RFC 3164 syslog format.""" parser = SyslogParser() line = "Jan 15 10:30:00 server-01 app[1234]: Test message" entry = parser.parse(line, 1) assert entry is not None assert entry.host == "server-01" assert entry.logger == "app[1234]" assert entry.message == "Test message" def test_can_parse_syslog(self): """Test can_parse detection for syslog.""" parser = SyslogParser() assert parser.can_parse("Jan 15 10:30:00 server-01 app: message") assert parser.can_parse("<34>1 2024-01-15T10:30:00Z server-01 app - - - message") assert not parser.can_parse("not syslog format") def test_parse_priority(self): """Test parsing priority from syslog.""" parser = SyslogParser() line = "<34>1 2024-01-15T10:30:00Z server-01 app - - - message" entry = parser.parse(line, 1) assert entry is not None assert entry.facility == "auth" class TestApacheParser: """Tests for Apache/Nginx parser.""" def test_parse_combined_log(self): """Test parsing Apache combined log format.""" parser = ApacheParser() line = '192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /api HTTP/1.1" 200 1234 "-" "Mozilla"' entry = parser.parse(line, 1) assert entry is not None assert entry.host == "192.168.1.1" assert entry.level == "info" assert "GET /api" in entry.message def test_can_parse_apache(self): """Test can_parse detection for Apache logs.""" parser = ApacheParser() assert parser.can_parse('192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 200 1234') assert not parser.can_parse("not apache format") def test_error_log(self): """Test parsing Apache error log.""" parser = ApacheParser() line = '[Sat Jan 15 10:30:00.123456 2024] [mpm_prefork:notice] [pid 1234] AH00163: Apache configured' entry = parser.parse(line, 1) assert entry is not None assert entry.level == "notice" def test_status_code_inference(self): """Test severity inference from HTTP status codes.""" parser = ApacheParser() entry_500 = parser.parse('192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 500 123', 1) assert entry_500.level == "error" entry_404 = parser.parse('192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 404 123', 1) assert entry_404.level == "warning" class TestParserFactory: """Tests for ParserFactory.""" def test_detect_json_format(self): """Test format detection for JSON logs.""" factory = ParserFactory() line = '{"key": "value"}' format_detected = factory.detect_format(line) assert format_detected == LogFormat.JSON def test_detect_syslog_format(self): """Test format detection for syslog.""" factory = ParserFactory() line = "Jan 15 10:30:00 server-01 app: message" format_detected = factory.detect_format(line) assert format_detected == LogFormat.SYSLOG def test_detect_apache_format(self): """Test format detection for Apache logs.""" factory = ParserFactory() line = '192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 200 123' format_detected = factory.detect_format(line) assert format_detected == LogFormat.APACHE def test_parse_lines_with_auto_detection(self): """Test parsing lines with auto format detection.""" factory = ParserFactory() lines = [ '{"level": "INFO", "message": "Test"}', '{"level": "ERROR", "message": "Error test"}', ] results = factory.parse_lines(lines) assert len(results) == 2 def test_unknown_format(self): """Test handling of unknown format.""" factory = ParserFactory() lines = ["not a known format"] results = factory.parse_lines(lines) assert len(results) == 1 assert results[0].message == "Unknown format"