diff --git a/CMakeLists.txt b/CMakeLists.txt index 867f2684db..0cf4a4029c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -460,6 +460,7 @@ set(ZIG_STD_FILES "empty.zig" "event.zig" "event/channel.zig" + "event/fs.zig" "event/future.zig" "event/group.zig" "event/lock.zig" diff --git a/std/build.zig b/std/build.zig index 68cf13c1eb..021b8399e3 100644 --- a/std/build.zig +++ b/std/build.zig @@ -603,10 +603,10 @@ pub const Builder = struct { } fn copyFile(self: *Builder, source_path: []const u8, dest_path: []const u8) !void { - return self.copyFileMode(source_path, dest_path, os.default_file_mode); + return self.copyFileMode(source_path, dest_path, os.File.default_mode); } - fn copyFileMode(self: *Builder, source_path: []const u8, dest_path: []const u8, mode: os.FileMode) !void { + fn copyFileMode(self: *Builder, source_path: []const u8, dest_path: []const u8, mode: os.File.Mode) !void { if (self.verbose) { warn("cp {} {}\n", source_path, dest_path); } diff --git a/std/debug/index.zig b/std/debug/index.zig index ab50d79db3..c32c3d352c 100644 --- a/std/debug/index.zig +++ b/std/debug/index.zig @@ -672,14 +672,10 @@ fn parseFormValueRef(allocator: *mem.Allocator, in_stream: var, comptime T: type const ParseFormValueError = error{ EndOfStream, - Io, - BadFd, - Unexpected, InvalidDebugInfo, EndOfFile, - IsDir, OutOfMemory, -}; +} || std.os.File.ReadError; fn parseFormValue(allocator: *mem.Allocator, in_stream: var, form_id: u64, is_64: bool) ParseFormValueError!FormValue { return switch (form_id) { diff --git a/std/event.zig b/std/event.zig index 1e52086286..9b692ecd44 100644 --- a/std/event.zig +++ b/std/event.zig @@ -1,17 +1,19 @@ +pub const Channel = @import("event/channel.zig").Channel; +pub const Future = @import("event/future.zig").Future; +pub const Group = @import("event/group.zig").Group; +pub const Lock = @import("event/lock.zig").Lock; pub const Locked = @import("event/locked.zig").Locked; pub const Loop = @import("event/loop.zig").Loop; -pub const Lock = @import("event/lock.zig").Lock; +pub const fs = @import("event/fs.zig"); pub const tcp = @import("event/tcp.zig"); -pub const Channel = @import("event/channel.zig").Channel; -pub const Group = @import("event/group.zig").Group; -pub const Future = @import("event/future.zig").Future; test "import event tests" { + _ = @import("event/channel.zig"); + _ = @import("event/fs.zig"); + _ = @import("event/future.zig"); + _ = @import("event/group.zig"); + _ = @import("event/lock.zig"); _ = @import("event/locked.zig"); _ = @import("event/loop.zig"); - _ = @import("event/lock.zig"); _ = @import("event/tcp.zig"); - _ = @import("event/channel.zig"); - _ = @import("event/group.zig"); - _ = @import("event/future.zig"); } diff --git a/std/event/fs.zig b/std/event/fs.zig new file mode 100644 index 0000000000..760d5f61e4 --- /dev/null +++ b/std/event/fs.zig @@ -0,0 +1,343 @@ +const std = @import("../index.zig"); +const event = std.event; +const assert = std.debug.assert; +const os = std.os; +const mem = std.mem; + +pub const RequestNode = std.atomic.Queue(Request).Node; + +pub const Request = struct { + msg: Msg, + finish: Finish, + + pub const Finish = union(enum) { + TickNode: event.Loop.NextTickNode, + DeallocCloseOperation: *CloseOperation, + NoAction, + }; + + pub const Msg = union(enum) { + PWriteV: PWriteV, + PReadV: PReadV, + OpenRead: OpenRead, + Close: Close, + WriteFile: WriteFile, + End, // special - means the fs thread should exit + + pub const PWriteV = struct { + fd: os.FileHandle, + data: []const []const u8, + offset: usize, + result: Error!void, + + pub const Error = error{}; + }; + + pub const PReadV = struct { + fd: os.FileHandle, + iov: []os.linux.iovec, + offset: usize, + result: Error!usize, + + pub const Error = os.File.ReadError; + }; + + pub const OpenRead = struct { + /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 + path: []const u8, + result: Error!os.FileHandle, + + pub const Error = os.File.OpenError; + }; + + pub const WriteFile = struct { + /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 + path: []const u8, + contents: []const u8, + mode: os.File.Mode, + result: Error!void, + + pub const Error = os.File.OpenError || os.File.WriteError; + }; + + pub const Close = struct { + fd: os.FileHandle, + }; + }; +}; + +/// data - both the outer and inner references - must live until pwritev promise completes. +pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []const u8) !void { + //const data_dupe = try mem.dupe(loop.allocator, []const u8, data); + //defer loop.allocator.free(data_dupe); + + // workaround for https://github.com/ziglang/zig/issues/1194 + var my_handle: promise = undefined; + suspend |p| { + my_handle = p; + resume p; + } + + var req_node = RequestNode{ + .next = undefined, + .data = Request{ + .msg = Request.Msg{ + .PWriteV = Request.Msg.PWriteV{ + .fd = fd, + .data = data, + .offset = offset, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = event.Loop.NextTickNode{ + .next = undefined, + .data = my_handle, + }, + }, + }, + }; + + suspend |_| { + loop.linuxFsRequest(&req_node); + } + + return req_node.data.msg.PWriteV.result; +} + +/// data - just the inner references - must live until pwritev promise completes. +pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []u8) !usize { + //const data_dupe = try mem.dupe(loop.allocator, []const u8, data); + //defer loop.allocator.free(data_dupe); + + // workaround for https://github.com/ziglang/zig/issues/1194 + var my_handle: promise = undefined; + suspend |p| { + my_handle = p; + resume p; + } + + const iovecs = try loop.allocator.alloc(os.linux.iovec, data.len); + defer loop.allocator.free(iovecs); + + for (data) |buf, i| { + iovecs[i] = os.linux.iovec{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + var req_node = RequestNode{ + .next = undefined, + .data = Request{ + .msg = Request.Msg{ + .PReadV = Request.Msg.PReadV{ + .fd = fd, + .iov = iovecs, + .offset = offset, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = event.Loop.NextTickNode{ + .next = undefined, + .data = my_handle, + }, + }, + }, + }; + + suspend |_| { + loop.linuxFsRequest(&req_node); + } + + return req_node.data.msg.PReadV.result; +} + +pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle { + // workaround for https://github.com/ziglang/zig/issues/1194 + var my_handle: promise = undefined; + suspend |p| { + my_handle = p; + resume p; + } + + var req_node = RequestNode{ + .next = undefined, + .data = Request{ + .msg = Request.Msg{ + .OpenRead = Request.Msg.OpenRead{ + .path = path, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = event.Loop.NextTickNode{ + .next = undefined, + .data = my_handle, + }, + }, + }, + }; + + suspend |_| { + loop.linuxFsRequest(&req_node); + } + + return req_node.data.msg.OpenRead.result; +} + +/// This abstraction helps to close file handles in defer expressions +/// without suspending. Start a CloseOperation before opening a file. +pub const CloseOperation = struct { + loop: *event.Loop, + have_fd: bool, + close_req_node: RequestNode, + + pub fn create(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) { + const self = try loop.allocator.createOne(CloseOperation); + self.* = CloseOperation{ + .loop = loop, + .have_fd = false, + .close_req_node = RequestNode{ + .next = undefined, + .data = Request{ + .msg = Request.Msg{ + .Close = Request.Msg.Close{ .fd = undefined }, + }, + .finish = Request.Finish{ .DeallocCloseOperation = self }, + }, + }, + }; + return self; + } + + /// Defer this after creating. + pub fn deinit(self: *CloseOperation) void { + if (self.have_fd) { + self.loop.linuxFsRequest(&self.close_req_node); + } else { + self.loop.allocator.destroy(self); + } + } + + pub fn setHandle(self: *CloseOperation, handle: os.FileHandle) void { + self.close_req_node.data.msg.Close.fd = handle; + self.have_fd = true; + } +}; + +/// contents must remain alive until writeFile completes. +pub async fn writeFile(loop: *event.Loop, path: []const u8, contents: []const u8) !void { + return await (async writeFileMode(loop, path, contents, os.File.default_mode) catch unreachable); +} + +/// contents must remain alive until writeFile completes. +pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { + // workaround for https://github.com/ziglang/zig/issues/1194 + var my_handle: promise = undefined; + suspend |p| { + my_handle = p; + resume p; + } + + const path_with_null = try std.cstr.addNullByte(loop.allocator, path); + defer loop.allocator.free(path_with_null); + + var req_node = RequestNode{ + .next = undefined, + .data = Request{ + .msg = Request.Msg{ + .WriteFile = Request.Msg.WriteFile{ + .path = path_with_null[0..path.len], + .contents = contents, + .mode = mode, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = event.Loop.NextTickNode{ + .next = undefined, + .data = my_handle, + }, + }, + }, + }; + + suspend |_| { + loop.linuxFsRequest(&req_node); + } + + return req_node.data.msg.WriteFile.result; +} + +/// The promise resumes when the last data has been confirmed written, but before the file handle +/// is closed. +pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) ![]u8 { + var close_op = try CloseOperation.create(loop); + defer close_op.deinit(); + + const path_with_null = try std.cstr.addNullByte(loop.allocator, file_path); + defer loop.allocator.free(path_with_null); + + const fd = try await (async openRead(loop, path_with_null[0..file_path.len]) catch unreachable); + close_op.setHandle(fd); + + var list = std.ArrayList(u8).init(loop.allocator); + defer list.deinit(); + + while (true) { + try list.ensureCapacity(list.len + os.page_size); + const buf = list.items[list.len..]; + const buf_array = [][]u8{buf}; + const amt = try await (async preadv(loop, fd, list.len, buf_array) catch unreachable); + list.len += amt; + if (amt < buf.len) { + return list.toOwnedSlice(); + } + } +} + +const test_tmp_dir = "std_event_fs_test"; + +test "write a file, watch it, write it again" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + // TODO move this into event loop too + try os.makePath(allocator, test_tmp_dir); + defer os.deleteTree(allocator, test_tmp_dir) catch {}; + + var loop: event.Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var result: error!void = undefined; + const handle = try async testFsWatchCantFail(&loop, &result); + defer cancel handle; + + loop.run(); + return result; +} + +async fn testFsWatchCantFail(loop: *event.Loop, result: *(error!void)) void { + result.* = await async testFsWatch(loop) catch unreachable; +} + +async fn testFsWatch(loop: *event.Loop) !void { + const file_path = try os.path.join(loop.allocator, test_tmp_dir, "file.txt"); + defer loop.allocator.free(file_path); + + const contents = + \\line 1 + \\line 2 + ; + + // first just write then read the file + try await try async writeFile(loop, file_path, contents); + + const read_contents = try await try async readFile(loop, file_path, 1024 * 1024); + assert(mem.eql(u8, read_contents, contents)); +} diff --git a/std/event/loop.zig b/std/event/loop.zig index 4e219653be..416d6cb2c3 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -2,10 +2,12 @@ const std = @import("../index.zig"); const builtin = @import("builtin"); const assert = std.debug.assert; const mem = std.mem; -const posix = std.os.posix; -const windows = std.os.windows; const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; +const fs = std.event.fs; +const os = std.os; +const posix = os.posix; +const windows = os.windows; pub const Loop = struct { allocator: *mem.Allocator, @@ -13,7 +15,7 @@ pub const Loop = struct { os_data: OsData, final_resume_node: ResumeNode, pending_event_count: usize, - extra_threads: []*std.os.Thread, + extra_threads: []*os.Thread, // pre-allocated eventfds. all permanently active. // this is how we send promises to be resumed on other threads. @@ -65,7 +67,7 @@ pub const Loop = struct { /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { - const core_count = try std.os.cpuCount(allocator); + const core_count = try os.cpuCount(allocator); return self.initInternal(allocator, core_count); } @@ -92,7 +94,7 @@ pub const Loop = struct { ); errdefer self.allocator.free(self.eventfd_resume_nodes); - self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count); + self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); errdefer self.allocator.free(self.extra_threads); try self.initOsData(extra_thread_count); @@ -104,17 +106,34 @@ pub const Loop = struct { self.allocator.free(self.extra_threads); } - const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError || - std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError || - std.os.WindowsCreateIoCompletionPortError; + const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || + os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError || + os.WindowsCreateIoCompletionPortError; const wakeup_bytes = []u8{0x1} ** 8; fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { builtin.Os.linux => { + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue_len = 0; + // we need another thread for the file system because Linux does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + self.os_data.fs_thread = try os.spawnThread(self, linuxFsRun); errdefer { - while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + self.linuxFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + errdefer { + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); } for (self.eventfd_resume_nodes) |*eventfd_node| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -123,7 +142,7 @@ pub const Loop = struct { .id = ResumeNode.Id.EventFd, .handle = undefined, }, - .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), + .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), .epoll_op = posix.EPOLL_CTL_ADD, }, .next = undefined, @@ -131,17 +150,17 @@ pub const Loop = struct { self.available_eventfd_resume_nodes.push(eventfd_node); } - self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC); - errdefer std.os.close(self.os_data.epollfd); + self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + errdefer os.close(self.os_data.epollfd); - self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); - errdefer std.os.close(self.os_data.final_eventfd); + self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + errdefer os.close(self.os_data.final_eventfd); self.os_data.final_eventfd_event = posix.epoll_event{ .events = posix.EPOLLIN, .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, }; - try std.os.linuxEpollCtl( + try os.linuxEpollCtl( self.os_data.epollfd, posix.EPOLL_CTL_ADD, self.os_data.final_eventfd, @@ -151,19 +170,19 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail - std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); } }, builtin.Os.macosx => { - self.os_data.kqfd = try std.os.bsdKQueue(); - errdefer std.os.close(self.os_data.kqfd); + self.os_data.kqfd = try os.bsdKQueue(); + errdefer os.close(self.os_data.kqfd); self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); errdefer self.allocator.free(self.os_data.kevents); @@ -191,7 +210,7 @@ pub const Loop = struct { }; self.available_eventfd_resume_nodes.push(eventfd_node); const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); - _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; // this one is for waiting for events @@ -216,30 +235,30 @@ pub const Loop = struct { .udata = @ptrToInt(&self.final_resume_node), }; const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent); - _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); self.os_data.final_kevent.flags = posix.EV_ENABLE; self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; var extra_thread_index: usize = 0; errdefer { - _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable; + _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); } }, builtin.Os.windows => { - self.os_data.io_port = try std.os.windowsCreateIoCompletionPort( + self.os_data.io_port = try os.windowsCreateIoCompletionPort( windows.INVALID_HANDLE_VALUE, null, undefined, undefined, ); - errdefer std.os.close(self.os_data.io_port); + errdefer os.close(self.os_data.io_port); for (self.eventfd_resume_nodes) |*eventfd_node, i| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -262,7 +281,7 @@ pub const Loop = struct { while (i < extra_thread_index) : (i += 1) { while (true) { const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; break; } } @@ -272,7 +291,7 @@ pub const Loop = struct { } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); } }, else => {}, @@ -282,17 +301,17 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { builtin.Os.linux => { - std.os.close(self.os_data.final_eventfd); - while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); - std.os.close(self.os_data.epollfd); + os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + os.close(self.os_data.epollfd); self.allocator.free(self.eventfd_resume_nodes); }, builtin.Os.macosx => { self.allocator.free(self.os_data.kevents); - std.os.close(self.os_data.kqfd); + os.close(self.os_data.kqfd); }, builtin.Os.windows => { - std.os.close(self.os_data.io_port); + os.close(self.os_data.io_port); }, else => {}, } @@ -307,17 +326,17 @@ pub const Loop = struct { try self.modFd( fd, posix.EPOLL_CTL_ADD, - std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, + os.linux.EPOLLIN | os.linux.EPOLLOUT | os.linux.EPOLLET, resume_node, ); } pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void { - var ev = std.os.linux.epoll_event{ + var ev = os.linux.epoll_event{ .events = events, - .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, + .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, }; - try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); + try os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); } pub fn removeFd(self: *Loop, fd: i32) void { @@ -326,7 +345,7 @@ pub const Loop = struct { } fn removeFdNoCounter(self: *Loop, fd: i32) void { - std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; + os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; } pub async fn waitFd(self: *Loop, fd: i32) !void { @@ -353,7 +372,7 @@ pub const Loop = struct { builtin.Os.macosx => { const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); const eventlist = ([*]posix.Kevent)(undefined)[0..0]; - _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { + _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { self.next_tick_queue.unget(next_tick_node); self.available_eventfd_resume_nodes.push(resume_stack_node); return; @@ -361,8 +380,8 @@ pub const Loop = struct { }, builtin.Os.linux => { // the pending count is already accounted for - const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | - std.os.linux.EPOLLET; + const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | + os.linux.EPOLLET; self.modFd( eventfd_node.eventfd, eventfd_node.epoll_op, @@ -379,7 +398,7 @@ pub const Loop = struct { // the consumer code can decide whether to read the completion key. // it has to do this for normal I/O, so we match that behavior here. const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - std.os.windowsPostQueuedCompletionStatus( + os.windowsPostQueuedCompletionStatus( self.os_data.io_port, undefined, eventfd_node.completion_key, @@ -406,6 +425,9 @@ pub const Loop = struct { self.finishOneEvent(); // the reference we start with self.workerRun(); + + self.os_data.fs_thread.wait(); + for (self.extra_threads) |extra_thread| { extra_thread.wait(); } @@ -453,15 +475,16 @@ pub const Loop = struct { // cause all the threads to stop switch (builtin.os) { builtin.Os.linux => { + self.linuxFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail - std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, builtin.Os.macosx => { const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); const eventlist = ([*]posix.Kevent)(undefined)[0..0]; // cannot fail because we already added it and this just enables it - _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; + _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; return; }, builtin.Os.windows => { @@ -469,7 +492,7 @@ pub const Loop = struct { while (i < self.extra_threads.len + 1) : (i += 1) { while (true) { const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; break; } } @@ -492,8 +515,8 @@ pub const Loop = struct { switch (builtin.os) { builtin.Os.linux => { // only process 1 event so we don't steal from other threads - var events: [1]std.os.linux.epoll_event = undefined; - const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + var events: [1]os.linux.epoll_event = undefined; + const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); for (events[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); const handle = resume_node.handle; @@ -516,7 +539,7 @@ pub const Loop = struct { }, builtin.Os.macosx => { var eventlist: [1]posix.Kevent = undefined; - const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; + const count = os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; for (eventlist[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.udata); const handle = resume_node.handle; @@ -541,9 +564,9 @@ pub const Loop = struct { while (true) { var nbytes: windows.DWORD = undefined; var overlapped: ?*windows.OVERLAPPED = undefined; - switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { - std.os.WindowsWaitResult.Aborted => return, - std.os.WindowsWaitResult.Normal => {}, + switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + os.WindowsWaitResult.Aborted => return, + os.WindowsWaitResult.Normal => {}, } if (overlapped != null) break; } @@ -569,11 +592,73 @@ pub const Loop = struct { } } + fn linuxFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + self.os_data.fs_queue.put(request_node); + _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap + const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); + switch (os.linux.getErrno(rc)) { + 0 => {}, + posix.EINVAL => unreachable, + else => unreachable, + } + } + + fn linuxFsRun(self: *Loop) void { + var processed_count: i32 = 0; // we let this wrap + while (true) { + while (self.os_data.fs_queue.get()) |node| { + processed_count +%= 1; + switch (node.data.msg) { + @TagType(fs.Request.Msg).PWriteV => @panic("TODO"), + @TagType(fs.Request.Msg).PReadV => |*msg| { + msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).OpenRead => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDONLY; + msg.result = os.posixOpenC(msg.path.ptr, flags, 0); + }, + @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), + @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { + const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | + posix.O_CLOEXEC | posix.O_TRUNC; + const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { + msg.result = err; + break :blk; + }; + defer os.close(fd); + msg.result = os.posixWrite(fd, msg.contents); + }, + @TagType(fs.Request.Msg).End => return, + } + switch (node.data.finish) { + @TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node), + @TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| { + self.allocator.destroy(close_op); + }, + @TagType(fs.Request.Finish).NoAction => {}, + } + self.finishOneEvent(); + } + const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); + switch (os.linux.getErrno(rc)) { + 0 => continue, + posix.EINTR => continue, + posix.EAGAIN => continue, + else => unreachable, + } + } + } + const OsData = switch (builtin.os) { builtin.Os.linux => struct { epollfd: i32, final_eventfd: i32, - final_eventfd_event: std.os.linux.epoll_event, + final_eventfd_event: os.linux.epoll_event, + fs_thread: *os.Thread, + fs_queue_len: i32, // we let this wrap + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, }, builtin.Os.macosx => MacOsData, builtin.Os.windows => struct { diff --git a/std/io.zig b/std/io.zig index ff73c04f78..49e03a64b2 100644 --- a/std/io.zig +++ b/std/io.zig @@ -415,13 +415,12 @@ pub fn PeekStream(comptime buffer_size: usize, comptime InStreamError: type) typ self.at_end = (read < left); return pos + read; } - }; } pub const SliceInStream = struct { const Self = this; - pub const Error = error { }; + pub const Error = error{}; pub const Stream = InStream(Error); pub stream: Stream, @@ -481,13 +480,12 @@ pub const SliceOutStream = struct { assert(self.pos <= self.slice.len); - const n = - if (self.pos + bytes.len <= self.slice.len) - bytes.len - else - self.slice.len - self.pos; + const n = if (self.pos + bytes.len <= self.slice.len) + bytes.len + else + self.slice.len - self.pos; - std.mem.copy(u8, self.slice[self.pos..self.pos + n], bytes[0..n]); + std.mem.copy(u8, self.slice[self.pos .. self.pos + n], bytes[0..n]); self.pos += n; if (n < bytes.len) { @@ -586,7 +584,7 @@ pub const BufferedAtomicFile = struct { }); errdefer allocator.destroy(self); - self.atomic_file = try os.AtomicFile.init(allocator, dest_path, os.default_file_mode); + self.atomic_file = try os.AtomicFile.init(allocator, dest_path, os.File.default_mode); errdefer self.atomic_file.deinit(); self.file_stream = FileOutStream.init(&self.atomic_file.file); diff --git a/std/os/file.zig b/std/os/file.zig index 6998ba00d1..c402aa0522 100644 --- a/std/os/file.zig +++ b/std/os/file.zig @@ -15,10 +15,21 @@ pub const File = struct { /// The OS-specific file descriptor or file handle. handle: os.FileHandle, + pub const Mode = switch (builtin.os) { + Os.windows => void, + else => u32, + }; + + pub const default_mode = switch (builtin.os) { + Os.windows => {}, + else => 0o666, + }; + pub const OpenError = os.WindowsOpenError || os.PosixOpenError; /// `path` needs to be copied in memory to add a null terminating byte, hence the allocator. /// Call close to clean up. + /// TODO deprecated, just use open pub fn openRead(allocator: *mem.Allocator, path: []const u8) OpenError!File { if (is_posix) { const flags = posix.O_LARGEFILE | posix.O_RDONLY; @@ -39,16 +50,18 @@ pub const File = struct { } } - /// Calls `openWriteMode` with os.default_file_mode for the mode. + /// Calls `openWriteMode` with os.File.default_mode for the mode. + /// TODO deprecated, just use open pub fn openWrite(allocator: *mem.Allocator, path: []const u8) OpenError!File { - return openWriteMode(allocator, path, os.default_file_mode); + return openWriteMode(allocator, path, os.File.default_mode); } /// If the path does not exist it will be created. /// If a file already exists in the destination it will be truncated. /// `path` needs to be copied in memory to add a null terminating byte, hence the allocator. /// Call close to clean up. - pub fn openWriteMode(allocator: *mem.Allocator, path: []const u8, file_mode: os.FileMode) OpenError!File { + /// TODO deprecated, just use open + pub fn openWriteMode(allocator: *mem.Allocator, path: []const u8, file_mode: Mode) OpenError!File { if (is_posix) { const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_TRUNC; const fd = try os.posixOpen(allocator, path, flags, file_mode); @@ -72,7 +85,8 @@ pub const File = struct { /// If a file already exists in the destination this returns OpenError.PathAlreadyExists /// `path` needs to be copied in memory to add a null terminating byte, hence the allocator. /// Call close to clean up. - pub fn openWriteNoClobber(allocator: *mem.Allocator, path: []const u8, file_mode: os.FileMode) OpenError!File { + /// TODO deprecated, just use open + pub fn openWriteNoClobber(allocator: *mem.Allocator, path: []const u8, file_mode: Mode) OpenError!File { if (is_posix) { const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_EXCL; const fd = try os.posixOpen(allocator, path, flags, file_mode); @@ -282,7 +296,7 @@ pub const File = struct { Unexpected, }; - pub fn mode(self: *File) ModeError!os.FileMode { + pub fn mode(self: *File) ModeError!Mode { if (is_posix) { var stat: posix.Stat = undefined; const err = posix.getErrno(posix.fstat(self.handle, &stat)); @@ -296,7 +310,7 @@ pub const File = struct { // TODO: we should be able to cast u16 to ModeError!u32, making this // explicit cast not necessary - return os.FileMode(stat.mode); + return Mode(stat.mode); } else if (is_windows) { return {}; } else { @@ -305,9 +319,11 @@ pub const File = struct { } pub const ReadError = error{ - BadFd, - Io, + FileClosed, + InputOutput, IsDir, + WouldBlock, + SystemResources, Unexpected, }; @@ -323,9 +339,12 @@ pub const File = struct { posix.EINTR => continue, posix.EINVAL => unreachable, posix.EFAULT => unreachable, - posix.EBADF => return error.BadFd, - posix.EIO => return error.Io, + posix.EAGAIN => return error.WouldBlock, + posix.EBADF => return error.FileClosed, + posix.EIO => return error.InputOutput, posix.EISDIR => return error.IsDir, + posix.ENOBUFS => return error.SystemResources, + posix.ENOMEM => return error.SystemResources, else => return os.unexpectedErrorPosix(read_err), } } diff --git a/std/os/index.zig b/std/os/index.zig index 77fd2a78ad..727c71c435 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -38,16 +38,6 @@ pub const path = @import("path.zig"); pub const File = @import("file.zig").File; pub const time = @import("time.zig"); -pub const FileMode = switch (builtin.os) { - Os.windows => void, - else => u32, -}; - -pub const default_file_mode = switch (builtin.os) { - Os.windows => {}, - else => 0o666, -}; - pub const page_size = 4 * 1024; pub const UserInfo = @import("get_user_id.zig").UserInfo; @@ -256,6 +246,26 @@ pub fn posixRead(fd: i32, buf: []u8) !void { } } +pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u64) !usize { + while (true) { + const rc = posix.preadv(fd, iov, count, offset); + const err = posix.getErrno(rc); + switch (err) { + 0 => return rc, + posix.EINTR => continue, + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.EAGAIN => return error.WouldBlock, + posix.EBADF => return error.FileClosed, + posix.EIO => return error.InputOutput, + posix.EISDIR => return error.IsDir, + posix.ENOBUFS => return error.SystemResources, + posix.ENOMEM => return error.SystemResources, + else => return unexpectedErrorPosix(err), + } + } +} + pub const PosixWriteError = error{ WouldBlock, FileClosed, @@ -853,7 +863,7 @@ pub fn copyFile(allocator: *Allocator, source_path: []const u8, dest_path: []con /// Guaranteed to be atomic. However until https://patchwork.kernel.org/patch/9636735/ is /// merged and readily available, /// there is a possibility of power loss or application termination leaving temporary files present -pub fn copyFileMode(allocator: *Allocator, source_path: []const u8, dest_path: []const u8, mode: FileMode) !void { +pub fn copyFileMode(allocator: *Allocator, source_path: []const u8, dest_path: []const u8, mode: File.Mode) !void { var in_file = try os.File.openRead(allocator, source_path); defer in_file.close(); @@ -879,7 +889,7 @@ pub const AtomicFile = struct { /// dest_path must remain valid for the lifetime of AtomicFile /// call finish to atomically replace dest_path with contents - pub fn init(allocator: *Allocator, dest_path: []const u8, mode: FileMode) !AtomicFile { + pub fn init(allocator: *Allocator, dest_path: []const u8, mode: File.Mode) !AtomicFile { const dirname = os.path.dirname(dest_path); var rand_buf: [12]u8 = undefined; diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig index 69bc30bad0..cf68e03ff0 100644 --- a/std/os/linux/index.zig +++ b/std/os/linux/index.zig @@ -692,6 +692,10 @@ pub fn futex_wait(uaddr: usize, futex_op: u32, val: i32, timeout: ?*timespec) us return syscall4(SYS_futex, uaddr, futex_op, @bitCast(u32, val), @ptrToInt(timeout)); } +pub fn futex_wake(uaddr: usize, futex_op: u32, val: i32) usize { + return syscall3(SYS_futex, uaddr, futex_op, @bitCast(u32, val)); +} + pub fn getcwd(buf: [*]u8, size: usize) usize { return syscall2(SYS_getcwd, @ptrToInt(buf), size); } @@ -742,6 +746,10 @@ pub fn read(fd: i32, buf: [*]u8, count: usize) usize { return syscall3(SYS_read, @intCast(usize, fd), @ptrToInt(buf), count); } +pub fn preadv(fd: i32, iov: [*]const iovec, count: usize, offset: u64) usize { + return syscall4(SYS_preadv, @intCast(usize, fd), @ptrToInt(iov), count, offset); +} + // TODO https://github.com/ziglang/zig/issues/265 pub fn rmdir(path: [*]const u8) usize { return syscall1(SYS_rmdir, @ptrToInt(path));