Files
starry-sdk-observability-demo/starry-sdk/starry_client_sdk/telemetry.py
2026-05-06 14:06:44 +08:00

519 lines
17 KiB
Python

from __future__ import annotations
import atexit
import logging
import os
import threading
import time
from dataclasses import dataclass
from typing import Any, Optional
import requests
from prometheus_client import CollectorRegistry, Counter, Histogram
from .version import __version__
SDK_NAME = "starry-client-sdk"
DEFAULT_SERVICE_NAME = "starry-python-sdk-consumer"
DEFAULT_OTLP_ENDPOINT = "http://host.docker.internal:4318"
DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT = "http://host.docker.internal:8080/api/prom/push"
_PROMETHEUS_LABEL_NAMES = (
"service_name",
"sdk_name",
"sdk_version",
"sdk_interface",
"http_method",
"url_path",
"outcome",
"http_status_code",
"error_type",
)
_DURATION_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)
_LOCK = threading.Lock()
_HANDLES: Optional["TelemetryHandles"] = None
_LOGGER_PROVIDER: Any = None
_TRACER_PROVIDER: Any = None
_METRIC_PUSHER: Optional["_PrometheusRemoteWritePusher"] = None
_WRITE_REQUEST_CLASS: Any = None
class _NoopCounter:
def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
return None
class _NoopHistogram:
def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
return None
class _NoopSpan:
def __enter__(self) -> "_NoopSpan":
return self
def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None:
return None
def set_attribute(self, key: str, value: Any) -> None:
return None
def record_exception(self, exception: Exception) -> None:
return None
def set_status(self, status: Any) -> None:
return None
class _NoopTracer:
def start_as_current_span(self, name: str, **kwargs: Any) -> _NoopSpan:
return _NoopSpan()
class _PrometheusCounter:
def __init__(self, metric: Counter, static_labels: dict[str, str]) -> None:
self._metric = metric
self._static_labels = static_labels
def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
self._metric.labels(**_prometheus_labels(self._static_labels, attributes)).inc(amount)
class _PrometheusHistogram:
def __init__(self, metric: Histogram, static_labels: dict[str, str]) -> None:
self._metric = metric
self._static_labels = static_labels
def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
self._metric.labels(**_prometheus_labels(self._static_labels, attributes)).observe(amount)
@dataclass(frozen=True)
class TelemetryHandles:
request_counter: Any
error_counter: Any
duration_histogram: Any
tracer: Any
logger: logging.Logger
enabled: bool
def _truthy(value: str) -> bool:
return value.strip().lower() not in {"0", "false", "f", "no", "n", "off", "disabled"}
def telemetry_enabled() -> bool:
return _truthy(os.getenv("STARRYSDK_TELEMETRY_ENABLED", "true"))
def _base_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", DEFAULT_OTLP_ENDPOINT).rstrip("/")
def _logs_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", f"{_base_endpoint()}/v1/logs")
def _traces_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", f"{_base_endpoint()}/v1/traces")
def _remote_write_endpoint() -> str:
return os.getenv("STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT", DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT)
def _int_env(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except ValueError:
return default
def _float_env(name: str, default: float) -> float:
try:
return float(os.getenv(name, str(default)))
except ValueError:
return default
def _prometheus_labels(static_labels: dict[str, str], attributes: Optional[dict[str, Any]]) -> dict[str, str]:
raw = {**static_labels, **(attributes or {})}
labels = {}
for name in _PROMETHEUS_LABEL_NAMES:
value = raw.get(name, "")
labels[name] = str(value if value is not None else "")
return labels
def _write_request_class() -> Any:
global _WRITE_REQUEST_CLASS
if _WRITE_REQUEST_CLASS is not None:
return _WRITE_REQUEST_CLASS
from google.protobuf import descriptor_pb2, descriptor_pool, message_factory
file_proto = descriptor_pb2.FileDescriptorProto()
file_proto.name = "prometheus_remote_write.proto"
file_proto.package = "prometheus"
file_proto.syntax = "proto3"
label_msg = file_proto.message_type.add()
label_msg.name = "Label"
field = label_msg.field.add()
field.name = "name"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_STRING
field = label_msg.field.add()
field.name = "value"
field.number = 2
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_STRING
sample_msg = file_proto.message_type.add()
sample_msg.name = "Sample"
field = sample_msg.field.add()
field.name = "value"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_DOUBLE
field = sample_msg.field.add()
field.name = "timestamp"
field.number = 2
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_INT64
time_series_msg = file_proto.message_type.add()
time_series_msg.name = "TimeSeries"
field = time_series_msg.field.add()
field.name = "labels"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
field.type_name = ".prometheus.Label"
field = time_series_msg.field.add()
field.name = "samples"
field.number = 2
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
field.type_name = ".prometheus.Sample"
write_request_msg = file_proto.message_type.add()
write_request_msg.name = "WriteRequest"
field = write_request_msg.field.add()
field.name = "timeseries"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
field.type_name = ".prometheus.TimeSeries"
pool = descriptor_pool.DescriptorPool()
pool.Add(file_proto)
_WRITE_REQUEST_CLASS = message_factory.GetMessageClass(pool.FindMessageTypeByName("prometheus.WriteRequest"))
return _WRITE_REQUEST_CLASS
class _PrometheusRemoteWritePusher:
def __init__(
self,
*,
registry: CollectorRegistry,
endpoint: str,
interval_seconds: float,
timeout_seconds: float,
) -> None:
self._registry = registry
self._endpoint = endpoint
self._interval_seconds = interval_seconds
self._timeout_seconds = timeout_seconds
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, name="starry-sdk-prometheus-remote-write", daemon=True)
self._thread.start()
def force_flush(self) -> None:
self._flush()
def shutdown(self) -> None:
self._stop_event.set()
self._thread.join(timeout=1)
self._flush()
def _run(self) -> None:
while not self._stop_event.wait(self._interval_seconds):
self._flush()
def _flush(self) -> None:
samples = [
sample
for metric in self._registry.collect()
for sample in metric.samples
if not sample.name.endswith("_created")
]
if not samples:
return
try:
import snappy
write_request_cls = _write_request_class()
request = write_request_cls()
timestamp_ms = int(time.time() * 1000)
for prometheus_sample in samples:
time_series = request.timeseries.add()
metric_label = time_series.labels.add()
metric_label.name = "__name__"
metric_label.value = prometheus_sample.name
for label_name, label_value in sorted(prometheus_sample.labels.items()):
label = time_series.labels.add()
label.name = label_name
label.value = label_value
sample = time_series.samples.add()
sample.value = float(prometheus_sample.value)
sample.timestamp = timestamp_ms
payload = snappy.compress(request.SerializeToString())
requests.post(
self._endpoint,
data=payload,
headers={
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
},
timeout=self._timeout_seconds,
).raise_for_status()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug(
"Prometheus remote_write metrics push failed",
exc_info=True,
)
def _noop_handles() -> TelemetryHandles:
return TelemetryHandles(
request_counter=_NoopCounter(),
error_counter=_NoopCounter(),
duration_histogram=_NoopHistogram(),
tracer=_NoopTracer(),
logger=logging.getLogger("starry_client_sdk"),
enabled=False,
)
def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) -> tuple[Any, Any, Any]:
global _METRIC_PUSHER
interval_seconds = _float_env(
"STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS",
_int_env("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", 5000) / 1000.0,
)
timeout_seconds = _float_env("STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS", 2.0)
registry = CollectorRegistry()
_METRIC_PUSHER = _PrometheusRemoteWritePusher(
registry=registry,
endpoint=_remote_write_endpoint(),
interval_seconds=interval_seconds,
timeout_seconds=timeout_seconds,
)
static_labels = {
"service_name": service_name,
"sdk_name": SDK_NAME,
"sdk_version": sdk_version,
}
request_counter = Counter(
"starry_sdk_client_requests",
"Total SDK client calls.",
labelnames=_PROMETHEUS_LABEL_NAMES,
registry=registry,
)
error_counter = Counter(
"starry_sdk_client_errors",
"Total SDK client calls ending in an exception.",
labelnames=_PROMETHEUS_LABEL_NAMES,
registry=registry,
)
duration_histogram = Histogram(
"starry_sdk_client_request_duration_seconds",
"SDK client call latency in seconds.",
labelnames=_PROMETHEUS_LABEL_NAMES,
registry=registry,
buckets=_DURATION_BUCKETS,
)
return (
_PrometheusCounter(request_counter, static_labels),
_PrometheusCounter(error_counter, static_labels),
_PrometheusHistogram(duration_histogram, static_labels),
)
def _configure_otel_logs(*, logger: logging.Logger, service_name: str, sdk_version: str) -> None:
global _LOGGER_PROVIDER
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.instrumentation.logging.handler import LoggingHandler
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
resource = Resource.create(
{
"service.name": service_name,
"sdk.name": SDK_NAME,
"sdk.version": sdk_version,
"telemetry.source": "client-sdk",
}
)
_LOGGER_PROVIDER = LoggerProvider(resource=resource)
log_exporter = OTLPLogExporter(endpoint=_logs_endpoint())
_LOGGER_PROVIDER.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Attach only one OTLP handler to the SDK logger. Do not attach to root logger.
if not any(getattr(handler, "_starry_sdk_otel_handler", False) for handler in logger.handlers):
otel_handler = LoggingHandler(level=logging.INFO, logger_provider=_LOGGER_PROVIDER)
setattr(otel_handler, "_starry_sdk_otel_handler", True)
logger.addHandler(otel_handler)
def _configure_otel_traces(*, service_name: str, sdk_version: str) -> Any:
global _TRACER_PROVIDER
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
sample_rate = max(0.0, min(1.0, _float_env("STARRYSDK_TRACE_SAMPLE_RATE", 1.0)))
resource = Resource.create(
{
"service.name": service_name,
"sdk.name": SDK_NAME,
"sdk.version": sdk_version,
"telemetry.source": "client-sdk",
}
)
_TRACER_PROVIDER = TracerProvider(
resource=resource,
sampler=ParentBased(root=TraceIdRatioBased(sample_rate)),
)
trace_exporter = OTLPSpanExporter(endpoint=_traces_endpoint())
_TRACER_PROVIDER.add_span_processor(BatchSpanProcessor(trace_exporter))
return _TRACER_PROVIDER.get_tracer(SDK_NAME, sdk_version)
def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str = __version__) -> TelemetryHandles:
"""Configure default-on, non-blocking telemetry for this SDK.
Metrics are pushed to Fluent Bit with Prometheus remote_write. Exception logs and traces are exported
with OTLP/HTTP. Telemetry failures must never break business calls; if setup fails, the SDK falls back
to no-op handles.
"""
global _HANDLES
with _LOCK:
if _HANDLES is not None:
return _HANDLES
logger = logging.getLogger("starry_client_sdk")
logger.setLevel(logging.INFO)
if not telemetry_enabled():
_HANDLES = _noop_handles()
return _HANDLES
service_name = service_name or os.getenv("STARRYSDK_SERVICE_NAME", DEFAULT_SERVICE_NAME)
request_counter: Any = _NoopCounter()
error_counter: Any = _NoopCounter()
duration_histogram: Any = _NoopHistogram()
tracer: Any = _NoopTracer()
metrics_enabled = False
traces_enabled = False
try:
request_counter, error_counter, duration_histogram = _configure_prometheus_remote_write(
service_name=service_name,
sdk_version=sdk_version,
)
metrics_enabled = True
except Exception: # Telemetry must not break SDK business behavior.
logging.getLogger("starry_client_sdk.telemetry").debug(
"SDK Prometheus remote_write metrics setup failed",
exc_info=True,
)
try:
_configure_otel_logs(logger=logger, service_name=service_name, sdk_version=sdk_version)
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("SDK OTLP log setup failed", exc_info=True)
try:
tracer = _configure_otel_traces(service_name=service_name, sdk_version=sdk_version)
traces_enabled = True
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("SDK OTLP trace setup failed", exc_info=True)
_HANDLES = TelemetryHandles(
request_counter=request_counter,
error_counter=error_counter,
duration_histogram=duration_histogram,
tracer=tracer,
logger=logger,
enabled=metrics_enabled or traces_enabled,
)
atexit.register(shutdown_telemetry)
return _HANDLES
def force_flush(timeout_millis: int = 5000) -> None:
"""Flush SDK telemetry buffers. Useful in short-lived CLI/demo processes."""
if _METRIC_PUSHER is not None:
_METRIC_PUSHER.force_flush()
if _LOGGER_PROVIDER is None:
pass
else:
try:
_LOGGER_PROVIDER.force_flush(timeout_millis=timeout_millis)
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry log force_flush failed", exc_info=True)
if _TRACER_PROVIDER is not None:
try:
_TRACER_PROVIDER.force_flush(timeout_millis=timeout_millis)
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry trace force_flush failed", exc_info=True)
def shutdown_telemetry() -> None:
global _METRIC_PUSHER
if _METRIC_PUSHER is not None:
try:
_METRIC_PUSHER.shutdown()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Prometheus remote_write shutdown failed", exc_info=True)
finally:
_METRIC_PUSHER = None
if _LOGGER_PROVIDER is not None:
try:
_LOGGER_PROVIDER.shutdown()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry log shutdown failed", exc_info=True)
if _TRACER_PROVIDER is not None:
try:
_TRACER_PROVIDER.shutdown()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry trace shutdown failed", exc_info=True)