#include #include #include #include #include #include #include "main.h" #include "uring.h" #include "config.h" struct uring_ev { struct io_uring uring; struct io_uring_params uring_params; struct cfg *cfg; struct uring_task task; }; enum cqe_type { CQE_TYPE_NORMAL = 0x0, CQE_TYPE_CANCEL = 0x1, CQE_TYPE_CLOSE = 0x2, CQE_TYPE_POLL_CANCEL = 0x3 }; #define CQE_TYPE_PTR_MASK 0x3 uint64_t sqe_count = 0; uint64_t cqe_count = 0; static struct io_uring_sqe * get_sqe(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe; if (!cfg || !task) die("invalid parameters"); sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) { io_uring_submit(&cfg->uev->uring); sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("failed to get an sqe!"); } sqe_count++; uring_task_get(cfg, task); return sqe; } void uring_task_refdump(struct uring_task *task) { char buf[4096]; struct uring_task *tmp; buf[0] = '\0'; for (tmp = task; tmp; tmp = tmp->parent) { size_t prefix; char *dst; if (tmp->parent) prefix = strlen("->") + strlen(tmp->name); else prefix = strlen(tmp->name); memmove(&buf[prefix], &buf[0], strlen(buf) + 1); dst = &buf[0]; if (tmp->parent) { *dst++ = '-'; *dst++ = '>'; } memcpy(dst, tmp->name, strlen(tmp->name)); } debug(DBG_REF, "%s (0x%p parent 0x%p free 0x%p fd %i ref %u)\n", buf, task, task->parent, task->free, task->fd, task->refcount); } /* * Similar to uring_task_put, but can be called from other tasks * while the task is active. */ void uring_task_destroy(struct cfg *cfg, struct uring_task *task) { debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); if (!task) { error("called with no task\n"); return; } if (task->fd >= 0) { struct io_uring_sqe *sqe; sqe = get_sqe(cfg, task); io_uring_prep_cancel(sqe, task, 0); io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL)); } task->dead = true; uring_task_put(cfg, task); } void uring_task_put(struct cfg *cfg, struct uring_task *task) { struct uring_task *parent = task->parent; debug(DBG_REF, "task %s (%p), refcount %u\n", task->name, task, task->refcount); task->refcount--; if (task->refcount > 0) return; if (task->refcount < 0) error("Negative refcount!\n"); if (task->fd >= 0) { uring_task_close_fd(cfg, task); /* We'll be called again once the fd is closed */ return; } if (parent) debug(DBG_REF, "putting parent %s (%p)\n", task->parent->name, task->parent); if (task->free) task->free(task); if (parent) uring_task_put(cfg, parent); } void uring_task_get(struct cfg *cfg, struct uring_task *task) { debug(DBG_REF, "task %s (%p), refcount %u\n", task->name, task, task->refcount); if (task->refcount < 0) error("Negative refcount!\n"); task->refcount++; } void uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) { debug(DBG_UR, "task %s (%p), buf %p, refcount %u\n", task->name, task, tbuf, task->refcount); if (tbuf) { /* iov_len and msg_namelen are set at send/receive time */ tbuf->iov.iov_base = tbuf->buf; tbuf->msg.msg_name = &task->addr.storage; tbuf->msg.msg_iov = &tbuf->iov; tbuf->msg.msg_iovlen = 1; tbuf->msg.msg_control = NULL; tbuf->msg.msg_controllen = 0; tbuf->msg.msg_flags = 0; } task->tbuf = tbuf; } void uring_task_set_fd(struct uring_task *task, int fd) { debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, fd, task->refcount); task->fd = fd; } void uring_task_close_fd(struct cfg *cfg, struct uring_task *task) { debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); if (task->fd < 0) return; uring_close(cfg, task, task->fd); task->fd = -1; } struct uring_task * uring_parent(struct cfg *cfg) { if (!cfg) die("called with null cfg"); if (!cfg->uev) die("called with uninitialized uring"); return &cfg->uev->task; } void uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *)) { static bool first = true; if (first) first = false; else if (!parent) die("called without a parent"); if (!free) die("called without destructor"); task->refcount = 1; task->fd = -1; task->parent = parent; task->free = free; task->dead = false; task->name = name; task->tbuf = NULL; if (task->parent) { debug(DBG_REF, "task %s (%p), refcount %u, " "getting parent %s (%p), refcount %u\n", task->name, task, task->refcount, task->parent->name, task->parent, task->parent->refcount); uring_task_get(NULL, task->parent); } } void uring_close(struct cfg *cfg, struct uring_task *task, int fd) { struct io_uring_sqe *sqe; debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); if (!task || fd < 0) { error("invalid parameters (task: %p (%s), fd: %i)\n", task, task->name, fd); return; } sqe = get_sqe(cfg, task); io_uring_prep_close(sqe, fd); io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE)); } static void uring_tbuf_write_cb(struct cfg *cfg, struct uring_task *task, int res) { int r; if (!task || !task->tbuf || !task->final_callback) die("missing parameters"); debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); if (res < 0) { r = res; goto finished; } /* We wrote some more data */ task->tbuf->done += res; if (task->tbuf->done >= task->tbuf->len || res == 0) { r = task->tbuf->len; goto finished; } uring_write(cfg, task, task->tbuf->buf + task->tbuf->done, task->tbuf->len - task->tbuf->done, uring_tbuf_write_cb); return; finished: task->final_callback(cfg, task, r); return; } void uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback) { if (!task || task->fd < 0 || !task->tbuf || task->tbuf->len < 0) { error("invalid parameters\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); task->tbuf->done = 0; task->final_callback = callback; uring_write(cfg, task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); } void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("no fd set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_write(sqe, task->fd, buf, len, 0); io_uring_sqe_set_data(sqe, task); } static void uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res) { int r; if (!task || !task->tbuf || !task->final_callback || !task->complete_callback) { error("invalid parameters\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); if (res < 0) { r = res; goto finished; } task->tbuf->len += res; r = task->complete_callback(cfg, task, res); if (r < 0) { r = res; goto finished; } else if (r > 0) { r = task->tbuf->len; goto finished; } /* Need to read more */ if (task->tbuf->len >= sizeof(task->tbuf->buf)) { r = E2BIG; goto finished; } uring_read_offset(cfg, task, task->tbuf->buf + task->tbuf->len, sizeof(task->tbuf->buf) - task->tbuf->len, task->tbuf->len, uring_tbuf_read_until_cb); return; finished: task->final_callback(cfg, task, r); return; } void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, rcallback_t complete, callback_t callback) { if (!task || task->fd < 0 || !task->tbuf || !complete) { error("%s: invalid parameters\n", __func__); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); task->tbuf->len = 0; task->complete_callback = complete; task->final_callback = callback; uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), uring_tbuf_read_until_cb); } static int uring_tbuf_eof(struct cfg *cfg, struct uring_task *task, int res) { if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) return -E2BIG; if (res > 0) return 0; task->tbuf->buf[task->tbuf->len] = '\0'; task->tbuf->len++; return 1; } void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback) { uring_tbuf_read_until(cfg, task, uring_tbuf_eof, callback); } static int uring_tbuf_have_data(struct cfg *cfg, struct uring_task *task, int res) { if (res < 0) return res; else return 1; } void uring_tbuf_read(struct cfg *cfg, struct uring_task *task, callback_t callback) { uring_tbuf_read_until(cfg, task, uring_tbuf_have_data, callback); } void uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_read called with no fd set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_read(sqe, task->fd, buf, len, offset); io_uring_sqe_set_data(sqe, task); } void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback) { struct io_uring_sqe *sqe; debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); io_uring_sqe_set_data(sqe, task); } void uring_tbuf_recvmsg(struct cfg *cfg, struct uring_task *task, callback_t callback) { struct io_uring_sqe *sqe; if (!task->tbuf) { error("called with no tbuf set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->tbuf->done = 0; task->tbuf->len = 0; task->tbuf->iov.iov_len = sizeof(task->tbuf->buf); task->tbuf->msg.msg_namelen = task->addr.addrlen; task->callback = callback; io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0); io_uring_sqe_set_data(sqe, task); } void uring_tbuf_sendmsg(struct cfg *cfg, struct uring_task *task, callback_t callback) { struct io_uring_sqe *sqe; if (!task->tbuf) { error("%s: called with no tbuf set\n", __func__); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->tbuf->done = 0; task->tbuf->iov.iov_len = task->tbuf->len; task->tbuf->msg.msg_namelen = task->addr.addrlen; task->callback = callback; io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0); io_uring_sqe_set_data(sqe, task); } void uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("fd set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen); io_uring_sqe_set_data(sqe, task); } void uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("no fd set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); addr->addrlen = sizeof(addr->storage); task->callback = callback; io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, SOCK_CLOEXEC); io_uring_sqe_set_data(sqe, task); } void uring_poll(struct cfg *cfg, struct uring_task *task, short poll_mask, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_poll called with no fd set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->callback = callback; io_uring_prep_poll_add(sqe, task->fd, poll_mask); io_uring_sqe_set_data(sqe, task); } void uring_poll_cancel(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("no fd set\n"); return; } debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); sqe = get_sqe(cfg, task); task->dead = true; io_uring_prep_poll_remove(sqe, task); io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL)); } static void uring_free(struct uring_task *task) { struct uring_ev *uev = container_of(task, struct uring_ev, task); debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); io_uring_queue_exit(&uev->uring); uev->cfg->uev = NULL; xfree(uev); } void uring_refdump(struct uring_ev *uev) { uring_task_refdump(&uev->task); } void uring_delete(struct cfg *cfg) { struct uring_task *task = &cfg->uev->task; debug(DBG_UR, "task %s (%p), fd %i, refcount %u\n", task->name, task, task->fd, task->refcount); uring_task_put(cfg, task); } void uring_init(struct cfg *cfg) { struct uring_ev *uev; uev = zmalloc(sizeof(*uev)); if (!uev) perrordie("malloc"); if (io_uring_queue_init_params(4096, &uev->uring, &uev->uring_params) < 0) perrordie("io_uring_queue_init_params"); verbose("uring initialized, features: 0x%08x\n", uev->uring_params.features); uring_task_init(&uev->task, "uev", &cfg->task, uring_free); cfg->uev = uev; uev->cfg = cfg; } static inline void uring_print_cqe(struct cfg *cfg, const char *type, struct uring_task *task, struct io_uring_cqe *cqe) { if (!debug_enabled(DBG_UR)) return; error("got CQE " "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n", type, cqe->res, cqe->res < 0 ? strerror(-cqe->res) : "ok", task->name ? task->name : "", task, task->fd, task->callback); } void uring_event_loop(struct cfg *cfg) { while (true) { struct io_uring_cqe *cqe; unsigned nr, head; int r; io_uring_submit(&cfg->uev->uring); r = io_uring_wait_cqe(&cfg->uev->uring, &cqe); if (r < 0) { if (errno == EINTR) continue; else perrordie("io_uring_wait_cqe"); } nr = 0; io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) { struct uring_task *task = io_uring_cqe_get_data(cqe); bool do_callback; enum cqe_type cqe_type; cqe_count++; cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK); task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK); if (!task) die("null task"); switch (cqe_type) { case CQE_TYPE_CANCEL: uring_print_cqe(cfg, "cancel", task, cqe); do_callback = false; break; case CQE_TYPE_CLOSE: uring_print_cqe(cfg, "close", task, cqe); do_callback = false; break; case CQE_TYPE_POLL_CANCEL: uring_print_cqe(cfg, "poll_cancel", task, cqe); do_callback = false; break; case CQE_TYPE_NORMAL: uring_print_cqe(cfg, "standard", task, cqe); do_callback = true; break; default: die("unknown CQE type"); } if (do_callback && task->callback) task->callback(cfg, task, cqe->res); uring_task_put(cfg, task); if (exiting) return; nr++; } io_uring_cq_advance(&cfg->uev->uring, nr); } }