Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 56379f5

Browse files
authored
feat(loader): eBPF program and map pinning (#441)
mount bpf fs in kind cluster during tests and in dataplane pod fixes #426 Signed-off-by: Harald Gutmann <[email protected]>
1 parent a068fd0 commit 56379f5

File tree

5 files changed

+254
-54
lines changed

5 files changed

+254
-54
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ dependencies = [
13041304
"aya-log",
13051305
"clap",
13061306
"common",
1307+
"thiserror 2.0.14",
13071308
"tokio",
13081309
"tracing",
13091310
"tracing-log",

config/dataplane/dataplane.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,24 @@ spec:
1717
app: blixt
1818
component: dataplane
1919
spec:
20+
volumes:
21+
- name: epbf-fs
22+
hostPath:
23+
path: /sys/fs/bpf/
24+
type: Directory
2025
hostNetwork: true
2126
containers:
2227
- name: dataplane
2328
image: ghcr.io/kubernetes-sigs/blixt-dataplane:latest
2429
securityContext:
2530
privileged: true
26-
args: ["-i", "eth0"]
31+
volumeMounts:
32+
- name: epbf-fs
33+
mountPath: /sys/fs/bpf/
34+
args:
35+
- "-i"
36+
- "eth0"
37+
# - "--load-ebpf"
2738
ports:
2839
- containerPort: 9874
2940
env:

dataplane/loader/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ name = "loader"
1010
path = "src/main.rs"
1111

1212
[dependencies]
13+
api-server = { path = "../api-server" }
1314
aya = { workspace = true, features = ["async_tokio"] }
1415
aya-log = { workspace = true }
1516
anyhow = { workspace = true }
16-
api-server = { path = "../api-server" }
17-
common = { path = "../common", features = ["user"] }
1817
clap = { workspace = true, features = ["derive"] }
18+
common = { path = "../common", features = ["user"] }
19+
thiserror = { workspace = true }
1920
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "net", "signal"] }
2021
tracing = { workspace = true }
21-
tracing-subscriber = { workspace = true }
22-
tracing-log = { workspace = true }
22+
tracing-log = { workspace = true }
23+
tracing-subscriber = { workspace = true }

dataplane/loader/src/main.rs

Lines changed: 210 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@ SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause)
55
*/
66

77
use std::net::Ipv4Addr;
8+
use std::path::Path;
89

9-
use anyhow::Context;
1010
use api_server::config::TLSConfig;
1111
use api_server::start as start_api_server;
12-
use aya::maps::HashMap;
13-
use aya::programs::{SchedClassifier, TcAttachType, tc};
12+
use aya::maps::{HashMap, Map, MapData};
13+
use aya::programs::{ProgramError, SchedClassifier, TcAttachType, tc};
1414
use aya::{Ebpf, include_bytes_aligned};
1515
use aya_log::EbpfLogger;
1616
use clap::Parser;
1717
use common::{BackendKey, BackendList, ClientKey, LoadBalancerMapping};
18-
use tracing::{info, warn};
18+
use thiserror::Error as ThisError;
19+
use tracing::{debug, info, trace};
1920
use tracing_log::LogTracer;
2021
use tracing_subscriber::EnvFilter;
2122

@@ -27,7 +28,7 @@ use tracing_subscriber::EnvFilter;
2728
struct Opt {
2829
/// Name of the network interface to attach the eBPF programs to.
2930
///
30-
/// By default, this is set to `"lo"` (the loopback interface).
31+
/// By default, this is set to `lo` (the loopback interface).
3132
#[clap(short, long, default_value = "lo")]
3233
iface: String,
3334
/// Optional TLS configuration for securing the API server.
@@ -36,8 +37,34 @@ struct Opt {
3637
/// You can specify either `tls` for server-only TLS or `mutual-tls` for mutual TLS.
3738
#[clap(subcommand)]
3839
tls_config: Option<TLSConfig>,
40+
41+
/// Load eBPF programs and maps
42+
///
43+
/// Overrides usage of pinned programs/maps during init.
44+
///
45+
/// WARN: loading resets all the dataplane configuration and interrupts traffic flow
46+
#[clap(long)]
47+
load_ebpf: bool,
48+
}
49+
50+
#[derive(ThisError, Debug)]
51+
enum LoaderError {
52+
#[error("Failed to load ebpf map {0}")]
53+
MapLoad(String),
54+
#[error("Failed to pin ebpf {0} {1} {2}")]
55+
Pin(String, String, String),
56+
#[error("Could not find {0} {1}")]
57+
NotFound(String, String),
58+
#[error("{0}")]
59+
Program(#[from] ProgramError),
60+
#[error("{0}")]
61+
AyaLog(#[from] aya_log::Error),
3962
}
4063

64+
type Result<T, E = LoaderError> = std::result::Result<T, E>;
65+
66+
const EBPF_FS_ROOT: &str = "/sys/fs/bpf";
67+
4168
/// Main function for the application.
4269
///
4370
/// This function sets up and runs eBPF programs on the specified network interface
@@ -51,6 +78,7 @@ struct Opt {
5178
///
5279
/// - `iface`: The network interface to attach the eBPF programs to.
5380
/// - `tls_config`: Optional subcommand to configure TLS for the API server.
81+
/// - `load_ebpf`: load the eBPF programs and maps even in case pinned objects are available
5482
///
5583
/// # Example
5684
///
@@ -72,13 +100,12 @@ async fn main() -> Result<(), anyhow::Error> {
72100
.with_line_number(true)
73101
.init();
74102

75-
// aya-logs uses the log facade, the log tracer enables to log ebpf events through tracing
76103
LogTracer::init()?;
77104

78-
let opt = Opt::parse();
79-
80-
info!("loading ebpf programs");
105+
let opts = Opt::parse();
106+
info!("{:?}", opts);
81107

108+
info!("Loading ebpf programs");
82109
#[cfg(debug_assertions)]
83110
let mut bpf_program = Ebpf::load(include_bytes_aligned!(
84111
"../../target/bpfel-unknown-none/debug/loader"
@@ -87,58 +114,194 @@ async fn main() -> Result<(), anyhow::Error> {
87114
let mut bpf_program = Ebpf::load(include_bytes_aligned!(
88115
"../../target/bpfel-unknown-none/release/loader"
89116
))?;
90-
if let Err(e) = EbpfLogger::init(&mut bpf_program) {
91-
warn!("failed to initialize eBPF logger: {e}");
92-
}
93117

94-
info!("attaching tc_ingress program to {}", &opt.iface);
95-
96-
let _ = tc::qdisc_add_clsact(&opt.iface);
97-
let ingress_program: &mut SchedClassifier =
98-
bpf_program.program_mut("tc_ingress").unwrap().try_into()?;
99-
ingress_program.load()?;
100-
ingress_program
101-
.attach(&opt.iface, TcAttachType::Ingress)
102-
.context("failed to attach the ingress TC program")?;
103-
104-
info!("attaching tc_egress program to {}", &opt.iface);
105-
106-
let egress_program: &mut SchedClassifier =
107-
bpf_program.program_mut("tc_egress").unwrap().try_into()?;
108-
egress_program.load()?;
109-
egress_program
110-
.attach(&opt.iface, TcAttachType::Egress)
111-
.context("failed to attach the egress TC program")?;
112-
113-
info!("starting api server");
114-
info!("Using tls config: {:?}", &opt.tls_config);
115-
let backends: HashMap<_, BackendKey, BackendList> = HashMap::try_from(
116-
bpf_program
117-
.take_map("BACKENDS")
118-
.expect("no maps named BACKENDS"),
119-
)?;
120-
let gateway_indexes: HashMap<_, BackendKey, u16> = HashMap::try_from(
121-
bpf_program
122-
.take_map("GATEWAY_INDEXES")
123-
.expect("no maps named GATEWAY_INDEXES"),
118+
let _ = tc::qdisc_add_clsact(&opts.iface);
119+
let mut ingress_program = get_pinned_program("tc_ingress")?;
120+
program_load_pin(
121+
&mut bpf_program,
122+
&mut ingress_program,
123+
"tc_ingress",
124+
TcAttachType::Ingress,
125+
&opts.iface,
126+
opts.load_ebpf,
124127
)?;
125-
let tcp_conns: HashMap<_, ClientKey, LoadBalancerMapping> = HashMap::try_from(
126-
bpf_program
127-
.take_map("LB_CONNECTIONS")
128-
.expect("no maps named LB_CONNECTIONS"),
128+
129+
let mut egress_program = get_pinned_program("tc_egress")?;
130+
program_load_pin(
131+
&mut bpf_program,
132+
&mut egress_program,
133+
"tc_egress",
134+
TcAttachType::Egress,
135+
&opts.iface,
136+
opts.load_ebpf,
129137
)?;
130138

139+
let backends_map = map_take_pin(&mut bpf_program, "BACKENDS", opts.load_ebpf)?;
140+
let gateway_indexes_map = map_take_pin(&mut bpf_program, "GATEWAY_INDEXES", opts.load_ebpf)?;
141+
let tcp_conns_map = map_take_pin(&mut bpf_program, "LB_CONNECTIONS", opts.load_ebpf)?;
142+
143+
let backends: HashMap<MapData, BackendKey, BackendList> = HashMap::try_from(backends_map)?;
144+
trace!("Existing backends:");
145+
for k in backends.keys() {
146+
let k = k?;
147+
trace!("{:?}", k);
148+
}
149+
150+
let gateway_indexes: HashMap<MapData, BackendKey, u16> =
151+
HashMap::try_from(gateway_indexes_map)?;
152+
trace!("Existing gateway_indexes:");
153+
for k in gateway_indexes.keys() {
154+
let k = k?;
155+
trace!("{:?}", k);
156+
}
157+
158+
let tcp_conns: HashMap<MapData, ClientKey, LoadBalancerMapping> =
159+
HashMap::try_from(tcp_conns_map)?;
160+
trace!("Existing tcp_conns:");
161+
for k in tcp_conns.keys() {
162+
let k = k?;
163+
trace!("{:?}", k);
164+
}
165+
166+
info!("Starting api server");
167+
info!("Using tls config: {:?}", &opts.tls_config);
131168
start_api_server(
132169
Ipv4Addr::new(0, 0, 0, 0),
133170
9874,
134171
backends,
135172
gateway_indexes,
136173
tcp_conns,
137-
opt.tls_config,
174+
opts.tls_config,
138175
)
139176
.await?;
140177

141178
info!("Exiting...");
179+
Ok(())
180+
}
142181

182+
fn program_load_pin(
183+
bpf_program: &mut Ebpf,
184+
pinned_program: &mut Option<SchedClassifier>,
185+
identifier: &str,
186+
tc_attach_type: TcAttachType,
187+
iface: &str,
188+
load_ebpf: bool,
189+
) -> Result<()> {
190+
if pinned_program.is_some() && !load_ebpf {
191+
let program = pinned_program.as_mut().ok_or(LoaderError::NotFound(
192+
"program".to_string(),
193+
identifier.to_string(),
194+
))?;
195+
attach_interface_logs(identifier, iface, tc_attach_type, program)?;
196+
} else {
197+
let program = load_pin_program(bpf_program, identifier, load_ebpf)?;
198+
attach_interface_logs(identifier, iface, tc_attach_type, program)?;
199+
};
143200
Ok(())
144201
}
202+
203+
fn get_pinned_program(identifier: &str) -> Result<Option<SchedClassifier>> {
204+
let path = format!("{EBPF_FS_ROOT}/{identifier}");
205+
let pin_path = Path::new(&path);
206+
207+
if pin_path
208+
.try_exists()
209+
.map_err(|e| LoaderError::Pin("program".to_string(), path.clone(), e.to_string()))?
210+
{
211+
debug!("ebpf program {identifier} is already pinned to {path}");
212+
let program = SchedClassifier::from_pin(pin_path).map_err(LoaderError::Program)?;
213+
info!("Loaded ebpf program {identifier} from pin {path}");
214+
return Ok(Some(program));
215+
}
216+
217+
Ok(None)
218+
}
219+
220+
fn load_pin_program<'a>(
221+
bpf_program: &'a mut Ebpf,
222+
identifier: &str,
223+
load_ebpf: bool,
224+
) -> Result<&'a mut SchedClassifier> {
225+
let program: &mut SchedClassifier = bpf_program
226+
.program_mut(identifier)
227+
.ok_or(LoaderError::NotFound(
228+
"program".to_string(),
229+
identifier.to_string(),
230+
))?
231+
.try_into()?;
232+
info!("Loaded ebpf program {identifier}");
233+
234+
let path = format!("{EBPF_FS_ROOT}/{identifier}");
235+
let pin_path = Path::new(&path);
236+
237+
// loading ebpf requested
238+
// removing pinned program in case existing
239+
if load_ebpf
240+
&& pin_path.try_exists().map_err(|e| {
241+
LoaderError::Pin("program".to_string(), identifier.to_string(), e.to_string())
242+
})?
243+
{
244+
info!("Removing existing pinned program {}", path);
245+
std::fs::remove_file(pin_path).map_err(|e| {
246+
LoaderError::Pin("program".to_string(), identifier.to_string(), e.to_string())
247+
})?;
248+
}
249+
250+
program.load()?;
251+
252+
program
253+
.pin(pin_path)
254+
.map_err(|e| LoaderError::Pin("program".to_string(), path.clone(), e.to_string()))?;
255+
info!("Successfully pinned ebpf program {identifier} to {path}");
256+
257+
Ok(program)
258+
}
259+
260+
fn attach_interface_logs(
261+
identifier: &str,
262+
iface: &str,
263+
tc_attach_type: TcAttachType,
264+
program: &mut SchedClassifier,
265+
) -> Result<()> {
266+
info!("Attaching {identifier} program to {}", iface);
267+
program
268+
.attach(iface, tc_attach_type)
269+
.map_err(LoaderError::Program)?;
270+
info!("Initializing logs for {identifier} program");
271+
let info = program.info()?;
272+
EbpfLogger::init_from_id(info.id())?;
273+
Ok(())
274+
}
275+
276+
fn map_take_pin(bpf_program: &mut Ebpf, identifier: &str, load_ebpf: bool) -> Result<Map> {
277+
let path = format!("{EBPF_FS_ROOT}/{identifier}");
278+
let pin_path = Path::new(&path);
279+
let pin_path_exists = pin_path
280+
.try_exists()
281+
.map_err(|e| LoaderError::Pin("map".to_string(), identifier.to_string(), e.to_string()))?;
282+
283+
if !load_ebpf && pin_path_exists {
284+
debug!("ebpf map {identifier} is already pinned to {path}");
285+
let map_data = MapData::from_pin(pin_path).map_err(|e| {
286+
LoaderError::MapLoad(format!("failed to load map from pin {path}: {e}"))
287+
})?;
288+
info!("Loaded ebpf map {identifier} from pin {path}");
289+
Ok(Map::HashMap(map_data))
290+
} else {
291+
if pin_path_exists {
292+
info!("Removing existing pinned map {}", path);
293+
std::fs::remove_file(pin_path).map_err(|e| {
294+
LoaderError::Pin("map".to_string(), identifier.to_string(), e.to_string())
295+
})?;
296+
}
297+
info!("Loaded ebpf map {identifier}");
298+
let map = bpf_program
299+
.take_map(identifier)
300+
.ok_or(LoaderError::MapLoad(identifier.to_string()))?;
301+
info!("Successfully pinned ebpf map {identifier} to {path}");
302+
map.pin(pin_path).map_err(|e| {
303+
LoaderError::Pin("map".to_string(), identifier.to_string(), e.to_string())
304+
})?;
305+
Ok(map)
306+
}
307+
}

0 commit comments

Comments
 (0)