diff --git a/README.md b/README.md index a629b3b..5f73829 100644 --- a/README.md +++ b/README.md @@ -3,16 +3,18 @@ ```mermaid flowchart LR - SDK[Python SDK] -->|logs OTLP/HTTP| FB[Fluent Bit] + SDK[Python SDK] -->|logs/traces OTLP/HTTP| FB[Fluent Bit] SDK -->|metrics Prometheus remote_write| FB FB -->|logs| VL[VictoriaLogs] + FB -->|traces| VT[VictoriaTraces] FB -->|metrics remote_write| VM[VictoriaMetrics] VM -->|query| GF[Grafana] VL -->|query| GF + VT -->|Jaeger query| GF ``` @@ -30,9 +32,9 @@ flowchart TD BK[Backend] -->|trace| FB - FB -->|logs| VL[VictoriaLogs cluster] - FB -->|metrics remote_write| VM[VictoriaMetrics] - FB -->|trace| VT[VictoriaTraces] + FB -->|logs| VL[VictoriaLogs (cluster)] + FB -->|metrics remote_write| VM[VictoriaMetrics (cluster)] + FB -->|trace| VT[VictoriaTraces (cluster)] VM -->|query| GF[Grafana] VL -->|query| GF diff --git a/demo-app/main.py b/demo-app/main.py index b570d10..1256837 100644 --- a/demo-app/main.py +++ b/demo-app/main.py @@ -34,7 +34,7 @@ def main() -> None: except Exception as exc: print(f"ERROR path={path!r} type={exc.__class__.__name__} error={exc}") - # Demo app is short-looping, so flush logs and remote_write metrics quickly. + # Demo app is short-looping, so flush logs, traces, and remote_write metrics quickly. force_flush() if not loop: diff --git a/docker-compose.yml b/docker-compose.yml index 68a0ca5..e3d2392 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -64,6 +64,29 @@ services: source: victoriametrics target: /vmsingle volume: {} + victoriatraces: + command: + - -storageDataPath=/vtraces + - -loggerFormat=json + healthcheck: + test: + - CMD + - wget + - -qO- + - http://127.0.0.1:10428/health + timeout: 1s + interval: 1s + retries: 10 + image: docker.io/victoriametrics/victoria-traces:latest + networks: + default: null + ports: + - 10428:10428 + volumes: + - type: volume + source: victoriatraces + target: /vtraces + volume: {} demo-app: build: context: . @@ -73,6 +96,7 @@ services: STARRYSDK_SERVICE_NAME: starry-python-sdk-demo STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT: http://fluentbit:8080/api/prom/push STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS: "5" + STARRYSDK_TRACE_SAMPLE_RATE: "1.0" OTEL_EXPORTER_OTLP_ENDPOINT: http://fluentbit:4318 DEMO_LOOP: "true" DEMO_INTERVAL_SECONDS: "5" @@ -92,5 +116,7 @@ volumes: name: fluentbit-oltp_victorialogs-1 victoriametrics: name: fluentbit-oltp_victoriametrics + victoriatraces: + name: fluentbit-oltp_victoriatraces grafana-data: name: fluentbit-oltp_grafana diff --git a/fluent-bit.conf b/fluent-bit.conf index cee6898..d064959 100644 --- a/fluent-bit.conf +++ b/fluent-bit.conf @@ -67,6 +67,18 @@ Compress gzip Workers 2 +# Python SDK 发来的 traces -> VictoriaTraces +[OUTPUT] + Name opentelemetry + Match v1_traces + Host victoriatraces + Port 10428 + + Traces_Uri /insert/opentelemetry/v1/traces + + Compress gzip + Workers 2 + [OUTPUT] Name stdout Match * diff --git a/starry-sdk/README.md b/starry-sdk/README.md index 7f3cc83..64e2e8f 100644 --- a/starry-sdk/README.md +++ b/starry-sdk/README.md @@ -1,6 +1,6 @@ # starry-client-sdk demo -Demo SDK: request `https://blog.starryskymeow.top/{path}`, return response text, and raise `StarryNotFoundError` on HTTP 404. Telemetry is enabled by default. Metrics are pushed to Fluent Bit with Prometheus remote_write; exception logs are exported with OTLP/HTTP. +Demo SDK: request `https://blog.starryskymeow.top/{path}`, return response text, and raise `StarryNotFoundError` on HTTP 404. Telemetry is enabled by default. Metrics are pushed to Fluent Bit with Prometheus remote_write; exception logs and traces are exported with OTLP/HTTP. In the demo compose stack, traces are forwarded from Fluent Bit to VictoriaTraces. Trace sampling defaults to 100%. Environment variables: @@ -9,5 +9,7 @@ Environment variables: - `STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT`: default `http://host.docker.internal:8080/api/prom/push`. - `STARRYSDK_METRIC_PUSH_INTERVAL_SECONDS`: default `5`. - `STARRYSDK_METRIC_PUSH_TIMEOUT_SECONDS`: default `2`. -- `OTEL_EXPORTER_OTLP_ENDPOINT`: default `http://host.docker.internal:4318`; SDK appends `/v1/logs` for OTLP/HTTP logs. +- `STARRYSDK_TRACE_SAMPLE_RATE`: default `1.0`; clamped to `0.0`-`1.0`. +- `OTEL_EXPORTER_OTLP_ENDPOINT`: default `http://host.docker.internal:4318`; SDK appends `/v1/logs` and `/v1/traces` for OTLP/HTTP telemetry. - `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT`: optional explicit logs endpoint. +- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`: optional explicit traces endpoint. diff --git a/starry-sdk/pyproject.toml b/starry-sdk/pyproject.toml index aadf891..9a91c30 100644 --- a/starry-sdk/pyproject.toml +++ b/starry-sdk/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "starry-client-sdk" version = "0.1.0" -description = "Demo Python SDK with default-on Prometheus metrics and OpenTelemetry exception logs." +description = "Demo Python SDK with default-on Prometheus metrics, OpenTelemetry exception logs, and traces." readme = "README.md" requires-python = ">=3.9" dependencies = [ diff --git a/starry-sdk/starry_client_sdk/client.py b/starry-sdk/starry_client_sdk/client.py index 4cb53b5..1caae1c 100644 --- a/starry-sdk/starry_client_sdk/client.py +++ b/starry-sdk/starry_client_sdk/client.py @@ -4,6 +4,7 @@ import time from dataclasses import dataclass from typing import Any, Optional +from opentelemetry.trace import SpanKind, Status, StatusCode import requests from .telemetry import SDK_NAME, configure_telemetry @@ -89,38 +90,59 @@ class StarryClient: "error_type": "none", } - start = time.perf_counter() - try: - response = requests.get(url, timeout=self.timeout_seconds) - attrs["http_status_code"] = response.status_code + with self._telemetry.tracer.start_as_current_span( + "starry_client_sdk.get", + kind=SpanKind.CLIENT, + attributes={ + "sdk.name": SDK_NAME, + "sdk.version": self.sdk_version, + "sdk.interface": context.sdk_interface, + "http.request.method": context.http_method, + "url.path": attrs["url_path"], + }, + record_exception=False, + set_status_on_exception=False, + ) as span: + start = time.perf_counter() + try: + response = requests.get(url, timeout=self.timeout_seconds) + attrs["http_status_code"] = response.status_code + span.set_attribute("http.response.status_code", response.status_code) - if response.status_code == 404: + if response.status_code == 404: + attrs["outcome"] = "error" + attrs["error_type"] = "StarryNotFoundError" + raise StarryNotFoundError(path=normalized_path, url=url) + + response.raise_for_status() + attrs["outcome"] = "success" + span.set_attribute("sdk.outcome", attrs["outcome"]) + span.set_status(Status(StatusCode.OK)) + return response.text + except Exception as exc: attrs["outcome"] = "error" - attrs["error_type"] = "StarryNotFoundError" - raise StarryNotFoundError(path=normalized_path, url=url) - - response.raise_for_status() - attrs["outcome"] = "success" - return response.text - except Exception as exc: - attrs["outcome"] = "error" - attrs["error_type"] = exc.__class__.__name__ - self._telemetry.logger.exception( - "Starry SDK request error", - extra={ - "sdk_name": SDK_NAME, - "sdk_version": self.sdk_version, - "sdk_interface": context.sdk_interface, - "http_method": context.http_method, - "http_status_code": attrs.get("http_status_code", 0), - "url_path": attrs["url_path"], - "error_type": attrs["error_type"], - }, - ) - raise - finally: - duration_seconds = time.perf_counter() - start - self._telemetry.request_counter.add(1, attributes=attrs) - self._telemetry.duration_histogram.record(duration_seconds, attributes=attrs) - if attrs.get("outcome") == "error": - self._telemetry.error_counter.add(1, attributes=attrs) + attrs["error_type"] = exc.__class__.__name__ + span.set_attribute("sdk.outcome", attrs["outcome"]) + span.set_attribute("error.type", attrs["error_type"]) + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, attrs["error_type"])) + self._telemetry.logger.exception( + "Starry SDK request error", + extra={ + "sdk_name": SDK_NAME, + "sdk_version": self.sdk_version, + "sdk_interface": context.sdk_interface, + "http_method": context.http_method, + "http_status_code": attrs.get("http_status_code", 0), + "url_path": attrs["url_path"], + "error_type": attrs["error_type"], + }, + ) + raise + finally: + duration_seconds = time.perf_counter() - start + span.set_attribute("sdk.duration_seconds", duration_seconds) + self._telemetry.request_counter.add(1, attributes=attrs) + self._telemetry.duration_histogram.record(duration_seconds, attributes=attrs) + if attrs.get("outcome") == "error": + self._telemetry.error_counter.add(1, attributes=attrs) diff --git a/starry-sdk/starry_client_sdk/telemetry.py b/starry-sdk/starry_client_sdk/telemetry.py index 2cd38e2..a461165 100644 --- a/starry-sdk/starry_client_sdk/telemetry.py +++ b/starry-sdk/starry_client_sdk/telemetry.py @@ -34,6 +34,7 @@ _DURATION_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, _LOCK = threading.Lock() _HANDLES: Optional["TelemetryHandles"] = None _LOGGER_PROVIDER: Any = None +_TRACER_PROVIDER: Any = None _METRIC_PUSHER: Optional["_PrometheusRemoteWritePusher"] = None _WRITE_REQUEST_CLASS: Any = None @@ -48,6 +49,28 @@ class _NoopHistogram: return None +class _NoopSpan: + def __enter__(self) -> "_NoopSpan": + return self + + def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: + return None + + def set_attribute(self, key: str, value: Any) -> None: + return None + + def record_exception(self, exception: Exception) -> None: + return None + + def set_status(self, status: Any) -> None: + return None + + +class _NoopTracer: + def start_as_current_span(self, name: str, **kwargs: Any) -> _NoopSpan: + return _NoopSpan() + + class _PrometheusCounter: def __init__(self, metric: Counter, static_labels: dict[str, str]) -> None: self._metric = metric @@ -71,6 +94,7 @@ class TelemetryHandles: request_counter: Any error_counter: Any duration_histogram: Any + tracer: Any logger: logging.Logger enabled: bool @@ -91,6 +115,10 @@ def _logs_endpoint() -> str: return os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", f"{_base_endpoint()}/v1/logs") +def _traces_endpoint() -> str: + return os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", f"{_base_endpoint()}/v1/traces") + + def _remote_write_endpoint() -> str: return os.getenv("STARRYSDK_PROMETHEUS_REMOTE_WRITE_ENDPOINT", DEFAULT_PROMETHEUS_REMOTE_WRITE_ENDPOINT) @@ -271,6 +299,7 @@ def _noop_handles() -> TelemetryHandles: request_counter=_NoopCounter(), error_counter=_NoopCounter(), duration_histogram=_NoopHistogram(), + tracer=_NoopTracer(), logger=logging.getLogger("starry_client_sdk"), enabled=False, ) @@ -352,12 +381,40 @@ def _configure_otel_logs(*, logger: logging.Logger, service_name: str, sdk_versi logger.addHandler(otel_handler) +def _configure_otel_traces(*, service_name: str, sdk_version: str) -> Any: + global _TRACER_PROVIDER + + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased + + sample_rate = max(0.0, min(1.0, _float_env("STARRYSDK_TRACE_SAMPLE_RATE", 1.0))) + resource = Resource.create( + { + "service.name": service_name, + "sdk.name": SDK_NAME, + "sdk.version": sdk_version, + "telemetry.source": "client-sdk", + } + ) + + _TRACER_PROVIDER = TracerProvider( + resource=resource, + sampler=ParentBased(root=TraceIdRatioBased(sample_rate)), + ) + trace_exporter = OTLPSpanExporter(endpoint=_traces_endpoint()) + _TRACER_PROVIDER.add_span_processor(BatchSpanProcessor(trace_exporter)) + return _TRACER_PROVIDER.get_tracer(SDK_NAME, sdk_version) + + def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str = __version__) -> TelemetryHandles: """Configure default-on, non-blocking telemetry for this SDK. - Metrics are pushed to Fluent Bit with Prometheus remote_write. Exception logs are still exported with - OTLP/HTTP. Telemetry failures must never break business calls; if setup fails, the SDK falls back to - no-op meters. + Metrics are pushed to Fluent Bit with Prometheus remote_write. Exception logs and traces are exported + with OTLP/HTTP. Telemetry failures must never break business calls; if setup fails, the SDK falls back + to no-op handles. """ global _HANDLES @@ -377,7 +434,9 @@ def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str request_counter: Any = _NoopCounter() error_counter: Any = _NoopCounter() duration_histogram: Any = _NoopHistogram() + tracer: Any = _NoopTracer() metrics_enabled = False + traces_enabled = False try: request_counter, error_counter, duration_histogram = _configure_prometheus_remote_write( @@ -396,12 +455,19 @@ def configure_telemetry(*, service_name: Optional[str] = None, sdk_version: str except Exception: logging.getLogger("starry_client_sdk.telemetry").debug("SDK OTLP log setup failed", exc_info=True) + try: + tracer = _configure_otel_traces(service_name=service_name, sdk_version=sdk_version) + traces_enabled = True + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("SDK OTLP trace setup failed", exc_info=True) + _HANDLES = TelemetryHandles( request_counter=request_counter, error_counter=error_counter, duration_histogram=duration_histogram, + tracer=tracer, logger=logger, - enabled=metrics_enabled, + enabled=metrics_enabled or traces_enabled, ) atexit.register(shutdown_telemetry) return _HANDLES @@ -414,11 +480,18 @@ def force_flush(timeout_millis: int = 5000) -> None: _METRIC_PUSHER.force_flush() if _LOGGER_PROVIDER is None: - return - try: - _LOGGER_PROVIDER.force_flush(timeout_millis=timeout_millis) - except Exception: - logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry force_flush failed", exc_info=True) + pass + else: + try: + _LOGGER_PROVIDER.force_flush(timeout_millis=timeout_millis) + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry log force_flush failed", exc_info=True) + + if _TRACER_PROVIDER is not None: + try: + _TRACER_PROVIDER.force_flush(timeout_millis=timeout_millis) + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry trace force_flush failed", exc_info=True) def shutdown_telemetry() -> None: @@ -436,4 +509,10 @@ def shutdown_telemetry() -> None: try: _LOGGER_PROVIDER.shutdown() except Exception: - logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry shutdown failed", exc_info=True) + logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry log shutdown failed", exc_info=True) + + if _TRACER_PROVIDER is not None: + try: + _TRACER_PROVIDER.shutdown() + except Exception: + logging.getLogger("starry_client_sdk.telemetry").debug("Telemetry trace shutdown failed", exc_info=True)