@@ -152,6 +152,20 @@ async fn process_batch(
152152 let mut tcp_rtt: u32 = 0 ;
153153 let mut tcp_retrans: u32 = 0 ;
154154 let mut tcp_lost: u32 = 0 ;
155+ let mut protocol = String :: new ( ) ;
156+ let mut http_method = String :: new ( ) ;
157+ let mut tcp_send_wnd: u32 = 0 ;
158+ let mut tcp_recv_wnd: u32 = 0 ;
159+ let mut tcp_send_mss: u32 = 0 ;
160+ let mut tcp_recv_mss: u32 = 0 ;
161+ let mut tcp_bytes_acked: u64 = 0 ;
162+ let mut tcp_segs_in: u32 = 0 ;
163+ let mut tcp_segs_out: u32 = 0 ;
164+ let mut tls_version = String :: new ( ) ;
165+ let mut client_ip = String :: new ( ) ;
166+ let mut client_port: u16 = 0 ;
167+ let mut server_ip = String :: new ( ) ;
168+ let mut server_port: u16 = 0 ;
155169
156170 // Direct field extraction
157171 for field in message_inner. split ( ',' ) {
@@ -178,13 +192,35 @@ async fn process_batch(
178192 "TCP_RTT" if ai_enabled => tcp_rtt = value. parse ( ) . unwrap_or ( 0 ) ,
179193 "TCP_RETRANS" if ai_enabled => tcp_retrans = value. parse ( ) . unwrap_or ( 0 ) ,
180194 "TCP_LOST" if ai_enabled => tcp_lost = value. parse ( ) . unwrap_or ( 0 ) ,
195+ "PROTO" if ai_enabled => protocol = value. to_string ( ) ,
196+ "METHOD" if ai_enabled => http_method = value. to_string ( ) ,
197+ "TCP_SND_WND" if ai_enabled => tcp_send_wnd = value. parse ( ) . unwrap_or ( 0 ) ,
198+ "TCP_RCV_WND" if ai_enabled => tcp_recv_wnd = value. parse ( ) . unwrap_or ( 0 ) ,
199+ "TCP_SND_MSS" if ai_enabled => tcp_send_mss = value. parse ( ) . unwrap_or ( 0 ) ,
200+ "TCP_RCV_MSS" if ai_enabled => tcp_recv_mss = value. parse ( ) . unwrap_or ( 0 ) ,
201+ "TCP_BYTES_ACKED" if ai_enabled => tcp_bytes_acked = value. parse ( ) . unwrap_or ( 0 ) ,
202+ "TCP_SEGS_IN" if ai_enabled => tcp_segs_in = value. parse ( ) . unwrap_or ( 0 ) ,
203+ "TCP_SEGS_OUT" if ai_enabled => tcp_segs_out = value. parse ( ) . unwrap_or ( 0 ) ,
204+ "TLS_VER" if ai_enabled => tls_version = value. to_string ( ) ,
205+ "CLIENT" if ai_enabled => {
206+ if let Some ( ( ip, port) ) = value. rsplit_once ( ':' ) {
207+ client_ip = ip. to_string ( ) ;
208+ client_port = port. parse ( ) . unwrap_or ( 0 ) ;
209+ }
210+ } ,
211+ "SERVER" if ai_enabled => {
212+ if let Some ( ( ip, port) ) = value. rsplit_once ( ':' ) {
213+ server_ip = ip. to_string ( ) ;
214+ server_port = port. parse ( ) . unwrap_or ( 0 ) ;
215+ }
216+ } ,
181217
182- // Skip ML fields when disabled (zero overhead)
183- "DUR" | "PROTO" | "METHOD" |
184- "TCP_RTT" | "TCP_RETRANS" | "TCP_LOST" |
185- "TCP_SND_WND" | "TCP_RCV_WND" | "TCP_SND_MSS" | "TCP_RCV_MSS" |
186- "TCP_BYTES_ACKED" | "TCP_SEGS_IN" | "TCP_SEGS_OUT" |
187- "TLS_VER" | "CLIENT" | "SERVER" => { } ,
218+ // All ML fields are now parsed above when ai_enabled
219+ _ if !ai_enabled && matches ! ( * key , "DUR" | "PROTO" | "METHOD" |
220+ "TCP_RTT" | "TCP_RETRANS" | "TCP_LOST" |
221+ "TCP_SND_WND" | "TCP_RCV_WND" | "TCP_SND_MSS" | "TCP_RCV_MSS" |
222+ "TCP_BYTES_ACKED" | "TCP_SEGS_IN" | "TCP_SEGS_OUT" |
223+ "TLS_VER" | "CLIENT" | "SERVER" ) => { } ,
188224
189225 _ => { } // Ignore unknown fields
190226 }
@@ -228,10 +264,26 @@ async fn process_batch(
228264 let ml_log = MlFeatureLog {
229265 conn_id : conn_id. clone ( ) ,
230266 duration_ms,
267+ http_status : status_code as f32 ,
268+ http_method,
269+ protocol,
270+ size_in : bytes_in as f32 ,
271+ size_out : bytes_out as f32 ,
231272 tcp_rtt : tcp_rtt as f32 ,
232273 tcp_retrans : tcp_retrans as f32 ,
233274 tcp_lost : tcp_lost as f32 ,
234- http_status : status_code as f32 ,
275+ tcp_send_wnd : tcp_send_wnd as f32 ,
276+ tcp_recv_wnd : tcp_recv_wnd as f32 ,
277+ tcp_send_mss : tcp_send_mss as f32 ,
278+ tcp_recv_mss : tcp_recv_mss as f32 ,
279+ tcp_bytes_acked : tcp_bytes_acked as f32 ,
280+ tcp_segs_in : tcp_segs_in as f32 ,
281+ tcp_segs_out : tcp_segs_out as f32 ,
282+ tls_version,
283+ client_ip,
284+ client_port : client_port as f32 ,
285+ server_ip,
286+ server_port : server_port as f32 ,
235287 } ;
236288 ai_security:: send_to_inference ( ml_log) ;
237289 }
0 commit comments