-
Notifications
You must be signed in to change notification settings - Fork 485
Description
Our applications writes to a file (XFS), and uses the IOSQE_ASYNC flag. The application can submit up to 8 operations at any point of time. Once 8 operations are in-flight, the application won't send more, until it has seen CQEs.
My understanding of IOSQE_ASYNC is that our operation would get executed in a worker thread, and once we have got the CQE, that worker thread should be free for more work. Thus, I do not understand why we're seeing hundreds (sometimes thousands) of worker threads.
Example:
ps -o pid,thcount $(pgrep uring_test)
PID THCNT
2005193 311
I think we shouldn't be using IOSQE_ASYNC, as we'd be better of leaving to uring the option of taking the non-blocking path.
However, I wanted to know whether the behaviour that I observed is expected of uring, when IOSQE_ASYNC is used? I would have expected as many threads as operations I have ongoing, but not more than that. Is my expectation correct?
File system used is XFS, and I these are the kernel versions of the machines were I ran this:
Linux dev3-147 6.14.5-100.fc40.x86_64 #1 SMP PREEMPT_DYNAMIC Fri May 2 14:22:13 UTC 2025 x86_64 GNU/Linux
and
Linux lab-130-19 5.14.0-570.35.1.0.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Aug 19 07:47:05 PDT 2025 x86_64 x86_64 x86_64 GNU/Linux
Here's the program I used:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include <linux/falloc.h>
#include <liburing.h>
#include <time.h>
#define FILE_SIZE (512ULL * 1024 * 1024) // 512MB
#define CHUNK_SIZE 4096 * 8
#define MAX_IN_FLIGHT 8 // Maximum concurrent operations
#define TOTAL_DATA (10ULL * 1024 * 1024 * 1024) // 10GB
#define RING_SIZE 16 // io_uring queue size
struct write_op {
char *buffer;
off_t offset;
int in_use;
};
static int create_and_setup_file(const char *filename) {
struct stat st;
int fd;
int flags = O_RDWR | O_DSYNC | O_DIRECT;
// Check if file exists
if (stat(filename, &st) == 0) {
// File exists, just open it
fd = open(filename, flags);
if (fd < 0) {
perror("open existing file");
return -1;
}
printf("File %s already exists, using existing file\n", filename);
} else {
// File doesn't exist, create it
fd = open(filename, flags | O_CREAT, 0644);
if (fd < 0) {
perror("create file");
return -1;
}
printf("Created new file %s, allocating %llu bytes...\n", filename, FILE_SIZE);
// Allocate space
if (fallocate(fd, 0, 0, FILE_SIZE) < 0) {
perror("fallocate");
close(fd);
return -1;
}
// Sync to disk
if (fsync(fd) < 0) {
perror("fsync");
close(fd);
return -1;
}
printf("File setup complete\n");
}
return fd;
}
static char* allocate_aligned_buffer(size_t size) {
void *buf;
if (posix_memalign(&buf, 4096, size) != 0) {
perror("posix_memalign");
return NULL;
}
return (char*)buf;
}
static void fill_data(char *buffer, size_t size) {
for (size_t i = 0; i < size; i++) {
buffer[i] = i & 0xFF;
}
}
static void print_progress(unsigned long long bytes_written) {
double progress = (double)bytes_written / TOTAL_DATA * 100.0;
double mb_written = (double)bytes_written / (1024 * 1024);
double total_mb = (double)TOTAL_DATA / (1024 * 1024);
printf("\r\033[KProgress: %.1f%% (%.1f MB / %.1f MB written)",
progress, mb_written, total_mb);
fflush(stdout);
}
int main() {
const char *filename = "test.dat";
int fd;
struct io_uring ring;
struct write_op ops[MAX_IN_FLIGHT];
unsigned long long bytes_written = 0;
off_t current_offset = 0;
int in_flight = 0;
int ret;
// Seed random number generator
srand(time(NULL));
// Create and setup the file
fd = create_and_setup_file(filename);
if (fd < 0) {
return 1;
}
// Initialize io_uring
ret = io_uring_queue_init(RING_SIZE, &ring, 0);
if (ret < 0) {
fprintf(stderr, "io_uring_queue_init failed: %s\n", strerror(-ret));
close(fd);
return 1;
}
// Initialize write operations
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
ops[i].buffer = allocate_aligned_buffer(CHUNK_SIZE);
if (!ops[i].buffer) {
fprintf(stderr, "Failed to allocate buffer %d\n", i);
goto cleanup;
}
ops[i].in_use = 0;
}
printf("Starting to write %llu bytes in %d byte chunks...\n", TOTAL_DATA, CHUNK_SIZE);
while (bytes_written < TOTAL_DATA) {
// Submit new writes if we have capacity and data to write
while (in_flight < MAX_IN_FLIGHT && bytes_written < TOTAL_DATA) {
// Find a free operation slot
int op_idx = -1;
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
if (!ops[i].in_use) {
op_idx = i;
break;
}
}
if (op_idx == -1) {
break; // No free slots
}
// Setup the write operation
struct write_op *op = &ops[op_idx];
op->offset = current_offset;
op->in_use = 1;
// Fill buffer with random data
fill_data(op->buffer, CHUNK_SIZE);
// Get SQE and prepare write
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Failed to get SQE\n");
goto cleanup;
}
io_uring_prep_write(sqe, fd, op->buffer, CHUNK_SIZE, op->offset);
sqe->flags |= IOSQE_ASYNC;
io_uring_sqe_set_data(sqe, op);
// Submit immediately
ret = io_uring_submit(&ring);
if (ret < 0) {
fprintf(stderr, "io_uring_submit failed: %s\n", strerror(-ret));
goto cleanup;
}
in_flight++;
// Update offset, wrapping around at file size
current_offset += CHUNK_SIZE;
if (current_offset + CHUNK_SIZE > FILE_SIZE) {
current_offset = 0;
}
}
// Wait for at least one completion with 1000ms timeout
struct io_uring_cqe *cqe;
struct __kernel_timespec ts = {
.tv_sec = 1,
.tv_nsec = 0
};
ret = io_uring_wait_cqe_timeout(&ring, &cqe, &ts);
if (ret == -ETIME) {
// Timeout occurred
printf("\nTimeout waiting for completion\n");
continue;
} else if (ret < 0) {
fprintf(stderr, "io_uring_wait_cqe_timeout failed: %s\n", strerror(-ret));
goto cleanup;
}
// Process all available completions
struct io_uring_cqe *cqe_ptr;
unsigned head;
unsigned count = 0;
io_uring_for_each_cqe(&ring, head, cqe_ptr) {
struct write_op *completed_op = (struct write_op*)io_uring_cqe_get_data(cqe_ptr);
if (cqe_ptr->res < 0) {
fprintf(stderr, "\nWrite failed: %s\n", strerror(-cqe_ptr->res));
goto cleanup;
} else if (cqe_ptr->res != CHUNK_SIZE) {
fprintf(stderr, "\nPartial write: %d bytes instead of %d\n",
cqe_ptr->res, CHUNK_SIZE);
goto cleanup;
}
// Mark operation as completed
completed_op->in_use = 0;
in_flight--;
bytes_written += CHUNK_SIZE;
count++;
print_progress(bytes_written);
}
io_uring_cq_advance(&ring, count);
}
printf("\nWrite completed successfully!\n");
cleanup:
// Cleanup
for (int i = 0; i < MAX_IN_FLIGHT; i++) {
if (ops[i].buffer) {
free(ops[i].buffer);
}
}
io_uring_queue_exit(&ring);
close(fd);
return 0;
}
And to compile and run it:
gcc -O2 -o uring_test uring_test.c -luring
./uring_test