diff --git a/starry-sdk/pyproject.toml b/starry-sdk/pyproject.toml index 84e1900..aadf891 100644 --- a/starry-sdk/pyproject.toml +++ b/starry-sdk/pyproject.toml @@ -10,6 +10,7 @@ readme = "README.md" requires-python = ">=3.9" dependencies = [ "requests>=2.32.0,<3.0", + "prometheus-client>=0.20.0,<1.0", "protobuf>=5.0,<7.0", "python-snappy>=0.7.0,<1.0", "opentelemetry-api==1.41.1", diff --git a/starry-sdk/starry_client_sdk/telemetry.py b/starry-sdk/starry_client_sdk/telemetry.py index b1401a1..2cd38e2 100644 --- a/starry-sdk/starry_client_sdk/telemetry.py +++ b/starry-sdk/starry_client_sdk/telemetry.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from typing import Any, Optional import requests +from prometheus_client import CollectorRegistry, Counter, Histogram from .version import __version__ @@ -47,36 +48,22 @@ class _NoopHistogram: return None -class _RemoteWriteCounter: - def __init__(self, pusher: "_PrometheusRemoteWritePusher", metric_name: str, static_labels: dict[str, str]) -> None: - self._pusher = pusher - self._metric_name = metric_name +class _PrometheusCounter: + def __init__(self, metric: Counter, static_labels: dict[str, str]) -> None: + self._metric = metric self._static_labels = static_labels def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None: - self._pusher.add_counter(self._metric_name, amount, _prometheus_labels(self._static_labels, attributes)) + self._metric.labels(**_prometheus_labels(self._static_labels, attributes)).inc(amount) -class _RemoteWriteHistogram: - def __init__( - self, - pusher: "_PrometheusRemoteWritePusher", - metric_name: str, - static_labels: dict[str, str], - buckets: tuple[float, ...], - ) -> None: - self._pusher = pusher - self._metric_name = metric_name +class _PrometheusHistogram: + def __init__(self, metric: Histogram, static_labels: dict[str, str]) -> None: + self._metric = metric self._static_labels = static_labels - self._buckets = buckets def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None: - self._pusher.observe_histogram( - self._metric_name, - float(amount), - _prometheus_labels(self._static_labels, attributes), - self._buckets, - ) + self._metric.labels(**_prometheus_labels(self._static_labels, attributes)).observe(amount) @dataclass(frozen=True) @@ -131,14 +118,6 @@ def _prometheus_labels(static_labels: dict[str, str], attributes: Optional[dict[ return labels -def _series_key(metric_name: str, labels: dict[str, str]) -> tuple[str, tuple[tuple[str, str], ...]]: - return metric_name, tuple(sorted(labels.items())) - - -def _format_bucket(bucket: float) -> str: - return f"{bucket:g}" - - def _write_request_class() -> Any: global _WRITE_REQUEST_CLASS @@ -209,33 +188,22 @@ def _write_request_class() -> Any: class _PrometheusRemoteWritePusher: - def __init__(self, *, endpoint: str, interval_seconds: float, timeout_seconds: float) -> None: + def __init__( + self, + *, + registry: CollectorRegistry, + endpoint: str, + interval_seconds: float, + timeout_seconds: float, + ) -> None: + self._registry = registry self._endpoint = endpoint self._interval_seconds = interval_seconds self._timeout_seconds = timeout_seconds - self._series: dict[tuple[str, tuple[tuple[str, str], ...]], float] = {} - self._series_lock = threading.Lock() self._stop_event = threading.Event() self._thread = threading.Thread(target=self._run, name="starry-sdk-prometheus-remote-write", daemon=True) self._thread.start() - def add_counter(self, metric_name: str, amount: int | float, labels: dict[str, str]) -> None: - self._add_sample(metric_name, labels, float(amount)) - - def observe_histogram( - self, - metric_name: str, - amount: float, - labels: dict[str, str], - buckets: tuple[float, ...], - ) -> None: - for bucket in buckets: - if amount <= bucket: - self._add_sample(f"{metric_name}_bucket", {**labels, "le": _format_bucket(bucket)}, 1.0) - self._add_sample(f"{metric_name}_bucket", {**labels, "le": "+Inf"}, 1.0) - self._add_sample(f"{metric_name}_count", labels, 1.0) - self._add_sample(f"{metric_name}_sum", labels, amount) - def force_flush(self) -> None: self._flush() @@ -244,20 +212,18 @@ class _PrometheusRemoteWritePusher: self._thread.join(timeout=1) self._flush() - def _add_sample(self, metric_name: str, labels: dict[str, str], amount: float) -> None: - key = _series_key(metric_name, labels) - with self._series_lock: - self._series[key] = self._series.get(key, 0.0) + amount - def _run(self) -> None: while not self._stop_event.wait(self._interval_seconds): self._flush() def _flush(self) -> None: - with self._series_lock: - snapshot = dict(self._series) - - if not snapshot: + samples = [ + sample + for metric in self._registry.collect() + for sample in metric.samples + if not sample.name.endswith("_created") + ] + if not samples: return try: @@ -267,19 +233,19 @@ class _PrometheusRemoteWritePusher: request = write_request_cls() timestamp_ms = int(time.time() * 1000) - for (metric_name, labels), value in snapshot.items(): + for prometheus_sample in samples: time_series = request.timeseries.add() metric_label = time_series.labels.add() metric_label.name = "__name__" - metric_label.value = metric_name + metric_label.value = prometheus_sample.name - for label_name, label_value in labels: + for label_name, label_value in sorted(prometheus_sample.labels.items()): label = time_series.labels.add() label.name = label_name label.value = label_value sample = time_series.samples.add() - sample.value = value + sample.value = float(prometheus_sample.value) sample.timestamp = timestamp_ms payload = snappy.compress(request.SerializeToString()) @@ -318,7 +284,9 @@ def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) - _int_env("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", 5000) / 1000.0, ) timeout_seconds = _float_env("STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS", 2.0) + registry = CollectorRegistry() _METRIC_PUSHER = _PrometheusRemoteWritePusher( + registry=registry, endpoint=_remote_write_endpoint(), interval_seconds=interval_seconds, timeout_seconds=timeout_seconds, @@ -329,15 +297,29 @@ def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) - "sdk_name": SDK_NAME, "sdk_version": sdk_version, } + request_counter = Counter( + "starry_sdk_client_requests", + "Total SDK client calls.", + labelnames=_PROMETHEUS_LABEL_NAMES, + registry=registry, + ) + error_counter = Counter( + "starry_sdk_client_errors", + "Total SDK client calls ending in an exception.", + labelnames=_PROMETHEUS_LABEL_NAMES, + registry=registry, + ) + duration_histogram = Histogram( + "starry_sdk_client_request_duration_seconds", + "SDK client call latency in seconds.", + labelnames=_PROMETHEUS_LABEL_NAMES, + registry=registry, + buckets=_DURATION_BUCKETS, + ) return ( - _RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_requests_total", static_labels), - _RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_errors_total", static_labels), - _RemoteWriteHistogram( - _METRIC_PUSHER, - "starry_sdk_client_request_duration_seconds", - static_labels, - _DURATION_BUCKETS, - ), + _PrometheusCounter(request_counter, static_labels), + _PrometheusCounter(error_counter, static_labels), + _PrometheusHistogram(duration_histogram, static_labels), )