diff --git a/packages/core/src/tracing/ai/utils.ts b/packages/core/src/tracing/ai/utils.ts index d9628e3c75e2..e867e2b078f0 100644 --- a/packages/core/src/tracing/ai/utils.ts +++ b/packages/core/src/tracing/ai/utils.ts @@ -6,6 +6,12 @@ import { getClient } from '../../currentScopes'; import type { Span } from '../../types-hoist/span'; import { isThenable } from '../../utils/is'; import { + GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, + GEN_AI_RESPONSE_ID_ATTRIBUTE, + GEN_AI_RESPONSE_MODEL_ATTRIBUTE, + GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, + GEN_AI_RESPONSE_TEXT_ATTRIBUTE, + GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, @@ -99,6 +105,64 @@ export function setTokenUsageAttributes( } } +export interface StreamSpanState { + responseId?: string; + responseModel?: string; + finishReasons: string[]; + responseTexts: string[]; + toolCalls: unknown[]; + promptTokens?: number; + completionTokens?: number; + totalTokens?: number; + cacheCreationInputTokens?: number; + cacheReadInputTokens?: number; +} + +/** + * Finalizes a streaming span by setting all accumulated response attributes and ending the span. + * Shared across OpenAI, Anthropic, and Google GenAI streaming implementations. + */ +export function finalizeStreamSpan(span: Span, state: StreamSpanState, recordOutputs: boolean): void { + const attrs: Record = { + [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, + }; + + if (state.responseId) attrs[GEN_AI_RESPONSE_ID_ATTRIBUTE] = state.responseId; + if (state.responseModel) attrs[GEN_AI_RESPONSE_MODEL_ATTRIBUTE] = state.responseModel; + + if (state.promptTokens !== undefined) attrs[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE] = state.promptTokens; + if (state.completionTokens !== undefined) attrs[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE] = state.completionTokens; + + // Use explicit total if provided (OpenAI, Google), otherwise compute from cache tokens (Anthropic) + if (state.totalTokens !== undefined) { + attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = state.totalTokens; + } else if ( + state.promptTokens !== undefined || + state.completionTokens !== undefined || + state.cacheCreationInputTokens !== undefined || + state.cacheReadInputTokens !== undefined + ) { + attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = + (state.promptTokens ?? 0) + + (state.completionTokens ?? 0) + + (state.cacheCreationInputTokens ?? 0) + + (state.cacheReadInputTokens ?? 0); + } + + if (state.finishReasons.length) { + attrs[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE] = JSON.stringify(state.finishReasons); + } + if (recordOutputs && state.responseTexts.length) { + attrs[GEN_AI_RESPONSE_TEXT_ATTRIBUTE] = state.responseTexts.join(''); + } + if (state.toolCalls.length) { + attrs[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE] = JSON.stringify(state.toolCalls); + } + + span.setAttributes(attrs); + span.end(); +} + /** * Get the truncated JSON string for a string or array of strings. * diff --git a/packages/core/src/tracing/anthropic-ai/streaming.ts b/packages/core/src/tracing/anthropic-ai/streaming.ts index 940ec53e8030..6ed96be7c5ea 100644 --- a/packages/core/src/tracing/anthropic-ai/streaming.ts +++ b/packages/core/src/tracing/anthropic-ai/streaming.ts @@ -1,15 +1,7 @@ import { captureException } from '../../exports'; import { SPAN_STATUS_ERROR } from '../../tracing'; import type { Span } from '../../types-hoist/span'; -import { - GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_ID_ATTRIBUTE, - GEN_AI_RESPONSE_MODEL_ATTRIBUTE, - GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, - GEN_AI_RESPONSE_TEXT_ATTRIBUTE, - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, -} from '../ai/gen-ai-attributes'; -import { setTokenUsageAttributes } from '../ai/utils'; +import { finalizeStreamSpan } from '../ai/utils'; import type { AnthropicAiStreamingEvent } from './types'; import { mapAnthropicErrorToStatusMessage } from './utils'; @@ -208,60 +200,6 @@ function processEvent( handleContentBlockStop(event, state); } -/** - * Finalizes span attributes when stream processing completes - */ -function finalizeStreamSpan(state: StreamingState, span: Span, recordOutputs: boolean): void { - if (!span.isRecording()) { - return; - } - - // Set common response attributes if available - if (state.responseId) { - span.setAttributes({ - [GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId, - }); - } - if (state.responseModel) { - span.setAttributes({ - [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel, - }); - } - - setTokenUsageAttributes( - span, - state.promptTokens, - state.completionTokens, - state.cacheCreationInputTokens, - state.cacheReadInputTokens, - ); - - span.setAttributes({ - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }); - - if (state.finishReasons.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), - }); - } - - if (recordOutputs && state.responseTexts.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), - }); - } - - // Set tool calls if any were captured - if (recordOutputs && state.toolCalls.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls), - }); - } - - span.end(); -} - /** * Instruments an async iterable stream of Anthropic events, updates the span with * streaming attributes and (optionally) the aggregated output text, and yields @@ -291,50 +229,7 @@ export async function* instrumentAsyncIterableStream( yield event; } } finally { - // Set common response attributes if available - if (state.responseId) { - span.setAttributes({ - [GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId, - }); - } - if (state.responseModel) { - span.setAttributes({ - [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel, - }); - } - - setTokenUsageAttributes( - span, - state.promptTokens, - state.completionTokens, - state.cacheCreationInputTokens, - state.cacheReadInputTokens, - ); - - span.setAttributes({ - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }); - - if (state.finishReasons.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), - }); - } - - if (recordOutputs && state.responseTexts.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), - }); - } - - // Set tool calls if any were captured - if (recordOutputs && state.toolCalls.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls), - }); - } - - span.end(); + finalizeStreamSpan(span, state, recordOutputs); } } @@ -366,7 +261,7 @@ export function instrumentMessageStream // The event fired when a message is done being streamed by the API. Corresponds to the message_stop SSE event. // @see https://github.com/anthropics/anthropic-sdk-typescript/blob/d3be31f5a4e6ebb4c0a2f65dbb8f381ae73a9166/helpers.md?plain=1#L42-L44 stream.on('message', () => { - finalizeStreamSpan(state, span, recordOutputs); + finalizeStreamSpan(span, state, recordOutputs); }); stream.on('error', (error: unknown) => { diff --git a/packages/core/src/tracing/google-genai/streaming.ts b/packages/core/src/tracing/google-genai/streaming.ts index d3f6598b8fd7..a97a3358045c 100644 --- a/packages/core/src/tracing/google-genai/streaming.ts +++ b/packages/core/src/tracing/google-genai/streaming.ts @@ -1,17 +1,7 @@ import { captureException } from '../../exports'; import { SPAN_STATUS_ERROR } from '../../tracing'; -import type { Span, SpanAttributeValue } from '../../types-hoist/span'; -import { - GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_ID_ATTRIBUTE, - GEN_AI_RESPONSE_MODEL_ATTRIBUTE, - GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, - GEN_AI_RESPONSE_TEXT_ATTRIBUTE, - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, - GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, -} from '../ai/gen-ai-attributes'; +import type { Span } from '../../types-hoist/span'; +import { finalizeStreamSpan } from '../ai/utils'; import type { GoogleGenAIResponse } from './types'; /** @@ -137,27 +127,6 @@ export async function* instrumentStream( yield chunk; } } finally { - const attrs: Record = { - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }; - - if (state.responseId) attrs[GEN_AI_RESPONSE_ID_ATTRIBUTE] = state.responseId; - if (state.responseModel) attrs[GEN_AI_RESPONSE_MODEL_ATTRIBUTE] = state.responseModel; - if (state.promptTokens !== undefined) attrs[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE] = state.promptTokens; - if (state.completionTokens !== undefined) attrs[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE] = state.completionTokens; - if (state.totalTokens !== undefined) attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = state.totalTokens; - - if (state.finishReasons.length) { - attrs[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE] = JSON.stringify(state.finishReasons); - } - if (recordOutputs && state.responseTexts.length) { - attrs[GEN_AI_RESPONSE_TEXT_ATTRIBUTE] = state.responseTexts.join(''); - } - if (recordOutputs && state.toolCalls.length) { - attrs[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE] = JSON.stringify(state.toolCalls); - } - - span.setAttributes(attrs); - span.end(); + finalizeStreamSpan(span, state, recordOutputs); } } diff --git a/packages/core/src/tracing/openai/streaming.ts b/packages/core/src/tracing/openai/streaming.ts index 3a5634df34dd..687ce14ec9fe 100644 --- a/packages/core/src/tracing/openai/streaming.ts +++ b/packages/core/src/tracing/openai/streaming.ts @@ -1,12 +1,7 @@ import { captureException } from '../../exports'; import { SPAN_STATUS_ERROR } from '../../tracing'; import type { Span } from '../../types-hoist/span'; -import { - GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, - GEN_AI_RESPONSE_TEXT_ATTRIBUTE, - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE, -} from '../ai/gen-ai-attributes'; +import { finalizeStreamSpan } from '../ai/utils'; import { RESPONSE_EVENT_TYPES } from './constants'; import type { ChatCompletionChunk, @@ -15,12 +10,7 @@ import type { ResponseFunctionCall, ResponseStreamingEvent, } from './types'; -import { - isChatCompletionChunk, - isResponsesApiStreamEvent, - setCommonResponseAttributes, - setTokenUsageAttributes, -} from './utils'; +import { isChatCompletionChunk, isResponsesApiStreamEvent } from './utils'; /** * State object used to accumulate information from a stream of OpenAI events/chunks. @@ -245,35 +235,7 @@ export async function* instrumentStream( yield event; } } finally { - setCommonResponseAttributes(span, state.responseId, state.responseModel, state.responseTimestamp); - setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens); - - span.setAttributes({ - [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, - }); - - if (state.finishReasons.length) { - span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), - }); - } - - if (recordOutputs && state.responseTexts.length) { - span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), - }); - } - - // Set tool calls attribute if any were accumulated - const chatCompletionToolCallsArray = Object.values(state.chatCompletionToolCalls); - const allToolCalls = [...chatCompletionToolCallsArray, ...state.responsesApiToolCalls]; - - if (allToolCalls.length > 0) { - span.setAttributes({ - [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(allToolCalls), - }); - } - - span.end(); + const allToolCalls = [...Object.values(state.chatCompletionToolCalls), ...state.responsesApiToolCalls]; + finalizeStreamSpan(span, { ...state, toolCalls: allToolCalls }, recordOutputs); } }