Python: Fix Python OTel usage detail attributes (#6493)

* fix python otel usage detail attributes

Map cached/read/reasoning usage detail fields to standard OTel GenAI attributes while preserving provider-specific legacy keys.

Add focused coverage for direct response spans, aggregated agent spans, and provider usage parsing.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* address usage detail review feedback

Omit missing OpenAI Responses usage detail counts while preserving zero-valued counts.

Record zero-valued token usage in OTel histograms and add regression coverage.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-06-15 09:10:14 +02:00
committed by GitHub
Unverified
parent d7027fc1f9
commit d7e8d2206d
9 changed files with 239 additions and 23 deletions
@@ -1024,8 +1024,10 @@ class RawAnthropicClient(
usage_details["input_token_count"] = usage.input_tokens
if usage.cache_creation_input_tokens is not None:
usage_details["anthropic.cache_creation_input_tokens"] = usage.cache_creation_input_tokens # type: ignore[typeddict-unknown-key]
usage_details["cache_creation_input_token_count"] = usage.cache_creation_input_tokens
if usage.cache_read_input_tokens is not None:
usage_details["anthropic.cache_read_input_tokens"] = usage.cache_read_input_tokens # type: ignore[typeddict-unknown-key]
usage_details["cache_read_input_token_count"] = usage.cache_read_input_tokens
return usage_details
def _parse_contents_from_anthropic(
@@ -2354,6 +2354,27 @@ def test_parse_usage_with_cache_tokens(mock_anthropic_client: MagicMock) -> None
assert result["input_token_count"] == 100
assert result["anthropic.cache_creation_input_tokens"] == 20
assert result["anthropic.cache_read_input_tokens"] == 30
assert result["cache_creation_input_token_count"] == 20
assert result["cache_read_input_token_count"] == 30
def test_parse_usage_preserves_zero_cache_tokens(mock_anthropic_client: MagicMock) -> None:
"""Test parsing usage preserves zero-valued mapped cache tokens."""
client = create_test_anthropic_client(mock_anthropic_client)
mock_usage = MagicMock()
mock_usage.input_tokens = 100
mock_usage.output_tokens = 50
mock_usage.cache_creation_input_tokens = 0
mock_usage.cache_read_input_tokens = 0
result = client._parse_usage_from_anthropic(mock_usage)
assert result is not None
assert result["anthropic.cache_creation_input_tokens"] == 0
assert result["cache_creation_input_token_count"] == 0
assert result["anthropic.cache_read_input_tokens"] == 0
assert result["cache_read_input_token_count"] == 0
# Code Execution Result Tests
@@ -400,12 +400,18 @@ class UsageDetails(TypedDict, total=False, extra_items=int): # type: ignore[cal
input_token_count: The number of input tokens used.
output_token_count: The number of output tokens generated.
total_token_count: The total number of tokens (input + output).
cache_creation_input_token_count: The number of input tokens written to a provider-managed cache.
cache_read_input_token_count: The number of input tokens served from a provider-managed cache.
reasoning_output_token_count: The number of output tokens used for reasoning.
"""
input_token_count: int | None
output_token_count: int | None
total_token_count: int | None
cache_creation_input_token_count: int | None
cache_read_input_token_count: int | None
reasoning_output_token_count: int | None
def add_usage_details(usage1: UsageDetails | None, usage2: UsageDetails | None) -> UsageDetails:
@@ -201,6 +201,9 @@ class OtelAttr(str, Enum):
# Usage attributes
INPUT_TOKENS = "gen_ai.usage.input_tokens"
OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
CACHE_CREATION_INPUT_TOKENS = "gen_ai.usage.cache_creation.input_tokens"
CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens"
REASONING_OUTPUT_TOKENS = "gen_ai.usage.reasoning.output_tokens"
# Tool attributes
TOOL_CALL_ID = "gen_ai.tool.call.id"
TOOL_DESCRIPTION = "gen_ai.tool.description"
@@ -327,6 +330,20 @@ FINISH_REASON_MAP = {
"tool_calls": "tool_call",
"length": "length",
}
USAGE_DETAIL_TO_OTEL_ATTR: Final[tuple[tuple[str, OtelAttr], ...]] = (
("input_token_count", OtelAttr.INPUT_TOKENS),
("output_token_count", OtelAttr.OUTPUT_TOKENS),
("cache_creation_input_token_count", OtelAttr.CACHE_CREATION_INPUT_TOKENS),
("cache_read_input_token_count", OtelAttr.CACHE_READ_INPUT_TOKENS),
("reasoning_output_token_count", OtelAttr.REASONING_OUTPUT_TOKENS),
("anthropic.cache_creation_input_tokens", OtelAttr.CACHE_CREATION_INPUT_TOKENS),
("anthropic.cache_read_input_tokens", OtelAttr.CACHE_READ_INPUT_TOKENS),
("openai.cached_input_tokens", OtelAttr.CACHE_READ_INPUT_TOKENS),
("prompt/cached_tokens", OtelAttr.CACHE_READ_INPUT_TOKENS),
("openai.reasoning_tokens", OtelAttr.REASONING_OUTPUT_TOKENS),
("completion/reasoning_tokens", OtelAttr.REASONING_OUTPUT_TOKENS),
("reasoning_tokens", OtelAttr.REASONING_OUTPUT_TOKENS),
)
# region Telemetry utils
@@ -2350,12 +2367,16 @@ def _apply_accumulated_usage(attributes: dict[str, Any], captured_fields: set[st
accumulated = INNER_ACCUMULATED_USAGE.get()
if not accumulated:
return
input_tokens = accumulated.get("input_token_count")
if input_tokens:
attributes[OtelAttr.INPUT_TOKENS] = input_tokens
output_tokens = accumulated.get("output_token_count")
if output_tokens:
attributes[OtelAttr.OUTPUT_TOKENS] = output_tokens
_apply_usage_attributes(attributes, accumulated)
def _apply_usage_attributes(attributes: dict[str, Any], usage: Mapping[str, Any]) -> None:
"""Apply known usage details as standard OTel GenAI attributes."""
for usage_key, otel_attr in USAGE_DETAIL_TO_OTEL_ATTR:
value = usage.get(usage_key)
if value is None or isinstance(value, bool) or not isinstance(value, int):
continue
attributes.setdefault(otel_attr, value)
def _get_response_attributes(
@@ -2378,12 +2399,7 @@ def _get_response_attributes(
if model := getattr(response, "model", None):
attributes[OtelAttr.RESPONSE_MODEL] = model
if capture_usage and (usage := response.usage_details):
input_tokens = usage.get("input_token_count")
if input_tokens:
attributes[OtelAttr.INPUT_TOKENS] = input_tokens
output_tokens = usage.get("output_token_count")
if output_tokens:
attributes[OtelAttr.OUTPUT_TOKENS] = output_tokens
_apply_usage_attributes(attributes, usage)
return attributes
@@ -2407,9 +2423,9 @@ def _capture_response(
"""Set the response for a given span."""
span.set_attributes(attributes)
attrs: dict[str, Any] = {k: v for k, v in attributes.items() if k in GEN_AI_METRIC_ATTRIBUTES}
if token_usage_histogram and (input_tokens := attributes.get(OtelAttr.INPUT_TOKENS)):
if token_usage_histogram and (input_tokens := attributes.get(OtelAttr.INPUT_TOKENS)) is not None:
token_usage_histogram.record(input_tokens, attributes={**attrs, OtelAttr.T_TYPE: OtelAttr.T_TYPE_INPUT})
if token_usage_histogram and (output_tokens := attributes.get(OtelAttr.OUTPUT_TOKENS)):
if token_usage_histogram and (output_tokens := attributes.get(OtelAttr.OUTPUT_TOKENS)) is not None:
token_usage_histogram.record(output_tokens, {**attrs, OtelAttr.T_TYPE: OtelAttr.T_TYPE_OUTPUT})
if operation_duration_histogram and duration is not None:
if OtelAttr.ERROR_TYPE in attributes:
@@ -2154,6 +2154,58 @@ def test_get_response_attributes_with_usage():
assert result[OtelAttr.OUTPUT_TOKENS] == 50
def test_get_response_attributes_with_additional_usage():
"""Test _get_response_attributes maps additional usage details to OTel attributes."""
from unittest.mock import Mock
from agent_framework.observability import OtelAttr, _get_response_attributes
response = Mock()
response.response_id = None
response.finish_reason = None
response.raw_representation = None
response.usage_details = {
"input_token_count": 0,
"output_token_count": 50,
"cache_creation_input_token_count": 10,
"cache_read_input_token_count": 0,
"reasoning_output_token_count": 30,
}
attrs = {}
result = _get_response_attributes(attrs, response)
assert result[OtelAttr.INPUT_TOKENS] == 0
assert result[OtelAttr.OUTPUT_TOKENS] == 50
assert result[OtelAttr.CACHE_CREATION_INPUT_TOKENS] == 10
assert result[OtelAttr.CACHE_READ_INPUT_TOKENS] == 0
assert result[OtelAttr.REASONING_OUTPUT_TOKENS] == 30
def test_get_response_attributes_maps_legacy_usage_keys():
"""Test _get_response_attributes maps legacy provider usage keys to standard OTel attributes."""
from unittest.mock import Mock
from agent_framework.observability import OtelAttr, _get_response_attributes
response = Mock()
response.response_id = None
response.finish_reason = None
response.raw_representation = None
response.usage_details = {
"anthropic.cache_creation_input_tokens": 12,
"openai.cached_input_tokens": 0,
"completion/reasoning_tokens": 34,
}
attrs = {}
result = _get_response_attributes(attrs, response)
assert result[OtelAttr.CACHE_CREATION_INPUT_TOKENS] == 12
assert result[OtelAttr.CACHE_READ_INPUT_TOKENS] == 0
assert result[OtelAttr.REASONING_OUTPUT_TOKENS] == 34
def test_get_response_attributes_capture_usage_false():
"""Test _get_response_attributes skips usage when capture_usage is False."""
from unittest.mock import Mock
@@ -2164,13 +2216,22 @@ def test_get_response_attributes_capture_usage_false():
response.response_id = None
response.finish_reason = None
response.raw_representation = None
response.usage_details = {"input_token_count": 100, "output_token_count": 50}
response.usage_details = {
"input_token_count": 100,
"output_token_count": 50,
"cache_creation_input_token_count": 10,
"cache_read_input_token_count": 20,
"reasoning_output_token_count": 30,
}
attrs = {}
result = _get_response_attributes(attrs, response, capture_usage=False)
assert OtelAttr.INPUT_TOKENS not in result
assert OtelAttr.OUTPUT_TOKENS not in result
assert OtelAttr.CACHE_CREATION_INPUT_TOKENS not in result
assert OtelAttr.CACHE_READ_INPUT_TOKENS not in result
assert OtelAttr.REASONING_OUTPUT_TOKENS not in result
def test_get_response_attributes_capture_response_id_false():
@@ -2933,6 +2994,23 @@ def test_capture_response(span_exporter: InMemorySpanExporter):
assert spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 50
def test_capture_response_records_zero_token_usage():
"""Test _capture_response records zero-valued token usage."""
from agent_framework.observability import OtelAttr, _capture_response
span = Mock()
token_histogram = Mock()
attrs = {
OtelAttr.INPUT_TOKENS: 0,
OtelAttr.OUTPUT_TOKENS: 0,
}
_capture_response(span=span, attributes=attrs, token_usage_histogram=token_histogram)
span.set_attributes.assert_called_once_with(attrs)
assert token_histogram.record.call_count == 2
async def test_layer_ordering_span_sequence_with_function_calling(span_exporter: InMemorySpanExporter):
"""Test that with correct layer ordering, spans appear in the expected sequence.
@@ -3937,11 +4015,21 @@ async def test_agent_invoke_span_aggregates_usage_across_tool_calls(span_exporte
Content.from_function_call(call_id="call_1", name="get_weather", arguments='{"city": "Seattle"}')
],
),
usage_details=UsageDetails(input_token_count=2239, output_token_count=192),
usage_details=UsageDetails(
input_token_count=2239,
output_token_count=192,
cache_read_input_token_count=100,
reasoning_output_token_count=25,
),
),
ChatResponse(
messages=Message(role="assistant", contents=["The weather in Seattle is sunny."]),
usage_details=UsageDetails(input_token_count=2569, output_token_count=99),
usage_details=UsageDetails(
input_token_count=2569,
output_token_count=99,
cache_read_input_token_count=200,
reasoning_output_token_count=0,
),
),
]
@@ -3965,12 +4053,18 @@ async def test_agent_invoke_span_aggregates_usage_across_tool_calls(span_exporte
# Individual chat spans retain their own usage
assert chat_spans[0].attributes.get(OtelAttr.INPUT_TOKENS) == 2239
assert chat_spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 192
assert chat_spans[0].attributes.get(OtelAttr.CACHE_READ_INPUT_TOKENS) == 100
assert chat_spans[0].attributes.get(OtelAttr.REASONING_OUTPUT_TOKENS) == 25
assert chat_spans[1].attributes.get(OtelAttr.INPUT_TOKENS) == 2569
assert chat_spans[1].attributes.get(OtelAttr.OUTPUT_TOKENS) == 99
assert chat_spans[1].attributes.get(OtelAttr.CACHE_READ_INPUT_TOKENS) == 200
assert chat_spans[1].attributes.get(OtelAttr.REASONING_OUTPUT_TOKENS) == 0
# The invoke_agent span must report the aggregate across all LLM round-trips
assert agent_span.attributes.get(OtelAttr.INPUT_TOKENS) == 2239 + 2569
assert agent_span.attributes.get(OtelAttr.OUTPUT_TOKENS) == 192 + 99
assert agent_span.attributes.get(OtelAttr.CACHE_READ_INPUT_TOKENS) == 100 + 200
assert agent_span.attributes.get(OtelAttr.REASONING_OUTPUT_TOKENS) == 25
@pytest.mark.parametrize("enable_sensitive_data", [False], indirect=True)
@@ -2979,10 +2979,16 @@ class RawOpenAIChatClient( # type: ignore[misc]
output_token_count=usage.output_tokens,
total_token_count=usage.total_tokens,
)
if usage.input_tokens_details and usage.input_tokens_details.cached_tokens:
details["openai.cached_input_tokens"] = usage.input_tokens_details.cached_tokens # type: ignore[typeddict-unknown-key]
if usage.output_tokens_details and usage.output_tokens_details.reasoning_tokens:
details["openai.reasoning_tokens"] = usage.output_tokens_details.reasoning_tokens # type: ignore[typeddict-unknown-key]
if usage.input_tokens_details:
cached_tokens = cast("int | None", getattr(usage.input_tokens_details, "cached_tokens", None))
if cached_tokens is not None:
details["openai.cached_input_tokens"] = cached_tokens # type: ignore[typeddict-unknown-key]
details["cache_read_input_token_count"] = cached_tokens
if usage.output_tokens_details:
reasoning_tokens = cast("int | None", getattr(usage.output_tokens_details, "reasoning_tokens", None))
if reasoning_tokens is not None:
details["openai.reasoning_tokens"] = reasoning_tokens # type: ignore[typeddict-unknown-key]
details["reasoning_output_token_count"] = reasoning_tokens
return details
def _get_metadata_from_response(self, output: Any) -> dict[str, Any]:
@@ -765,15 +765,17 @@ class RawOpenAIChatCompletionClient( # type: ignore[misc]
details["completion/accepted_prediction_tokens"] = tokens # type: ignore[typeddict-unknown-key]
if tokens := usage.completion_tokens_details.audio_tokens:
details["completion/audio_tokens"] = tokens # type: ignore[typeddict-unknown-key]
if tokens := usage.completion_tokens_details.reasoning_tokens:
if (tokens := usage.completion_tokens_details.reasoning_tokens) is not None:
details["completion/reasoning_tokens"] = tokens # type: ignore[typeddict-unknown-key]
details["reasoning_output_token_count"] = tokens
if tokens := usage.completion_tokens_details.rejected_prediction_tokens:
details["completion/rejected_prediction_tokens"] = tokens # type: ignore[typeddict-unknown-key]
if usage.prompt_tokens_details:
if tokens := usage.prompt_tokens_details.audio_tokens:
details["prompt/audio_tokens"] = tokens # type: ignore[typeddict-unknown-key]
if tokens := usage.prompt_tokens_details.cached_tokens:
if (tokens := usage.prompt_tokens_details.cached_tokens) is not None:
details["prompt/cached_tokens"] = tokens # type: ignore[typeddict-unknown-key]
details["cache_read_input_token_count"] = tokens
return details
def _parse_text_from_openai(self, choice: Choice | ChunkChoice) -> Content | None:
@@ -3301,6 +3301,7 @@ def test_usage_details_with_cached_tokens() -> None:
assert details is not None
assert details["input_token_count"] == 200
assert details["openai.cached_input_tokens"] == 25
assert details["cache_read_input_token_count"] == 25
def test_usage_details_with_reasoning_tokens() -> None:
@@ -3319,6 +3320,49 @@ def test_usage_details_with_reasoning_tokens() -> None:
assert details is not None
assert details["output_token_count"] == 80
assert details["openai.reasoning_tokens"] == 30
assert details["reasoning_output_token_count"] == 30
def test_usage_details_with_zero_cached_and_reasoning_tokens() -> None:
"""Test _parse_usage_from_openai preserves zero-valued mapped usage details."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
mock_usage = MagicMock()
mock_usage.input_tokens = 150
mock_usage.output_tokens = 80
mock_usage.total_tokens = 230
mock_usage.input_tokens_details = MagicMock()
mock_usage.input_tokens_details.cached_tokens = 0
mock_usage.output_tokens_details = MagicMock()
mock_usage.output_tokens_details.reasoning_tokens = 0
details = client._parse_usage_from_openai(mock_usage) # type: ignore
assert details is not None
assert details["openai.cached_input_tokens"] == 0
assert details["cache_read_input_token_count"] == 0
assert details["openai.reasoning_tokens"] == 0
assert details["reasoning_output_token_count"] == 0
def test_usage_details_omits_missing_cached_and_reasoning_tokens() -> None:
"""Test _parse_usage_from_openai omits missing mapped usage details."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
mock_usage = MagicMock()
mock_usage.input_tokens = 150
mock_usage.output_tokens = 80
mock_usage.total_tokens = 230
mock_usage.input_tokens_details = MagicMock()
mock_usage.input_tokens_details.cached_tokens = None
mock_usage.output_tokens_details = MagicMock()
mock_usage.output_tokens_details.reasoning_tokens = None
details = client._parse_usage_from_openai(mock_usage) # type: ignore
assert details is not None
assert "openai.cached_input_tokens" not in details
assert "cache_read_input_token_count" not in details
assert "openai.reasoning_tokens" not in details
assert "reasoning_output_token_count" not in details
def test_get_metadata_from_response() -> None:
@@ -1099,6 +1099,31 @@ def test_usage_content_in_streaming_response(
assert usage_content.usage_details["total_token_count"] == 150
def test_parse_usage_includes_standard_and_legacy_mapped_token_details() -> None:
"""Test _parse_usage_from_openai emits standard and legacy mapped token details."""
client = OpenAIChatCompletionClient(model="test-model", api_key="test-key")
mock_usage = MagicMock()
mock_usage.prompt_tokens = 100
mock_usage.completion_tokens = 50
mock_usage.total_tokens = 150
mock_usage.completion_tokens_details = MagicMock()
mock_usage.completion_tokens_details.accepted_prediction_tokens = None
mock_usage.completion_tokens_details.audio_tokens = None
mock_usage.completion_tokens_details.reasoning_tokens = 0
mock_usage.completion_tokens_details.rejected_prediction_tokens = None
mock_usage.prompt_tokens_details = MagicMock()
mock_usage.prompt_tokens_details.audio_tokens = None
mock_usage.prompt_tokens_details.cached_tokens = 0
details = client._parse_usage_from_openai(mock_usage) # type: ignore[arg-type]
assert details["completion/reasoning_tokens"] == 0
assert details["reasoning_output_token_count"] == 0
assert details["prompt/cached_tokens"] == 0
assert details["cache_read_input_token_count"] == 0
def test_streaming_chunk_with_usage_and_text(
openai_unit_test_env: dict[str, str],
) -> None: