Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions lib/std/Io.zig
Original file line number Diff line number Diff line change
Expand Up @@ -990,15 +990,15 @@ pub fn Future(Result: type) type {
/// Idempotent. Not threadsafe.
pub fn cancel(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
io.vtable.cancel(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
f.any_future = null;
return f.result;
}

/// Idempotent. Not threadsafe.
pub fn await(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
io.vtable.await(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
f.any_future = null;
return f.result;
}
Expand Down Expand Up @@ -1034,7 +1034,7 @@ pub const Group = struct {
@call(.auto, function, args_casted.*);
}
};
io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}

/// Blocks until all tasks of the group finish. During this time,
Expand Down Expand Up @@ -1111,7 +1111,7 @@ pub fn Select(comptime U: type) type {
}
};
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&args), .of(Args), TypeErased.start);
}

/// Blocks until another task of the select finishes.
Expand Down Expand Up @@ -1539,9 +1539,9 @@ pub fn async(
var future: Future(Result) = undefined;
future.any_future = io.vtable.async(
io.userdata,
@ptrCast((&future.result)[0..1]),
@ptrCast(&future.result),
.of(Result),
@ptrCast((&args)[0..1]),
@ptrCast(&args),
.of(Args),
TypeErased.start,
);
Expand Down Expand Up @@ -1580,7 +1580,7 @@ pub fn concurrent(
io.userdata,
@sizeOf(Result),
.of(Result),
@ptrCast((&args)[0..1]),
@ptrCast(&args),
.of(Args),
TypeErased.start,
);
Expand Down
52 changes: 31 additions & 21 deletions lib/std/Io/Threaded.zig
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ const AsyncClosure = struct {
reset_event: ResetEvent,
select_condition: ?*ResetEvent,
context_alignment: std.mem.Alignment,
result_offset: usize,
result_alignment: std.mem.Alignment,
result_offset_before_padding: usize,

const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));

Expand Down Expand Up @@ -420,12 +421,12 @@ const AsyncClosure = struct {

fn resultPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.result_offset;
return @ptrFromInt(ac.result_alignment.forward(@intFromPtr(base + ac.result_offset_before_padding)));
}

fn contextPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.context_alignment.forward(@sizeOf(AsyncClosure));
return @ptrFromInt(ac.context_alignment.forward(@intFromPtr(base + @sizeOf(AsyncClosure))));
}

fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
Expand All @@ -436,7 +437,9 @@ const AsyncClosure = struct {

fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void {
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
gpa.free(base[0 .. ac.result_offset + result_len]);
const result_offset_with_padding = ac.result_alignment.forward(ac.result_offset_before_padding);
const allocated_size = result_offset_with_padding + result_len;
gpa.free(base[0..allocated_size]);
}
};

Expand All @@ -460,13 +463,16 @@ fn async(
};
};
const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
const context_offset_before_padding = @alignOf(AsyncClosure) + @sizeOf(AsyncClosure);
const context_offset_with_padding = context_alignment.forward(context_offset_before_padding);
const result_offset_before_padding = context_offset_with_padding + context.len;
const result_offset_with_padding = result_alignment.forward(result_offset_before_padding);
const allocated_size = result_offset_with_padding + result.len;
const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), allocated_size) catch {
start(context.ptr, result.ptr);
return null;
}));
};
const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes));

ac.* = .{
.closure = .{
Expand All @@ -476,7 +482,8 @@ fn async(
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.result_alignment = result_alignment,
.result_offset_before_padding = result_offset_before_padding,
.reset_event = .unset,
.select_condition = null,
};
Expand Down Expand Up @@ -531,10 +538,12 @@ fn concurrent(
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1;
const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch
const context_offset_before_padding = @alignOf(AsyncClosure) + @sizeOf(AsyncClosure);
const context_offset_with_padding = context_alignment.forward(context_offset_before_padding);
const result_offset_before_padding = context_offset_with_padding + context.len;
const result_offset_with_padding = result_alignment.forward(result_offset_before_padding);
const allocated_size = result_offset_with_padding + result_len;
const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), allocated_size) catch
return error.ConcurrencyUnavailable;
const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes));

Expand All @@ -546,7 +555,8 @@ fn concurrent(
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.result_alignment = result_alignment,
.result_offset_before_padding = result_offset_before_padding,
.reset_event = .unset,
.select_condition = null,
};
Expand Down Expand Up @@ -580,6 +590,8 @@ fn concurrent(
return @ptrCast(ac);
}

/// Trailing data:
/// 1. context
const GroupClosure = struct {
closure: Closure,
t: *Threaded,
Expand Down Expand Up @@ -621,17 +633,15 @@ const GroupClosure = struct {
gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
}

fn contextOffset(context_alignment: std.mem.Alignment) usize {
return context_alignment.forward(@sizeOf(GroupClosure));
}

fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
return contextOffset(context_alignment) + context_len;
const context_offset_before_padding = @alignOf(GroupClosure) + @sizeOf(GroupClosure);
const context_offset_with_padding = context_alignment.forward(context_offset_before_padding);
return context_offset_with_padding + context_len;
}

fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
return base + contextOffset(gc.context_alignment);
return @ptrFromInt(gc.context_alignment.forward(@intFromPtr(base + @sizeOf(GroupClosure))));
}

const sync_is_waiting: usize = 1 << 0;
Expand Down
66 changes: 66 additions & 0 deletions lib/std/Io/Threaded/test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,69 @@ test "concurrent vs concurrent prevents deadlock via oversubscription" {
getter.await(io);
putter.await(io);
}

fn paramWithExtraAlignment(param: Align64) void {
assert(param.data == 3);
}

fn returnValueWithExtraAlignment() Align64 {
return .{ .data = 5 };
}

const Align64 = struct {
data: u8 align(64),
};

test "async closure where result or context has extra alignment" {
// A fixed buffer allocator is used instead of `std.testing.allocator` to
// not get memory that has better alignment than requested.
var buffer: [1024]u8 align(64) = undefined;
var fba: std.heap.FixedBufferAllocator = .init(buffer[1..]);

var threaded: std.Io.Threaded = .init(fba.allocator());
defer threaded.deinit();
const io = threaded.io();

{
var future = io.async(paramWithExtraAlignment, .{.{ .data = 3 }});
future.await(io);
}

{
var future = io.async(returnValueWithExtraAlignment, .{});
const result = future.await(io);
try std.testing.expectEqual(5, result.data);
}
}

test "group closure where context has extra alignment" {
// A fixed buffer allocator is used instead of `std.testing.allocator` to
// not get memory that has better alignment than requested.
var buffer: [1024]u8 align(64) = undefined;
var fba: std.heap.FixedBufferAllocator = .init(buffer[1..]);

var threaded: std.Io.Threaded = .init(fba.allocator());
defer threaded.deinit();
const io = threaded.io();

var group: std.Io.Group = .init;
defer group.cancel(io);

group.async(io, paramWithExtraAlignment, .{.{ .data = 3 }});
}

fn returnArray() [32]u8 {
return @splat(5);
}

test "async on function with array parameter or return type" {
var threaded: std.Io.Threaded = .init(std.testing.allocator);
defer threaded.deinit();
const io = threaded.io();

var future = io.async(returnArray, .{});
const result = future.await(io);
for (result) |actual| {
try std.testing.expectEqual(5, actual);
}
}
Loading