#include #include #include #include #include #include #include "main.h" #include "uring.h" #include "config.h" struct uring_ev { struct io_uring uring; struct io_uring_params uring_params; struct cfg *cfg; struct uring_task task; }; static struct io_uring_sqe * get_sqe(struct cfg *cfg) { struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) { io_uring_submit(&cfg->uev->uring); sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("Failed to get an sqe!\n"); } return sqe; } void uring_task_refdump(struct uring_task *task) { char buf[4096]; struct uring_task *tmp; buf[0] = '\0'; for (tmp = task; tmp; tmp = tmp->parent) { size_t prefix; char *dst; if (tmp->parent) prefix = strlen("->") + strlen(tmp->name); else prefix = strlen(tmp->name); memmove(&buf[prefix], &buf[0], strlen(buf) + 1); dst = &buf[0]; if (tmp->parent) { *dst++ = '-'; *dst++ = '>'; } memcpy(dst, tmp->name, strlen(tmp->name)); } fprintf(stderr, "%s (0x%p parent 0x%p free 0x%p fd %i ref %u)\n", buf, task, task->parent, task->free, task->fd, task->refcount); } static void uring_cancel(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe = get_sqe(cfg); io_uring_prep_cancel(sqe, task, 0); io_uring_sqe_set_data(sqe, NULL); } /* * Similar to uring_task_put, but can be called from other tasks * while the task is active. */ void uring_task_destroy(struct cfg *cfg, struct uring_task *task) { fprintf(stderr, "%s: called with task %s (%p), fd %i and refcount %u\n", __func__, task->name, task, task->fd, task->refcount); if (task->fd >= 0) uring_cancel(cfg, task); task->dead = true; uring_task_put(cfg, task); } void uring_task_put(struct cfg *cfg, struct uring_task *task) { struct uring_task *parent = task->parent; fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n", __func__, task->name, task, task->refcount); task->refcount--; if (task->refcount > 0) return; if (task->refcount < 0) error("Negative refcount!\n"); if (task->fd >= 0) { uring_task_close_fd(cfg, task); /* We'll be called again once the fd is closed */ return; } if (parent) fprintf(stderr, "%s: task %s (%p) putting parent %s (%p)\n", __func__, task->name, task, task->parent->name, task->parent); if (task->free) task->free(task); if (parent) uring_task_put(NULL, parent); } void uring_task_get(struct cfg *cfg, struct uring_task *task) { fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n", __func__, task->name, task, task->refcount); if (task->refcount < 0) error("Negative refcount!\n"); task->refcount++; } void uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) { task->tbuf = tbuf; } void uring_task_set_fd(struct uring_task *task, int fd) { task->fd = fd; } void uring_task_close_fd(struct cfg *cfg, struct uring_task *task) { fprintf(stderr, "%s: called with task %s (0x%p)\n", __func__, task->name, task); if (task->fd < 0) return; uring_close(cfg, task, task->fd, NULL); task->fd = -1; } struct uring_task * uring_parent(struct cfg *cfg) { if (!cfg) die("%s: called with null cfg\n"); if (!cfg->uev) die("%s: called with uninitialized uring\n"); return &cfg->uev->task; } void uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *)) { static bool first = true; if (first) first = false; else if (!parent) die("uring_task_init called without a parent\n"); if (!free) die("uring_task_init called without destructor\n"); task->refcount = 1; task->fd = -1; task->parent = parent; task->free = free; task->dead = false; task->name = name; task->tbuf = NULL; if (task->parent) { fprintf(stderr, "%s: task %s (%p) getting parent %s (%p)\n", __func__, task->name, task, task->parent->name, task->parent); uring_task_get(NULL, task->parent); } } void uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback) { struct io_uring_sqe *sqe = get_sqe(cfg); fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback); if (task) { uring_task_get(cfg, task); task->callback = callback; } io_uring_prep_close(sqe, fd); io_uring_sqe_set_data(sqe, task); fprintf(stderr, "%s: done\n", __func__); } static void uring_tbuf_write_cb(struct cfg *cfg, struct uring_task *task, int res) { int r; if (!task || !task->tbuf || !task->final_callback) perrordie("%s: missing parameters\n"); if (res < 0) { r = res; goto finished; } /* We wrote some more data */ task->tbuf->done += res; if (task->tbuf->done >= task->tbuf->len || res == 0) { r = task->tbuf->len; goto finished; } uring_write(cfg, task, task->tbuf->buf + task->tbuf->done, task->tbuf->len - task->tbuf->done, uring_tbuf_write_cb); return; finished: task->final_callback(cfg, task, r); return; } void uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback) { if (!task || task->fd < 0 || !task->tbuf || task->tbuf->len < 0) { error("%s invalid parameters\n", __func__); return; } task->tbuf->done = 0; task->final_callback = callback; uring_write(cfg, task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); } void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_write called with no fd set\n"); return; } sqe = get_sqe(cfg); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_write(sqe, task->fd, buf, len, 0); io_uring_sqe_set_data(sqe, task); } static void uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res) { int r; if (!task || !task->tbuf || !task->final_callback || !task->complete_callback) { error("%s: invalid parameters\n", __func__); return; } if (res < 0) { r = res; goto finished; } task->tbuf->len += res; r = task->complete_callback(cfg, task, res); if (r < 0) { r = res; goto finished; } else if (r > 0) { r = task->tbuf->len; goto finished; } /* Need to read more */ if (task->tbuf->len >= sizeof(task->tbuf->buf)) { r = E2BIG; goto finished; } uring_read_offset(cfg, task, task->tbuf->buf + task->tbuf->len, sizeof(task->tbuf->buf) - task->tbuf->len, task->tbuf->len, uring_tbuf_read_until_cb); return; finished: task->final_callback(cfg, task, r); return; } void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, rcallback_t complete, callback_t callback) { if (!task || task->fd < 0 || !task->tbuf || !complete) { error("%s: invalid parameters\n", __func__); return; } task->tbuf->len = 0; task->complete_callback = complete; task->final_callback = callback; uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), uring_tbuf_read_until_cb); } static int uring_tbuf_eof(struct cfg *cfg, struct uring_task *task, int res) { if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) return -E2BIG; if (res > 0) return 0; task->tbuf->buf[task->tbuf->len] = '\0'; task->tbuf->len++; return 1; } void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback) { uring_tbuf_read_until(cfg, task, uring_tbuf_eof, callback); } static int uring_tbuf_have_data(struct cfg *cfg, struct uring_task *task, int res) { if (res < 0) return res; else return 1; } void uring_tbuf_read(struct cfg *cfg, struct uring_task *task, callback_t callback) { uring_tbuf_read_until(cfg, task, uring_tbuf_have_data, callback); } void uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_read called with no fd set\n"); return; } sqe = get_sqe(cfg); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_read(sqe, task->fd, buf, len, offset); io_uring_sqe_set_data(sqe, task); } void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback) { struct io_uring_sqe *sqe = get_sqe(cfg); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); io_uring_sqe_set_data(sqe, task); } void uring_sendmsg(struct cfg *cfg, struct uring_task *task, struct msghdr *msg, callback_t callback) { struct io_uring_sqe *sqe = get_sqe(cfg); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_sendmsg(sqe, task->fd, msg, 0); io_uring_sqe_set_data(sqe, task); } void uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_connect called with no fd set\n"); return; } sqe = get_sqe(cfg); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen); io_uring_sqe_set_data(sqe, task); } void uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_accept called with no fd set\n"); return; } sqe = get_sqe(cfg); addr->addrlen = sizeof(addr->storage); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, 0); io_uring_sqe_set_data(sqe, task); } void uring_poll(struct cfg *cfg, struct uring_task *task, short poll_mask, callback_t callback) { struct io_uring_sqe *sqe; if (task->fd < 0) { error("uring_poll called with no fd set\n"); return; } sqe = get_sqe(cfg); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_poll_add(sqe, task->fd, poll_mask); io_uring_sqe_set_data(sqe, task); } void uring_poll_cancel(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe; if (task->fd < 0) { fprintf(stderr, "uring_poll_cancel called with no fd set\n"); return; } sqe = get_sqe(cfg); task->dead = true; io_uring_prep_poll_remove(sqe, task); io_uring_sqe_set_data(sqe, NULL); } static void uring_free(struct uring_task *task) { struct uring_ev *uev = container_of(task, struct uring_ev, task); fprintf(stderr, "%s: called\n", __func__); io_uring_queue_exit(&uev->uring); uev->cfg->uev = NULL; xfree(uev); } void uring_refdump(struct uring_ev *uev) { uring_task_refdump(&uev->task); } void uring_delete(struct cfg *cfg) { uring_task_put(cfg, &cfg->uev->task); } void uring_init(struct cfg *cfg) { struct uring_ev *uev; uev = zmalloc(sizeof(*uev)); if (!uev) perrordie("malloc"); if (io_uring_queue_init_params(4096, &uev->uring, &uev->uring_params) < 0) perrordie("io_uring_queue_init_params"); fprintf(stderr, "uring initialized, features: 0x%08x\n", uev->uring_params.features); uring_task_init(&uev->task, "uev", &cfg->task, uring_free); cfg->uev = uev; uev->cfg = cfg; } void uring_event_loop(struct cfg *cfg) { while (true) { struct io_uring_cqe *cqe; unsigned nr, head; int r; io_uring_submit(&cfg->uev->uring); r = io_uring_wait_cqe(&cfg->uev->uring, &cqe); if (r < 0) { if (errno == EINTR) continue; else perrordie("io_uring_wait_cqe"); } nr = 0; io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) { struct uring_task *task = io_uring_cqe_get_data(cqe); fprintf(stderr, "%s: got CEQ (res: %i, task: 0x%p, cb: 0x%p)\n", __func__, cqe->res, task, task ? task->callback : NULL); if (task && task->callback) task->callback(cfg, task, cqe->res); if (task) uring_task_put(cfg, task); if (exiting) return; nr++; } printf("%s: %u CQEs treated\n", __func__, nr); io_uring_cq_advance(&cfg->uev->uring, nr); } }