Files
starry-sdk-observability-demo/starry-sdk/starry_client_sdk/telemetry.py
2026-04-30 00:48:34 +08:00

179 lines
6.6 KiB
Python

from __future__ import annotations
import atexit
import logging
import os
import threading
from dataclasses import dataclass
from typing import Any, Optional
from .version import __version__
SDK_NAME = "starry-client-sdk"
DEFAULT_SERVICE_NAME = "starry-python-sdk-consumer"
DEFAULT_OTLP_ENDPOINT = "http://host.docker.internal:4318"
_LOCK = threading.Lock()
_HANDLES: Optional["TelemetryHandles"] = None
_METER_PROVIDER: Any = None
_LOGGER_PROVIDER: Any = None
class _NoopCounter:
def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
return None
class _NoopHistogram:
def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
return None
@dataclass(frozen=True)
class TelemetryHandles:
request_counter: Any
error_counter: Any
duration_histogram: Any
logger: logging.Logger
enabled: bool
def _truthy(value: str) -> bool:
return value.strip().lower() not in {"0", "false", "f", "no", "n", "off", "disabled"}
def telemetry_enabled() -> bool:
return _truthy(os.getenv("STARRYSDK_TELEMETRY_ENABLED", "true"))
def _base_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", DEFAULT_OTLP_ENDPOINT).rstrip("/")
def _metrics_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", f"{_base_endpoint()}/v1/metrics")
def _logs_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", f"{_base_endpoint()}/v1/logs")
def _noop_handles() -> TelemetryHandles:
return TelemetryHandles(
request_counter=_NoopCounter(),
error_counter=_NoopCounter(),
duration_histogram=_NoopHistogram(),
logger=logging.getLogger("starry_client_sdk"),
enabled=False,
)
def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str = __version__) -> TelemetryHandles:
"""Configure default-on, non-blocking telemetry for this SDK.
This demo intentionally configures local OpenTelemetry providers owned by the SDK, so the SDK can
be observable by default without overwriting an application's global OpenTelemetry configuration.
Export failures must never break business calls; if setup fails, the SDK falls back to no-op meters.
"""
global _HANDLES, _METER_PROVIDER, _LOGGER_PROVIDER
with _LOCK:
if _HANDLES is not None:
return _HANDLES
logger = logging.getLogger("starry_client_sdk")
logger.setLevel(logging.INFO)
if not telemetry_enabled():
_HANDLES = _noop_handles()
return _HANDLES
try:
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.logging.handler import LoggingHandler
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
service_name = service_name or os.getenv("STARRYSDK_SERVICE_NAME", DEFAULT_SERVICE_NAME)
export_interval_ms = int(os.getenv("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", "5000"))
resource = Resource.create(
{
"service.name": service_name,
"sdk.name": SDK_NAME,
"sdk.version": sdk_version,
"telemetry.source": "client-sdk",
}
)
metric_exporter = OTLPMetricExporter(endpoint=_metrics_endpoint())
metric_reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=export_interval_ms,
)
_METER_PROVIDER = MeterProvider(resource=resource, metric_readers=[metric_reader])
meter = _METER_PROVIDER.get_meter(SDK_NAME, sdk_version)
_LOGGER_PROVIDER = LoggerProvider(resource=resource)
log_exporter = OTLPLogExporter(endpoint=_logs_endpoint())
_LOGGER_PROVIDER.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Attach only one OTLP handler to the SDK logger. Do not attach to root logger.
if not any(getattr(handler, "_starry_sdk_otel_handler", False) for handler in logger.handlers):
otel_handler = LoggingHandler(level=logging.INFO, logger_provider=_LOGGER_PROVIDER)
setattr(otel_handler, "_starry_sdk_otel_handler", True)
logger.addHandler(otel_handler)
_HANDLES = TelemetryHandles(
request_counter=meter.create_counter(
"starry.sdk.client.requests",
unit="1",
description="Total SDK client calls.",
),
error_counter=meter.create_counter(
"starry.sdk.client.errors",
unit="1",
description="Total SDK client calls ending in an exception.",
),
duration_histogram=meter.create_histogram(
"starry.sdk.client.request.duration.ms",
unit="ms",
description="SDK client call latency in milliseconds.",
),
logger=logger,
enabled=True,
)
atexit.register(shutdown_telemetry)
return _HANDLES
except Exception: # Telemetry must not break SDK business behavior.
logging.getLogger("starry_client_sdk.telemetry").debug("SDK telemetry setup failed", exc_info=True)
_HANDLES = _noop_handles()
return _HANDLES
def force_flush(timeout_millis: int = 5000) -> None:
"""Flush telemetry buffers. Useful in short-lived CLI/demo processes."""
for provider in (_METER_PROVIDER, _LOGGER_PROVIDER):
if provider is None:
continue
try:
provider.force_flush(timeout_millis=timeout_millis)
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry force_flush failed", exc_info=True)
def shutdown_telemetry() -> None:
for provider in (_METER_PROVIDER, _LOGGER_PROVIDER):
if provider is None:
continue
try:
provider.shutdown()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry shutdown failed", exc_info=True)