use prometheus sdk
This commit is contained in:
@@ -10,6 +10,7 @@ readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
dependencies = [
|
||||
"requests>=2.32.0,<3.0",
|
||||
"prometheus-client>=0.20.0,<1.0",
|
||||
"protobuf>=5.0,<7.0",
|
||||
"python-snappy>=0.7.0,<1.0",
|
||||
"opentelemetry-api==1.41.1",
|
||||
|
||||
@@ -9,6 +9,7 @@ from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
import requests
|
||||
from prometheus_client import CollectorRegistry, Counter, Histogram
|
||||
|
||||
from .version import __version__
|
||||
|
||||
@@ -47,36 +48,22 @@ class _NoopHistogram:
|
||||
return None
|
||||
|
||||
|
||||
class _RemoteWriteCounter:
|
||||
def __init__(self, pusher: "_PrometheusRemoteWritePusher", metric_name: str, static_labels: dict[str, str]) -> None:
|
||||
self._pusher = pusher
|
||||
self._metric_name = metric_name
|
||||
class _PrometheusCounter:
|
||||
def __init__(self, metric: Counter, static_labels: dict[str, str]) -> None:
|
||||
self._metric = metric
|
||||
self._static_labels = static_labels
|
||||
|
||||
def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
|
||||
self._pusher.add_counter(self._metric_name, amount, _prometheus_labels(self._static_labels, attributes))
|
||||
self._metric.labels(**_prometheus_labels(self._static_labels, attributes)).inc(amount)
|
||||
|
||||
|
||||
class _RemoteWriteHistogram:
|
||||
def __init__(
|
||||
self,
|
||||
pusher: "_PrometheusRemoteWritePusher",
|
||||
metric_name: str,
|
||||
static_labels: dict[str, str],
|
||||
buckets: tuple[float, ...],
|
||||
) -> None:
|
||||
self._pusher = pusher
|
||||
self._metric_name = metric_name
|
||||
class _PrometheusHistogram:
|
||||
def __init__(self, metric: Histogram, static_labels: dict[str, str]) -> None:
|
||||
self._metric = metric
|
||||
self._static_labels = static_labels
|
||||
self._buckets = buckets
|
||||
|
||||
def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None:
|
||||
self._pusher.observe_histogram(
|
||||
self._metric_name,
|
||||
float(amount),
|
||||
_prometheus_labels(self._static_labels, attributes),
|
||||
self._buckets,
|
||||
)
|
||||
self._metric.labels(**_prometheus_labels(self._static_labels, attributes)).observe(amount)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -131,14 +118,6 @@ def _prometheus_labels(static_labels: dict[str, str], attributes: Optional[dict[
|
||||
return labels
|
||||
|
||||
|
||||
def _series_key(metric_name: str, labels: dict[str, str]) -> tuple[str, tuple[tuple[str, str], ...]]:
|
||||
return metric_name, tuple(sorted(labels.items()))
|
||||
|
||||
|
||||
def _format_bucket(bucket: float) -> str:
|
||||
return f"{bucket:g}"
|
||||
|
||||
|
||||
def _write_request_class() -> Any:
|
||||
global _WRITE_REQUEST_CLASS
|
||||
|
||||
@@ -209,33 +188,22 @@ def _write_request_class() -> Any:
|
||||
|
||||
|
||||
class _PrometheusRemoteWritePusher:
|
||||
def __init__(self, *, endpoint: str, interval_seconds: float, timeout_seconds: float) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
registry: CollectorRegistry,
|
||||
endpoint: str,
|
||||
interval_seconds: float,
|
||||
timeout_seconds: float,
|
||||
) -> None:
|
||||
self._registry = registry
|
||||
self._endpoint = endpoint
|
||||
self._interval_seconds = interval_seconds
|
||||
self._timeout_seconds = timeout_seconds
|
||||
self._series: dict[tuple[str, tuple[tuple[str, str], ...]], float] = {}
|
||||
self._series_lock = threading.Lock()
|
||||
self._stop_event = threading.Event()
|
||||
self._thread = threading.Thread(target=self._run, name="starry-sdk-prometheus-remote-write", daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
def add_counter(self, metric_name: str, amount: int | float, labels: dict[str, str]) -> None:
|
||||
self._add_sample(metric_name, labels, float(amount))
|
||||
|
||||
def observe_histogram(
|
||||
self,
|
||||
metric_name: str,
|
||||
amount: float,
|
||||
labels: dict[str, str],
|
||||
buckets: tuple[float, ...],
|
||||
) -> None:
|
||||
for bucket in buckets:
|
||||
if amount <= bucket:
|
||||
self._add_sample(f"{metric_name}_bucket", {**labels, "le": _format_bucket(bucket)}, 1.0)
|
||||
self._add_sample(f"{metric_name}_bucket", {**labels, "le": "+Inf"}, 1.0)
|
||||
self._add_sample(f"{metric_name}_count", labels, 1.0)
|
||||
self._add_sample(f"{metric_name}_sum", labels, amount)
|
||||
|
||||
def force_flush(self) -> None:
|
||||
self._flush()
|
||||
|
||||
@@ -244,20 +212,18 @@ class _PrometheusRemoteWritePusher:
|
||||
self._thread.join(timeout=1)
|
||||
self._flush()
|
||||
|
||||
def _add_sample(self, metric_name: str, labels: dict[str, str], amount: float) -> None:
|
||||
key = _series_key(metric_name, labels)
|
||||
with self._series_lock:
|
||||
self._series[key] = self._series.get(key, 0.0) + amount
|
||||
|
||||
def _run(self) -> None:
|
||||
while not self._stop_event.wait(self._interval_seconds):
|
||||
self._flush()
|
||||
|
||||
def _flush(self) -> None:
|
||||
with self._series_lock:
|
||||
snapshot = dict(self._series)
|
||||
|
||||
if not snapshot:
|
||||
samples = [
|
||||
sample
|
||||
for metric in self._registry.collect()
|
||||
for sample in metric.samples
|
||||
if not sample.name.endswith("_created")
|
||||
]
|
||||
if not samples:
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -267,19 +233,19 @@ class _PrometheusRemoteWritePusher:
|
||||
request = write_request_cls()
|
||||
timestamp_ms = int(time.time() * 1000)
|
||||
|
||||
for (metric_name, labels), value in snapshot.items():
|
||||
for prometheus_sample in samples:
|
||||
time_series = request.timeseries.add()
|
||||
metric_label = time_series.labels.add()
|
||||
metric_label.name = "__name__"
|
||||
metric_label.value = metric_name
|
||||
metric_label.value = prometheus_sample.name
|
||||
|
||||
for label_name, label_value in labels:
|
||||
for label_name, label_value in sorted(prometheus_sample.labels.items()):
|
||||
label = time_series.labels.add()
|
||||
label.name = label_name
|
||||
label.value = label_value
|
||||
|
||||
sample = time_series.samples.add()
|
||||
sample.value = value
|
||||
sample.value = float(prometheus_sample.value)
|
||||
sample.timestamp = timestamp_ms
|
||||
|
||||
payload = snappy.compress(request.SerializeToString())
|
||||
@@ -318,7 +284,9 @@ def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) -
|
||||
_int_env("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", 5000) / 1000.0,
|
||||
)
|
||||
timeout_seconds = _float_env("STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS", 2.0)
|
||||
registry = CollectorRegistry()
|
||||
_METRIC_PUSHER = _PrometheusRemoteWritePusher(
|
||||
registry=registry,
|
||||
endpoint=_remote_write_endpoint(),
|
||||
interval_seconds=interval_seconds,
|
||||
timeout_seconds=timeout_seconds,
|
||||
@@ -329,15 +297,29 @@ def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) -
|
||||
"sdk_name": SDK_NAME,
|
||||
"sdk_version": sdk_version,
|
||||
}
|
||||
return (
|
||||
_RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_requests_total", static_labels),
|
||||
_RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_errors_total", static_labels),
|
||||
_RemoteWriteHistogram(
|
||||
_METRIC_PUSHER,
|
||||
request_counter = Counter(
|
||||
"starry_sdk_client_requests",
|
||||
"Total SDK client calls.",
|
||||
labelnames=_PROMETHEUS_LABEL_NAMES,
|
||||
registry=registry,
|
||||
)
|
||||
error_counter = Counter(
|
||||
"starry_sdk_client_errors",
|
||||
"Total SDK client calls ending in an exception.",
|
||||
labelnames=_PROMETHEUS_LABEL_NAMES,
|
||||
registry=registry,
|
||||
)
|
||||
duration_histogram = Histogram(
|
||||
"starry_sdk_client_request_duration_seconds",
|
||||
static_labels,
|
||||
_DURATION_BUCKETS,
|
||||
),
|
||||
"SDK client call latency in seconds.",
|
||||
labelnames=_PROMETHEUS_LABEL_NAMES,
|
||||
registry=registry,
|
||||
buckets=_DURATION_BUCKETS,
|
||||
)
|
||||
return (
|
||||
_PrometheusCounter(request_counter, static_labels),
|
||||
_PrometheusCounter(error_counter, static_labels),
|
||||
_PrometheusHistogram(duration_histogram, static_labels),
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user