diff --git a/README.md b/README.md index 07b394b..b23c6b8 100644 --- a/README.md +++ b/README.md @@ -3,11 +3,12 @@ ```mermaid flowchart LR - SDK[Python SDK] -->|logs / metrics| FB[Fluent Bit] + SDK[Python SDK] -->|logs OTLP/HTTP| FB[Fluent Bit] + SDK -->|metrics Prometheus remote_write| FB FB -->|logs| VL[VictoriaLogs] - FB -->|metrics| VM[VictoriaMetrics] + FB -->|metrics remote_write| VM[VictoriaMetrics] VM -->|query| GF[Grafana] @@ -24,12 +25,12 @@ flowchart TD GW[Gateway
Auth 鉴权
Rate Limit 限流] FB[Fluent Bit] - SDK -->|logs / metrics + telemetry token| GW + SDK -->|logs / Prometheus remote_write metrics + token| GW GW -->|logs / metrics| FB FB -->|logs| VL[VictoriaLogs cluster] - FB -->|metrics| VM[VictoriaMetrics] + FB -->|metrics remote_write| VM[VictoriaMetrics] VM -->|query| GF[Grafana] VL -->|query| GF diff --git a/demo-app/main.py b/demo-app/main.py index 630572f..b570d10 100644 --- a/demo-app/main.py +++ b/demo-app/main.py @@ -34,7 +34,7 @@ def main() -> None: except Exception as exc: print(f"ERROR path={path!r} type={exc.__class__.__name__} error={exc}") - # Demo app is short-looping, so flush to make logs/metrics visible quickly. + # Demo app is short-looping, so flush logs and remote_write metrics quickly. force_flush() if not loop: diff --git a/docker-compose.yml b/docker-compose.yml index 57595d5..68a0ca5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,7 @@ services: default: null ports: - 4318:4318 + - 8080:8080 volumes: - ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf victorialogs-1: @@ -70,12 +71,12 @@ services: environment: STARRYSDK_TELEMETRY_ENABLED: "true" STARRYSDK_SERVICE_NAME: starry-python-sdk-demo - STARRYSDK_METRIC_EXPORT_INTERVAL_MS: "5000" - #OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4318 + STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT: http://fluentbit:8080/api/prom/push + STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS: "5" + OTEL_EXPORTER_OTLP_ENDPOINT: http://fluentbit:4318 DEMO_LOOP: "true" DEMO_INTERVAL_SECONDS: "5" DEMO_PATHS: ",__sdk_demo_not_found__" - DEFAULT_OTLP_ENDPOINT: "http://fluentbit:4318" grafana: image: grafana/grafana-enterprise ports: diff --git a/fluent-bit.conf b/fluent-bit.conf index ee98040..cee6898 100644 --- a/fluent-bit.conf +++ b/fluent-bit.conf @@ -26,16 +26,26 @@ Buffer_Max_Size 10M Threaded On -# Python SDK 发来的 metrics -> VictoriaMetrics remote_write +# Python SDK 主动推送的 Prometheus remote_write metrics -> VictoriaMetrics remote_write +[INPUT] + Name prometheus_remote_write + Listen 0.0.0.0 + Port 8080 + Uri /api/prom/push + Tag sdk_metrics + Tag_From_Uri false + Successful_Response_Code 200 + Threaded On + [OUTPUT] Name prometheus_remote_write - Match v1_metrics + Match sdk_metrics Host victoriametrics Port 8428 Uri /api/v1/write # 可选公共 label,便于区分来源 - Add_Label otel_pipeline fluent-bit + Add_Label metrics_pipeline fluent-bit_prometheus_remote_write Workers 2 # Python SDK 发来的 logs -> vlagent -> VictoriaLogs diff --git a/starry-sdk/README.md b/starry-sdk/README.md index e0dadf4..7f3cc83 100644 --- a/starry-sdk/README.md +++ b/starry-sdk/README.md @@ -1,12 +1,13 @@ # starry-client-sdk demo -Demo SDK: request `https://blog.starryskymeow.top/{path}`, return response text, and raise `StarryNotFoundError` on HTTP 404. Telemetry is enabled by default and exported with OTLP/HTTP. +Demo SDK: request `https://blog.starryskymeow.top/{path}`, return response text, and raise `StarryNotFoundError` on HTTP 404. Telemetry is enabled by default. Metrics are pushed to Fluent Bit with Prometheus remote_write; exception logs are exported with OTLP/HTTP. Environment variables: - `STARRYSDK_TELEMETRY_ENABLED`: default `true`; set `false` to disable SDK telemetry. - `STARRYSDK_SERVICE_NAME`: default `starry-python-sdk-consumer`. -- `STARRYSDK_METRIC_EXPORT_INTERVAL_MS`: default `5000`. -- `OTEL_EXPORTER_OTLP_ENDPOINT`: default `http://localhost:4318`; SDK appends `/v1/metrics` and `/v1/logs` for OTLP/HTTP. -- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`: optional explicit metrics endpoint. +- `STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT`: default `http://host.docker.internal:8080/api/prom/push`. +- `STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS`: default `5`. +- `STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS`: default `2`. +- `OTEL_EXPORTER_OTLP_ENDPOINT`: default `http://host.docker.internal:4318`; SDK appends `/v1/logs` for OTLP/HTTP logs. - `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT`: optional explicit logs endpoint. diff --git a/starry-sdk/pyproject.toml b/starry-sdk/pyproject.toml index c37295f..84e1900 100644 --- a/starry-sdk/pyproject.toml +++ b/starry-sdk/pyproject.toml @@ -5,11 +5,13 @@ build-backend = "setuptools.build_meta" [project] name = "starry-client-sdk" version = "0.1.0" -description = "Demo Python SDK with default-on OpenTelemetry metrics and exception logs." +description = "Demo Python SDK with default-on Prometheus metrics and OpenTelemetry exception logs." readme = "README.md" requires-python = ">=3.9" dependencies = [ "requests>=2.32.0,<3.0", + "protobuf>=5.0,<7.0", + "python-snappy>=0.7.0,<1.0", "opentelemetry-api==1.41.1", "opentelemetry-sdk==1.41.1", "opentelemetry-exporter-otlp-proto-http==1.41.1", diff --git a/starry-sdk/starry_client_sdk/client.py b/starry-sdk/starry_client_sdk/client.py index 8e1ec93..4cb53b5 100644 --- a/starry-sdk/starry_client_sdk/client.py +++ b/starry-sdk/starry_client_sdk/client.py @@ -57,7 +57,7 @@ class StarryClient: """Demo client SDK. `get(path)` requests `https://blog.starryskymeow.top/{path}` and returns `str`. - HTTP 404 is converted to `StarryNotFoundError` and logged through OpenTelemetry. + HTTP 404 is converted to `StarryNotFoundError` and logged through SDK telemetry. """ def __init__( @@ -86,6 +86,7 @@ class StarryClient: "url_path": _metric_path(normalized_path), "outcome": "unknown", "http_status_code": 0, + "error_type": "none", } start = time.perf_counter() @@ -103,7 +104,7 @@ class StarryClient: return response.text except Exception as exc: attrs["outcome"] = "error" - attrs.setdefault("error_type", exc.__class__.__name__) + attrs["error_type"] = exc.__class__.__name__ self._telemetry.logger.exception( "Starry SDK request error", extra={ @@ -118,8 +119,8 @@ class StarryClient: ) raise finally: - duration_ms = (time.perf_counter() - start) * 1000.0 + duration_seconds = time.perf_counter() - start self._telemetry.request_counter.add(1, attributes=attrs) - self._telemetry.duration_histogram.record(duration_ms, attributes=attrs) + self._telemetry.duration_histogram.record(duration_seconds, attributes=attrs) if attrs.get("outcome") == "error": self._telemetry.error_counter.add(1, attributes=attrs) diff --git a/starry-sdk/starry_client_sdk/telemetry.py b/starry-sdk/starry_client_sdk/telemetry.py index 5375bfa..b1401a1 100644 --- a/starry-sdk/starry_client_sdk/telemetry.py +++ b/starry-sdk/starry_client_sdk/telemetry.py @@ -4,19 +4,37 @@ import atexit import logging import os import threading +import time from dataclasses import dataclass from typing import Any, Optional +import requests + from .version import __version__ SDK_NAME = "starry-client-sdk" DEFAULT_SERVICE_NAME = "starry-python-sdk-consumer" DEFAULT_OTLP_ENDPOINT = "http://host.docker.internal:4318" +DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT = "http://host.docker.internal:8080/api/prom/push" + +_PROMETHEUS_LABEL_NAMES = ( + "service_name", + "sdk_name", + "sdk_version", + "sdk_interface", + "http_method", + "url_path", + "outcome", + "http_status_code", + "error_type", +) +_DURATION_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0) _LOCK = threading.Lock() _HANDLES: Optional["TelemetryHandles"] = None -_METER_PROVIDER: Any = None _LOGGER_PROVIDER: Any = None +_METRIC_PUSHER: Optional["_PrometheusRemoteWritePusher"] = None +_WRITE_REQUEST_CLASS: Any = None class _NoopCounter: @@ -29,6 +47,38 @@ class _NoopHistogram: return None +class _RemoteWriteCounter: + def __init__(self, pusher: "_PrometheusRemoteWritePusher", metric_name: str, static_labels: dict[str, str]) -> None: + self._pusher = pusher + self._metric_name = metric_name + self._static_labels = static_labels + + def add(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None: + self._pusher.add_counter(self._metric_name, amount, _prometheus_labels(self._static_labels, attributes)) + + +class _RemoteWriteHistogram: + def __init__( + self, + pusher: "_PrometheusRemoteWritePusher", + metric_name: str, + static_labels: dict[str, str], + buckets: tuple[float, ...], + ) -> None: + self._pusher = pusher + self._metric_name = metric_name + self._static_labels = static_labels + self._buckets = buckets + + def record(self, amount: int | float, attributes: Optional[dict[str, Any]] = None) -> None: + self._pusher.observe_histogram( + self._metric_name, + float(amount), + _prometheus_labels(self._static_labels, attributes), + self._buckets, + ) + + @dataclass(frozen=True) class TelemetryHandles: request_counter: Any @@ -50,14 +100,206 @@ def _base_endpoint() -> str: return os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", DEFAULT_OTLP_ENDPOINT).rstrip("/") -def _metrics_endpoint() -> str: - return os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", f"{_base_endpoint()}/v1/metrics") - - def _logs_endpoint() -> str: return os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", f"{_base_endpoint()}/v1/logs") +def _remote_write_endpoint() -> str: + return os.getenv("STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT", DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT) + + +def _int_env(name: str, default: int) -> int: + try: + return int(os.getenv(name, str(default))) + except ValueError: + return default + + +def _float_env(name: str, default: float) -> float: + try: + return float(os.getenv(name, str(default))) + except ValueError: + return default + + +def _prometheus_labels(static_labels: dict[str, str], attributes: Optional[dict[str, Any]]) -> dict[str, str]: + raw = {**static_labels, **(attributes or {})} + labels = {} + for name in _PROMETHEUS_LABEL_NAMES: + value = raw.get(name, "") + labels[name] = str(value if value is not None else "") + return labels + + +def _series_key(metric_name: str, labels: dict[str, str]) -> tuple[str, tuple[tuple[str, str], ...]]: + return metric_name, tuple(sorted(labels.items())) + + +def _format_bucket(bucket: float) -> str: + return f"{bucket:g}" + + +def _write_request_class() -> Any: + global _WRITE_REQUEST_CLASS + + if _WRITE_REQUEST_CLASS is not None: + return _WRITE_REQUEST_CLASS + + from google.protobuf import descriptor_pb2, descriptor_pool, message_factory + + file_proto = descriptor_pb2.FileDescriptorProto() + file_proto.name = "prometheus_remote_write.proto" + file_proto.package = "prometheus" + file_proto.syntax = "proto3" + + label_msg = file_proto.message_type.add() + label_msg.name = "Label" + field = label_msg.field.add() + field.name = "name" + field.number = 1 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_STRING + field = label_msg.field.add() + field.name = "value" + field.number = 2 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_STRING + + sample_msg = file_proto.message_type.add() + sample_msg.name = "Sample" + field = sample_msg.field.add() + field.name = "value" + field.number = 1 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_DOUBLE + field = sample_msg.field.add() + field.name = "timestamp" + field.number = 2 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_INT64 + + time_series_msg = file_proto.message_type.add() + time_series_msg.name = "TimeSeries" + field = time_series_msg.field.add() + field.name = "labels" + field.number = 1 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE + field.type_name = ".prometheus.Label" + field = time_series_msg.field.add() + field.name = "samples" + field.number = 2 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE + field.type_name = ".prometheus.Sample" + + write_request_msg = file_proto.message_type.add() + write_request_msg.name = "WriteRequest" + field = write_request_msg.field.add() + field.name = "timeseries" + field.number = 1 + field.label = descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED + field.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE + field.type_name = ".prometheus.TimeSeries" + + pool = descriptor_pool.DescriptorPool() + pool.Add(file_proto) + _WRITE_REQUEST_CLASS = message_factory.GetMessageClass(pool.FindMessageTypeByName("prometheus.WriteRequest")) + return _WRITE_REQUEST_CLASS + + +class _PrometheusRemoteWritePusher: + def __init__(self, *, endpoint: str, interval_seconds: float, timeout_seconds: float) -> None: + self._endpoint = endpoint + self._interval_seconds = interval_seconds + self._timeout_seconds = timeout_seconds + self._series: dict[tuple[str, tuple[tuple[str, str], ...]], float] = {} + self._series_lock = threading.Lock() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._run, name="starry-sdk-prometheus-remote-write", daemon=True) + self._thread.start() + + def add_counter(self, metric_name: str, amount: int | float, labels: dict[str, str]) -> None: + self._add_sample(metric_name, labels, float(amount)) + + def observe_histogram( + self, + metric_name: str, + amount: float, + labels: dict[str, str], + buckets: tuple[float, ...], + ) -> None: + for bucket in buckets: + if amount <= bucket: + self._add_sample(f"{metric_name}_bucket", {**labels, "le": _format_bucket(bucket)}, 1.0) + self._add_sample(f"{metric_name}_bucket", {**labels, "le": "+Inf"}, 1.0) + self._add_sample(f"{metric_name}_count", labels, 1.0) + self._add_sample(f"{metric_name}_sum", labels, amount) + + def force_flush(self) -> None: + self._flush() + + def shutdown(self) -> None: + self._stop_event.set() + self._thread.join(timeout=1) + self._flush() + + def _add_sample(self, metric_name: str, labels: dict[str, str], amount: float) -> None: + key = _series_key(metric_name, labels) + with self._series_lock: + self._series[key] = self._series.get(key, 0.0) + amount + + def _run(self) -> None: + while not self._stop_event.wait(self._interval_seconds): + self._flush() + + def _flush(self) -> None: + with self._series_lock: + snapshot = dict(self._series) + + if not snapshot: + return + + try: + import snappy + + write_request_cls = _write_request_class() + request = write_request_cls() + timestamp_ms = int(time.time() * 1000) + + for (metric_name, labels), value in snapshot.items(): + time_series = request.timeseries.add() + metric_label = time_series.labels.add() + metric_label.name = "__name__" + metric_label.value = metric_name + + for label_name, label_value in labels: + label = time_series.labels.add() + label.name = label_name + label.value = label_value + + sample = time_series.samples.add() + sample.value = value + sample.timestamp = timestamp_ms + + payload = snappy.compress(request.SerializeToString()) + requests.post( + self._endpoint, + data=payload, + headers={ + "Content-Encoding": "snappy", + "Content-Type": "application/x-protobuf", + "X-Prometheus-Remote-Write-Version": "0.1.0", + }, + timeout=self._timeout_seconds, + ).raise_for_status() + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug( + "Prometheus remote_write metrics push failed", + exc_info=True, + ) + + def _noop_handles() -> TelemetryHandles: return TelemetryHandles( request_counter=_NoopCounter(), @@ -68,15 +310,75 @@ def _noop_handles() -> TelemetryHandles: ) +def _configure_prometheus_remote_write(*, service_name: str, sdk_version: str) -> tuple[Any, Any, Any]: + global _METRIC_PUSHER + + interval_seconds = _float_env( + "STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS", + _int_env("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", 5000) / 1000.0, + ) + timeout_seconds = _float_env("STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS", 2.0) + _METRIC_PUSHER = _PrometheusRemoteWritePusher( + endpoint=_remote_write_endpoint(), + interval_seconds=interval_seconds, + timeout_seconds=timeout_seconds, + ) + + static_labels = { + "service_name": service_name, + "sdk_name": SDK_NAME, + "sdk_version": sdk_version, + } + return ( + _RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_requests_total", static_labels), + _RemoteWriteCounter(_METRIC_PUSHER, "starry_sdk_client_errors_total", static_labels), + _RemoteWriteHistogram( + _METRIC_PUSHER, + "starry_sdk_client_request_duration_seconds", + static_labels, + _DURATION_BUCKETS, + ), + ) + + +def _configure_otel_logs(*, logger: logging.Logger, service_name: str, sdk_version: str) -> None: + global _LOGGER_PROVIDER + + from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter + from opentelemetry.instrumentation.logging.handler import LoggingHandler + from opentelemetry.sdk._logs import LoggerProvider + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.sdk.resources import Resource + + resource = Resource.create( + { + "service.name": service_name, + "sdk.name": SDK_NAME, + "sdk.version": sdk_version, + "telemetry.source": "client-sdk", + } + ) + + _LOGGER_PROVIDER = LoggerProvider(resource=resource) + log_exporter = OTLPLogExporter(endpoint=_logs_endpoint()) + _LOGGER_PROVIDER.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) + + # Attach only one OTLP handler to the SDK logger. Do not attach to root logger. + if not any(getattr(handler, "_starry_sdk_otel_handler", False) for handler in logger.handlers): + otel_handler = LoggingHandler(level=logging.INFO, logger_provider=_LOGGER_PROVIDER) + setattr(otel_handler, "_starry_sdk_otel_handler", True) + logger.addHandler(otel_handler) + + def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str = __version__) -> TelemetryHandles: """Configure default-on, non-blocking telemetry for this SDK. - This demo intentionally configures local OpenTelemetry providers owned by the SDK, so the SDK can - be observable by default without overwriting an application's global OpenTelemetry configuration. - Export failures must never break business calls; if setup fails, the SDK falls back to no-op meters. + Metrics are pushed to Fluent Bit with Prometheus remote_write. Exception logs are still exported with + OTLP/HTTP. Telemetry failures must never break business calls; if setup fails, the SDK falls back to + no-op meters. """ - global _HANDLES, _METER_PROVIDER, _LOGGER_PROVIDER + global _HANDLES with _LOCK: if _HANDLES is not None: @@ -89,90 +391,67 @@ def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str _HANDLES = _noop_handles() return _HANDLES + service_name = service_name or os.getenv("STARRYSDK_SERVICE_NAME", DEFAULT_SERVICE_NAME) + request_counter: Any = _NoopCounter() + error_counter: Any = _NoopCounter() + duration_histogram: Any = _NoopHistogram() + metrics_enabled = False + try: - from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter - from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter - from opentelemetry.instrumentation.logging.handler import LoggingHandler - from opentelemetry.sdk._logs import LoggerProvider - from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - from opentelemetry.sdk.metrics import MeterProvider - from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - from opentelemetry.sdk.resources import Resource - - service_name = service_name or os.getenv("STARRYSDK_SERVICE_NAME", DEFAULT_SERVICE_NAME) - export_interval_ms = int(os.getenv("STARRYSDK_METRIC_EXPORT_INTERVAL_MS", "5000")) - - resource = Resource.create( - { - "service.name": service_name, - "sdk.name": SDK_NAME, - "sdk.version": sdk_version, - "telemetry.source": "client-sdk", - } + request_counter, error_counter, duration_histogram = _configure_prometheus_remote_write( + service_name=service_name, + sdk_version=sdk_version, ) - - metric_exporter = OTLPMetricExporter(endpoint=_metrics_endpoint()) - metric_reader = PeriodicExportingMetricReader( - metric_exporter, - export_interval_millis=export_interval_ms, - ) - _METER_PROVIDER = MeterProvider(resource=resource, metric_readers=[metric_reader]) - meter = _METER_PROVIDER.get_meter(SDK_NAME, sdk_version) - - _LOGGER_PROVIDER = LoggerProvider(resource=resource) - log_exporter = OTLPLogExporter(endpoint=_logs_endpoint()) - _LOGGER_PROVIDER.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) - - # Attach only one OTLP handler to the SDK logger. Do not attach to root logger. - if not any(getattr(handler, "_starry_sdk_otel_handler", False) for handler in logger.handlers): - otel_handler = LoggingHandler(level=logging.INFO, logger_provider=_LOGGER_PROVIDER) - setattr(otel_handler, "_starry_sdk_otel_handler", True) - logger.addHandler(otel_handler) - - _HANDLES = TelemetryHandles( - request_counter=meter.create_counter( - "starry.sdk.client.requests", - unit="1", - description="Total SDK client calls.", - ), - error_counter=meter.create_counter( - "starry.sdk.client.errors", - unit="1", - description="Total SDK client calls ending in an exception.", - ), - duration_histogram=meter.create_histogram( - "starry.sdk.client.request.duration.ms", - unit="ms", - description="SDK client call latency in milliseconds.", - ), - logger=logger, - enabled=True, - ) - atexit.register(shutdown_telemetry) - return _HANDLES + metrics_enabled = True except Exception: # Telemetry must not break SDK business behavior. - logging.getLogger("starry_client_sdk.telemetry").debug("SDK telemetry setup failed", exc_info=True) - _HANDLES = _noop_handles() - return _HANDLES + logging.getLogger("starry_client_sdk.telemetry").debug( + "SDK Prometheus remote_write metrics setup failed", + exc_info=True, + ) + + try: + _configure_otel_logs(logger=logger, service_name=service_name, sdk_version=sdk_version) + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("SDK OTLP log setup failed", exc_info=True) + + _HANDLES = TelemetryHandles( + request_counter=request_counter, + error_counter=error_counter, + duration_histogram=duration_histogram, + logger=logger, + enabled=metrics_enabled, + ) + atexit.register(shutdown_telemetry) + return _HANDLES def force_flush(timeout_millis: int = 5000) -> None: - """Flush telemetry buffers. Useful in short-lived CLI/demo processes.""" + """Flush SDK telemetry buffers. Useful in short-lived CLI/demo processes.""" - for provider in (_METER_PROVIDER, _LOGGER_PROVIDER): - if provider is None: - continue - try: - provider.force_flush(timeout_millis=timeout_millis) - except Exception: - logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry force_flush failed", exc_info=True) + if _METRIC_PUSHER is not None: + _METRIC_PUSHER.force_flush() + + if _LOGGER_PROVIDER is None: + return + try: + _LOGGER_PROVIDER.force_flush(timeout_millis=timeout_millis) + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry force_flush failed", exc_info=True) def shutdown_telemetry() -> None: - for provider in (_METER_PROVIDER, _LOGGER_PROVIDER): - if provider is None: - continue + global _METRIC_PUSHER + + if _METRIC_PUSHER is not None: try: - provider.shutdown() + _METRIC_PUSHER.shutdown() + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("Prometheus remote_write shutdown failed", exc_info=True) + finally: + _METRIC_PUSHER = None + + if _LOGGER_PROVIDER is not None: + try: + _LOGGER_PROVIDER.shutdown() except Exception: logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry shutdown failed", exc_info=True)