/* SPDX-License-Identifier: GPL-2.0 */ #include #include #include #include #include #include #include #include "main.h" #include "uring.h" struct uring_ev { struct io_uring uring; struct io_uring_params uring_params; struct uring_task task; /* for testing if the kernel supports splice */ int pipe[2]; int tfd; }; /* clang-format off */ enum cqe_type { CQE_TYPE_NORMAL = 0x0, CQE_TYPE_CANCEL = 0x1, CQE_TYPE_CLOSE = 0x2, CQE_TYPE_POLL_CANCEL = 0x3 }; /* clang-format on */ #define CQE_TYPE_PTR_MASK 0x3 uint64_t sqe_count = 0; uint64_t cqe_count = 0; static struct io_uring_sqe *get_sqe(struct uring_task *task) { struct io_uring_sqe *sqe; assert_die(task, "invalid arguments"); sqe = io_uring_get_sqe(&cfg->uring->uring); if (!sqe) { io_uring_submit(&cfg->uring->uring); sqe = io_uring_get_sqe(&cfg->uring->uring); if (!sqe) die("failed to get an sqe!"); } sqe_count++; uring_task_get(task); return sqe; } void uring_task_refdump(struct uring_task *task) { char buf[4096]; struct uring_task *tmp; assert_return(task); if (!debug_enabled(DBG_REF)) return; buf[0] = '\0'; for (tmp = task; tmp; tmp = tmp->parent) { size_t prefix; char *dst; if (tmp->parent) prefix = STRLEN("->") + strlen(tmp->name); else prefix = strlen(tmp->name); memmove(&buf[prefix], &buf[0], strlen(buf) + 1); dst = &buf[0]; if (tmp->parent) { *dst++ = '-'; *dst++ = '>'; } memcpy(dst, tmp->name, strlen(tmp->name)); } info("%s (0x%p parent 0x%p free 0x%p fd %i ref %u)", buf, task, task->parent, task->free, task->fd, task->refcount); } /* * Similar to uring_task_put, but can be called from other tasks * while the task is active. */ void uring_task_destroy(struct uring_task *task) { assert_return(task); assert_return_silent(!task->dead); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); if (task->fd >= 0) { struct io_uring_sqe *sqe; sqe = get_sqe(task); io_uring_prep_cancel(sqe, task, 0); io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL)); } task->dead = true; uring_task_put(task); } void uring_task_put(struct uring_task *task) { struct uring_task *parent; assert_return(task); debug(DBG_REF, "task %s (%p), refcount %u -> %u", task->name, task, task->refcount, task->refcount - 1); task->refcount--; if (task->refcount > 0) return; if (task->refcount < 0) error("Negative refcount!"); if (task->fd >= 0) { uring_task_close_fd(task); /* We'll be called again once the fd is closed */ return; } parent = task->parent; if (task->free) task->free(task); if (parent) { debug(DBG_REF, "putting parent %s (%p)", parent->name, parent); uring_task_put(parent); } } void uring_task_get(struct uring_task *task) { assert_return(task); debug(DBG_REF, "task %s (%p), refcount %u -> %u", task->name, task, task->refcount, task->refcount + 1); if (task->refcount < 0) error("Negative refcount!"); task->refcount++; } void uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) { assert_return(task && tbuf); debug(DBG_UR, "task %s (%p), buf %p, refcount %u", task->name, task, tbuf, task->refcount); /* iov_len and msg_namelen are set at send/receive time */ tbuf->iov.iov_base = tbuf->buf; tbuf->msg.msg_name = &task->saddr.st; tbuf->msg.msg_iov = &tbuf->iov; tbuf->msg.msg_iovlen = 1; tbuf->msg.msg_control = NULL; tbuf->msg.msg_controllen = 0; tbuf->msg.msg_flags = 0; task->tbuf = tbuf; } void uring_task_set_fd(struct uring_task *task, int fd) { assert_return(task); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, fd, task->refcount); task->fd = fd; } void uring_task_close_fd(struct uring_task *task) { assert_return(task); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); if (task->fd < 0) return; uring_close(task, task->fd); task->fd = -1; } struct uring_task *uring_parent() { assert_die(cfg->uring, "invalid arguments"); return &cfg->uring->task; } void uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *)) { static bool first = true; assert_die(task && !empty_str(name) && free, "invalid arguments"); if (first) first = false; else assert_die(parent, "called without a parent task"); task->refcount = 1; task->fd = -1; task->parent = parent; task->free = free; task->dead = false; task->name = name; task->tbuf = NULL; if (task->parent) { debug(DBG_REF, "task %s (%p), refcount %u, " "getting parent %s (%p), refcount %u", task->name, task, task->refcount, task->parent->name, task->parent, task->parent->refcount); uring_task_get(task->parent); } } void uring_close(struct uring_task *task, int fd) { struct io_uring_sqe *sqe; assert_return(task && fd >= 0); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); io_uring_prep_close(sqe, fd); io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE)); } static void uring_tbuf_write_cb(struct uring_task *task, int res) { int r; assert_return(task && task->tbuf && task->final_cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); if (res < 0) { r = res; goto finished; } /* We wrote some more data */ task->tbuf->done += res; if (task->tbuf->done >= task->tbuf->len || res == 0) { r = task->tbuf->len; goto finished; } uring_write(task, task->tbuf->buf + task->tbuf->done, task->tbuf->len - task->tbuf->done, uring_tbuf_write_cb); return; finished: task->final_cb(task, r); } void uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb) { assert_return(task && task->fd >= 0 && task->tbuf && task->tbuf->len > 0); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); task->tbuf->done = 0; task->final_cb = final_cb; uring_write(task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); } void uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && buf && len > 0 && cb && task->fd >= 0); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->cb = cb; io_uring_prep_write(sqe, task->fd, buf, len, 0); io_uring_sqe_set_data(sqe, task); } static void uring_tbuf_read_until_cb(struct uring_task *task, int res) { int r; assert_return(task && task->tbuf && task->final_cb && task->is_complete_cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); if (res < 0) { r = res; goto finished; } task->tbuf->len += res; r = task->is_complete_cb(task, res); if (r < 0) { r = res; goto finished; } else if (r > 0) { r = task->tbuf->len; goto finished; } /* Need to read more */ if (task->tbuf->len >= sizeof(task->tbuf->buf)) { r = E2BIG; goto finished; } uring_read_offset(task, task->tbuf->buf + task->tbuf->len, sizeof(task->tbuf->buf) - task->tbuf->len, task->tbuf->len, uring_tbuf_read_until_cb); return; finished: task->final_cb(task, r); } void uring_tbuf_read_until(struct uring_task *task, rutask_cb_t is_complete_cb, utask_cb_t final_cb) { assert_return(task && task->fd >= 0 && task->tbuf && is_complete_cb && final_cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); task->tbuf->len = 0; task->is_complete_cb = is_complete_cb; task->final_cb = final_cb; uring_read(task, &task->tbuf->buf, sizeof(task->tbuf->buf), uring_tbuf_read_until_cb); } static int uring_tbuf_eof(struct uring_task *task, int res) { assert_return(task && task->tbuf, -EINVAL); assert_task_alive_or(DBG_UR, task, return -EINTR); if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) return -E2BIG; if (res > 0) return 0; task->tbuf->buf[task->tbuf->len] = '\0'; task->tbuf->len++; return 1; } void uring_tbuf_read_until_eof(struct uring_task *task, utask_cb_t final_cb) { assert_return(task && task->tbuf && final_cb); uring_tbuf_read_until(task, uring_tbuf_eof, final_cb); } static int uring_tbuf_have_data(struct uring_task *task, int res) { assert_return(task, -EINVAL); if (res < 0) return res; else return 1; } void uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb) { assert_return(task && final_cb); uring_tbuf_read_until(task, uring_tbuf_have_data, final_cb); } void uring_read_offset(struct uring_task *task, void *buf, size_t len, off_t offset, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && buf && len > 0 && task->fd >= 0); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->cb = cb; io_uring_prep_read(sqe, task->fd, buf, len, offset); io_uring_sqe_set_data(sqe, task); } void uring_openat(struct uring_task *task, int dfd, const char *path, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && !empty_str(path) && cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->cb = cb; io_uring_prep_openat(sqe, dfd, path, O_RDONLY | O_CLOEXEC, 0); io_uring_sqe_set_data(sqe, task); /* We need to do this here since path may go away */ io_uring_submit(&cfg->uring->uring); } void uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && task->fd >= 0 && task->tbuf && cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->tbuf->done = 0; task->tbuf->len = 0; task->tbuf->iov.iov_len = sizeof(task->tbuf->buf); task->tbuf->msg.msg_namelen = task->saddr.addrlen; task->cb = cb; io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0); io_uring_sqe_set_data(sqe, task); } void uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && task->fd >= 0 && task->tbuf && cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->tbuf->done = 0; task->tbuf->iov.iov_len = task->tbuf->len; task->tbuf->msg.msg_namelen = task->saddr.addrlen; task->cb = cb; io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0); io_uring_sqe_set_data(sqe, task); } void uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && task->fd >= 0 && saddr && cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->cb = cb; io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&saddr->st, saddr->addrlen); io_uring_sqe_set_data(sqe, task); } void uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && task->fd >= 0 && saddr && cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); saddr->addrlen = sizeof(saddr->st); task->cb = cb; io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&saddr->st, &saddr->addrlen, SOCK_CLOEXEC); io_uring_sqe_set_data(sqe, task); } void uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && fd_in >= 0 && fd_out >= 0 && cb); debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u", task->name, task, fd_in, fd_out, task->refcount); sqe = get_sqe(task); task->cb = cb; io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE); io_uring_sqe_set_data(sqe, task); } void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb) { struct io_uring_sqe *sqe; assert_return(task && task->fd >= 0 && poll_mask && cb); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->cb = cb; io_uring_prep_poll_add(sqe, task->fd, poll_mask); io_uring_sqe_set_data(sqe, task); } void uring_poll_cancel(struct uring_task *task) { struct io_uring_sqe *sqe; assert_return(task); if (task->fd < 0) { /* not an error, no need to print error msg */ return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); sqe = get_sqe(task); task->dead = true; io_uring_prep_poll_remove(sqe, task); io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL)); } static void uring_free(struct uring_task *task) { struct uring_ev *uring = container_of(task, struct uring_ev, task); assert_return(task); debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); io_uring_queue_exit(&uring->uring); cfg->uring = NULL; xfree(uring); } void uring_refdump() { assert_return(cfg->uring); uring_task_refdump(&cfg->uring->task); } void uring_delete() { struct uring_task *task; assert_return(cfg->uring); task = &cfg->uring->task; debug(DBG_UR, "task %s (%p), fd %i, refcount %u", task->name, task, task->fd, task->refcount); uring_task_put(task); } static void uring_splice_test_cb(struct uring_task *task, int res) { struct uring_ev *uring = container_of(task, struct uring_ev, task); assert_die(task && uring == cfg->uring, "splice test failed"); uring_close(task, uring->tfd); uring_close(task, uring->pipe[PIPE_RD]); uring_close(task, uring->pipe[PIPE_WR]); uring->tfd = -1; uring->pipe[PIPE_RD] = -1; uring->pipe[PIPE_WR] = -1; if (res >= 0) { cfg->splice_supported = true; debug(DBG_UR, "splice supported"); } else if (res == -EINVAL) debug(DBG_UR, "splice not supported"); else error("splice check failed: %i\n", res); } void uring_init() { struct uring_ev *uring; assert_return(!cfg->uring); uring = zmalloc(sizeof(*uring)); if (!uring) die("malloc: %m"); if (io_uring_queue_init_params(4096, &uring->uring, &uring->uring_params) < 0) die("io_uring_queue_init_params"); debug(DBG_UR, "uring initialized, features: 0x%08x", uring->uring_params.features); uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free); cfg->uring = uring; /* splice check, a bit convoluted, but seems to be no simpler way */ cfg->splice_supported = false; if (pipe2(uring->pipe, O_CLOEXEC) < 0) die("pipe2: %m"); uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY); if (uring->tfd < 0) die("open(\"/dev/null\"): %m"); uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb); } static inline void uring_print_cqe(const char *type, struct uring_task *task, struct io_uring_cqe *cqe) { assert_return(!empty_str(type) && task && cqe); debug(DBG_UR, "got CQE " "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)", type, cqe->res, cqe->res < 0 ? strerror(-cqe->res) : "ok", task->name ? task->name : "", task, task->fd, task->cb); } void uring_event_loop() { while (true) { struct io_uring_cqe *cqe; unsigned nr, head; int r; io_uring_submit(&cfg->uring->uring); r = io_uring_wait_cqe(&cfg->uring->uring, &cqe); if (r < 0) { if (errno == EINTR) continue; else die("io_uring_wait_cqe: %i", r); } nr = 0; io_uring_for_each_cqe(&cfg->uring->uring, head, cqe) { struct uring_task *task = io_uring_cqe_get_data(cqe); bool do_cb; enum cqe_type cqe_type; cqe_count++; cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK); task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK); if (!task) die("null task"); switch (cqe_type) { case CQE_TYPE_CANCEL: uring_print_cqe("cancel", task, cqe); do_cb = false; break; case CQE_TYPE_CLOSE: uring_print_cqe("close", task, cqe); do_cb = false; break; case CQE_TYPE_POLL_CANCEL: uring_print_cqe("poll_cancel", task, cqe); do_cb = false; break; case CQE_TYPE_NORMAL: uring_print_cqe("standard", task, cqe); do_cb = true; break; default: die("unknown CQE type"); } if (do_cb && task->cb) task->cb(task, cqe->res); uring_task_put(task); if (exiting) return; nr++; } io_uring_cq_advance(&cfg->uring->uring, nr); } }