mirror of
https://github.com/ziglang/zig.git
synced 2024-12-14 07:10:16 +00:00
add std.event.RwLock and a few more std changes
* add std.event.RwLock and std.event.RwLocked * std.debug.warn does its printing locked * add std.Mutex, however it's currently implemented as a spinlock * rename std.event.Group.cancelAll to std.event.Group.deinit and change the docs and assumptions. * add std.HashMap.clone
This commit is contained in:
parent
3c8d4e04ea
commit
e3ae2cfb52
@ -466,6 +466,8 @@ set(ZIG_STD_FILES
|
||||
"event/lock.zig"
|
||||
"event/locked.zig"
|
||||
"event/loop.zig"
|
||||
"event/rwlock.zig"
|
||||
"event/rwlocked.zig"
|
||||
"event/tcp.zig"
|
||||
"fmt/errol/enum3.zig"
|
||||
"fmt/errol/index.zig"
|
||||
@ -554,6 +556,7 @@ set(ZIG_STD_FILES
|
||||
"math/tanh.zig"
|
||||
"math/trunc.zig"
|
||||
"mem.zig"
|
||||
"mutex.zig"
|
||||
"net.zig"
|
||||
"os/child_process.zig"
|
||||
"os/darwin.zig"
|
||||
|
@ -23,7 +23,10 @@ pub const runtime_safety = switch (builtin.mode) {
|
||||
var stderr_file: os.File = undefined;
|
||||
var stderr_file_out_stream: io.FileOutStream = undefined;
|
||||
var stderr_stream: ?*io.OutStream(io.FileOutStream.Error) = null;
|
||||
var stderr_mutex = std.Mutex.init();
|
||||
pub fn warn(comptime fmt: []const u8, args: ...) void {
|
||||
const held = stderr_mutex.acquire();
|
||||
defer held.release();
|
||||
const stderr = getStderrStream() catch return;
|
||||
stderr.print(fmt, args) catch return;
|
||||
}
|
||||
|
@ -3,6 +3,8 @@ pub const Future = @import("event/future.zig").Future;
|
||||
pub const Group = @import("event/group.zig").Group;
|
||||
pub const Lock = @import("event/lock.zig").Lock;
|
||||
pub const Locked = @import("event/locked.zig").Locked;
|
||||
pub const RwLock = @import("event/rwlock.zig").Lock;
|
||||
pub const RwLocked = @import("event/rwlocked.zig").RwLocked;
|
||||
pub const Loop = @import("event/loop.zig").Loop;
|
||||
pub const fs = @import("event/fs.zig");
|
||||
pub const tcp = @import("event/tcp.zig");
|
||||
@ -14,6 +16,8 @@ test "import event tests" {
|
||||
_ = @import("event/group.zig");
|
||||
_ = @import("event/lock.zig");
|
||||
_ = @import("event/locked.zig");
|
||||
_ = @import("event/rwlock.zig");
|
||||
_ = @import("event/rwlocked.zig");
|
||||
_ = @import("event/loop.zig");
|
||||
_ = @import("event/tcp.zig");
|
||||
}
|
||||
|
@ -116,6 +116,10 @@ pub fn Channel(comptime T: type) type {
|
||||
return result;
|
||||
}
|
||||
|
||||
fn getOrNull(self: *SelfChannel) ?T {
|
||||
TODO();
|
||||
}
|
||||
|
||||
fn dispatch(self: *SelfChannel) void {
|
||||
// set the "need dispatch" flag
|
||||
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
|
||||
|
@ -253,7 +253,9 @@ pub async fn openReadWrite(
|
||||
}
|
||||
|
||||
/// This abstraction helps to close file handles in defer expressions
|
||||
/// without suspending. Start a CloseOperation before opening a file.
|
||||
/// without the possibility of failure and without the use of suspend points.
|
||||
/// Start a `CloseOperation` before opening a file, so that you can defer
|
||||
/// `CloseOperation.deinit`.
|
||||
pub const CloseOperation = struct {
|
||||
loop: *event.Loop,
|
||||
have_fd: bool,
|
||||
|
@ -29,6 +29,17 @@ pub fn Group(comptime ReturnType: type) type {
|
||||
};
|
||||
}
|
||||
|
||||
/// Cancel all the outstanding promises. Can be called even if wait was already called.
|
||||
pub fn deinit(self: *Self) void {
|
||||
while (self.coro_stack.pop()) |node| {
|
||||
cancel node.data;
|
||||
}
|
||||
while (self.alloc_stack.pop()) |node| {
|
||||
cancel node.data;
|
||||
self.lock.loop.allocator.destroy(node);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a promise to the group. Thread-safe.
|
||||
pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) {
|
||||
const node = try self.lock.loop.allocator.create(Stack.Node{
|
||||
@ -88,7 +99,7 @@ pub fn Group(comptime ReturnType: type) type {
|
||||
await node.data;
|
||||
} else {
|
||||
(await node.data) catch |err| {
|
||||
self.cancelAll();
|
||||
self.deinit();
|
||||
return err;
|
||||
};
|
||||
}
|
||||
@ -100,25 +111,12 @@ pub fn Group(comptime ReturnType: type) type {
|
||||
await handle;
|
||||
} else {
|
||||
(await handle) catch |err| {
|
||||
self.cancelAll();
|
||||
self.deinit();
|
||||
return err;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancel all the outstanding promises. May only be called if wait was never called.
|
||||
/// TODO These should be `cancelasync` not `cancel`.
|
||||
/// See https://github.com/ziglang/zig/issues/1261
|
||||
pub fn cancelAll(self: *Self) void {
|
||||
while (self.coro_stack.pop()) |node| {
|
||||
cancel node.data;
|
||||
}
|
||||
while (self.alloc_stack.pop()) |node| {
|
||||
cancel node.data;
|
||||
self.lock.loop.allocator.destroy(node);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ const Loop = std.event.Loop;
|
||||
/// Thread-safe async/await lock.
|
||||
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
/// Allows only one actor to hold the lock.
|
||||
pub const Lock = struct {
|
||||
loop: *Loop,
|
||||
shared_bit: u8, // TODO make this a bool
|
||||
|
292
std/event/rwlock.zig
Normal file
292
std/event/rwlock.zig
Normal file
@ -0,0 +1,292 @@
|
||||
const std = @import("../index.zig");
|
||||
const builtin = @import("builtin");
|
||||
const assert = std.debug.assert;
|
||||
const mem = std.mem;
|
||||
const AtomicRmwOp = builtin.AtomicRmwOp;
|
||||
const AtomicOrder = builtin.AtomicOrder;
|
||||
const Loop = std.event.Loop;
|
||||
|
||||
/// Thread-safe async/await lock.
|
||||
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
/// Many readers can hold the lock at the same time; however locking for writing is exclusive.
|
||||
pub const RwLock = struct {
|
||||
loop: *Loop,
|
||||
shared_state: u8, // TODO make this an enum
|
||||
writer_queue: Queue,
|
||||
reader_queue: Queue,
|
||||
writer_queue_empty_bit: u8, // TODO make this a bool
|
||||
reader_queue_empty_bit: u8, // TODO make this a bool
|
||||
reader_lock_count: usize,
|
||||
|
||||
const State = struct {
|
||||
const Unlocked = 0;
|
||||
const WriteLock = 1;
|
||||
const ReadLock = 2;
|
||||
};
|
||||
|
||||
const Queue = std.atomic.Queue(promise);
|
||||
|
||||
pub const HeldRead = struct {
|
||||
lock: *RwLock,
|
||||
|
||||
pub fn release(self: HeldRead) void {
|
||||
// If other readers still hold the lock, we're done.
|
||||
if (@atomicRmw(usize, &self.lock.reader_lock_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) != 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
_ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
|
||||
if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
|
||||
// Didn't unlock. Someone else's problem.
|
||||
return;
|
||||
}
|
||||
|
||||
self.lock.commonPostUnlock();
|
||||
}
|
||||
};
|
||||
|
||||
pub const HeldWrite = struct {
|
||||
lock: *RwLock,
|
||||
|
||||
pub fn release(self: HeldWrite) void {
|
||||
// See if we can leave it locked for writing, and pass the lock to the next writer
|
||||
// in the queue to grab the lock.
|
||||
if (self.lock.writer_queue.get()) |node| {
|
||||
self.lock.loop.onNextTick(node);
|
||||
return;
|
||||
}
|
||||
|
||||
// We need to release the write lock. Check if any readers are waiting to grab the lock.
|
||||
if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) {
|
||||
// Switch to a read lock.
|
||||
_ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.ReadLock, AtomicOrder.SeqCst);
|
||||
while (self.lock.reader_queue.get()) |node| {
|
||||
self.lock.loop.onNextTick(node);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
_ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
|
||||
_ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst);
|
||||
|
||||
self.lock.commonPostUnlock();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(loop: *Loop) RwLock {
|
||||
return RwLock{
|
||||
.loop = loop,
|
||||
.shared_state = State.Unlocked,
|
||||
.writer_queue = Queue.init(),
|
||||
.writer_queue_empty_bit = 1,
|
||||
.reader_queue = Queue.init(),
|
||||
.reader_queue_empty_bit = 1,
|
||||
.reader_lock_count = 0,
|
||||
};
|
||||
}
|
||||
|
||||
/// Must be called when not locked. Not thread safe.
|
||||
/// All calls to acquire() and release() must complete before calling deinit().
|
||||
pub fn deinit(self: *RwLock) void {
|
||||
assert(self.shared_state == State.Unlocked);
|
||||
while (self.writer_queue.get()) |node| cancel node.data;
|
||||
while (self.reader_queue.get()) |node| cancel node.data;
|
||||
}
|
||||
|
||||
pub async fn acquireRead(self: *RwLock) HeldRead {
|
||||
_ = @atomicRmw(usize, &self.reader_lock_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
|
||||
|
||||
suspend |handle| {
|
||||
// TODO explicitly put this memory in the coroutine frame #1194
|
||||
var my_tick_node = Loop.NextTickNode{
|
||||
.data = handle,
|
||||
.next = undefined,
|
||||
};
|
||||
|
||||
self.reader_queue.put(&my_tick_node);
|
||||
|
||||
// At this point, we are in the reader_queue, so we might have already been resumed and this coroutine
|
||||
// frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
|
||||
|
||||
// We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1,
|
||||
// some actor will attempt to grab the lock.
|
||||
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
|
||||
|
||||
// Here we don't care if we are the one to do the locking or if it was already locked for reading.
|
||||
const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst)) |old_state| old_state == State.ReadLock else true;
|
||||
if (have_read_lock) {
|
||||
// Give out all the read locks.
|
||||
if (self.reader_queue.get()) |first_node| {
|
||||
while (self.reader_queue.get()) |node| {
|
||||
self.loop.onNextTick(node);
|
||||
}
|
||||
resume first_node.data;
|
||||
}
|
||||
}
|
||||
}
|
||||
return HeldRead{ .lock = self };
|
||||
}
|
||||
|
||||
pub async fn acquireWrite(self: *RwLock) HeldWrite {
|
||||
suspend |handle| {
|
||||
// TODO explicitly put this memory in the coroutine frame #1194
|
||||
var my_tick_node = Loop.NextTickNode{
|
||||
.data = handle,
|
||||
.next = undefined,
|
||||
};
|
||||
|
||||
self.writer_queue.put(&my_tick_node);
|
||||
|
||||
// At this point, we are in the writer_queue, so we might have already been resumed and this coroutine
|
||||
// frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
|
||||
|
||||
// We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1,
|
||||
// some actor will attempt to grab the lock.
|
||||
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
|
||||
|
||||
// Here we must be the one to acquire the write lock. It cannot already be locked.
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) == null) {
|
||||
// We now have a write lock.
|
||||
if (self.writer_queue.get()) |node| {
|
||||
// Whether this node is us or someone else, we tail resume it.
|
||||
resume node.data;
|
||||
}
|
||||
}
|
||||
}
|
||||
return HeldWrite{ .lock = self };
|
||||
}
|
||||
|
||||
fn commonPostUnlock(self: *RwLock) void {
|
||||
while (true) {
|
||||
// There might be a writer_queue item or a reader_queue item
|
||||
// If we check and both are empty, we can be done, because the other actors will try to
|
||||
// obtain the lock.
|
||||
// But if there's a writer_queue item or a reader_queue item,
|
||||
// we are the actor which must loop and attempt to grab the lock again.
|
||||
if (@atomicLoad(u8, &self.writer_queue_empty_bit, AtomicOrder.SeqCst) == 0) {
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
|
||||
// We did not obtain the lock. Great, the queues are someone else's problem.
|
||||
return;
|
||||
}
|
||||
// If there's an item in the writer queue, give them the lock, and we're done.
|
||||
if (self.writer_queue.get()) |node| {
|
||||
self.loop.onNextTick(node);
|
||||
return;
|
||||
}
|
||||
// Release the lock again.
|
||||
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
|
||||
_ = @atomicRmw(u8, &self.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (@atomicLoad(u8, &self.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) {
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
|
||||
// We did not obtain the lock. Great, the queues are someone else's problem.
|
||||
return;
|
||||
}
|
||||
// If there are any items in the reader queue, give out all the reader locks, and we're done.
|
||||
if (self.reader_queue.get()) |first_node| {
|
||||
self.loop.onNextTick(first_node);
|
||||
while (self.reader_queue.get()) |node| {
|
||||
self.loop.onNextTick(node);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// Release the lock again.
|
||||
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
|
||||
// Didn't unlock. Someone else's problem.
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
test "std.event.RwLock" {
|
||||
var da = std.heap.DirectAllocator.init();
|
||||
defer da.deinit();
|
||||
|
||||
const allocator = &da.allocator;
|
||||
|
||||
var loop: Loop = undefined;
|
||||
try loop.initMultiThreaded(allocator);
|
||||
defer loop.deinit();
|
||||
|
||||
var lock = RwLock.init(&loop);
|
||||
defer lock.deinit();
|
||||
|
||||
const handle = try async<allocator> testLock(&loop, &lock);
|
||||
defer cancel handle;
|
||||
loop.run();
|
||||
|
||||
const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
|
||||
assert(mem.eql(i32, shared_test_data, expected_result));
|
||||
}
|
||||
|
||||
async fn testLock(loop: *Loop, lock: *RwLock) void {
|
||||
// TODO explicitly put next tick node memory in the coroutine frame #1194
|
||||
suspend |p| {
|
||||
resume p;
|
||||
}
|
||||
|
||||
var read_nodes: [100]Loop.NextTickNode = undefined;
|
||||
for (read_nodes) |*read_node| {
|
||||
read_node.data = async readRunner(lock) catch @panic("out of memory");
|
||||
loop.onNextTick(read_node);
|
||||
}
|
||||
|
||||
var write_nodes: [shared_it_count]Loop.NextTickNode = undefined;
|
||||
for (write_nodes) |*write_node| {
|
||||
write_node.data = async writeRunner(lock) catch @panic("out of memory");
|
||||
loop.onNextTick(write_node);
|
||||
}
|
||||
|
||||
for (write_nodes) |*write_node| {
|
||||
await @ptrCast(promise->void, write_node.data);
|
||||
}
|
||||
for (read_nodes) |*read_node| {
|
||||
await @ptrCast(promise->void, read_node.data);
|
||||
}
|
||||
}
|
||||
|
||||
const shared_it_count = 10;
|
||||
var shared_test_data = [1]i32{0} ** 10;
|
||||
var shared_test_index: usize = 0;
|
||||
var shared_count: usize = 0;
|
||||
|
||||
async fn writeRunner(lock: *RwLock) void {
|
||||
suspend; // resumed by onNextTick
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < shared_test_data.len) : (i += 1) {
|
||||
std.os.time.sleep(0, 100000);
|
||||
const lock_promise = async lock.acquireWrite() catch @panic("out of memory");
|
||||
const handle = await lock_promise;
|
||||
defer handle.release();
|
||||
|
||||
shared_count += 1;
|
||||
while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
|
||||
shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
|
||||
}
|
||||
shared_test_index = 0;
|
||||
}
|
||||
}
|
||||
|
||||
async fn readRunner(lock: *RwLock) void {
|
||||
suspend; // resumed by onNextTick
|
||||
std.os.time.sleep(0, 1);
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < shared_test_data.len) : (i += 1) {
|
||||
const lock_promise = async lock.acquireRead() catch @panic("out of memory");
|
||||
const handle = await lock_promise;
|
||||
defer handle.release();
|
||||
|
||||
assert(shared_test_index == 0);
|
||||
assert(shared_test_data[i] == @intCast(i32, shared_count));
|
||||
}
|
||||
}
|
58
std/event/rwlocked.zig
Normal file
58
std/event/rwlocked.zig
Normal file
@ -0,0 +1,58 @@
|
||||
const std = @import("../index.zig");
|
||||
const RwLock = std.event.RwLock;
|
||||
const Loop = std.event.Loop;
|
||||
|
||||
/// Thread-safe async/await RW lock that protects one piece of data.
|
||||
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
|
||||
/// are resumed when the lock is released, in order.
|
||||
pub fn RwLocked(comptime T: type) type {
|
||||
return struct {
|
||||
lock: RwLock,
|
||||
locked_data: T,
|
||||
|
||||
const Self = this;
|
||||
|
||||
pub const HeldReadLock = struct {
|
||||
value: *const T,
|
||||
held: RwLock.HeldRead,
|
||||
|
||||
pub fn release(self: HeldReadLock) void {
|
||||
self.held.release();
|
||||
}
|
||||
};
|
||||
|
||||
pub const HeldWriteLock = struct {
|
||||
value: *T,
|
||||
held: RwLock.HeldWrite,
|
||||
|
||||
pub fn release(self: HeldWriteLock) void {
|
||||
self.held.release();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(loop: *Loop, data: T) Self {
|
||||
return Self{
|
||||
.lock = RwLock.init(loop),
|
||||
.locked_data = data,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.lock.deinit();
|
||||
}
|
||||
|
||||
pub async fn acquireRead(self: *Self) HeldReadLock {
|
||||
return HeldReadLock{
|
||||
.held = await (async self.lock.acquireRead() catch unreachable),
|
||||
.value = &self.locked_data,
|
||||
};
|
||||
}
|
||||
|
||||
pub async fn acquireWrite(self: *Self) HeldWriteLock {
|
||||
return HeldWriteLock{
|
||||
.held = await (async self.lock.acquireWrite() catch unreachable),
|
||||
.value = &self.locked_data,
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
@ -61,7 +61,7 @@ pub const Server = struct {
|
||||
|
||||
/// Stop listening
|
||||
pub fn close(self: *Server) void {
|
||||
self.loop.removeFd(self.sockfd.?);
|
||||
self.loop.linuxRemoveFd(self.sockfd.?);
|
||||
std.os.close(self.sockfd.?);
|
||||
}
|
||||
|
||||
@ -116,7 +116,7 @@ pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File
|
||||
errdefer std.os.close(sockfd);
|
||||
|
||||
try std.os.posixConnectAsync(sockfd, &address.os_addr);
|
||||
try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT);
|
||||
try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET);
|
||||
try std.os.posixGetSockOptConnectError(sockfd);
|
||||
|
||||
return std.os.File.openHandle(sockfd);
|
||||
|
@ -163,6 +163,16 @@ pub fn HashMap(comptime K: type, comptime V: type, comptime hash: fn (key: K) u3
|
||||
};
|
||||
}
|
||||
|
||||
pub fn clone(self: Self) !Self {
|
||||
var other = Self.init(self.allocator);
|
||||
try other.initCapacity(self.entries.len);
|
||||
var it = self.iterator();
|
||||
while (it.next()) |entry| {
|
||||
try other.put(entry.key, entry.value);
|
||||
}
|
||||
return other;
|
||||
}
|
||||
|
||||
fn initCapacity(hm: *Self, capacity: usize) !void {
|
||||
hm.entries = try hm.allocator.alloc(Entry, capacity);
|
||||
hm.size = 0;
|
||||
|
@ -9,6 +9,7 @@ pub const LinkedList = @import("linked_list.zig").LinkedList;
|
||||
pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList;
|
||||
pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
|
||||
pub const DynLib = @import("dynamic_library.zig").DynLib;
|
||||
pub const Mutex = @import("mutex.zig").Mutex;
|
||||
|
||||
pub const atomic = @import("atomic/index.zig");
|
||||
pub const base64 = @import("base64.zig");
|
||||
@ -48,6 +49,7 @@ test "std" {
|
||||
_ = @import("hash_map.zig");
|
||||
_ = @import("linked_list.zig");
|
||||
_ = @import("segmented_list.zig");
|
||||
_ = @import("mutex.zig");
|
||||
|
||||
_ = @import("base64.zig");
|
||||
_ = @import("build.zig");
|
||||
|
27
std/mutex.zig
Normal file
27
std/mutex.zig
Normal file
@ -0,0 +1,27 @@
|
||||
const std = @import("index.zig");
|
||||
const builtin = @import("builtin");
|
||||
const AtomicOrder = builtin.AtomicOrder;
|
||||
const AtomicRmwOp = builtin.AtomicRmwOp;
|
||||
const assert = std.debug.assert;
|
||||
|
||||
/// TODO use syscalls instead of a spinlock
|
||||
pub const Mutex = struct {
|
||||
lock: u8, // TODO use a bool
|
||||
|
||||
pub const Held = struct {
|
||||
mutex: *Mutex,
|
||||
|
||||
pub fn release(self: Held) void {
|
||||
assert(@atomicRmw(u8, &self.mutex.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init() Mutex {
|
||||
return Mutex{ .lock = 0 };
|
||||
}
|
||||
|
||||
pub fn acquire(self: *Mutex) Held {
|
||||
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
|
||||
return Held{ .mutex = self };
|
||||
}
|
||||
};
|
Loading…
Reference in New Issue
Block a user