From afa80c32554dcd02737e2c341c27b6a9fa7f3e80 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Thu, 11 Jun 2020 16:05:59 +0200 Subject: Further improve reference counting and tracking in uring --- uring.c | 130 +++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 84 insertions(+), 46 deletions(-) (limited to 'uring.c') diff --git a/uring.c b/uring.c index a2668b0..78a084a 100644 --- a/uring.c +++ b/uring.c @@ -17,10 +17,13 @@ struct uring_ev { }; static struct io_uring_sqe * -get_sqe(struct cfg *cfg) +get_sqe(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe; + if (!cfg || !task) + die("%s: invalid parameters\n", __func__); + sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) { io_uring_submit(&cfg->uev->uring); @@ -29,6 +32,7 @@ get_sqe(struct cfg *cfg) perrordie("Failed to get an sqe!\n"); } + uring_task_get(cfg, task); return sqe; } @@ -64,15 +68,6 @@ uring_task_refdump(struct uring_task *task) task->refcount); } -static void -uring_cancel(struct cfg *cfg, struct uring_task *task) -{ - struct io_uring_sqe *sqe = get_sqe(cfg); - - io_uring_prep_cancel(sqe, task, 0); - io_uring_sqe_set_data(sqe, NULL); -} - /* * Similar to uring_task_put, but can be called from other tasks * while the task is active. @@ -80,11 +75,21 @@ uring_cancel(struct cfg *cfg, struct uring_task *task) void uring_task_destroy(struct cfg *cfg, struct uring_task *task) { - fprintf(stderr, "%s: called with task %s (%p), fd %i and refcount %u\n", + fprintf(stderr, "%s: called (task: %s (%p), fd: %i, refcount: %u)\n", __func__, task->name, task, task->fd, task->refcount); - if (task->fd >= 0) - uring_cancel(cfg, task); + if (!task) { + error("%s: called with no task\n", __func__); + return; + } + + if (task->fd >= 0) { + struct io_uring_sqe *sqe; + + sqe = get_sqe(cfg, task); + io_uring_prep_cancel(sqe, task, 0); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task + 0x1)); + } task->dead = true; uring_task_put(cfg, task); @@ -121,7 +126,7 @@ uring_task_put(struct cfg *cfg, struct uring_task *task) task->free(task); if (parent) - uring_task_put(NULL, parent); + uring_task_put(cfg, parent); } void @@ -161,7 +166,7 @@ uring_task_close_fd(struct cfg *cfg, struct uring_task *task) if (task->fd < 0) return; - uring_close(cfg, task, task->fd, NULL); + uring_close(cfg, task, task->fd); task->fd = -1; } @@ -207,20 +212,21 @@ uring_task_init(struct uring_task *task, const char *name, } void -uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback) +uring_close(struct cfg *cfg, struct uring_task *task, int fd) { - struct io_uring_sqe *sqe = get_sqe(cfg); + struct io_uring_sqe *sqe; - fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback); + fprintf(stderr, "%s: called (task: %p (%s), fd: %i)\n", __func__, task, task->name, fd); - if (task) { - uring_task_get(cfg, task); - task->callback = callback; + if (!task || fd < 0) { + error("%s: invalid parameters (task: %p (%s), fd: %i)\n", __func__, task, task->name, fd); + return; } + + sqe = get_sqe(cfg, task); + task = (void *)((uintptr_t)task + 0x2); io_uring_prep_close(sqe, fd); io_uring_sqe_set_data(sqe, task); - - fprintf(stderr, "%s: done\n", __func__); } static void @@ -276,8 +282,7 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_write(sqe, task->fd, buf, len, 0); io_uring_sqe_set_data(sqe, task); @@ -384,8 +389,7 @@ uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t le return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_read(sqe, task->fd, buf, len, offset); io_uring_sqe_set_data(sqe, task); @@ -394,9 +398,9 @@ uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t le void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback) { - struct io_uring_sqe *sqe = get_sqe(cfg); - - uring_task_get(cfg, task); + struct io_uring_sqe *sqe; + + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); io_uring_sqe_set_data(sqe, task); @@ -405,9 +409,9 @@ uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callbac void uring_sendmsg(struct cfg *cfg, struct uring_task *task, struct msghdr *msg, callback_t callback) { - struct io_uring_sqe *sqe = get_sqe(cfg); + struct io_uring_sqe *sqe; - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_sendmsg(sqe, task->fd, msg, 0); io_uring_sqe_set_data(sqe, task); @@ -423,8 +427,7 @@ uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *ad return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen); io_uring_sqe_set_data(sqe, task); @@ -440,9 +443,8 @@ uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *add return; } - sqe = get_sqe(cfg); + sqe = get_sqe(cfg, task); addr->addrlen = sizeof(addr->storage); - uring_task_get(cfg, task); task->callback = callback; io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, SOCK_CLOEXEC); io_uring_sqe_set_data(sqe, task); @@ -458,8 +460,7 @@ uring_poll(struct cfg *cfg, struct uring_task *task, short poll_mask, callback_t return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_poll_add(sqe, task->fd, poll_mask); io_uring_sqe_set_data(sqe, task); @@ -475,9 +476,10 @@ uring_poll_cancel(struct cfg *cfg, struct uring_task *task) return; } - sqe = get_sqe(cfg); + sqe = get_sqe(cfg, task); task->dead = true; io_uring_prep_poll_remove(sqe, task); + /* FIXME: should not set this to NULL */ io_uring_sqe_set_data(sqe, NULL); } @@ -544,12 +546,49 @@ uring_event_loop(struct cfg *cfg) nr = 0; io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) { struct uring_task *task = io_uring_cqe_get_data(cqe); - - fprintf(stderr, "%s: got CEQ (res: %i, task: %s (%p), cb: %p)\n", - __func__, cqe->res, task ? task->name : "", - task, task ? task->callback : NULL); - - if (task && task->callback) + bool do_callback; + + if (!task) + die("%s: null task\n", __func__); + + if (task != NULL && ((uintptr_t)task & 0x1)) { + task = (void *)((uintptr_t)task - 0x1); + fprintf(stderr, "%s: got cancellation CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", + __func__, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task ? task->name : "", + task, + task ? task->fd : -1, + task ? task->callback : NULL); + do_callback = false; + } else if (task != NULL && ((uintptr_t)task & 0x2)) { + task = (void *)((uintptr_t)task - 0x2); + fprintf(stderr, "%s: got close CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", + __func__, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task ? task->name : "", + task, + task ? task->fd : -1, + task ? task->callback : NULL); + do_callback = false; + } else { + fprintf(stderr, "%s: got CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", + __func__, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task ? task->name : "", + task, + task ? task->fd : -1, + task ? task->callback : NULL); + do_callback = true; + } + + if (!task) + error("%s: null task!?\n", __func__); + + if (do_callback && task && task->callback) task->callback(cfg, task, cqe->res); if (task) @@ -561,7 +600,6 @@ uring_event_loop(struct cfg *cfg) nr++; } - //printf("%s: %u CQEs treated\n", __func__, nr); io_uring_cq_advance(&cfg->uev->uring, nr); } } -- cgit v1.2.3