Skip to content

Commit 01be359

Browse files
bitswap: Reuse inbound substream for subsequent requests (#447)
Fix compatibility with kubo IPFS >= 0.37.0. Close #444.
1 parent 4da3f77 commit 01be359

File tree

2 files changed

+73
-41
lines changed

2 files changed

+73
-41
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ smallvec = "1.15.0"
3737
snow = { version = "0.9.3", features = ["ring-resolver"], default-features = false }
3838
socket2 = { version = "0.5.9", features = ["all"] }
3939
thiserror = "2.0.12"
40-
tokio-stream = "0.1.12"
40+
tokio-stream = "0.1.17"
4141
tokio-util = { version = "0.7.15", features = ["compat", "io", "codec"] }
4242
tokio = { version = "1.45.0", features = ["rt", "net", "io-util", "time", "macros", "sync", "parking_lot"] }
4343
tracing = { version = "0.1.40", features = ["log"] }

src/protocol/libp2p/bitswap/mod.rs

Lines changed: 72 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ use crate::{
2929
};
3030

3131
use cid::Version;
32-
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
3332
use multihash::Code;
3433
use prost::Message;
3534
use tokio::sync::mpsc::{Receiver, Sender};
35+
use tokio_stream::{StreamExt, StreamMap};
3636

3737
use std::{collections::HashMap, time::Duration};
3838

@@ -110,9 +110,8 @@ pub(crate) struct Bitswap {
110110
/// Pending outbound substreams.
111111
pending_outbound: HashMap<SubstreamId, Vec<ResponseType>>,
112112

113-
/// Pending inbound substreams.
114-
pending_inbound:
115-
FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Vec<(Cid, WantType)>)>>>,
113+
/// Inbound substreams.
114+
inbound: StreamMap<PeerId, Substream>,
116115
}
117116

118117
impl Bitswap {
@@ -123,45 +122,61 @@ impl Bitswap {
123122
cmd_rx: config.cmd_rx,
124123
event_tx: config.event_tx,
125124
pending_outbound: HashMap::new(),
126-
pending_inbound: FuturesUnordered::new(),
125+
inbound: StreamMap::new(),
127126
}
128127
}
129128

130129
/// Substream opened to remote peer.
131-
fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
130+
fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
132131
tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
133132

134-
self.pending_inbound.push(Box::pin(async move {
135-
let message = substream.next().await.ok_or(Error::ConnectionClosed)??;
136-
let message = schema::bitswap::Message::decode(message)?;
137-
138-
let Some(wantlist) = message.wantlist else {
139-
tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
140-
return Err(Error::InvalidData);
141-
};
142-
143-
Ok((
144-
peer,
145-
wantlist
146-
.entries
147-
.into_iter()
148-
.filter_map(|entry| {
149-
let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
150-
151-
let want_type = match entry.want_type {
152-
0 => WantType::Block,
153-
1 => WantType::Have,
154-
_ => return None,
155-
};
156-
157-
(cid.version() == cid::Version::V1
158-
&& cid.hash().code() == u64::from(Code::Blake2b256)
159-
&& cid.hash().size() == 32)
160-
.then_some((cid, want_type))
161-
})
162-
.collect::<Vec<_>>(),
163-
))
164-
}));
133+
if self.inbound.insert(peer, substream).is_some() {
134+
// Only one inbound substream per peer is allowed in order to constrain resources.
135+
tracing::debug!(
136+
target: LOG_TARGET,
137+
?peer,
138+
"dropping inbound substream as remote opened a new one",
139+
);
140+
}
141+
}
142+
143+
/// Message received from remote peer.
144+
async fn on_message_received(
145+
&mut self,
146+
peer: PeerId,
147+
message: bytes::BytesMut,
148+
) -> Result<(), Error> {
149+
tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound message");
150+
151+
let message = schema::bitswap::Message::decode(message)?;
152+
153+
let Some(wantlist) = message.wantlist else {
154+
tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
155+
return Err(Error::InvalidData);
156+
};
157+
158+
let cids = wantlist
159+
.entries
160+
.into_iter()
161+
.filter_map(|entry| {
162+
let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
163+
164+
let want_type = match entry.want_type {
165+
0 => WantType::Block,
166+
1 => WantType::Have,
167+
_ => return None,
168+
};
169+
170+
(cid.version() == cid::Version::V1
171+
&& cid.hash().code() == u64::from(Code::Blake2b256)
172+
&& cid.hash().size() == 32)
173+
.then_some((cid, want_type))
174+
})
175+
.collect::<Vec<_>>();
176+
177+
let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
178+
179+
Ok(())
165180
}
166181

167182
/// Send response to bitswap request.
@@ -252,9 +267,26 @@ impl Bitswap {
252267
}
253268
None => return,
254269
},
255-
event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {
256-
if let Some(Ok((peer, cids))) = event {
257-
let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
270+
Some((peer, message)) = self.inbound.next(), if !self.inbound.is_empty() => {
271+
match message {
272+
Ok(message) => if let Err(e) = self.on_message_received(peer, message).await {
273+
tracing::trace!(
274+
target: LOG_TARGET,
275+
?peer,
276+
?e,
277+
"error handling inbound message, dropping substream",
278+
);
279+
self.inbound.remove(&peer);
280+
},
281+
Err(e) => {
282+
tracing::trace!(
283+
target: LOG_TARGET,
284+
?peer,
285+
?e,
286+
"inbound substream closed",
287+
);
288+
self.inbound.remove(&peer);
289+
},
258290
}
259291
}
260292
}

0 commit comments

Comments
 (0)