diff options
-rw-r--r-- | main.c | 4 | ||||
-rw-r--r-- | server.c | 5 | ||||
-rw-r--r-- | uring.c | 130 | ||||
-rw-r--r-- | uring.h | 3 |
4 files changed, 91 insertions, 51 deletions
@@ -64,9 +64,9 @@ cfg_free(struct uring_task *task) fprintf(stderr, "%s: called\n", __func__); systemd_delete(cfg); - xfree(cfg); fprintf(stderr, "All resources free, exiting\n"); exiting = true; + /* The cfg struct is free:d in main() */ } static void @@ -324,6 +324,8 @@ main(int argc, char **argv) fprintf(stderr, "Event loop exited\n"); + xfree(cfg); + debug_resource_usage(); exit(EXIT_SUCCESS); @@ -207,14 +207,15 @@ server_local_accept(struct cfg *cfg, struct uring_task *task, int res) scfg->name, pbuf, lbuf, local->addrstr); if (list_empty(&scfg->remotes)) { + /* This shouldn't be possible, checked before opening local */ error("scfg->remotes empty!\n"); - uring_close(cfg, NULL, res, NULL); + uring_close(cfg, &local->task, res); goto out; } proxy = proxy_new(cfg, scfg, &local->peer, res); if (!proxy) - uring_close(cfg, NULL, res, NULL); + uring_close(cfg, &local->task, res); out: uring_accept(cfg, &local->task, &local->peer, server_local_accept); @@ -17,10 +17,13 @@ struct uring_ev { }; static struct io_uring_sqe * -get_sqe(struct cfg *cfg) +get_sqe(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe; + if (!cfg || !task) + die("%s: invalid parameters\n", __func__); + sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) { io_uring_submit(&cfg->uev->uring); @@ -29,6 +32,7 @@ get_sqe(struct cfg *cfg) perrordie("Failed to get an sqe!\n"); } + uring_task_get(cfg, task); return sqe; } @@ -64,15 +68,6 @@ uring_task_refdump(struct uring_task *task) task->refcount); } -static void -uring_cancel(struct cfg *cfg, struct uring_task *task) -{ - struct io_uring_sqe *sqe = get_sqe(cfg); - - io_uring_prep_cancel(sqe, task, 0); - io_uring_sqe_set_data(sqe, NULL); -} - /* * Similar to uring_task_put, but can be called from other tasks * while the task is active. @@ -80,11 +75,21 @@ uring_cancel(struct cfg *cfg, struct uring_task *task) void uring_task_destroy(struct cfg *cfg, struct uring_task *task) { - fprintf(stderr, "%s: called with task %s (%p), fd %i and refcount %u\n", + fprintf(stderr, "%s: called (task: %s (%p), fd: %i, refcount: %u)\n", __func__, task->name, task, task->fd, task->refcount); - if (task->fd >= 0) - uring_cancel(cfg, task); + if (!task) { + error("%s: called with no task\n", __func__); + return; + } + + if (task->fd >= 0) { + struct io_uring_sqe *sqe; + + sqe = get_sqe(cfg, task); + io_uring_prep_cancel(sqe, task, 0); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task + 0x1)); + } task->dead = true; uring_task_put(cfg, task); @@ -121,7 +126,7 @@ uring_task_put(struct cfg *cfg, struct uring_task *task) task->free(task); if (parent) - uring_task_put(NULL, parent); + uring_task_put(cfg, parent); } void @@ -161,7 +166,7 @@ uring_task_close_fd(struct cfg *cfg, struct uring_task *task) if (task->fd < 0) return; - uring_close(cfg, task, task->fd, NULL); + uring_close(cfg, task, task->fd); task->fd = -1; } @@ -207,20 +212,21 @@ uring_task_init(struct uring_task *task, const char *name, } void -uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback) +uring_close(struct cfg *cfg, struct uring_task *task, int fd) { - struct io_uring_sqe *sqe = get_sqe(cfg); + struct io_uring_sqe *sqe; - fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback); + fprintf(stderr, "%s: called (task: %p (%s), fd: %i)\n", __func__, task, task->name, fd); - if (task) { - uring_task_get(cfg, task); - task->callback = callback; + if (!task || fd < 0) { + error("%s: invalid parameters (task: %p (%s), fd: %i)\n", __func__, task, task->name, fd); + return; } + + sqe = get_sqe(cfg, task); + task = (void *)((uintptr_t)task + 0x2); io_uring_prep_close(sqe, fd); io_uring_sqe_set_data(sqe, task); - - fprintf(stderr, "%s: done\n", __func__); } static void @@ -276,8 +282,7 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_write(sqe, task->fd, buf, len, 0); io_uring_sqe_set_data(sqe, task); @@ -384,8 +389,7 @@ uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t le return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_read(sqe, task->fd, buf, len, offset); io_uring_sqe_set_data(sqe, task); @@ -394,9 +398,9 @@ uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t le void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback) { - struct io_uring_sqe *sqe = get_sqe(cfg); - - uring_task_get(cfg, task); + struct io_uring_sqe *sqe; + + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); io_uring_sqe_set_data(sqe, task); @@ -405,9 +409,9 @@ uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callbac void uring_sendmsg(struct cfg *cfg, struct uring_task *task, struct msghdr *msg, callback_t callback) { - struct io_uring_sqe *sqe = get_sqe(cfg); + struct io_uring_sqe *sqe; - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_sendmsg(sqe, task->fd, msg, 0); io_uring_sqe_set_data(sqe, task); @@ -423,8 +427,7 @@ uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *ad return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen); io_uring_sqe_set_data(sqe, task); @@ -440,9 +443,8 @@ uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *add return; } - sqe = get_sqe(cfg); + sqe = get_sqe(cfg, task); addr->addrlen = sizeof(addr->storage); - uring_task_get(cfg, task); task->callback = callback; io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, SOCK_CLOEXEC); io_uring_sqe_set_data(sqe, task); @@ -458,8 +460,7 @@ uring_poll(struct cfg *cfg, struct uring_task *task, short poll_mask, callback_t return; } - sqe = get_sqe(cfg); - uring_task_get(cfg, task); + sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_poll_add(sqe, task->fd, poll_mask); io_uring_sqe_set_data(sqe, task); @@ -475,9 +476,10 @@ uring_poll_cancel(struct cfg *cfg, struct uring_task *task) return; } - sqe = get_sqe(cfg); + sqe = get_sqe(cfg, task); task->dead = true; io_uring_prep_poll_remove(sqe, task); + /* FIXME: should not set this to NULL */ io_uring_sqe_set_data(sqe, NULL); } @@ -544,12 +546,49 @@ uring_event_loop(struct cfg *cfg) nr = 0; io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) { struct uring_task *task = io_uring_cqe_get_data(cqe); - - fprintf(stderr, "%s: got CEQ (res: %i, task: %s (%p), cb: %p)\n", - __func__, cqe->res, task ? task->name : "<none>", - task, task ? task->callback : NULL); - - if (task && task->callback) + bool do_callback; + + if (!task) + die("%s: null task\n", __func__); + + if (task != NULL && ((uintptr_t)task & 0x1)) { + task = (void *)((uintptr_t)task - 0x1); + fprintf(stderr, "%s: got cancellation CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", + __func__, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task ? task->name : "<none>", + task, + task ? task->fd : -1, + task ? task->callback : NULL); + do_callback = false; + } else if (task != NULL && ((uintptr_t)task & 0x2)) { + task = (void *)((uintptr_t)task - 0x2); + fprintf(stderr, "%s: got close CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", + __func__, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task ? task->name : "<none>", + task, + task ? task->fd : -1, + task ? task->callback : NULL); + do_callback = false; + } else { + fprintf(stderr, "%s: got CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", + __func__, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task ? task->name : "<none>", + task, + task ? task->fd : -1, + task ? task->callback : NULL); + do_callback = true; + } + + if (!task) + error("%s: null task!?\n", __func__); + + if (do_callback && task && task->callback) task->callback(cfg, task, cqe->res); if (task) @@ -561,7 +600,6 @@ uring_event_loop(struct cfg *cfg) nr++; } - //printf("%s: %u CQEs treated\n", __func__, nr); io_uring_cq_advance(&cfg->uev->uring, nr); } } @@ -21,8 +21,7 @@ void uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *)); -void uring_close(struct cfg *cfg, struct uring_task *task, int fd, - callback_t callback); +void uring_close(struct cfg *cfg, struct uring_task *task, int fd); void uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback); |