191 lines
6.0 KiB
Python
191 lines
6.0 KiB
Python
"""Unit tests for log parsers."""
|
|
|
|
from loglens.parsers.apache_parser import ApacheParser
|
|
from loglens.parsers.base import LogFormat
|
|
from loglens.parsers.factory import ParserFactory
|
|
from loglens.parsers.json_parser import JSONParser
|
|
from loglens.parsers.syslog_parser import SyslogParser
|
|
|
|
|
|
class TestJSONParser:
|
|
"""Tests for JSON parser."""
|
|
|
|
def test_parse_valid_json(self):
|
|
"""Test parsing valid JSON log entry."""
|
|
parser = JSONParser()
|
|
line = '{"timestamp": "2024-01-15T10:30:00Z", "level": "INFO", "message": "Test"}'
|
|
|
|
entry = parser.parse(line, 1)
|
|
|
|
assert entry is not None
|
|
assert entry.raw_line == line
|
|
assert entry.line_number == 1
|
|
assert entry.message == "Test"
|
|
assert entry.level == "INFO"
|
|
|
|
def test_can_parse_json(self):
|
|
"""Test can_parse detection for JSON."""
|
|
parser = JSONParser()
|
|
|
|
assert parser.can_parse('{"key": "value"}')
|
|
assert parser.can_parse("[1, 2, 3]")
|
|
assert not parser.can_parse("not json")
|
|
assert not parser.can_parse("")
|
|
|
|
def test_parse_invalid_json(self):
|
|
"""Test parsing invalid JSON returns error entry."""
|
|
parser = JSONParser()
|
|
line = '{"invalid": json}'
|
|
|
|
entry = parser.parse(line, 1)
|
|
|
|
assert entry is not None
|
|
assert entry.severity == "error"
|
|
assert "JSON parse error" in entry.message
|
|
|
|
def test_parse_batch(self):
|
|
"""Test batch parsing."""
|
|
parser = JSONParser()
|
|
lines = [
|
|
'{"level": "INFO", "message": "First"}',
|
|
'{"level": "ERROR", "message": "Second"}',
|
|
'{"level": "INFO", "message": "Third"}',
|
|
]
|
|
|
|
results = parser.parse_batch(lines)
|
|
|
|
assert len(results) == 3
|
|
assert results[0].message == "First"
|
|
assert results[1].message == "Second"
|
|
assert results[2].message == "Third"
|
|
|
|
|
|
class TestSyslogParser:
|
|
"""Tests for Syslog parser."""
|
|
|
|
def test_parse_rfc3164(self):
|
|
"""Test parsing RFC 3164 syslog format."""
|
|
parser = SyslogParser()
|
|
line = "Jan 15 10:30:00 server-01 app[1234]: Test message"
|
|
|
|
entry = parser.parse(line, 1)
|
|
|
|
assert entry is not None
|
|
assert entry.host == "server-01"
|
|
assert entry.logger == "app[1234]"
|
|
assert entry.message == "Test message"
|
|
|
|
def test_can_parse_syslog(self):
|
|
"""Test can_parse detection for syslog."""
|
|
parser = SyslogParser()
|
|
|
|
assert parser.can_parse("Jan 15 10:30:00 server-01 app: message")
|
|
assert parser.can_parse("<34>1 2024-01-15T10:30:00Z server-01 app - - - message")
|
|
assert not parser.can_parse("not syslog format")
|
|
|
|
def test_parse_priority(self):
|
|
"""Test parsing priority from syslog."""
|
|
parser = SyslogParser()
|
|
line = "<34>1 2024-01-15T10:30:00Z server-01 app - - - message"
|
|
|
|
entry = parser.parse(line, 1)
|
|
|
|
assert entry is not None
|
|
assert entry.facility == "auth"
|
|
|
|
|
|
class TestApacheParser:
|
|
"""Tests for Apache/Nginx parser."""
|
|
|
|
def test_parse_combined_log(self):
|
|
"""Test parsing Apache combined log format."""
|
|
parser = ApacheParser()
|
|
line = '192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /api HTTP/1.1" 200 1234 "-" "Mozilla"'
|
|
|
|
entry = parser.parse(line, 1)
|
|
|
|
assert entry is not None
|
|
assert entry.host == "192.168.1.1"
|
|
assert entry.level == "info"
|
|
assert "GET /api" in entry.message
|
|
|
|
def test_can_parse_apache(self):
|
|
"""Test can_parse detection for Apache logs."""
|
|
parser = ApacheParser()
|
|
|
|
assert parser.can_parse('192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 200 1234')
|
|
assert not parser.can_parse("not apache format")
|
|
|
|
def test_error_log(self):
|
|
"""Test parsing Apache error log."""
|
|
parser = ApacheParser()
|
|
line = "[Sat Jan 15 10:30:00.123456 2024] [mpm_prefork:notice] [pid 1234] AH00163: Apache configured"
|
|
|
|
entry = parser.parse(line, 1)
|
|
|
|
assert entry is not None
|
|
assert entry.level == "notice"
|
|
|
|
def test_status_code_inference(self):
|
|
"""Test severity inference from HTTP status codes."""
|
|
parser = ApacheParser()
|
|
|
|
entry_500 = parser.parse('192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 500 123', 1)
|
|
assert entry_500.level == "error"
|
|
|
|
entry_404 = parser.parse('192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 404 123', 1)
|
|
assert entry_404.level == "warning"
|
|
|
|
|
|
class TestParserFactory:
|
|
"""Tests for ParserFactory."""
|
|
|
|
def test_detect_json_format(self):
|
|
"""Test format detection for JSON logs."""
|
|
factory = ParserFactory()
|
|
line = '{"key": "value"}'
|
|
|
|
format_detected = factory.detect_format(line)
|
|
|
|
assert format_detected == LogFormat.JSON
|
|
|
|
def test_detect_syslog_format(self):
|
|
"""Test format detection for syslog."""
|
|
factory = ParserFactory()
|
|
line = "Jan 15 10:30:00 server-01 app: message"
|
|
|
|
format_detected = factory.detect_format(line)
|
|
|
|
assert format_detected == LogFormat.SYSLOG
|
|
|
|
def test_detect_apache_format(self):
|
|
"""Test format detection for Apache logs."""
|
|
factory = ParserFactory()
|
|
line = '192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /" 200 123'
|
|
|
|
format_detected = factory.detect_format(line)
|
|
|
|
assert format_detected == LogFormat.APACHE
|
|
|
|
def test_parse_lines_with_auto_detection(self):
|
|
"""Test parsing lines with auto format detection."""
|
|
factory = ParserFactory()
|
|
lines = [
|
|
'{"level": "INFO", "message": "Test"}',
|
|
'{"level": "ERROR", "message": "Error test"}',
|
|
]
|
|
|
|
results = factory.parse_lines(lines)
|
|
|
|
assert len(results) == 2
|
|
|
|
def test_unknown_format(self):
|
|
"""Test handling of unknown format."""
|
|
factory = ParserFactory()
|
|
lines = ["not a known format"]
|
|
|
|
results = factory.parse_lines(lines)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].message == "Unknown format"
|