Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kernel): Add dmesg-like command #1732

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pwndbg/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ def load_commands() -> None:
import pwndbg.commands.kchecksec
import pwndbg.commands.kcmdline
import pwndbg.commands.kconfig
import pwndbg.commands.kdmesg
import pwndbg.commands.killthreads
import pwndbg.commands.kversion
import pwndbg.commands.leakfind
Expand Down
96 changes: 96 additions & 0 deletions pwndbg/commands/kdmesg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import argparse
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional

import pwndbg
from pwndbg.commands import CommandCategory
from pwndbg.gdblib.kernel import log

parser = argparse.ArgumentParser(description="Outputs the kernel log buffer")
parser.add_argument("-l", "--level", type=str, help="Filter by log levels")
parser.add_argument("-f", "--facility", type=str, help="Filter by facilities")

LEVEL_MAP = {
0: "EMERGENCY",
1: "ALERT",
2: "CRITICAL",
3: "ERROR",
4: "WARNING",
5: "NOTICE",
6: "INFORMATIONAL",
7: "DEBUG",
}

FACILITY_MAP = {
0: "KERNEL",
1: "USER",
2: "MAIL",
3: "DAEMONS",
4: "AUTH",
5: "SYSLOG",
6: "LPR",
7: "NETWORK NEWS",
}


@pwndbg.commands.ArgparsedCommand(
parser, aliases=["dmesg", "klog"], category=CommandCategory.KERNEL
)
@pwndbg.commands.OnlyWhenQemuKernel
@pwndbg.commands.OnlyWithKernelDebugSyms
def kdmesg(level: Optional[str], facility: Optional[str]) -> None:

facilities = _facilities(facility)
log_levels = _log_lvls(level)

logs = log.KernelLog().get_logs()
filtered_logs = filter_logs(logs, facilities, log_levels)
for _log in filtered_logs:
ts = float(_log["timestamp"]) / 1e9
text = _log["text"]
print(f"[{ts:12.6f}] {text}")


def filter_logs(
logs: Generator[Dict, None, None],
_facilities: Optional[List[int]] = None,
_log_levels: Optional[List[int]] = None,
):
return filter(
lambda log: (not _facilities or log["facility"] in _facilities)
and (not _log_levels or log["log_level"] in _log_levels),
logs,
)


def _get_values(usr_input: Optional[str], map_dict: Dict[int, str], error_msg: str) -> List[int]:
"""Convert the user-provided values to numerical based on a given map_dict"""
_values: List[int] = []

if usr_input is None:
return _values

for value in usr_input.split(","):
if value.isdigit() and int(value) in map_dict.keys():
_values.append(int(value))
continue

_value = value.upper()
for key, name in map_dict.items():
if _value in name:
_values.append(key)
break
else:
raise ValueError(error_msg.format(value=value))

return _values


def _log_lvls(levels: Optional[str]) -> List[int]:
return _get_values(levels, LEVEL_MAP, "Unrecognized log level: '{value}'")


def _facilities(facilities: Optional[str]) -> List[int]:
return _get_values(facilities, FACILITY_MAP, "Unrecognized facility: '{value}'")
68 changes: 68 additions & 0 deletions pwndbg/gdblib/kernel/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Some of the code here was inspired from https://github.com/osandov/drgn/

from typing import Dict
from typing import Generator
from typing import Union

import gdb

import pwndbg


class KernelLog:
def __init__(self) -> None:
self._prb = gdb.lookup_symbol("prb")[0].value()

def get_logs(self) -> Generator[Dict[str, Union[int, str]], None, None]:
# TODO/FIXME: currently only working for linux >= 5.10 (commit 896fbe20b4e2)
descriptor_ring = self._prb["desc_ring"]
descriptors = descriptor_ring["descs"]
infos = descriptor_ring["infos"]

tail_id = int(descriptor_ring["tail_id"]["counter"])
head_id = int(descriptor_ring["head_id"]["counter"])

ring_count_mask = 1 << int(descriptor_ring["count_bits"])
ring_data_size_mask = 1 << int(self._prb["text_data_ring"]["size_bits"])

text_data_start = int(self._prb["text_data_ring"]["data"])

for descriptor_id in range(tail_id, head_id + 1):
descriptor_id %= ring_count_mask
descriptor = descriptors[descriptor_id]

state_var = 3 & (
int(descriptor["state_var"]["counter"]) >> (pwndbg.gdblib.arch.ptrsize * 8 - 2)
)
if state_var not in [1, 2]:
# Skip non-committed record
continue

info = infos[descriptor_id]
text_length = int(info["text_len"])

if text_length == 0:
# Skip data-less record
continue

# TODO: handle wrapping data block

text_start = (
text_data_start + int(descriptor["text_blk_lpos"]["begin"]) % ring_data_size_mask
)
# skip over descriptor id
text_start += pwndbg.gdblib.arch.ptrsize

text_data = pwndbg.gdblib.memory.read(text_start, text_length)

text = text_data.decode(encoding="utf8", errors="replace")
timestamp = int(info["ts_nsec"]) # timestamp in nanoseconds
log_level = int(info["level"]) # syslog level
facility = int(info["facility"])

yield {
"timestamp": timestamp,
"text": text,
"log_level": log_level,
"facility": facility,
}
9 changes: 9 additions & 0 deletions tests/qemu-tests/tests/system/test_commands_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,12 @@ def get_slab_object_address():
if len(matches) > 0:
return (matches[0], cache_name)
raise ValueError("Could not find any slab objects")


def test_command_kdmesg():
res = gdb.execute("kdmesg", to_string=True)

if not pwndbg.gdblib.kernel.has_debug_syms():
assert "may only be run when debugging a Linux kernel with debug" in res
else:
assert "[ 0.000000]" in res