Skip to content

Commit 36d4a6b

Browse files
authored
Improve status and health endpoints (#1345)
This PR replaces the concept of aggregated phases with the concept of *conditions*, which aligns better with Kubernetes best practices (see https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties). I maintained the state machine at the `PipelineRuntime` status level, but we now also track a set of three-state conditions (`True`, `False`, `Unknown`) commonly used in K8S (`Accepted`, `Ready`). This change simplifies the creation of the following three global endpoints: * `/status` * `/livez` * `/readyz` This PR fixes the issue #1318 (see status output below). This PR also adds an alias for the metrics endpoint, which is now accessible from `/metrics` as well. In a future PR, I will add events in the different exporters to track the actual state of outgoing connections, in order to properly update the system's readiness. Below the output of the `/status` endpoint after a shutdown request (fixing #1318 ). ```json { "generatedAt": "2025-10-25T02:38:05.426649+00:00", "pipelines": { "default_pipeline_group:default_pipeline": { "conditions": [ { "type": "Accepted", "status": "True", "lastTransitionTime": "2025-10-25T02:35:56.020482+00:00", "reason": "ConfigValid", "message": "Pipeline configuration validated and resources quota is not exceeded." }, { "type": "Ready", "status": "False", "lastTransitionTime": "2025-10-25T02:37:39.788879+00:00", "reason": "QuorumNotMet", "message": "Pipeline is not ready; ready quorum all cores ready not met (0 of 1 cores ready)." } ], "totalCores": 1, "runningCores": 0, "cores": { "1": { "phase": "stopped", "lastHeartbeatTime": "2025-10-25T02:37:39.788879+00:00", "conditions": [ { "type": "Accepted", "status": "True", "lastTransitionTime": "2025-10-25T02:35:56.020482+00:00", "reason": "ConfigValid", "message": "Pipeline admission successful." }, { "type": "Ready", "status": "False", "lastTransitionTime": "2025-10-25T02:37:39.788879+00:00", "reason": "Drained", "message": "Pipeline runtime (core) drained; waiting for shutdown or deletion." } ], "deletePending": false, "recentEvents": [ { "time": "2025-10-25T02:37:39.788879+00:00", "type": { "Success": "Drained" } }, { "time": "2025-10-25T02:36:39.137776+00:00", "type": { "Request": "ShutdownRequested" }, "message": "Shutdown requested via the `/pipeline-groups/shutdown` endpoint." }, { "time": "2025-10-25T02:36:24.421752+00:00", "type": { "Success": "Ready" }, "message": "Pipeline initialization successful." }, { "time": "2025-10-25T02:35:56.020482+00:00", "type": { "Success": "Admitted" }, "message": "Pipeline admission successful." } ] } } } } } ``` Closes #1318
1 parent 8dbbaea commit 36d4a6b

File tree

15 files changed

+1323
-294
lines changed

15 files changed

+1323
-294
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Global health and status endpoints.
5+
//!
6+
//! - GET `/status` - list all pipelines and their status
7+
//! - GET `/livez` - liveness probe
8+
//! - GET `/readyz` - readiness probe
9+
10+
use crate::AppState;
11+
use axum::extract::State;
12+
use axum::http::StatusCode;
13+
use axum::routing::get;
14+
use axum::{Json, Router};
15+
use chrono::Utc;
16+
use otap_df_state::PipelineKey;
17+
use otap_df_state::conditions::{Condition, ConditionKind, ConditionReason, ConditionStatus};
18+
use otap_df_state::pipeline_status::PipelineStatus;
19+
use serde::Serialize;
20+
use std::collections::HashMap;
21+
22+
/// All the routes for health and status endpoints.
23+
pub(crate) fn routes() -> Router<AppState> {
24+
Router::new()
25+
// Returns a summary of all pipelines and their statuses.
26+
.route("/status", get(show_status))
27+
// Returns liveness status.
28+
.route("/livez", get(livez))
29+
// Returns readiness status.
30+
.route("/readyz", get(readyz))
31+
}
32+
33+
#[derive(Serialize)]
34+
#[serde(rename_all = "camelCase")]
35+
pub(crate) struct StatusResponse {
36+
generated_at: String,
37+
pipelines: HashMap<PipelineKey, PipelineStatus>,
38+
}
39+
40+
#[derive(Serialize)]
41+
#[serde(rename_all = "camelCase")]
42+
pub(crate) struct ProbeResponse {
43+
probe: &'static str,
44+
status: &'static str,
45+
generated_at: String,
46+
#[serde(skip_serializing_if = "Vec::is_empty")]
47+
failing: Vec<PipelineConditionFailure>,
48+
}
49+
50+
#[derive(Serialize)]
51+
#[serde(rename_all = "camelCase")]
52+
struct PipelineConditionFailure {
53+
pipeline: PipelineKey,
54+
condition: Condition,
55+
}
56+
57+
pub async fn show_status(
58+
State(state): State<AppState>,
59+
) -> Result<Json<StatusResponse>, StatusCode> {
60+
Ok(Json(StatusResponse {
61+
generated_at: Utc::now().to_rfc3339(),
62+
pipelines: state.observed_state_store.snapshot(),
63+
}))
64+
}
65+
66+
pub(crate) async fn livez(State(state): State<AppState>) -> (StatusCode, Json<ProbeResponse>) {
67+
let snapshot = state.observed_state_store.snapshot();
68+
let failing = collect_condition_failures(
69+
&snapshot,
70+
ConditionKind::Accepted,
71+
skip_pipelines_without_runtimes,
72+
acceptance_failure,
73+
);
74+
75+
if failing.is_empty() {
76+
(StatusCode::OK, Json(ProbeResponse::ok("livez")))
77+
} else {
78+
(
79+
StatusCode::INTERNAL_SERVER_ERROR,
80+
Json(ProbeResponse::fail("livez", failing)),
81+
)
82+
}
83+
}
84+
85+
pub(crate) async fn readyz(State(state): State<AppState>) -> (StatusCode, Json<ProbeResponse>) {
86+
let snapshot = state.observed_state_store.snapshot();
87+
let failing = collect_condition_failures(
88+
&snapshot,
89+
ConditionKind::Ready,
90+
skip_pipelines_without_runtimes,
91+
|cond| cond.status != ConditionStatus::True,
92+
);
93+
94+
if failing.is_empty() {
95+
(StatusCode::OK, Json(ProbeResponse::ok("readyz")))
96+
} else {
97+
(
98+
StatusCode::SERVICE_UNAVAILABLE,
99+
Json(ProbeResponse::fail("readyz", failing)),
100+
)
101+
}
102+
}
103+
104+
fn collect_condition_failures<FSkip, FFail>(
105+
pipelines: &HashMap<PipelineKey, PipelineStatus>,
106+
condition_kind: ConditionKind,
107+
skip: FSkip,
108+
failure_predicate: FFail,
109+
) -> Vec<PipelineConditionFailure>
110+
where
111+
FSkip: Fn(&PipelineStatus) -> bool,
112+
FFail: Fn(&Condition) -> bool,
113+
{
114+
pipelines
115+
.iter()
116+
.filter(|(_, status)| !skip(status))
117+
.filter_map(|(key, status)| {
118+
let condition = status
119+
.conditions()
120+
.into_iter()
121+
.find(|c| c.kind == condition_kind)?;
122+
failure_predicate(&condition).then(|| PipelineConditionFailure {
123+
pipeline: key.clone(),
124+
condition,
125+
})
126+
})
127+
.collect()
128+
}
129+
130+
fn acceptance_failure(condition: &Condition) -> bool {
131+
match condition.status {
132+
ConditionStatus::True => false,
133+
ConditionStatus::Unknown => {
134+
!matches!(condition.reason, Some(ConditionReason::NoPipelineRuntime))
135+
}
136+
ConditionStatus::False => {
137+
let benign = matches!(
138+
condition.reason,
139+
Some(
140+
ConditionReason::Pending
141+
| ConditionReason::StartRequested
142+
| ConditionReason::Deleting
143+
| ConditionReason::ForceDeleting
144+
| ConditionReason::Deleted
145+
| ConditionReason::NoPipelineRuntime
146+
)
147+
);
148+
!benign
149+
}
150+
}
151+
}
152+
153+
fn skip_pipelines_without_runtimes(status: &PipelineStatus) -> bool {
154+
status.total_cores() == 0
155+
}
156+
157+
impl ProbeResponse {
158+
fn ok(probe: &'static str) -> Self {
159+
Self {
160+
probe,
161+
status: "ok",
162+
generated_at: Utc::now().to_rfc3339(),
163+
failing: Vec::new(),
164+
}
165+
}
166+
167+
fn fail(probe: &'static str, failing: Vec<PipelineConditionFailure>) -> Self {
168+
Self {
169+
probe,
170+
status: "failed",
171+
generated_at: Utc::now().to_rfc3339(),
172+
failing,
173+
}
174+
}
175+
}
176+
177+
#[cfg(test)]
178+
mod tests {
179+
use super::*;
180+
181+
fn cond(status: ConditionStatus, reason: Option<ConditionReason>) -> Condition {
182+
Condition {
183+
kind: ConditionKind::Accepted,
184+
status,
185+
reason,
186+
message: None,
187+
last_transition_time: None,
188+
}
189+
}
190+
191+
#[test]
192+
fn acceptance_failure_ignores_benign_reasons() {
193+
assert!(!acceptance_failure(&cond(
194+
ConditionStatus::False,
195+
Some(ConditionReason::Pending)
196+
)));
197+
assert!(!acceptance_failure(&cond(
198+
ConditionStatus::False,
199+
Some(ConditionReason::Deleted)
200+
)));
201+
assert!(!acceptance_failure(&cond(
202+
ConditionStatus::Unknown,
203+
Some(ConditionReason::NoPipelineRuntime)
204+
)));
205+
}
206+
207+
#[test]
208+
fn acceptance_failure_flags_errors() {
209+
assert!(acceptance_failure(&cond(
210+
ConditionStatus::False,
211+
Some(ConditionReason::AdmissionError)
212+
)));
213+
assert!(acceptance_failure(&cond(ConditionStatus::Unknown, None)));
214+
}
215+
}

rust/otap-dataflow/crates/admin/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! HTTP server for exposing admin endpoints.
55
66
pub mod error;
7+
mod health;
78
mod pipeline;
89
mod pipeline_group;
910
mod telemetry;
@@ -49,6 +50,7 @@ pub async fn run(
4950
};
5051

5152
let app = Router::new()
53+
.merge(health::routes())
5254
.merge(telemetry::routes())
5355
.merge(pipeline_group::routes())
5456
.merge(pipeline::routes())

rust/otap-dataflow/crates/admin/src/pipeline_group.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ struct ShutdownResponse {
5353
pub async fn show_status(
5454
State(state): State<AppState>,
5555
) -> Result<Json<PipelineGroupsStatusResponse>, StatusCode> {
56-
let snapshot = state.observed_state_store.snapshot();
57-
let response = build_status_response(snapshot);
58-
Ok(Json(response))
56+
Ok(Json(PipelineGroupsStatusResponse {
57+
generated_at: Utc::now().to_rfc3339(),
58+
pipelines: state.observed_state_store.snapshot(),
59+
}))
5960
}
6061

6162
async fn shutdown_all_pipelines(State(state): State<AppState>) -> impl IntoResponse {
@@ -66,7 +67,10 @@ async fn shutdown_all_pipelines(State(state): State<AppState>) -> impl IntoRespo
6667
// ToDo configurable shutdown timeout
6768
let deadline = Instant::now() + Duration::from_secs(10);
6869
sender
69-
.try_send_shutdown(deadline, "admin requested shutdown".to_owned()) // ToDo we probably need to codify reasons in the future
70+
.try_send_shutdown(
71+
deadline,
72+
"Shutdown requested via the `/pipeline-groups/shutdown` endpoint.".to_owned(),
73+
) // ToDo we probably need to codify reasons in the future
7074
.err()
7175
})
7276
.map(|e| e.to_string())
@@ -90,17 +94,3 @@ async fn shutdown_all_pipelines(State(state): State<AppState>) -> impl IntoRespo
9094
)
9195
}
9296
}
93-
94-
fn build_status_response(
95-
mut pipelines: HashMap<PipelineKey, PipelineStatus>,
96-
) -> PipelineGroupsStatusResponse {
97-
// Aggregated phase are computed on-demand.
98-
for pipeline_status in pipelines.values_mut() {
99-
pipeline_status.infer_agg_phase();
100-
}
101-
102-
PipelineGroupsStatusResponse {
103-
generated_at: Utc::now().to_rfc3339(),
104-
pipelines,
105-
}
106-
}

rust/otap-dataflow/crates/admin/src/telemetry.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub(crate) fn routes() -> Router<AppState> {
2828
.route("/telemetry/live-schema", get(get_live_schema))
2929
.route("/telemetry/metrics", get(get_metrics))
3030
.route("/telemetry/metrics/aggregate", get(get_metrics_aggregate))
31+
.route("/metrics", get(get_metrics))
3132
}
3233

3334
/// All metric sets.

rust/otap-dataflow/crates/controller/src/lib.rs

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::error::Error;
2121
use crate::thread_task::spawn_thread_local_task;
2222
use core_affinity::CoreId;
2323
use otap_df_config::engine::HttpAdminSettings;
24-
use otap_df_config::node::NodeKind;
2524
use otap_df_config::{
2625
PipelineGroupId, PipelineId,
2726
pipeline::PipelineConfig,
@@ -32,7 +31,7 @@ use otap_df_engine::context::{ControllerContext, PipelineContext};
3231
use otap_df_engine::control::{
3332
PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel,
3433
};
35-
use otap_df_engine::error::Error as EngineError;
34+
use otap_df_engine::error::{Error as EngineError, error_summary_from};
3635
use otap_df_state::DeployedPipelineKey;
3736
use otap_df_state::event::{ErrorSummary, ObservedEvent};
3837
use otap_df_state::reporter::ObservedEventReporter;
@@ -327,7 +326,13 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
327326

328327
// Start the pipeline (this will use the current thread's Tokio runtime)
329328
runtime_pipeline
330-
.run_forever(metrics_reporter, pipeline_ctrl_msg_tx, pipeline_ctrl_msg_rx)
329+
.run_forever(
330+
pipeline_key,
331+
obs_evt_reporter,
332+
metrics_reporter,
333+
pipeline_ctrl_msg_tx,
334+
pipeline_ctrl_msg_rx,
335+
)
331336
.map_err(|e| Error::PipelineRuntimeError {
332337
source: Box::new(e),
333338
})
@@ -355,52 +360,6 @@ fn error_summary_from_gen(error: &Error) -> ErrorSummary {
355360
}
356361
}
357362

358-
fn error_summary_from(err: &EngineError) -> ErrorSummary {
359-
match err {
360-
EngineError::ReceiverError {
361-
receiver,
362-
kind,
363-
error,
364-
source_detail,
365-
} => ErrorSummary::Node {
366-
node: receiver.name.to_string(),
367-
node_kind: NodeKind::Receiver,
368-
error_kind: kind.to_string(),
369-
message: error.clone(),
370-
source: (!source_detail.is_empty()).then(|| source_detail.clone()),
371-
},
372-
EngineError::ProcessorError {
373-
processor,
374-
kind,
375-
error,
376-
source_detail,
377-
} => ErrorSummary::Node {
378-
node: processor.name.to_string(),
379-
node_kind: NodeKind::Processor,
380-
error_kind: kind.to_string(),
381-
message: error.clone(),
382-
source: (!source_detail.is_empty()).then(|| source_detail.clone()),
383-
},
384-
EngineError::ExporterError {
385-
exporter,
386-
kind,
387-
error,
388-
source_detail,
389-
} => ErrorSummary::Node {
390-
node: exporter.name.to_string(),
391-
node_kind: NodeKind::Exporter,
392-
error_kind: kind.to_string(),
393-
message: error.clone(),
394-
source: (!source_detail.is_empty()).then(|| source_detail.clone()),
395-
},
396-
_ => ErrorSummary::Pipeline {
397-
error_kind: err.variant_name(),
398-
message: err.to_string(),
399-
source: None,
400-
},
401-
}
402-
}
403-
404363
#[cfg(test)]
405364
mod tests {
406365
use super::*;

0 commit comments

Comments
 (0)