Skip to content

[Question] Writing to a file with a depth of 8 operations with IOSQE_ASYNC causes the creation of a lot of worker threads. #1466

@dickeyf

Description

@dickeyf

Our applications writes to a file (XFS), and uses the IOSQE_ASYNC flag. The application can submit up to 8 operations at any point of time. Once 8 operations are in-flight, the application won't send more, until it has seen CQEs.

My understanding of IOSQE_ASYNC is that our operation would get executed in a worker thread, and once we have got the CQE, that worker thread should be free for more work. Thus, I do not understand why we're seeing hundreds (sometimes thousands) of worker threads.

Example:

ps -o pid,thcount $(pgrep uring_test)
    PID THCNT
2005193   311

I think we shouldn't be using IOSQE_ASYNC, as we'd be better of leaving to uring the option of taking the non-blocking path.

However, I wanted to know whether the behaviour that I observed is expected of uring, when IOSQE_ASYNC is used? I would have expected as many threads as operations I have ongoing, but not more than that. Is my expectation correct?

File system used is XFS, and I these are the kernel versions of the machines were I ran this:

Linux dev3-147 6.14.5-100.fc40.x86_64 #1 SMP PREEMPT_DYNAMIC Fri May  2 14:22:13 UTC 2025 x86_64 GNU/Linux
and
Linux lab-130-19 5.14.0-570.35.1.0.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Aug 19 07:47:05 PDT 2025 x86_64 x86_64 x86_64 GNU/Linux

Here's the program I used:

#define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include <linux/falloc.h>
#include <liburing.h>
#include <time.h>

#define FILE_SIZE (512ULL * 1024 * 1024)  // 512MB
#define CHUNK_SIZE 4096 * 8
#define MAX_IN_FLIGHT 8                     // Maximum concurrent operations
#define TOTAL_DATA (10ULL * 1024 * 1024 * 1024)  // 10GB
#define RING_SIZE 16                        // io_uring queue size

struct write_op {
    char *buffer;
    off_t offset;
    int in_use;
};

static int create_and_setup_file(const char *filename) {
    struct stat st;
    int fd;
    int flags = O_RDWR | O_DSYNC | O_DIRECT;

    // Check if file exists
    if (stat(filename, &st) == 0) {
        // File exists, just open it
        fd = open(filename, flags);
        if (fd < 0) {
            perror("open existing file");
            return -1;
        }
        printf("File %s already exists, using existing file\n", filename);
    } else {
        // File doesn't exist, create it
        fd = open(filename, flags | O_CREAT, 0644);
        if (fd < 0) {
            perror("create file");
            return -1;
        }

        printf("Created new file %s, allocating %llu bytes...\n", filename, FILE_SIZE);

        // Allocate space
        if (fallocate(fd, 0, 0, FILE_SIZE) < 0) {
            perror("fallocate");
            close(fd);
            return -1;
        }

        // Sync to disk
        if (fsync(fd) < 0) {
            perror("fsync");
            close(fd);
            return -1;
        }

        printf("File setup complete\n");
    }

    return fd;
}

static char* allocate_aligned_buffer(size_t size) {
    void *buf;
    if (posix_memalign(&buf, 4096, size) != 0) {
        perror("posix_memalign");
        return NULL;
    }
    return (char*)buf;
}

static void fill_data(char *buffer, size_t size) {
    for (size_t i = 0; i < size; i++) {
        buffer[i] = i & 0xFF;
    }
}

static void print_progress(unsigned long long bytes_written) {
    double progress = (double)bytes_written / TOTAL_DATA * 100.0;
    double mb_written = (double)bytes_written / (1024 * 1024);
    double total_mb = (double)TOTAL_DATA / (1024 * 1024);

    printf("\r\033[KProgress: %.1f%% (%.1f MB / %.1f MB written)",
           progress, mb_written, total_mb);
    fflush(stdout);
}

int main() {
    const char *filename = "test.dat";
    int fd;
    struct io_uring ring;
    struct write_op ops[MAX_IN_FLIGHT];
    unsigned long long bytes_written = 0;
    off_t current_offset = 0;
    int in_flight = 0;
    int ret;

    // Seed random number generator
    srand(time(NULL));

    // Create and setup the file
    fd = create_and_setup_file(filename);
    if (fd < 0) {
        return 1;
    }

    // Initialize io_uring
    ret = io_uring_queue_init(RING_SIZE, &ring, 0);
    if (ret < 0) {
        fprintf(stderr, "io_uring_queue_init failed: %s\n", strerror(-ret));
        close(fd);
        return 1;
    }

    // Initialize write operations
    for (int i = 0; i < MAX_IN_FLIGHT; i++) {
        ops[i].buffer = allocate_aligned_buffer(CHUNK_SIZE);
        if (!ops[i].buffer) {
            fprintf(stderr, "Failed to allocate buffer %d\n", i);
            goto cleanup;
        }
        ops[i].in_use = 0;
    }

    printf("Starting to write %llu bytes in %d byte chunks...\n", TOTAL_DATA, CHUNK_SIZE);

    while (bytes_written < TOTAL_DATA) {
        // Submit new writes if we have capacity and data to write
        while (in_flight < MAX_IN_FLIGHT && bytes_written < TOTAL_DATA) {
            // Find a free operation slot
            int op_idx = -1;
            for (int i = 0; i < MAX_IN_FLIGHT; i++) {
                if (!ops[i].in_use) {
                    op_idx = i;
                    break;
                }
            }

            if (op_idx == -1) {
                break; // No free slots
            }

            // Setup the write operation
            struct write_op *op = &ops[op_idx];
            op->offset = current_offset;
            op->in_use = 1;

            // Fill buffer with random data
            fill_data(op->buffer, CHUNK_SIZE);

            // Get SQE and prepare write
            struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
            if (!sqe) {
                fprintf(stderr, "Failed to get SQE\n");
                goto cleanup;
            }

            io_uring_prep_write(sqe, fd, op->buffer, CHUNK_SIZE, op->offset);
            sqe->flags |= IOSQE_ASYNC;
            io_uring_sqe_set_data(sqe, op);

            // Submit immediately
            ret = io_uring_submit(&ring);
            if (ret < 0) {
                fprintf(stderr, "io_uring_submit failed: %s\n", strerror(-ret));
                goto cleanup;
            }

            in_flight++;

            // Update offset, wrapping around at file size
            current_offset += CHUNK_SIZE;
            if (current_offset + CHUNK_SIZE > FILE_SIZE) {
                current_offset = 0;
            }
        }

        // Wait for at least one completion with 1000ms timeout
        struct io_uring_cqe *cqe;
        struct __kernel_timespec ts = {
            .tv_sec = 1,
            .tv_nsec = 0
        };

        ret = io_uring_wait_cqe_timeout(&ring, &cqe, &ts);
        if (ret == -ETIME) {
            // Timeout occurred
            printf("\nTimeout waiting for completion\n");
            continue;
        } else if (ret < 0) {
            fprintf(stderr, "io_uring_wait_cqe_timeout failed: %s\n", strerror(-ret));
            goto cleanup;
        }

        // Process all available completions
        struct io_uring_cqe *cqe_ptr;
        unsigned head;
        unsigned count = 0;

        io_uring_for_each_cqe(&ring, head, cqe_ptr) {
            struct write_op *completed_op = (struct write_op*)io_uring_cqe_get_data(cqe_ptr);

            if (cqe_ptr->res < 0) {
                fprintf(stderr, "\nWrite failed: %s\n", strerror(-cqe_ptr->res));
                goto cleanup;
            } else if (cqe_ptr->res != CHUNK_SIZE) {
                fprintf(stderr, "\nPartial write: %d bytes instead of %d\n",
                        cqe_ptr->res, CHUNK_SIZE);
                goto cleanup;
            }

            // Mark operation as completed
            completed_op->in_use = 0;
            in_flight--;
            bytes_written += CHUNK_SIZE;
            count++;

            print_progress(bytes_written);
        }

        io_uring_cq_advance(&ring, count);
    }

    printf("\nWrite completed successfully!\n");

cleanup:
    // Cleanup
    for (int i = 0; i < MAX_IN_FLIGHT; i++) {
        if (ops[i].buffer) {
            free(ops[i].buffer);
        }
    }

    io_uring_queue_exit(&ring);
    close(fd);

    return 0;
}

And to compile and run it:

gcc -O2 -o uring_test uring_test.c -luring

./uring_test

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions