Skip to content

Commit 8dd77ba

Browse files
authored
Async CAR fixes (#4223)
* chunk incoming car & yield to event loop * add ui8reader * 20 -> 25 * changeset * Update packages/repo/src/car.ts * build branch * use node setImmediate * dont build branch
1 parent 1a5d742 commit 8dd77ba

File tree

3 files changed

+75
-29
lines changed

3 files changed

+75
-29
lines changed

.changeset/calm-dots-leave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@atproto/repo": patch
3+
---
4+
5+
Ensure that reading CAR files is actually done asynchronously

.github/workflows/build-and-push-pds-ghcr.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ on:
33
push:
44
branches:
55
- main
6-
- divy/pds-build-dblock-fix
76
env:
87
REGISTRY: ghcr.io
98
USERNAME: ${{ github.actor }}

packages/repo/src/car.ts

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { setImmediate } from 'node:timers/promises'
12
import * as cbor from '@ipld/dag-cbor'
23
import { CID } from 'multiformats/cid'
34
import * as ui8 from 'uint8arrays'
@@ -65,7 +66,7 @@ export const readCar = async (
6566
bytes: Uint8Array,
6667
opts?: ReadCarOptions,
6768
): Promise<{ roots: CID[]; blocks: BlockMap }> => {
68-
const { roots, blocks } = await readCarStream([bytes], opts)
69+
const { roots, blocks } = await readCarReader(new Ui8Reader(bytes), opts)
6970
const blockMap = new BlockMap()
7071
for await (const block of blocks) {
7172
blockMap.set(block.cid, block.bytes)
@@ -98,9 +99,18 @@ export const readCarStream = async (
9899
roots: CID[]
99100
blocks: CarBlockIterable
100101
}> => {
101-
const reader = new BufferedReader(car)
102+
return readCarReader(new BufferedReader(car), opts)
103+
}
104+
105+
export const readCarReader = async (
106+
reader: BytesReader,
107+
opts?: ReadCarOptions,
108+
): Promise<{
109+
roots: CID[]
110+
blocks: CarBlockIterable
111+
}> => {
102112
try {
103-
const headerSize = await reader.readVarint()
113+
const headerSize = await readVarint(reader)
104114
if (headerSize === null) {
105115
throw new Error('Could not parse CAR header')
106116
}
@@ -120,7 +130,7 @@ export const readCarStream = async (
120130
}
121131

122132
const readCarBlocksIter = (
123-
reader: BufferedReader,
133+
reader: BytesReader,
124134
opts?: ReadCarOptions,
125135
): CarBlockIterable => {
126136
let generator = readCarBlocksIterGenerator(reader)
@@ -143,18 +153,26 @@ const readCarBlocksIter = (
143153
}
144154

145155
async function* readCarBlocksIterGenerator(
146-
reader: BufferedReader,
156+
reader: BytesReader,
147157
): AsyncGenerator<CarBlock, void, unknown> {
158+
let blocks = 0
148159
try {
149160
while (!reader.isDone) {
150-
const blockSize = await reader.readVarint()
161+
const blockSize = await readVarint(reader)
151162
if (blockSize === null) {
152163
break
153164
}
154165
const blockBytes = await reader.read(blockSize)
155166
const cid = parseCidFromBytes(blockBytes.subarray(0, 36))
156167
const bytes = blockBytes.subarray(36)
157168
yield { cid, bytes }
169+
170+
// yield to the event loop every 25 blocks
171+
// in the case the incoming CAR is synchronous, this can end up jamming up the thread
172+
blocks++
173+
if (blocks % 25 === 0) {
174+
await setImmediate()
175+
}
158176
}
159177
} finally {
160178
await reader.close()
@@ -170,7 +188,52 @@ export async function* verifyIncomingCarBlocks(
170188
}
171189
}
172190

173-
class BufferedReader {
191+
const readVarint = async (reader: BytesReader): Promise<number | null> => {
192+
let done = false
193+
const bytes: Uint8Array[] = []
194+
while (!done) {
195+
const byte = await reader.read(1)
196+
if (byte.byteLength === 0) {
197+
if (bytes.length > 0) {
198+
throw new Error('could not parse varint')
199+
} else {
200+
return null
201+
}
202+
}
203+
bytes.push(byte)
204+
if (byte[0] < 128) {
205+
done = true
206+
}
207+
}
208+
const concatted = ui8.concat(bytes)
209+
return varint.decode(concatted)
210+
}
211+
212+
interface BytesReader {
213+
isDone: boolean
214+
read(bytesToRead: number): Promise<Uint8Array>
215+
close(): Promise<void>
216+
}
217+
218+
class Ui8Reader implements BytesReader {
219+
idx = 0
220+
isDone = false
221+
222+
constructor(public bytes: Uint8Array) {}
223+
224+
async read(bytesToRead: number): Promise<Uint8Array> {
225+
const value = this.bytes.subarray(this.idx, this.idx + bytesToRead)
226+
this.idx += bytesToRead
227+
if (this.idx >= this.bytes.length) {
228+
this.isDone = true
229+
}
230+
return value
231+
}
232+
233+
async close(): Promise<void> {}
234+
}
235+
236+
class BufferedReader implements BytesReader {
174237
buffer: Uint8Array = new Uint8Array()
175238
iterator: Iterator<Uint8Array> | AsyncIterator<Uint8Array>
176239
isDone = false
@@ -189,27 +252,6 @@ class BufferedReader {
189252
return value
190253
}
191254

192-
async readVarint(): Promise<number | null> {
193-
let done = false
194-
const bytes: Uint8Array[] = []
195-
while (!done) {
196-
const byte = await this.read(1)
197-
if (byte.byteLength === 0) {
198-
if (bytes.length > 0) {
199-
throw new Error('could not parse varint')
200-
} else {
201-
return null
202-
}
203-
}
204-
bytes.push(byte)
205-
if (byte[0] < 128) {
206-
done = true
207-
}
208-
}
209-
const concatted = ui8.concat(bytes)
210-
return varint.decode(concatted)
211-
}
212-
213255
private async readUntilBuffered(bytesToRead: number) {
214256
if (this.isDone) {
215257
return

0 commit comments

Comments
 (0)