Files
starry-sdk-observability-demo/starry-sdk/starry_client_sdk/telemetry.py

458 lines
16 KiB
Python

from __future__ import annotations
import atexit
import logging
import os
import threading
import time
from dataclasses import dataclass
from typing import Any, Optional
import requests
from .version import __version__
SDK_NAME = "starry-client-sdk"
DEFAULT_SERVICE_NAME = "starry-python-sdk-consumer"
DEFAULT_OTLP_ENDPOINT = "http://host.docker.internal:4318"
DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT = "http://host.docker.internal:8080/api/prom/push"
_PROMETHEUS_LABEL_NAMES = (
"service_name",
"sdk_name",
"sdk_version",
"sdk_interface",
"http_method",
"url_path",
"outcome",
"http_status_code",
"error_type",
)
_DURATION_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)
_LOCK = threading.Lock()
_HANDLES: Optional["TelemetryHandles"] = None
_LOGGER_PROVIDER: Any = None
_METRIC_PUSHER: Optional["_PrometheusRemoteWritePusher"] = None
_WRITE_REQUEST_CLASS: Any = None
class _NoopCounter:
def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
return None
class _NoopHistogram:
def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
return None
class _RemoteWriteCounter:
def __init__(self, pusher: "_PrometheusRemoteWritePusher", metric_name: str, static_labels: dict[str, str]) -> None:
self._pusher = pusher
self._metric_name = metric_name
self._static_labels = static_labels
def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
self._pusher.add_counter(self._metric_name, amount, _prometheus_labels(self._static_labels, attributes))
class _RemoteWriteHistogram:
def __init__(
self,
pusher: "_PrometheusRemoteWritePusher",
metric_name: str,
static_labels: dict[str, str],
buckets: tuple[float, ...],
) -> None:
self._pusher = pusher
self._metric_name = metric_name
self._static_labels = static_labels
self._buckets = buckets
def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
self._pusher.observe_histogram(
self._metric_name,
float(amount),
_prometheus_labels(self._static_labels, attributes),
self._buckets,
)
@dataclass(frozen=True)
class TelemetryHandles:
request_counter: Any
error_counter: Any
duration_histogram: Any
logger: logging.Logger
enabled: bool
def _truthy(value: str) -> bool:
return value.strip().lower() not in {"0", "false", "f", "no", "n", "off", "disabled"}
def telemetry_enabled() -> bool:
return _truthy(os.getenv("STARRYSDK_TELEMETRY_ENABLED", "true"))
def _base_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", DEFAULT_OTLP_ENDPOINT).rstrip("/")
def _logs_endpoint() -> str:
return os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", f"{_base_endpoint()}/v1/logs")
def _remote_write_endpoint() -> str:
return os.getenv("STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT", DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT)
def _int_env(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except ValueError:
return default
def _float_env(name: str, default: float) -> float:
try:
return float(os.getenv(name, str(default)))
except ValueError:
return default
def _prometheus_labels(static_labels: dict[str, str], attributes: Optional[dict[str, Any]]) -> dict[str, str]:
raw = {**static_labels, **(attributes or {})}
labels = {}
for name in _PROMETHEUS_LABEL_NAMES:
value = raw.get(name, "")
labels[name] = str(value if value is not None else "")
return labels
def _series_key(metric_name: str, labels: dict[str, str]) -> tuple[str, tuple[tuple[str, str], ...]]:
return metric_name, tuple(sorted(labels.items()))
def _format_bucket(bucket: float) -> str:
return f"{bucket:g}"
def _write_request_class() -> Any:
global _WRITE_REQUEST_CLASS
if _WRITE_REQUEST_CLASS is not None:
return _WRITE_REQUEST_CLASS
from google.protobuf import descriptor_pb2, descriptor_pool, message_factory
file_proto = descriptor_pb2.FileDescriptorProto()
file_proto.name = "prometheus_remote_write.proto"
file_proto.package = "prometheus"
file_proto.syntax = "proto3"
label_msg = file_proto.message_type.add()
label_msg.name = "Label"
field = label_msg.field.add()
field.name = "name"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_STRING
field = label_msg.field.add()
field.name = "value"
field.number = 2
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_STRING
sample_msg = file_proto.message_type.add()
sample_msg.name = "Sample"
field = sample_msg.field.add()
field.name = "value"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_DOUBLE
field = sample_msg.field.add()
field.name = "timestamp"
field.number = 2
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_INT64
time_series_msg = file_proto.message_type.add()
time_series_msg.name = "TimeSeries"
field = time_series_msg.field.add()
field.name = "labels"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
field.type_name = ".prometheus.Label"
field = time_series_msg.field.add()
field.name = "samples"
field.number = 2
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
field.type_name = ".prometheus.Sample"
write_request_msg = file_proto.message_type.add()
write_request_msg.name = "WriteRequest"
field = write_request_msg.field.add()
field.name = "timeseries"
field.number = 1
field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
field.type_name = ".prometheus.TimeSeries"
pool = descriptor_pool.DescriptorPool()
pool.Add(file_proto)
_WRITE_REQUEST_CLASS = message_factory.GetMessageClass(pool.FindMessageTypeByName("prometheus.WriteRequest"))
return _WRITE_REQUEST_CLASS
class _PrometheusRemoteWritePusher:
def __init__(self, *, endpoint: str, interval_seconds: float, timeout_seconds: float) -> None:
self._endpoint = endpoint
self._interval_seconds = interval_seconds
self._timeout_seconds = timeout_seconds
self._series: dict[tuple[str, tuple[tuple[str, str], ...]], float] = {}
self._series_lock = threading.Lock()
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, name="starry-sdk-prometheus-remote-write", daemon=True)
self._thread.start()
def add_counter(self, metric_name: str, amount: int | float, labels: dict[str, str]) -> None:
self._add_sample(metric_name, labels, float(amount))
def observe_histogram(
self,
metric_name: str,
amount: float,
labels: dict[str, str],
buckets: tuple[float, ...],
) -> None:
for bucket in buckets:
if amount <= bucket:
self._add_sample(f"{metric_name}_bucket", {**labels, "le": _format_bucket(bucket)}, 1.0)
self._add_sample(f"{metric_name}_bucket", {**labels, "le": "+Inf"}, 1.0)
self._add_sample(f"{metric_name}_count", labels, 1.0)
self._add_sample(f"{metric_name}_sum", labels, amount)
def force_flush(self) -> None:
self._flush()
def shutdown(self) -> None:
self._stop_event.set()
self._thread.join(timeout=1)
self._flush()
def _add_sample(self, metric_name: str, labels: dict[str, str], amount: float) -> None:
key = _series_key(metric_name, labels)
with self._series_lock:
self._series[key] = self._series.get(key, 0.0) + amount
def _run(self) -> None:
while not self._stop_event.wait(self._interval_seconds):
self._flush()
def _flush(self) -> None:
with self._series_lock:
snapshot = dict(self._series)
if not snapshot:
return
try:
import snappy
write_request_cls = _write_request_class()
request = write_request_cls()
timestamp_ms = int(time.time() * 1000)
for (metric_name, labels), value in snapshot.items():
time_series = request.timeseries.add()
metric_label = time_series.labels.add()
metric_label.name = "__name__"
metric_label.value = metric_name
for label_name, label_value in labels:
label = time_series.labels.add()
label.name = label_name
label.value = label_value
sample = time_series.samples.add()
sample.value = value
sample.timestamp = timestamp_ms
payload = snappy.compress(request.SerializeToString())
requests.post(
self._endpoint,
data=payload,
headers={
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
},
timeout=self._timeout_seconds,
).raise_for_status()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug(
"Prometheus remote_write metrics push failed",
exc_info=True,
)
def _noop_handles() -> TelemetryHandles:
return TelemetryHandles(
request_counter=_NoopCounter(),
error_counter=_NoopCounter(),
duration_histogram=_NoopHistogram(),
logger=logging.getLogger("starry_client_sdk"),
enabled=False,
)
def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) -> tuple[Any, Any, Any]:
global _METRIC_PUSHER
interval_seconds = _float_env(
"STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS",
_int_env("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", 5000) / 1000.0,
)
timeout_seconds = _float_env("STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS", 2.0)
_METRIC_PUSHER = _PrometheusRemoteWritePusher(
endpoint=_remote_write_endpoint(),
interval_seconds=interval_seconds,
timeout_seconds=timeout_seconds,
)
static_labels = {
"service_name": service_name,
"sdk_name": SDK_NAME,
"sdk_version": sdk_version,
}
return (
_RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_requests_total", static_labels),
_RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_errors_total", static_labels),
_RemoteWriteHistogram(
_METRIC_PUSHER,
"starry_sdk_client_request_duration_seconds",
static_labels,
_DURATION_BUCKETS,
),
)
def _configure_otel_logs(*, logger: logging.Logger, service_name: str, sdk_version: str) -> None:
global _LOGGER_PROVIDER
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.instrumentation.logging.handler import LoggingHandler
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
resource = Resource.create(
{
"service.name": service_name,
"sdk.name": SDK_NAME,
"sdk.version": sdk_version,
"telemetry.source": "client-sdk",
}
)
_LOGGER_PROVIDER = LoggerProvider(resource=resource)
log_exporter = OTLPLogExporter(endpoint=_logs_endpoint())
_LOGGER_PROVIDER.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Attach only one OTLP handler to the SDK logger. Do not attach to root logger.
if not any(getattr(handler, "_starry_sdk_otel_handler", False) for handler in logger.handlers):
otel_handler = LoggingHandler(level=logging.INFO, logger_provider=_LOGGER_PROVIDER)
setattr(otel_handler, "_starry_sdk_otel_handler", True)
logger.addHandler(otel_handler)
def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str = __version__) -> TelemetryHandles:
"""Configure default-on, non-blocking telemetry for this SDK.
Metrics are pushed to Fluent Bit with Prometheus remote_write. Exception logs are still exported with
OTLP/HTTP. Telemetry failures must never break business calls; if setup fails, the SDK falls back to
no-op meters.
"""
global _HANDLES
with _LOCK:
if _HANDLES is not None:
return _HANDLES
logger = logging.getLogger("starry_client_sdk")
logger.setLevel(logging.INFO)
if not telemetry_enabled():
_HANDLES = _noop_handles()
return _HANDLES
service_name = service_name or os.getenv("STARRYSDK_SERVICE_NAME", DEFAULT_SERVICE_NAME)
request_counter: Any = _NoopCounter()
error_counter: Any = _NoopCounter()
duration_histogram: Any = _NoopHistogram()
metrics_enabled = False
try:
request_counter, error_counter, duration_histogram = _configure_prometheus_remote_write(
service_name=service_name,
sdk_version=sdk_version,
)
metrics_enabled = True
except Exception: # Telemetry must not break SDK business behavior.
logging.getLogger("starry_client_sdk.telemetry").debug(
"SDK Prometheus remote_write metrics setup failed",
exc_info=True,
)
try:
_configure_otel_logs(logger=logger, service_name=service_name, sdk_version=sdk_version)
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("SDK OTLP log setup failed", exc_info=True)
_HANDLES = TelemetryHandles(
request_counter=request_counter,
error_counter=error_counter,
duration_histogram=duration_histogram,
logger=logger,
enabled=metrics_enabled,
)
atexit.register(shutdown_telemetry)
return _HANDLES
def force_flush(timeout_millis: int = 5000) -> None:
"""Flush SDK telemetry buffers. Useful in short-lived CLI/demo processes."""
if _METRIC_PUSHER is not None:
_METRIC_PUSHER.force_flush()
if _LOGGER_PROVIDER is None:
return
try:
_LOGGER_PROVIDER.force_flush(timeout_millis=timeout_millis)
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry force_flush failed", exc_info=True)
def shutdown_telemetry() -> None:
global _METRIC_PUSHER
if _METRIC_PUSHER is not None:
try:
_METRIC_PUSHER.shutdown()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Prometheus remote_write shutdown failed", exc_info=True)
finally:
_METRIC_PUSHER = None
if _LOGGER_PROVIDER is not None:
try:
_LOGGER_PROVIDER.shutdown()
except Exception:
logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry shutdown failed", exc_info=True)