diff --git a/tests/integration/test_attach.py b/tests/integration/test_attach.py index 8fc1c78932..f321087a53 100644 --- a/tests/integration/test_attach.py +++ b/tests/integration/test_attach.py @@ -52,22 +52,6 @@ def baz(): """ -def compare_allocations(allocations1, allocations2): - assert len(allocations1) == len(allocations2) - for i in range(0, len(allocations1)): - assert allocations1[i].allocator == allocations2[i].allocator - assert allocations1[i].n_allocations == allocations2[i].n_allocations - assert allocations1[i].size == allocations2[i].size - assert allocations1[i].stack_id == allocations2[i].stack_id - assert allocations1[i].tid == allocations2[i].tid - assert allocations1[i].native_stack_id == allocations2[i].native_stack_id - assert ( - allocations1[i].native_segment_generation - == allocations2[i].native_segment_generation - ) - assert allocations1[i].thread_name == allocations2[i].thread_name - - def generate_command(method, output, aggregate): cmd = [ sys.executable, @@ -124,12 +108,11 @@ def run_process(cmd): assert tracked_process.returncode == 0 -def get_functions(allocations): - (valloc,) = allocations - return [f[0] for f in valloc.stack_trace()] +def get_call_stack(allocation): + return [f[0] for f in allocation.stack_trace()] -def get_relevant_allocations(records): +def get_relevant_vallocs(records): return [ record for record in filter_relevant_allocations(records) @@ -138,50 +121,42 @@ def get_relevant_allocations(records): @pytest.mark.parametrize("method", ["lldb", "gdb"]) -@pytest.mark.parametrize("aggregate", [True, False]) -def test_basic_attach(tmp_path, method, aggregate): +def test_basic_attach(tmp_path, method): if not debugger_available(method): pytest.skip(f"a supported {method} debugger isn't installed") # GIVEN output = tmp_path / "test.bin" + attach_cmd = generate_command(method, output, aggregate=False) - attach_cmd = generate_command(method, output, aggregate) - + # WHEN run_process(attach_cmd) + # THEN reader = FileReader(output) + (valloc,) = get_relevant_vallocs(reader.get_allocation_records()) + assert get_call_stack(valloc) == ["valloc", "baz", "bar", "foo", ""] + + +@pytest.mark.parametrize("method", ["lldb", "gdb"]) +def test_aggregated_attach(tmp_path, method): + if not debugger_available(method): + pytest.skip(f"a supported {method} debugger isn't installed") + + # GIVEN + output = tmp_path / "test.bin" + attach_cmd = generate_command(method, output, aggregate=True) # WHEN - try: - hwa_allocation_records = list(reader.get_high_watermark_allocation_records()) - assert hwa_allocation_records is not None - allocation_records = list(reader.get_allocation_records()) - except NotImplementedError as exc: - if aggregate: - assert ( - "Can't get all allocations from a pre-aggregated capture file." - in str(exc) - ) - - hwa_relevant_allocations_records = get_relevant_allocations(hwa_allocation_records) - relevant_allocations_records = ( - get_relevant_allocations(allocation_records) if not aggregate else [] - ) + run_process(attach_cmd) - if not aggregate: - assert get_functions(hwa_relevant_allocations_records) == get_functions( - relevant_allocations_records - ) - else: - output_no_aggregate = tmp_path / "test.bin" - attach_cmd = generate_command(method, output_no_aggregate, False) - run_process(attach_cmd) - - reader = FileReader(output_no_aggregate) - allocation_records = list(reader.get_high_watermark_allocation_records()) - relevant_allocations_records = get_relevant_allocations(allocation_records) - - compare_allocations( - relevant_allocations_records, hwa_relevant_allocations_records - ) + # THEN + reader = FileReader(output) + with pytest.raises( + NotImplementedError, + match="Can't get all allocations from a pre-aggregated capture file.", + ): + list(reader.get_allocation_records()) + + (valloc,) = get_relevant_vallocs(reader.get_high_watermark_allocation_records()) + assert get_call_stack(valloc) == ["valloc", "baz", "bar", "foo", ""]