Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Jaeger Remote Sampling addition to contrib repo #3852

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,8 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
def shutdown(self):
"""Shut down the span processors added to the tracer provider."""
self._active_span_processor.shutdown()
if self.sampler is not None:
self.sampler.close()
if self._atexit_handler is not None:
atexit.unregister(self._atexit_handler)
self._atexit_handler = None
Expand Down
47 changes: 45 additions & 2 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class CustomSamplerFactory:
import os
from logging import getLogger
from types import MappingProxyType
from typing import Optional, Sequence
from typing import Any, Optional, Sequence

# pylint: disable=unused-import
from opentelemetry.context import Context
Expand All @@ -150,6 +150,13 @@ class CustomSamplerFactory:

_logger = getLogger(__name__)

# TODO(sconover): not sure what to do w/ these constants + associated attrs,
# the don't seem like a "fit" in this codebase. However these are central to the workings
# of the ported tests.
SAMPLER_TYPE_TAG_KEY = 'sampler.type'
SAMPLER_PARAM_TAG_KEY = 'sampler.param'

SAMPLER_TYPE_TRACE_ID_RATIO = 'traceidratio'

class Decision(enum.Enum):
# IsRecording() == false, span will not be recorded and all events and attributes will be dropped.
Expand Down Expand Up @@ -212,6 +219,27 @@ def should_sample(
def get_description(self) -> str:
pass

# TODO(sconover) added close to all samplers, because of cleanup needed
# for RemoteControlledSampler, in the contrib repo
# Q: Where should sampler.close() be called? I believe it might be
# TracerProvider#shutdown (but not entirely sure)
def close(self) -> None:
pass

# TODO(sconover): the adaptive sampler in the contrib repo makes use of sampler equality
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, self.__class__) and self.__dict__ == other.__dict__
)

def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)

# TODO(sconover): the jaeger tests (in the contrib repo) assert against the string value of a sampler,
# in order to detect what sampler is "in effect", e.g. in AdaptiveSampler
def __str__(self) -> str:
return self.get_description()


class StaticSampler(Sampler):
"""Sampler that always returns the same decision."""
Expand All @@ -231,6 +259,7 @@ def should_sample(
) -> "SamplingResult":
if self._decision is Decision.DROP:
attributes = None

return SamplingResult(
self._decision,
attributes,
Expand Down Expand Up @@ -263,6 +292,10 @@ def __init__(self, rate: float):
raise ValueError("Probability must be in range [0.0, 1.0].")
self._rate = rate
self._bound = self.get_bound_for_rate(self._rate)
self._attributes = {
SAMPLER_TYPE_TAG_KEY: SAMPLER_TYPE_TRACE_ID_RATIO,
SAMPLER_PARAM_TAG_KEY: rate
}

# For compatibility with 64 bit trace IDs, the sampler checks the 64
# low-order bits of the trace ID to decide whether to sample a given trace.
Expand Down Expand Up @@ -295,6 +328,16 @@ def should_sample(
decision = Decision.RECORD_AND_SAMPLE
if decision is Decision.DROP:
attributes = None

# TODO(sconover): the jaeger tests (in the contrib repo) really really want this probabilistic sampler
# to indicate key elements of internal state via attributes
# I expect this might be a controversial issue in code review
# (perhaps unacceptable aspect of the "port", especially since its existence service to support
# tests in an entirely different repo (opentelementry-python-contrib)
if attributes == None:
attributes = {}
attributes = {**self._attributes, **attributes}

return SamplingResult(
decision,
attributes,
Expand Down Expand Up @@ -413,7 +456,7 @@ class _ParentBasedAlwaysOn(ParentBased):
def __init__(self, _):
super().__init__(ALWAYS_ON)


# TODO(sconover): Something must be done to allow samplers from the contrib repo to be specified
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most immediate issue is what to do about _KNOWN_SAMPLERS - verification of sampler choice based on this dict excludes the possibility of contrib-repo samplers, I would appreciate advice about what to do about this.

_KNOWN_SAMPLERS = {
"always_on": ALWAYS_ON,
"always_off": ALWAYS_OFF,
Expand Down
18 changes: 16 additions & 2 deletions opentelemetry-sdk/tests/trace/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def test_probability_sampler(self):
attributes={"sampled.expect": "true"},
)
self.assertTrue(sampled_result.decision.is_sampled())
self.assertEqual(sampled_result.attributes, {"sampled.expect": "true"})
self.assertEqual(sampled_result.attributes, {"sampler.type": "traceidratio", "sampler.param": 0.5, "sampled.expect": "true"})
self.assertIsNone(sampled_result.trace_state)

not_sampled_result = sampler.should_sample(
Expand All @@ -231,7 +231,7 @@ def test_probability_sampler(self):
attributes={"sampled.expect": "false"},
)
self.assertFalse(not_sampled_result.decision.is_sampled())
self.assertEqual(not_sampled_result.attributes, {})
self.assertEqual(not_sampled_result.attributes, {"sampler.type": "traceidratio", "sampler.param": 0.5})
self.assertIsNone(sampled_result.trace_state)

def test_probability_sampler_zero(self):
Expand Down Expand Up @@ -537,3 +537,17 @@ def implicit_parent_context(span: trace.Span):
context_api.detach(token)

self.exec_parent_based(implicit_parent_context)

def test_sampler_equality(self):
const1 = sampling.StaticSampler(True)
const2 = sampling.StaticSampler(True)
const3 = sampling.StaticSampler(False)
self.assertEqual(const1, const2)
self.assertNotEqual(const1, const3)

prob1 = sampling.TraceIdRatioBased(rate=0.01)
prob2 = sampling.TraceIdRatioBased(rate=0.01)
prob3 = sampling.TraceIdRatioBased(rate=0.02)
self.assertEqual(prob1, prob2)
self.assertNotEqual(prob1, prob3)
self.assertNotEqual(const1, prob1)
6 changes: 5 additions & 1 deletion opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
Decision,
ParentBased,
StaticSampler,
Sampler,
)
from opentelemetry.sdk.util import BoundedDict, ns_to_iso_str
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
Expand Down Expand Up @@ -83,7 +84,8 @@ def test_extends_api(self):
self.assertIsInstance(tracer, trace_api.Tracer)

def test_shutdown(self):
tracer_provider = trace.TracerProvider()
mock_sampler = Mock(spec=Sampler)
tracer_provider = trace.TracerProvider(sampler=mock_sampler)

mock_processor1 = mock.Mock(spec=trace.SpanProcessor)
tracer_provider.add_span_processor(mock_processor1)
Expand All @@ -96,6 +98,8 @@ def test_shutdown(self):
self.assertEqual(mock_processor1.shutdown.call_count, 1)
self.assertEqual(mock_processor2.shutdown.call_count, 1)

self.assertEqual(mock_sampler.close.call_count, 1)

shutdown_python_code = """
import atexit
from unittest import mock
Expand Down