From 94ffb0a282872c2f4b14f757fa1aef2302aeaabb Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 30 Aug 2021 11:55:22 -0600 Subject: [PATCH 01/16] io-wq: fix race between adding work and activating a free worker The attempt to find and activate a free worker for new work is currently combined with creating a new one if we don't find one, but that opens io-wq up to a race where the worker that is found and activated can put itself to sleep without knowing that it has been selected to perform this new work. Fix this by moving the activation into where we add the new work item, then we can retain it within the wqe->lock scope and elimiate the race with the worker itself checking inside the lock, but sleeping outside of it. Cc: stable@vger.kernel.org Reported-by: Andres Freund Signed-off-by: Jens Axboe --- fs/io-wq.c | 51 ++++++++++++++++++++++++--------------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index cd9bd095fb1b..94f8f2ecb8e5 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -236,9 +236,9 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, create one. */ -static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) +static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { - bool ret; + bool do_create = false, first = false; /* * Most likely an attempt to queue unbounded work on an io_wq that @@ -247,26 +247,18 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) if (unlikely(!acct->max_workers)) pr_warn_once("io-wq is not configured for unbound workers"); - rcu_read_lock(); - ret = io_wqe_activate_free_worker(wqe); - rcu_read_unlock(); - - if (!ret) { - bool do_create = false, first = false; - - raw_spin_lock(&wqe->lock); - if (acct->nr_workers < acct->max_workers) { - if (!acct->nr_workers) - first = true; - acct->nr_workers++; - do_create = true; - } - raw_spin_unlock(&wqe->lock); - if (do_create) { - atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - create_io_worker(wqe->wq, wqe, acct->index, first); - } + raw_spin_lock(&wqe->lock); + if (acct->nr_workers < acct->max_workers) { + if (!acct->nr_workers) + first = true; + acct->nr_workers++; + do_create = true; + } + raw_spin_unlock(&wqe->lock); + if (do_create) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + create_io_worker(wqe->wq, wqe, acct->index, first); } } @@ -794,7 +786,8 @@ append: static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); - bool do_wake; + unsigned work_flags = work->flags; + bool do_create; /* * If io-wq is exiting for this task, or if the request has explicitly @@ -809,12 +802,16 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) raw_spin_lock(&wqe->lock); io_wqe_insert_work(wqe, work); wqe->flags &= ~IO_WQE_FLAG_STALLED; - do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) || - !atomic_read(&acct->nr_running); + + rcu_read_lock(); + do_create = !io_wqe_activate_free_worker(wqe); + rcu_read_unlock(); + raw_spin_unlock(&wqe->lock); - if (do_wake) - io_wqe_wake_worker(wqe, acct); + if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || + !atomic_read(&acct->nr_running))) + io_wqe_create_worker(wqe, acct); } void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) From 7b3188e7ed54102a5dcc73d07727f41fb528f7c8 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 30 Aug 2021 19:37:41 -0600 Subject: [PATCH 02/16] io_uring: IORING_OP_WRITE needs hash_reg_file set During some testing, it became evident that using IORING_OP_WRITE doesn't hash buffered writes like the other writes commands do. That's simply an oversight, and can cause performance regressions when doing buffered writes with this command. Correct that and add the flag, so that buffered writes are correctly hashed when using the non-iovec based write command. Cc: stable@vger.kernel.org Fixes: 3a6820f2bb8a ("io_uring: add non-vectored read/write commands") Signed-off-by: Jens Axboe --- fs/io_uring.c | 1 + 1 file changed, 1 insertion(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index 7cc458e0b636..473a977c7979 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -995,6 +995,7 @@ static const struct io_op_def io_op_defs[] = { }, [IORING_OP_WRITE] = { .needs_file = 1, + .hash_reg_file = 1, .unbound_nonreg_file = 1, .pollout = 1, .plug = 1, From 7db304375e11741e5940f9bc549155035bfb4dc1 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Sat, 21 Aug 2021 23:07:51 +0800 Subject: [PATCH 03/16] io_uring: retry in case of short read on block device In case of buffered reading from block device, when short read happens, we should retry to read more, otherwise the IO will be completed partially, for example, the following fio expects to read 2MB, but it can only read 1M or less bytes: fio --name=onessd --filename=/dev/nvme0n1 --filesize=2M \ --rw=randread --bs=2M --direct=0 --overwrite=0 --numjobs=1 \ --iodepth=1 --time_based=0 --runtime=2 --ioengine=io_uring \ --registerfiles --fixedbufs --gtod_reduce=1 --group_reporting Fix the issue by allowing short read retry for block device, which sets FMODE_BUF_RASYNC really. Fixes: 9a173346bd9e ("io_uring: fix short read retries for non-reg files") Cc: Pavel Begunkov Signed-off-by: Ming Lei Reviewed-by: Pavel Begunkov Link: https://lore.kernel.org/r/20210821150751.1290434-1-ming.lei@redhat.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 473a977c7979..364e77d5fe6c 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3382,6 +3382,12 @@ static inline int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter) return -EINVAL; } +static bool need_read_all(struct io_kiocb *req) +{ + return req->flags & REQ_F_ISREG || + S_ISBLK(file_inode(req->file)->i_mode); +} + static int io_read(struct io_kiocb *req, unsigned int issue_flags) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; @@ -3436,7 +3442,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) } else if (ret == -EIOCBQUEUED) { goto out_free; } else if (ret <= 0 || ret == io_size || !force_nonblock || - (req->flags & REQ_F_NOWAIT) || !(req->flags & REQ_F_ISREG)) { + (req->flags & REQ_F_NOWAIT) || !need_read_all(req)) { /* read all, failed, already did sync or don't want to retry */ goto done; } From 08bdbd39b58474d762242e1fadb7f2eb9ffcca71 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 31 Aug 2021 06:57:25 -0600 Subject: [PATCH 04/16] io-wq: ensure that hash wait lock is IRQ disabling A previous commit removed the IRQ safety of the worker and wqe locks, but that left one spot of the hash wait lock now being done without already having IRQs disabled. Ensure that we use the right locking variant for the hashed waitqueue lock. Fixes: a9a4aa9fbfc5 ("io-wq: wqe and worker locks no longer need to be IRQ safe") Signed-off-by: Jens Axboe --- fs/io-wq.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 94f8f2ecb8e5..a94060b72f84 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -405,7 +405,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) { struct io_wq *wq = wqe->wq; - spin_lock(&wq->hash->wait.lock); + spin_lock_irq(&wq->hash->wait.lock); if (list_empty(&wqe->wait.entry)) { __add_wait_queue(&wq->hash->wait, &wqe->wait); if (!test_bit(hash, &wq->hash->map)) { @@ -413,7 +413,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) list_del_init(&wqe->wait.entry); } } - spin_unlock(&wq->hash->wait.lock); + spin_unlock_irq(&wq->hash->wait.lock); } /* From c6d3d9cbd659de8f2176b4e4721149c88ac096d4 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 31 Aug 2021 14:13:10 +0100 Subject: [PATCH 05/16] io_uring: fix queueing half-created requests [ 27.259845] general protection fault, probably for non-canonical address 0xdffffc0000000005: 0000 [#1] SMP KASAN PTI [ 27.261043] KASAN: null-ptr-deref in range [0x0000000000000028-0x000000000000002f] [ 27.263730] RIP: 0010:sock_from_file+0x20/0x90 [ 27.272444] Call Trace: [ 27.272736] io_sendmsg+0x98/0x600 [ 27.279216] io_issue_sqe+0x498/0x68d0 [ 27.281142] __io_queue_sqe+0xab/0xb50 [ 27.285830] io_req_task_submit+0xbf/0x1b0 [ 27.286306] tctx_task_work+0x178/0xad0 [ 27.288211] task_work_run+0xe2/0x190 [ 27.288571] exit_to_user_mode_prepare+0x1a1/0x1b0 [ 27.289041] syscall_exit_to_user_mode+0x19/0x50 [ 27.289521] do_syscall_64+0x48/0x90 [ 27.289871] entry_SYSCALL_64_after_hwframe+0x44/0xae io_req_complete_failed() -> io_req_complete_post() -> io_req_task_queue() still would try to enqueue hard linked request, which can be half prepared (e.g. failed init), so we can't allow that to happen. Fixes: a8295b982c46d ("io_uring: fix failed linkchain code logic") Reported-by: syzbot+f9704d1878e290eddf73@syzkaller.appspotmail.com Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/70b513848c1000f88bd75965504649c6bb1415c0.1630415423.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 364e77d5fe6c..78f3d3ac2280 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1823,6 +1823,17 @@ static void io_req_complete_failed(struct io_kiocb *req, long res) io_req_complete_post(req, res, 0); } +static void io_req_complete_fail_submit(struct io_kiocb *req) +{ + /* + * We don't submit, fail them all, for that replace hardlinks with + * normal links. Extra REQ_F_LINK is tolerated. + */ + req->flags &= ~REQ_F_HARDLINK; + req->flags |= REQ_F_LINK; + io_req_complete_failed(req, req->result); +} + /* * Don't initialise the fields below on every allocation, but do that in * advance and keep them valid across allocations. @@ -6723,7 +6734,7 @@ static inline void io_queue_sqe(struct io_kiocb *req) if (likely(!(req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL)))) { __io_queue_sqe(req); } else if (req->flags & REQ_F_FAIL) { - io_req_complete_failed(req, req->result); + io_req_complete_fail_submit(req); } else { int ret = io_req_prep_async(req); From b8ce1b9d25ccf81e1bbabd45b963ed98b2222df8 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 31 Aug 2021 14:13:11 +0100 Subject: [PATCH 06/16] io_uring: don't submit half-prepared drain request [ 3784.910888] BUG: kernel NULL pointer dereference, address: 0000000000000020 [ 3784.910904] RIP: 0010:__io_file_supports_nowait+0x5/0xc0 [ 3784.910926] Call Trace: [ 3784.910928] ? io_read+0x17c/0x480 [ 3784.910945] io_issue_sqe+0xcb/0x1840 [ 3784.910953] __io_queue_sqe+0x44/0x300 [ 3784.910959] io_req_task_submit+0x27/0x70 [ 3784.910962] tctx_task_work+0xeb/0x1d0 [ 3784.910966] task_work_run+0x61/0xa0 [ 3784.910968] io_run_task_work_sig+0x53/0xa0 [ 3784.910975] __x64_sys_io_uring_enter+0x22/0x30 [ 3784.910977] do_syscall_64+0x3d/0x90 [ 3784.910981] entry_SYSCALL_64_after_hwframe+0x44/0xae io_drain_req() goes before checks for REQ_F_FAIL, which protect us from submitting under-prepared request (e.g. failed in io_init_req(). Fail such drained requests as well. Fixes: a8295b982c46d ("io_uring: fix failed linkchain code logic") Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/e411eb9924d47a131b1e200b26b675df0c2b7627.1630415423.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index 78f3d3ac2280..4a5eb9e856f0 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -6238,6 +6238,11 @@ static bool io_drain_req(struct io_kiocb *req) int ret; u32 seq; + if (req->flags & REQ_F_FAIL) { + io_req_complete_fail_submit(req); + return true; + } + /* * If we need to drain a request in the middle of a link, drain the * head request and the next request/link after the current link. From 0242f6426ea78fbe3933b44f8c55ae93ec37f6cc Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 31 Aug 2021 13:53:00 -0600 Subject: [PATCH 07/16] io-wq: fix queue stalling race We need to set the stalled bit early, before we drop the lock for adding us to the stall hash queue. If not, then we can race with new work being queued between adding us to the stall hash and io_worker_handle_work() marking us stalled. Signed-off-by: Jens Axboe --- fs/io-wq.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index a94060b72f84..aa9656eb832e 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -436,8 +436,7 @@ static bool io_worker_can_run_work(struct io_worker *worker, } static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, - struct io_worker *worker, - bool *stalled) + struct io_worker *worker) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; @@ -475,10 +474,14 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, } if (stall_hash != -1U) { + /* + * Set this before dropping the lock to avoid racing with new + * work being added and clearing the stalled bit. + */ + wqe->flags |= IO_WQE_FLAG_STALLED; raw_spin_unlock(&wqe->lock); io_wait_on_hash(wqe, stall_hash); raw_spin_lock(&wqe->lock); - *stalled = true; } return NULL; @@ -518,7 +521,6 @@ static void io_worker_handle_work(struct io_worker *worker) do { struct io_wq_work *work; - bool stalled; get_next: /* * If we got some work, mark us as busy. If we didn't, but @@ -527,12 +529,9 @@ get_next: * can't make progress, any work completion or insertion will * clear the stalled flag. */ - stalled = false; - work = io_get_next_work(wqe, worker, &stalled); + work = io_get_next_work(wqe, worker); if (work) __io_worker_busy(wqe, worker, work); - else if (stalled) - wqe->flags |= IO_WQE_FLAG_STALLED; raw_spin_unlock(&wqe->lock); if (!work) From f95dc207b93da9c88ddbb7741ec3730c6657b88e Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 31 Aug 2021 13:57:32 -0600 Subject: [PATCH 08/16] io-wq: split bounded and unbounded work into separate lists We've got a few issues that all boil down to the fact that we have one list of pending work items, yet two different types of workers to serve them. This causes some oddities around workers switching type and even hashed work vs regular work on the same bounded list. Just separate them out cleanly, similarly to how we already do accounting of what is running. That provides a clean separation and removes some corner cases that can cause stalls when handling IO that is punted to io-wq. Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state") Signed-off-by: Jens Axboe --- fs/io-wq.c | 156 +++++++++++++++++++++++------------------------------ 1 file changed, 68 insertions(+), 88 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index aa9656eb832e..8cba77a937a1 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -32,7 +32,7 @@ enum { }; enum { - IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ + IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ }; /* @@ -71,25 +71,24 @@ struct io_wqe_acct { unsigned max_workers; int index; atomic_t nr_running; + struct io_wq_work_list work_list; + unsigned long flags; }; enum { IO_WQ_ACCT_BOUND, IO_WQ_ACCT_UNBOUND, + IO_WQ_ACCT_NR, }; /* * Per-node worker thread pool */ struct io_wqe { - struct { - raw_spinlock_t lock; - struct io_wq_work_list work_list; - unsigned flags; - } ____cacheline_aligned_in_smp; + raw_spinlock_t lock; + struct io_wqe_acct acct[2]; int node; - struct io_wqe_acct acct[2]; struct hlist_nulls_head free_list; struct list_head all_list; @@ -195,11 +194,10 @@ static void io_worker_exit(struct io_worker *worker) do_exit(0); } -static inline bool io_wqe_run_queue(struct io_wqe *wqe) - __must_hold(wqe->lock) +static inline bool io_acct_run_queue(struct io_wqe_acct *acct) { - if (!wq_list_empty(&wqe->work_list) && - !(wqe->flags & IO_WQE_FLAG_STALLED)) + if (!wq_list_empty(&acct->work_list) && + !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) return true; return false; } @@ -208,7 +206,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) * Check head of free list for an available worker. If one isn't available, * caller must create one. */ -static bool io_wqe_activate_free_worker(struct io_wqe *wqe) +static bool io_wqe_activate_free_worker(struct io_wqe *wqe, + struct io_wqe_acct *acct) __must_hold(RCU) { struct hlist_nulls_node *n; @@ -222,6 +221,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { if (!io_worker_get(worker)) continue; + if (io_wqe_get_acct(worker) != acct) { + io_worker_release(worker); + continue; + } if (wake_up_process(worker->task)) { io_worker_release(worker); return true; @@ -340,7 +343,7 @@ static void io_wqe_dec_running(struct io_worker *worker) if (!(worker->flags & IO_WORKER_F_UP)) return; - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { + if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); io_queue_worker_create(wqe, worker, acct); @@ -355,29 +358,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, struct io_wq_work *work) __must_hold(wqe->lock) { - bool worker_bound, work_bound; - - BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); - if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); } - - /* - * If worker is moving from bound to unbound (or vice versa), then - * ensure we update the running accounting. - */ - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; - if (worker_bound != work_bound) { - int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; - io_wqe_dec_running(worker); - worker->flags ^= IO_WORKER_F_BOUND; - wqe->acct[index].nr_workers--; - wqe->acct[index ^ 1].nr_workers++; - io_wqe_inc_running(worker); - } } /* @@ -416,44 +400,23 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) spin_unlock_irq(&wq->hash->wait.lock); } -/* - * We can always run the work if the worker is currently the same type as - * the work (eg both are bound, or both are unbound). If they are not the - * same, only allow it if incrementing the worker count would be allowed. - */ -static bool io_worker_can_run_work(struct io_worker *worker, - struct io_wq_work *work) -{ - struct io_wqe_acct *acct; - - if (!(worker->flags & IO_WORKER_F_BOUND) != - !(work->flags & IO_WQ_WORK_UNBOUND)) - return true; - - /* not the same type, check if we'd go over the limit */ - acct = io_work_get_acct(worker->wqe, work); - return acct->nr_workers < acct->max_workers; -} - -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, struct io_worker *worker) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; unsigned int stall_hash = -1U; + struct io_wqe *wqe = worker->wqe; - wq_list_for_each(node, prev, &wqe->work_list) { + wq_list_for_each(node, prev, &acct->work_list) { unsigned int hash; work = container_of(node, struct io_wq_work, list); - if (!io_worker_can_run_work(worker, work)) - break; - /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { - wq_list_del(&wqe->work_list, node, prev); + wq_list_del(&acct->work_list, node, prev); return work; } @@ -464,7 +427,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, /* hashed, can run if not already running */ if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { wqe->hash_tail[hash] = NULL; - wq_list_cut(&wqe->work_list, &tail->list, prev); + wq_list_cut(&acct->work_list, &tail->list, prev); return work; } if (stall_hash == -1U) @@ -478,7 +441,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, * Set this before dropping the lock to avoid racing with new * work being added and clearing the stalled bit. */ - wqe->flags |= IO_WQE_FLAG_STALLED; + set_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&wqe->lock); io_wait_on_hash(wqe, stall_hash); raw_spin_lock(&wqe->lock); @@ -515,6 +478,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { + struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); @@ -529,7 +493,7 @@ get_next: * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe, worker); + work = io_get_next_work(acct, worker); if (work) __io_worker_busy(wqe, worker, work); @@ -563,10 +527,10 @@ get_next: if (hash != -1U && !next_hashed) { clear_bit(hash, &wq->hash->map); + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); if (wq_has_sleeper(&wq->hash->wait)) wake_up(&wq->hash->wait); raw_spin_lock(&wqe->lock); - wqe->flags &= ~IO_WQE_FLAG_STALLED; /* skip unnecessary unlock-lock wqe->lock */ if (!work) goto get_next; @@ -581,6 +545,7 @@ get_next: static int io_wqe_worker(void *data) { struct io_worker *worker = data; + struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; char buf[TASK_COMM_LEN]; @@ -596,7 +561,7 @@ static int io_wqe_worker(void *data) set_current_state(TASK_INTERRUPTIBLE); loop: raw_spin_lock(&wqe->lock); - if (io_wqe_run_queue(wqe)) { + if (io_acct_run_queue(acct)) { io_worker_handle_work(worker); goto loop; } @@ -764,12 +729,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned int hash; struct io_wq_work *tail; if (!io_wq_is_hashed(work)) { append: - wq_list_add_tail(&work->list, &wqe->work_list); + wq_list_add_tail(&work->list, &acct->work_list); return; } @@ -779,7 +745,7 @@ append: if (!tail) goto append; - wq_list_add_after(&work->list, &tail->list, &wqe->work_list); + wq_list_add_after(&work->list, &tail->list, &acct->work_list); } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) @@ -800,10 +766,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) raw_spin_lock(&wqe->lock); io_wqe_insert_work(wqe, work); - wqe->flags &= ~IO_WQE_FLAG_STALLED; + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); rcu_read_lock(); - do_create = !io_wqe_activate_free_worker(wqe); + do_create = !io_wqe_activate_free_worker(wqe, acct); rcu_read_unlock(); raw_spin_unlock(&wqe->lock); @@ -855,6 +821,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, struct io_wq_work *work, struct io_wq_work_node *prev) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; @@ -866,7 +833,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, else wqe->hash_tail[hash] = NULL; } - wq_list_del(&wqe->work_list, &work->list, prev); + wq_list_del(&acct->work_list, &work->list, prev); } static void io_wqe_cancel_pending_work(struct io_wqe *wqe, @@ -874,22 +841,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe, { struct io_wq_work_node *node, *prev; struct io_wq_work *work; + int i; retry: raw_spin_lock(&wqe->lock); - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); - if (!match->fn(work, match->data)) - continue; - io_wqe_remove_pending(wqe, work, prev); - raw_spin_unlock(&wqe->lock); - io_run_cancel(work, wqe); - match->nr_pending++; - if (!match->cancel_all) - return; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); - /* not safe to continue after unlock */ - goto retry; + wq_list_for_each(node, prev, &acct->work_list) { + work = container_of(node, struct io_wq_work, list); + if (!match->fn(work, match->data)) + continue; + io_wqe_remove_pending(wqe, work, prev); + raw_spin_unlock(&wqe->lock); + io_run_cancel(work, wqe); + match->nr_pending++; + if (!match->cancel_all) + return; + + /* not safe to continue after unlock */ + goto retry; + } } raw_spin_unlock(&wqe->lock); } @@ -950,18 +922,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); + int i; list_del_init(&wait->entry); rcu_read_lock(); - io_wqe_activate_free_worker(wqe); + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) + io_wqe_activate_free_worker(wqe, acct); + } rcu_read_unlock(); return 1; } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { - int ret, node; + int ret, node, i; struct io_wq *wq; if (WARN_ON_ONCE(!data->free_work || !data->do_work)) @@ -996,18 +974,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) cpumask_copy(wqe->cpu_mask, cpumask_of_node(node)); wq->wqes[node] = wqe; wqe->node = alloc_node; - wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; - wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; - atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); - atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); - wqe->wait.func = io_wqe_hash_wake; INIT_LIST_HEAD(&wqe->wait.entry); + wqe->wait.func = io_wqe_hash_wake; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + acct->index = i; + atomic_set(&acct->nr_running, 0); + INIT_WQ_LIST(&acct->work_list); + } wqe->wq = wq; raw_spin_lock_init(&wqe->lock); - INIT_WQ_LIST(&wqe->work_list); INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); INIT_LIST_HEAD(&wqe->all_list); } @@ -1189,7 +1169,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) for_each_node(node) { struct io_wqe_acct *acct; - for (i = 0; i < 2; i++) { + for (i = 0; i < IO_WQ_ACCT_NR; i++) { acct = &wq->wqes[node]->acct[i]; prev = max_t(int, acct->max_workers, prev); if (new_count[i]) From 15e20db2e0cecce0bfc6a67b69e55020fe9cda00 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 1 Sep 2021 11:18:41 -0600 Subject: [PATCH 09/16] io-wq: only exit on fatal signals If the application uses io_uring and also relies heavily on signals for communication, that can cause io-wq workers to spuriously exit just because the parent has a signal pending. Just ignore signals unless they are fatal. Signed-off-by: Jens Axboe --- fs/io-wq.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 8cba77a937a1..027eb4e13e3b 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -575,7 +575,9 @@ loop: if (!get_signal(&ksig)) continue; - break; + if (fatal_signal_pending(current)) + break; + continue; } if (ret) continue; From 05c5f4ee4da7086cceacc78bf3a080e314c241fa Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 1 Sep 2021 13:01:17 -0600 Subject: [PATCH 10/16] io-wq: get rid of FIXED worker flag It makes the logic easier to follow if we just get rid of the fixed worker flag, and simply ensure that we never exit the last worker in the group. This also means that no particular worker is special. Just track the last timeout state, and if we have hit it and no work is pending, check if there are other workers. If yes, then we can exit this one safely. Signed-off-by: Jens Axboe --- fs/io-wq.c | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 027eb4e13e3b..50ea07764a99 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -23,8 +23,7 @@ enum { IO_WORKER_F_UP = 1, /* up and active */ IO_WORKER_F_RUNNING = 2, /* account as running */ IO_WORKER_F_FREE = 4, /* worker on free list */ - IO_WORKER_F_FIXED = 8, /* static idle worker */ - IO_WORKER_F_BOUND = 16, /* is doing bounded work */ + IO_WORKER_F_BOUND = 8, /* is doing bounded work */ }; enum { @@ -132,7 +131,7 @@ struct io_cb_cancel_data { bool cancel_all; }; -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first); +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); static void io_wqe_dec_running(struct io_worker *worker); static bool io_worker_get(struct io_worker *worker) @@ -241,7 +240,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe, */ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { - bool do_create = false, first = false; + bool do_create = false; /* * Most likely an attempt to queue unbounded work on an io_wq that @@ -252,8 +251,6 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) raw_spin_lock(&wqe->lock); if (acct->nr_workers < acct->max_workers) { - if (!acct->nr_workers) - first = true; acct->nr_workers++; do_create = true; } @@ -261,7 +258,7 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) if (do_create) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); - create_io_worker(wqe->wq, wqe, acct->index, first); + create_io_worker(wqe->wq, wqe, acct->index); } } @@ -278,7 +275,7 @@ static void create_worker_cb(struct callback_head *cb) struct io_wq *wq; struct io_wqe *wqe; struct io_wqe_acct *acct; - bool do_create = false, first = false; + bool do_create = false; worker = container_of(cb, struct io_worker, create_work); wqe = worker->wqe; @@ -286,14 +283,12 @@ static void create_worker_cb(struct callback_head *cb) acct = &wqe->acct[worker->create_index]; raw_spin_lock(&wqe->lock); if (acct->nr_workers < acct->max_workers) { - if (!acct->nr_workers) - first = true; acct->nr_workers++; do_create = true; } raw_spin_unlock(&wqe->lock); if (do_create) { - create_io_worker(wq, wqe, worker->create_index, first); + create_io_worker(wq, wqe, worker->create_index); } else { atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -548,6 +543,7 @@ static int io_wqe_worker(void *data) struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + bool last_timeout = false; char buf[TASK_COMM_LEN]; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); @@ -565,6 +561,13 @@ loop: io_worker_handle_work(worker); goto loop; } + /* timed out, exit unless we're the last worker */ + if (last_timeout && acct->nr_workers > 1) { + raw_spin_unlock(&wqe->lock); + __set_current_state(TASK_RUNNING); + break; + } + last_timeout = false; __io_worker_idle(wqe, worker); raw_spin_unlock(&wqe->lock); if (io_flush_signals()) @@ -579,11 +582,7 @@ loop: break; continue; } - if (ret) - continue; - /* timed out, exit unless we're the fixed worker */ - if (!(worker->flags & IO_WORKER_F_FIXED)) - break; + last_timeout = !ret; } if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { @@ -634,7 +633,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock(&worker->wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first) +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; @@ -675,8 +674,6 @@ fail: worker->flags |= IO_WORKER_F_FREE; if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; - if (first && (worker->flags & IO_WORKER_F_BOUND)) - worker->flags |= IO_WORKER_F_FIXED; raw_spin_unlock(&wqe->lock); wake_up_new_task(tsk); } From 3146cba99aa284b1d4a10fbd923df953f1d18035 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 1 Sep 2021 11:20:10 -0600 Subject: [PATCH 11/16] io-wq: make worker creation resilient against signals If a task is queueing async work and also handling signals, then we can run into the case where create_io_thread() is interrupted and returns failure because of that. If this happens for creating the first worker in a group, then that worker will never get created and we can hang the ring. If we do get a fork failure, retry from task_work. With signals we have to be a bit careful as we cannot simply queue as task_work, as we'll still have signals pending at that point. Punt over a normal workqueue first and then create from task_work after that. Lastly, ensure that we handle fatal worker creations. Worker creation failures are normally not fatal, only if we fail to create one in an empty worker group can we not make progress. Right now that is ignored, ensure that we handle that and run cancel on the work item. There are two paths that create new workers - one is the "existing worker going to sleep", and the other is "no workers found for this work, create one". The former is never fatal, as workers do exist in the group. Only the latter needs to be carefully handled. Signed-off-by: Jens Axboe --- fs/io-wq.c | 237 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 173 insertions(+), 64 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 50ea07764a99..d80e4a735677 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -54,7 +54,10 @@ struct io_worker { struct callback_head create_work; int create_index; - struct rcu_head rcu; + union { + struct rcu_head rcu; + struct work_struct work; + }; }; #if BITS_PER_LONG == 64 @@ -131,8 +134,11 @@ struct io_cb_cancel_data { bool cancel_all; }; -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); static void io_wqe_dec_running(struct io_worker *worker); +static bool io_acct_cancel_pending_work(struct io_wqe *wqe, + struct io_wqe_acct *acct, + struct io_cb_cancel_data *match); static bool io_worker_get(struct io_worker *worker) { @@ -238,7 +244,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe, * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, create one. */ -static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) +static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { bool do_create = false; @@ -258,8 +264,10 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) if (do_create) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); - create_io_worker(wqe->wq, wqe, acct->index); + return create_io_worker(wqe->wq, wqe, acct->index); } + + return true; } static void io_wqe_inc_running(struct io_worker *worker) @@ -297,9 +305,11 @@ static void create_worker_cb(struct callback_head *cb) io_worker_release(worker); } -static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker, - struct io_wqe_acct *acct) +static bool io_queue_worker_create(struct io_worker *worker, + struct io_wqe_acct *acct, + task_work_func_t func) { + struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; /* raced with exit, just ignore create call */ @@ -317,16 +327,17 @@ static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker, test_and_set_bit_lock(0, &worker->create_state)) goto fail_release; - init_task_work(&worker->create_work, create_worker_cb); + init_task_work(&worker->create_work, func); worker->create_index = acct->index; if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) - return; + return true; clear_bit_unlock(0, &worker->create_state); fail_release: io_worker_release(worker); fail: atomic_dec(&acct->nr_running); io_worker_ref_put(wq); + return false; } static void io_wqe_dec_running(struct io_worker *worker) @@ -341,7 +352,7 @@ static void io_wqe_dec_running(struct io_worker *worker) if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); - io_queue_worker_create(wqe, worker, acct); + io_queue_worker_create(worker, acct, create_worker_cb); } } @@ -633,36 +644,9 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock(&worker->wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, + struct task_struct *tsk) { - struct io_wqe_acct *acct = &wqe->acct[index]; - struct io_worker *worker; - struct task_struct *tsk; - - __set_current_state(TASK_RUNNING); - - worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); - if (!worker) - goto fail; - - refcount_set(&worker->ref, 1); - worker->nulls_node.pprev = NULL; - worker->wqe = wqe; - spin_lock_init(&worker->lock); - init_completion(&worker->ref_done); - - tsk = create_io_thread(io_wqe_worker, worker, wqe->node); - if (IS_ERR(tsk)) { - kfree(worker); -fail: - atomic_dec(&acct->nr_running); - raw_spin_lock(&wqe->lock); - acct->nr_workers--; - raw_spin_unlock(&wqe->lock); - io_worker_ref_put(wq); - return; - } - tsk->pf_io_worker = worker; worker->task = tsk; set_cpus_allowed_ptr(tsk, wqe->cpu_mask); @@ -672,12 +656,118 @@ fail: hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); list_add_tail_rcu(&worker->all_list, &wqe->all_list); worker->flags |= IO_WORKER_F_FREE; - if (index == IO_WQ_ACCT_BOUND) - worker->flags |= IO_WORKER_F_BOUND; raw_spin_unlock(&wqe->lock); wake_up_new_task(tsk); } +static bool io_wq_work_match_all(struct io_wq_work *work, void *data) +{ + return true; +} + +static inline bool io_should_retry_thread(long err) +{ + switch (err) { + case -EAGAIN: + case -ERESTARTSYS: + case -ERESTARTNOINTR: + case -ERESTARTNOHAND: + return true; + default: + return false; + } +} + +static void create_worker_cont(struct callback_head *cb) +{ + struct io_worker *worker; + struct task_struct *tsk; + struct io_wqe *wqe; + + worker = container_of(cb, struct io_worker, create_work); + clear_bit_unlock(0, &worker->create_state); + wqe = worker->wqe; + tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + if (!IS_ERR(tsk)) { + io_init_new_worker(wqe, worker, tsk); + io_worker_release(worker); + return; + } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + struct io_wqe_acct *acct = io_wqe_get_acct(worker); + + atomic_dec(&acct->nr_running); + raw_spin_lock(&wqe->lock); + acct->nr_workers--; + if (!acct->nr_workers) { + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; + + while (io_acct_cancel_pending_work(wqe, acct, &match)) + raw_spin_lock(&wqe->lock); + } + raw_spin_unlock(&wqe->lock); + io_worker_ref_put(wqe->wq); + return; + } + + /* re-create attempts grab a new worker ref, drop the existing one */ + io_worker_release(worker); + schedule_work(&worker->work); +} + +static void io_workqueue_create(struct work_struct *work) +{ + struct io_worker *worker = container_of(work, struct io_worker, work); + struct io_wqe_acct *acct = io_wqe_get_acct(worker); + + if (!io_queue_worker_create(worker, acct, create_worker_cont)) { + clear_bit_unlock(0, &worker->create_state); + io_worker_release(worker); + } +} + +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +{ + struct io_wqe_acct *acct = &wqe->acct[index]; + struct io_worker *worker; + struct task_struct *tsk; + + __set_current_state(TASK_RUNNING); + + worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); + if (!worker) { +fail: + atomic_dec(&acct->nr_running); + raw_spin_lock(&wqe->lock); + acct->nr_workers--; + raw_spin_unlock(&wqe->lock); + io_worker_ref_put(wq); + return false; + } + + refcount_set(&worker->ref, 1); + worker->wqe = wqe; + spin_lock_init(&worker->lock); + init_completion(&worker->ref_done); + + if (index == IO_WQ_ACCT_BOUND) + worker->flags |= IO_WORKER_F_BOUND; + + tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + if (!IS_ERR(tsk)) { + io_init_new_worker(wqe, worker, tsk); + } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + goto fail; + } else { + INIT_WORK(&worker->work, io_workqueue_create); + schedule_work(&worker->work); + } + + return true; +} + /* * Iterate the passed in list and call the specific function for each * worker that isn't exiting @@ -710,11 +800,6 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } -static bool io_wq_work_match_all(struct io_wq_work *work, void *data) -{ - return true; -} - static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) { struct io_wq *wq = wqe->wq; @@ -759,6 +844,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) */ if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) || (work->flags & IO_WQ_WORK_CANCEL)) { +run_cancel: io_run_cancel(work, wqe); return; } @@ -774,8 +860,20 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) raw_spin_unlock(&wqe->lock); if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || - !atomic_read(&acct->nr_running))) - io_wqe_create_worker(wqe, acct); + !atomic_read(&acct->nr_running))) { + bool did_create; + + did_create = io_wqe_create_worker(wqe, acct); + if (unlikely(!did_create)) { + raw_spin_lock(&wqe->lock); + /* fatal condition, failed to create the first worker */ + if (!acct->nr_workers) { + raw_spin_unlock(&wqe->lock); + goto run_cancel; + } + raw_spin_unlock(&wqe->lock); + } + } } void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) @@ -835,31 +933,42 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, wq_list_del(&acct->work_list, &work->list, prev); } -static void io_wqe_cancel_pending_work(struct io_wqe *wqe, - struct io_cb_cancel_data *match) +static bool io_acct_cancel_pending_work(struct io_wqe *wqe, + struct io_wqe_acct *acct, + struct io_cb_cancel_data *match) + __releases(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; - int i; + wq_list_for_each(node, prev, &acct->work_list) { + work = container_of(node, struct io_wq_work, list); + if (!match->fn(work, match->data)) + continue; + io_wqe_remove_pending(wqe, work, prev); + raw_spin_unlock(&wqe->lock); + io_run_cancel(work, wqe); + match->nr_pending++; + /* not safe to continue after unlock */ + return true; + } + + return false; +} + +static void io_wqe_cancel_pending_work(struct io_wqe *wqe, + struct io_cb_cancel_data *match) +{ + int i; retry: raw_spin_lock(&wqe->lock); for (i = 0; i < IO_WQ_ACCT_NR; i++) { struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); - wq_list_for_each(node, prev, &acct->work_list) { - work = container_of(node, struct io_wq_work, list); - if (!match->fn(work, match->data)) - continue; - io_wqe_remove_pending(wqe, work, prev); - raw_spin_unlock(&wqe->lock); - io_run_cancel(work, wqe); - match->nr_pending++; - if (!match->cancel_all) - return; - - /* not safe to continue after unlock */ - goto retry; + if (io_acct_cancel_pending_work(wqe, acct, match)) { + if (match->cancel_all) + goto retry; + return; } } raw_spin_unlock(&wqe->lock); @@ -1013,7 +1122,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data) { struct io_worker *worker; - if (cb->func != create_worker_cb) + if (cb->func != create_worker_cb || cb->func != create_worker_cont) return false; worker = container_of(cb, struct io_worker, create_work); return worker->wqe->wq == data; From fa84693b3c896460831fe0750554121121a23da8 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 1 Sep 2021 14:15:59 -0600 Subject: [PATCH 12/16] io_uring: ensure IORING_REGISTER_IOWQ_MAX_WORKERS works with SQPOLL SQPOLL has a different thread doing submissions, we need to check for that and use the right task context when updating the worker values. Just hold the sqd->lock across the operation, this ensures that the thread cannot go away while we poke at ->io_uring. Link: https://github.com/axboe/liburing/issues/420 Fixes: 2e480058ddc2 ("io-wq: provide a way to limit max number of workers") Reported-by: Johannes Lundberg Tested-by: Johannes Lundberg Signed-off-by: Jens Axboe --- fs/io_uring.c | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 4a5eb9e856f0..4ad0d17dc92d 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -10323,26 +10323,46 @@ static int io_unregister_iowq_aff(struct io_ring_ctx *ctx) static int io_register_iowq_max_workers(struct io_ring_ctx *ctx, void __user *arg) { - struct io_uring_task *tctx = current->io_uring; + struct io_uring_task *tctx = NULL; + struct io_sq_data *sqd = NULL; __u32 new_count[2]; int i, ret; - if (!tctx || !tctx->io_wq) - return -EINVAL; if (copy_from_user(new_count, arg, sizeof(new_count))) return -EFAULT; for (i = 0; i < ARRAY_SIZE(new_count); i++) if (new_count[i] > INT_MAX) return -EINVAL; + if (ctx->flags & IORING_SETUP_SQPOLL) { + sqd = ctx->sq_data; + if (sqd) { + mutex_lock(&sqd->lock); + tctx = sqd->thread->io_uring; + } + } else { + tctx = current->io_uring; + } + + ret = -EINVAL; + if (!tctx || !tctx->io_wq) + goto err; + ret = io_wq_max_workers(tctx->io_wq, new_count); if (ret) - return ret; + goto err; + + if (sqd) + mutex_unlock(&sqd->lock); if (copy_to_user(arg, new_count, sizeof(new_count))) return -EFAULT; return 0; +err: + if (sqd) + mutex_unlock(&sqd->lock); + return ret; } static bool io_register_op_must_quiesce(int op) From 636378535afb837f165beb7de3907896480cf3b2 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Thu, 2 Sep 2021 00:38:22 +0100 Subject: [PATCH 13/16] io_uring: don't disable kiocb_done() CQE batching Not passing issue_flags from kiocb_done() into __io_complete_rw() means that completion batching for this case is disabled, e.g. for most of buffered reads. Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/b2689462835c3ee28a5999ef4f9a581e24be04a2.1630539342.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 4ad0d17dc92d..9f3f8a802abd 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2656,7 +2656,7 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2, { if (__io_complete_rw_common(req, res)) return; - __io_req_complete(req, 0, req->result, io_put_rw_kbuf(req)); + __io_req_complete(req, issue_flags, req->result, io_put_rw_kbuf(req)); } static void io_complete_rw(struct kiocb *kiocb, long res, long res2) From 8d4ad41e3e8e4b907f088f25aee4a92f3f864027 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Thu, 2 Sep 2021 00:38:23 +0100 Subject: [PATCH 14/16] io_uring: prolong tctx_task_work() with flushing io_submit_flush_completions() may enqueue linked requests for task_work execution, so don't leave tctx_task_work() right after the tw list is exhausted, but try to flush and then retry. Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/0755d4c2c36301447c63bdd4146c10477cea4249.1630539342.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index 9f3f8a802abd..3a7145a38653 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2102,6 +2102,9 @@ static void tctx_task_work(struct callback_head *cb) while (1) { struct io_wq_work_node *node; + if (!tctx->task_list.first && locked && ctx->submit_state.compl_nr) + io_submit_flush_completions(ctx); + spin_lock_irq(&tctx->task_lock); node = tctx->task_list.first; INIT_WQ_LIST(&tctx->task_list); From 31efe48eb5dc4de3e31e84b54f287e9665410ab3 Mon Sep 17 00:00:00 2001 From: Xiaoguang Wang Date: Fri, 3 Sep 2021 22:24:36 +0800 Subject: [PATCH 15/16] io_uring: fix possible poll event lost in multi shot mode IIUC, IORING_POLL_ADD_MULTI is similar to epoll's edge-triggered mode, that means once one pure poll request returns one event(cqe), we'll need to read or write continually until EAGAIN is returned, then I think there is a possible poll event lost race in multi shot mode: t1 poll request add | | t2 | | t3 event happens | | t4 task work add | | t5 | task work run | t6 | commit one cqe | t7 | | user app handles cqe t8 | new event happen | t9 | add back to waitqueue | t10 | After t6 but before t9, if new event happens, there'll be no wakeup operation, and if user app has picked up this cqe in t7, read or write until EAGAIN is returned. In t8, new event happens and will be lost, though this race window maybe small. To fix this possible race, add poll request back to waitqueue before committing cqe. Fixes: 88e41cf928a6 ("io_uring: add multishot mode for IORING_OP_POLL_ADD") Signed-off-by: Xiaoguang Wang Link: https://lore.kernel.org/r/20210903142436.5767-1-xiaoguang.wang@linux.alibaba.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 3a7145a38653..30d959416eba 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -5098,7 +5098,7 @@ static void io_poll_remove_double(struct io_kiocb *req) } } -static bool io_poll_complete(struct io_kiocb *req, __poll_t mask) +static bool __io_poll_complete(struct io_kiocb *req, __poll_t mask) __must_hold(&req->ctx->completion_lock) { struct io_ring_ctx *ctx = req->ctx; @@ -5120,10 +5120,19 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask) if (flags & IORING_CQE_F_MORE) ctx->cq_extra++; - io_commit_cqring(ctx); return !(flags & IORING_CQE_F_MORE); } +static inline bool io_poll_complete(struct io_kiocb *req, __poll_t mask) + __must_hold(&req->ctx->completion_lock) +{ + bool done; + + done = __io_poll_complete(req, mask); + io_commit_cqring(req->ctx); + return done; +} + static void io_poll_task_func(struct io_kiocb *req, bool *locked) { struct io_ring_ctx *ctx = req->ctx; @@ -5134,7 +5143,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) } else { bool done; - done = io_poll_complete(req, req->result); + done = __io_poll_complete(req, req->result); if (done) { io_poll_remove_double(req); hash_del(&req->hash_node); @@ -5142,6 +5151,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) req->result = 0; add_wait_queue(req->poll.head, &req->poll.wait); } + io_commit_cqring(ctx); spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); From 2fc2a7a62eb58650e71b4550cf6fa6cc0a75b2d2 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 3 Sep 2021 16:55:26 -0600 Subject: [PATCH 16/16] io_uring: io_uring_complete() trace should take an integer It currently takes a long, and while that's normally OK, the io_uring limit is an int. Internally in io_uring it's an int, but sometimes it's passed as a long. That can yield confusing results where a completions seems to generate a huge result: ou-sqp-1297-1298 [001] ...1 788.056371: io_uring_complete: ring 000000000e98e046, user_data 0x0, result 4294967171, cflags 0 which is due to -ECANCELED being stored in an unsigned, and then passed in as a long. Using the right int type, the trace looks correct: iou-sqp-338-339 [002] ...1 15.633098: io_uring_complete: ring 00000000e0ac60cf, user_data 0x0, result -125, cflags 0 Cc: stable@vger.kernel.org Signed-off-by: Jens Axboe --- include/trace/events/io_uring.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/trace/events/io_uring.h b/include/trace/events/io_uring.h index e4e44a2b4aa9..0dd30de00e5b 100644 --- a/include/trace/events/io_uring.h +++ b/include/trace/events/io_uring.h @@ -295,14 +295,14 @@ TRACE_EVENT(io_uring_fail_link, */ TRACE_EVENT(io_uring_complete, - TP_PROTO(void *ctx, u64 user_data, long res, unsigned cflags), + TP_PROTO(void *ctx, u64 user_data, int res, unsigned cflags), TP_ARGS(ctx, user_data, res, cflags), TP_STRUCT__entry ( __field( void *, ctx ) __field( u64, user_data ) - __field( long, res ) + __field( int, res ) __field( unsigned, cflags ) ), @@ -313,7 +313,7 @@ TRACE_EVENT(io_uring_complete, __entry->cflags = cflags; ), - TP_printk("ring %p, user_data 0x%llx, result %ld, cflags %x", + TP_printk("ring %p, user_data 0x%llx, result %d, cflags %x", __entry->ctx, (unsigned long long)__entry->user_data, __entry->res, __entry->cflags) );