From daa5de5415849b9a53056ec1e1e88fe4c5c9aa2b Mon Sep 17 00:00:00 2001
From: yangerkun <yangerkun@huawei.com>
Date: Tue, 24 Sep 2019 20:53:34 +0800
Subject: [PATCH 1/2] io_uring: compare cached_cq_tail with cq.head
 in_io_uring_poll

After 75b28af("io_uring: allocate the two rings together"), we compare
sq.head with cached_cq_tail to determine does there any cq invalid.
Actually, we should use cq.head.

Fixes: 75b28affdd6a ("io_uring: allocate the two rings together")
Signed-off-by: yangerkun <yangerkun@huawei.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index ca7570aca430..9b84232e5cc4 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3455,7 +3455,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
 	if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
 	    ctx->rings->sq_ring_entries)
 		mask |= EPOLLOUT | EPOLLWRNORM;
-	if (READ_ONCE(ctx->rings->sq.head) != ctx->cached_cq_tail)
+	if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
 		mask |= EPOLLIN | EPOLLRDNORM;
 
 	return mask;

From bda521624e75c665c407b3d9cece6e7a28178cd8 Mon Sep 17 00:00:00 2001
From: Jens Axboe <axboe@kernel.dk>
Date: Tue, 24 Sep 2019 13:47:15 -0600
Subject: [PATCH 2/2] io_uring: make CQ ring wakeups be more efficient

For batched IO, it's not uncommon for waiters to ask for more than 1
IO to complete before being woken up. This is a problem with
wait_event() since tasks will get woken for every IO that completes,
re-check condition, then go back to sleep. For batch counts on the
order of what you do for high IOPS, that can result in 10s of extra
wakeups for the waiting task.

Add a private wake function that checks for the wake up count criteria
being met before calling autoremove_wake_function(). Pavel reports that
one test case he has runs 40% faster with proper batching of wakeups.

Reported-by: Pavel Begunkov <asml.silence@gmail.com>
Tested-by: Pavel Begunkov <asml.silence@gmail.com>
Reviewed-by: Pavel Begunkov <asml.silence@gmail.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 66 +++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 56 insertions(+), 10 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 9b84232e5cc4..c934f91c51e9 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -2768,6 +2768,38 @@ out:
 	return submit;
 }
 
+struct io_wait_queue {
+	struct wait_queue_entry wq;
+	struct io_ring_ctx *ctx;
+	unsigned to_wait;
+	unsigned nr_timeouts;
+};
+
+static inline bool io_should_wake(struct io_wait_queue *iowq)
+{
+	struct io_ring_ctx *ctx = iowq->ctx;
+
+	/*
+	 * Wake up if we have enough events, or if a timeout occured since we
+	 * started waiting. For timeouts, we always want to return to userspace,
+	 * regardless of event count.
+	 */
+	return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+			atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
+}
+
+static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
+			    int wake_flags, void *key)
+{
+	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
+							wq);
+
+	if (!io_should_wake(iowq))
+		return -1;
+
+	return autoremove_wake_function(curr, mode, wake_flags, key);
+}
+
 /*
  * Wait until events become available, if we don't already have some. The
  * application must reap them itself, as they reside on the shared cq ring.
@@ -2775,8 +2807,16 @@ out:
 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			  const sigset_t __user *sig, size_t sigsz)
 {
+	struct io_wait_queue iowq = {
+		.wq = {
+			.private	= current,
+			.func		= io_wake_function,
+			.entry		= LIST_HEAD_INIT(iowq.wq.entry),
+		},
+		.ctx		= ctx,
+		.to_wait	= min_events,
+	};
 	struct io_rings *rings = ctx->rings;
-	unsigned nr_timeouts;
 	int ret;
 
 	if (io_cqring_events(rings) >= min_events)
@@ -2795,15 +2835,21 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			return ret;
 	}
 
-	nr_timeouts = atomic_read(&ctx->cq_timeouts);
-	/*
-	 * Return if we have enough events, or if a timeout occured since
-	 * we started waiting. For timeouts, we always want to return to
-	 * userspace.
-	 */
-	ret = wait_event_interruptible(ctx->wait,
-				io_cqring_events(rings) >= min_events ||
-				atomic_read(&ctx->cq_timeouts) != nr_timeouts);
+	ret = 0;
+	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
+	do {
+		prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
+						TASK_INTERRUPTIBLE);
+		if (io_should_wake(&iowq))
+			break;
+		schedule();
+		if (signal_pending(current)) {
+			ret = -ERESTARTSYS;
+			break;
+		}
+	} while (1);
+	finish_wait(&ctx->wait, &iowq.wq);
+
 	restore_saved_sigmask_unless(ret == -ERESTARTSYS);
 	if (ret == -ERESTARTSYS)
 		ret = -EINTR;