Source code for agentopera.engine.telemetry.propagation
from dataclasses import dataclass
from typing import Dict, Mapping, Optional
from opentelemetry.context import Context
from opentelemetry.propagate import extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
[docs]
@dataclass(kw_only=True)
class EnvelopeMetadata:
    """Metadata for an envelope."""
    traceparent: Optional[str] = None
    tracestate: Optional[str] = None
def _get_carrier_for_envelope_metadata(envelope_metadata: EnvelopeMetadata) -> Dict[str, str]:
    carrier: Dict[str, str] = {}
    if envelope_metadata.traceparent is not None:
        carrier["traceparent"] = envelope_metadata.traceparent
    if envelope_metadata.tracestate is not None:
        carrier["tracestate"] = envelope_metadata.tracestate
    return carrier
[docs]
def get_telemetry_envelope_metadata() -> EnvelopeMetadata:
    """
    Retrieves the telemetry envelope metadata.
    Returns:
        EnvelopeMetadata: The envelope metadata containing the traceparent and tracestate.
    """
    carrier: Dict[str, str] = {}
    TraceContextTextMapPropagator().inject(carrier)
    return EnvelopeMetadata(
        traceparent=carrier.get("traceparent"),
        tracestate=carrier.get("tracestate"),
    )
def _get_carrier_for_remote_call_metadata(remote_call_metadata: Mapping[str, str]) -> Dict[str, str]:
    carrier: Dict[str, str] = {}
    traceparent = remote_call_metadata.get("traceparent")
    tracestate = remote_call_metadata.get("tracestate")
    if traceparent:
        carrier["traceparent"] = traceparent
    if tracestate:
        carrier["tracestate"] = tracestate
    return carrier
[docs]
def get_telemetry_grpc_metadata(existingMetadata: Optional[Mapping[str, str]] = None) -> Dict[str, str]:
    """
    Retrieves the telemetry gRPC metadata.
    Args:
        existingMetadata (Optional[Mapping[str, str]]): The existing metadata to include in the gRPC metadata.
    Returns:
        Mapping[str, str]: The gRPC metadata containing the traceparent and tracestate.
    """
    carrier: Dict[str, str] = {}
    TraceContextTextMapPropagator().inject(carrier)
    traceparent = carrier.get("traceparent")
    tracestate = carrier.get("tracestate")
    metadata: Dict[str, str] = {}
    if existingMetadata is not None:
        for key, value in existingMetadata.items():
            metadata[key] = value
    if traceparent is not None:
        metadata["traceparent"] = traceparent
    if tracestate is not None:
        metadata["tracestate"] = tracestate
    return metadata
TelemetryMetadataContainer = Optional[EnvelopeMetadata] | Mapping[str, str]
def get_telemetry_context(metadata: TelemetryMetadataContainer) -> Context:
    """
    Retrieves the telemetry context from the given metadata.
    Args:
        metadata (Optional[EnvelopeMetadata]): The metadata containing the telemetry context.
    Returns:
        Context: The telemetry context extracted from the metadata, or an empty context if the metadata is None.
    """
    if metadata is None:
        return Context()
    elif isinstance(metadata, EnvelopeMetadata):
        return extract(_get_carrier_for_envelope_metadata(metadata))
    elif hasattr(metadata, "__getitem__"):
        return extract(_get_carrier_for_remote_call_metadata(metadata))
    else:
        raise ValueError(f"Unknown metadata type: {type(metadata)}")