@@ -21,6 +21,7 @@ use prometheus_client::{encoding::text::encode, registry::Registry};
2121use serde:: Deserialize ;
2222use std:: {
2323 borrow:: Cow ,
24+ collections:: BTreeMap ,
2425 io:: { self , BufRead as _, BufReader } ,
2526} ;
2627use tokio:: process:: Command ;
@@ -30,11 +31,31 @@ use tower::{
3031} ;
3132use tower_http:: compression:: CompressionLayer ;
3233
33- #[ derive( Debug , Deserialize ) ]
34- pub struct Params {
35- // Only enable jobstats if "jobstats=true"
36- #[ serde( default ) ]
37- jobstats : bool ,
34+ #[ derive( Debug , Deserialize , PartialEq , PartialOrd , Eq , Ord , Hash , Copy , Clone ) ]
35+ #[ serde( rename_all = "snake_case" ) ]
36+ pub enum Dimension {
37+ Jobstats ,
38+ Lnet ,
39+ Lustre ,
40+ LnetStats ,
41+ }
42+
43+ pub type Params = BTreeMap < Dimension , bool > ;
44+
45+ static DEFAULT_PARAMS : [ ( Dimension , bool ) ; 3 ] = [
46+ ( Dimension :: Lnet , true ) ,
47+ ( Dimension :: Lustre , true ) ,
48+ ( Dimension :: LnetStats , true ) ,
49+ ] ;
50+
51+ trait EnableConvenienceExt {
52+ fn enabled ( & self , param : & Dimension ) -> bool ;
53+ }
54+
55+ impl EnableConvenienceExt for Params {
56+ fn enabled ( & self , param : & Dimension ) -> bool {
57+ self . get ( param) . copied ( ) . unwrap_or_default ( )
58+ }
3859}
3960
4061const TIMEOUT_DURATION_SECS : u64 = 120 ;
@@ -149,69 +170,72 @@ pub fn lnet_stats_output() -> Command {
149170pub async fn scrape ( Query ( params) : Query < Params > ) -> Result < Response < Body > , Error > {
150171 let mut registry = Registry :: default ( ) ;
151172
152- if params. jobstats {
153- let child = tokio:: task:: spawn_blocking ( move || jobstats_metrics_cmd ( ) . spawn ( ) ) . await ?;
154-
155- if let Ok ( mut child) =
156- child. inspect_err ( |e| tracing:: debug!( "Error while spawning lctl jobstats: {e}" ) )
157- {
158- let reader = BufReader :: with_capacity (
159- 128 * 1_024 ,
160- child. stdout . take ( ) . ok_or ( io:: Error :: new (
161- io:: ErrorKind :: NotFound ,
162- "stdout missing for lctl jobstats call." ,
163- ) ) ?,
164- ) ;
173+ let mut targets = BTreeMap :: from ( DEFAULT_PARAMS ) ;
174+ for ( param, value) in params {
175+ targets
176+ . entry ( param)
177+ . and_modify ( |v| * v = value)
178+ . or_insert ( value) ;
179+ }
165180
166- let reader_stderr = BufReader :: new ( child. stderr . take ( ) . ok_or ( io:: Error :: new (
181+ if targets. enabled ( & Dimension :: Jobstats )
182+ && let Ok ( mut child) = tokio:: task:: spawn_blocking ( move || jobstats_metrics_cmd ( ) . spawn ( ) )
183+ . await ?
184+ . inspect_err ( |e| tracing:: debug!( "Error while spawning lctl jobstats: {e}" ) )
185+ {
186+ let reader = BufReader :: with_capacity (
187+ 128 * 1_024 ,
188+ child. stdout . take ( ) . ok_or ( io:: Error :: new (
167189 io:: ErrorKind :: NotFound ,
168- "stderr missing for lctl jobstats call." ,
169- ) ) ?) ;
170-
171- tokio:: task:: spawn ( async move {
172- for line in reader_stderr. lines ( ) . map_while ( Result :: ok) {
173- tracing:: debug!( "stderr: {line}" ) ;
174- }
175- } ) ;
176-
177- tokio:: task:: spawn_blocking ( move || {
178- if let Err ( e) = child. wait ( ) {
179- tracing:: debug!( "Unexpected error when waiting for child: {e}" ) ;
180- }
181- } ) ;
182-
183- jobstats_stream ( reader, JobstatMetrics :: default ( ) )
184- . await ?
185- . register_metric ( & mut registry) ;
186- }
187- } else {
188- let mut output = vec ! [ ] ;
190+ "stdout missing for lctl jobstats call." ,
191+ ) ) ?,
192+ ) ;
189193
190- let lctl = lustre_metrics_output ( ) . output ( ) . await ?;
194+ let reader_stderr = BufReader :: new ( child. stderr . take ( ) . ok_or ( io:: Error :: new (
195+ io:: ErrorKind :: NotFound ,
196+ "stderr missing for lctl jobstats call." ,
197+ ) ) ?) ;
191198
192- let mut lctl_output = parse_lctl_output ( & lctl. stdout ) ?;
199+ tokio:: task:: spawn ( async move {
200+ for line in reader_stderr. lines ( ) . map_while ( Result :: ok) {
201+ tracing:: debug!( "stderr: {line}" ) ;
202+ }
203+ } ) ;
193204
194- output. append ( & mut lctl_output) ;
205+ tokio:: task:: spawn_blocking ( move || {
206+ if let Err ( e) = child. wait ( ) {
207+ tracing:: debug!( "Unexpected error when waiting for child: {e}" ) ;
208+ }
209+ } ) ;
195210
196- let lnetctl = net_show_output ( ) . output ( ) . await ?;
211+ jobstats_stream ( reader, JobstatMetrics :: default ( ) )
212+ . await ?
213+ . register_metric ( & mut registry) ;
214+ }
197215
198- let mut lnetctl_output = parse_lnetctl_output ( & lnetctl . stdout ) ? ;
216+ let mut output = vec ! [ ] ;
199217
200- output. append ( & mut lnetctl_output) ;
218+ if targets. enabled ( & Dimension :: Lustre ) {
219+ let lctl = lustre_metrics_output ( ) . output ( ) . await ?;
220+ output. extend ( parse_lctl_output ( & lctl. stdout ) ?) ;
221+ }
201222
223+ if targets. enabled ( & Dimension :: LnetStats ) {
202224 let lnetctl_stats_output = lnet_stats_output ( ) . output ( ) . await ?;
225+ output. extend ( parse_lnetctl_stats ( & lnetctl_stats_output. stdout ) ?) ;
226+ }
203227
204- let mut lnetctl_stats_record = parse_lnetctl_stats ( & lnetctl_stats_output. stdout ) ?;
205-
206- output. append ( & mut lnetctl_stats_record) ;
228+ if targets. enabled ( & Dimension :: Lnet ) {
229+ let lnetctl = net_show_output ( ) . output ( ) . await ?;
230+ output. extend ( parse_lnetctl_output ( & lnetctl. stdout ) ?) ;
231+ }
207232
208- // Build the lustre stats
209- let mut opentelemetry_metrics = Metrics :: default ( ) ;
233+ // Build the lustre stats
234+ let mut opentelemetry_metrics = Metrics :: default ( ) ;
210235
211- // Build and register Lustre metrics
212- metrics:: build_lustre_stats ( & output, & mut opentelemetry_metrics) ;
213- opentelemetry_metrics. register_metric ( & mut registry) ;
214- }
236+ // Build and register Lustre metrics
237+ metrics:: build_lustre_stats ( & output, & mut opentelemetry_metrics) ;
238+ opentelemetry_metrics. register_metric ( & mut registry) ;
215239
216240 let mut buffer = String :: new ( ) ;
217241 encode ( & mut buffer, & registry) ?;
@@ -267,10 +291,10 @@ mod tests {
267291 #[ serial]
268292 async fn test_metrics_endpoint_is_idempotent ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
269293 let ( request, app) = get_app ( ) ;
270- let original = app. oneshot ( request) . await . unwrap ( ) . as_text ( ) . await ;
294+ let original = app. oneshot ( request) . await . unwrap ( ) . try_into_string ( ) . await ;
271295
272296 let ( request, app) = get_app ( ) ;
273- let new = app. oneshot ( request) . await . unwrap ( ) . as_text ( ) . await ;
297+ let new = app. oneshot ( request) . await . unwrap ( ) . try_into_string ( ) . await ;
274298 assert_eq ! ( original, new) ;
275299
276300 insta:: assert_snapshot!( original) ;
@@ -279,11 +303,11 @@ mod tests {
279303 }
280304
281305 trait AsText {
282- fn as_text ( self ) -> impl Future < Output = String > ;
306+ fn try_into_string ( self ) -> impl Future < Output = String > ;
283307 }
284308
285309 impl AsText for Response < Body > {
286- async fn as_text ( self ) -> String {
310+ async fn try_into_string ( self ) -> String {
287311 let mut body = self . into_body ( ) . into_data_stream ( ) ;
288312 let mut out = vec ! [ ] ;
289313
@@ -306,7 +330,7 @@ mod tests {
306330 #[ serial]
307331 async fn test_app_params ( params : Option < & str > ) {
308332 let request = Request :: builder ( )
309- . uri ( & format ! (
333+ . uri ( format ! (
310334 "/metrics{}" ,
311335 params. map( |p| format!( "?{p}" ) ) . unwrap_or_default( )
312336 ) )
@@ -317,7 +341,10 @@ mod tests {
317341 let response = crate :: routes:: app ( ) . oneshot ( request) . await . unwrap ( ) ;
318342
319343 assert ! ( response. status( ) . is_success( ) ) ;
320- insta:: assert_snapshot!( params. unwrap_or( "default" ) , response. as_text( ) . await ) ;
344+ insta:: assert_snapshot!(
345+ params. unwrap_or( "default" ) ,
346+ response. try_into_string( ) . await
347+ ) ;
321348 }
322349
323350 #[ commandeer( Replay , "lctl" , "lnetctl" ) ]
0 commit comments