From 8c27290245b7bcc7cd2f72f3b4a7562294b43bbe Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Tue, 23 Jun 2020 16:25:36 +0200 Subject: Split directories better --- uring.c | 759 ---------------------------------------------------------------- 1 file changed, 759 deletions(-) delete mode 100644 uring.c (limited to 'uring.c') diff --git a/uring.c b/uring.c deleted file mode 100644 index e979471..0000000 --- a/uring.c +++ /dev/null @@ -1,759 +0,0 @@ -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include - -#include "main.h" -#include "uring.h" - -struct uring_ev { - struct io_uring uring; - struct io_uring_params uring_params; - struct uring_task task; - - /* for testing if the kernel supports splice */ - int pipe[2]; - int tfd; -}; - -enum cqe_type { - CQE_TYPE_NORMAL = 0x0, - CQE_TYPE_CANCEL = 0x1, - CQE_TYPE_CLOSE = 0x2, - CQE_TYPE_POLL_CANCEL = 0x3 -}; - -#define CQE_TYPE_PTR_MASK 0x3 - -uint64_t sqe_count = 0; -uint64_t cqe_count = 0; - -static struct io_uring_sqe * -get_sqe(struct uring_task *task) -{ - struct io_uring_sqe *sqe; - - assert_die(task, "invalid arguments"); - - sqe = io_uring_get_sqe(&cfg->uring->uring); - if (!sqe) { - io_uring_submit(&cfg->uring->uring); - sqe = io_uring_get_sqe(&cfg->uring->uring); - if (!sqe) - die("failed to get an sqe!"); - } - - sqe_count++; - uring_task_get(task); - return sqe; -} - -void -uring_task_refdump(struct uring_task *task) -{ - char buf[4096]; - struct uring_task *tmp; - - assert_return(task); - - if (!debug_enabled(DBG_REF)) - return; - - buf[0] = '\0'; - for (tmp = task; tmp; tmp = tmp->parent) { - size_t prefix; - char *dst; - - if (tmp->parent) - prefix = strlen("->") + strlen(tmp->name); - else - prefix = strlen(tmp->name); - - memmove(&buf[prefix], &buf[0], strlen(buf) + 1); - - dst = &buf[0]; - if (tmp->parent) { - *dst++ = '-'; - *dst++ = '>'; - } - - memcpy(dst, tmp->name, strlen(tmp->name)); - } - - info("%s (0x%p parent 0x%p free 0x%p fd %i ref %u)", - buf, task, task->parent, task->free, task->fd, - task->refcount); -} - -/* - * Similar to uring_task_put, but can be called from other tasks - * while the task is active. - */ -void -uring_task_destroy(struct uring_task *task) -{ - assert_return(task); - assert_return_silent(!task->dead); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - if (task->fd >= 0) { - struct io_uring_sqe *sqe; - - sqe = get_sqe(task); - io_uring_prep_cancel(sqe, task, 0); - io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL)); - } - - task->dead = true; - uring_task_put(task); -} - -void -uring_task_put(struct uring_task *task) -{ - struct uring_task *parent; - - assert_return(task); - - debug(DBG_REF, "task %s (%p), refcount %u -> %u", - task->name, task, task->refcount, task->refcount - 1); - - task->refcount--; - - if (task->refcount > 0) - return; - - if (task->refcount < 0) - error("Negative refcount!"); - - if (task->fd >= 0) { - uring_task_close_fd(task); - /* We'll be called again once the fd is closed */ - return; - } - - parent = task->parent; - if (task->free) - task->free(task); - - if (parent) { - debug(DBG_REF, "putting parent %s (%p)", parent->name, parent); - uring_task_put(parent); - } -} - -void -uring_task_get(struct uring_task *task) -{ - assert_return(task); - - debug(DBG_REF, "task %s (%p), refcount %u -> %u", - task->name, task, task->refcount, task->refcount + 1); - - if (task->refcount < 0) - error("Negative refcount!"); - - task->refcount++; -} - -void -uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) -{ - assert_return(task && tbuf); - - debug(DBG_UR, "task %s (%p), buf %p, refcount %u", - task->name, task, tbuf, task->refcount); - - /* iov_len and msg_namelen are set at send/receive time */ - tbuf->iov.iov_base = tbuf->buf; - tbuf->msg.msg_name = &task->saddr.storage; - tbuf->msg.msg_iov = &tbuf->iov; - tbuf->msg.msg_iovlen = 1; - tbuf->msg.msg_control = NULL; - tbuf->msg.msg_controllen = 0; - tbuf->msg.msg_flags = 0; - task->tbuf = tbuf; -} - -void -uring_task_set_fd(struct uring_task *task, int fd) -{ - assert_return(task); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, fd, task->refcount); - - task->fd = fd; -} - -void -uring_task_close_fd(struct uring_task *task) -{ - assert_return(task); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - if (task->fd < 0) - return; - - uring_close(task, task->fd); - task->fd = -1; -} - -struct uring_task * -uring_parent() -{ - assert_die(cfg->uring, "invalid arguments"); - - return &cfg->uring->task; -} - -void -uring_task_init(struct uring_task *task, const char *name, - struct uring_task *parent, void (*free)(struct uring_task *)) -{ - static bool first = true; - - assert_die(task && !empty_str(name) && free, "invalid arguments"); - - if (first) - first = false; - else - assert_die(parent, "called without a parent task"); - - task->refcount = 1; - task->fd = -1; - task->parent = parent; - task->free = free; - task->dead = false; - task->name = name; - task->tbuf = NULL; - - if (task->parent) { - debug(DBG_REF, "task %s (%p), refcount %u, " - "getting parent %s (%p), refcount %u", - task->name, task, task->refcount, - task->parent->name, task->parent, task->parent->refcount); - uring_task_get(task->parent); - } -} - -void -uring_close(struct uring_task *task, int fd) -{ - struct io_uring_sqe *sqe; - - assert_return(task && fd >= 0); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - io_uring_prep_close(sqe, fd); - io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE)); -} - -static void -uring_tbuf_write_cb(struct uring_task *task, int res) -{ - int r; - - assert_return(task && task->tbuf && task->final_cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - if (res < 0) { - r = res; - goto finished; - } - - /* We wrote some more data */ - task->tbuf->done += res; - if (task->tbuf->done >= task->tbuf->len || res == 0) { - r = task->tbuf->len; - goto finished; - } - - uring_write(task, task->tbuf->buf + task->tbuf->done, - task->tbuf->len - task->tbuf->done, - uring_tbuf_write_cb); - return; - -finished: - task->final_cb(task, r); -} - -void -uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb) -{ - assert_return(task && task->fd >= 0 && task->tbuf && task->tbuf->len > 0); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - task->tbuf->done = 0; - task->final_cb = final_cb; - uring_write(task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); -} - -void -uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && buf && len > 0 && cb && task->fd >= 0); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->cb = cb; - io_uring_prep_write(sqe, task->fd, buf, len, 0); - io_uring_sqe_set_data(sqe, task); -} - -static void -uring_tbuf_read_until_cb(struct uring_task *task, int res) -{ - int r; - - assert_return(task && task->tbuf && task->final_cb && task->is_complete_cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - if (res < 0) { - r = res; - goto finished; - } - - task->tbuf->len += res; - r = task->is_complete_cb(task, res); - if (r < 0) { - r = res; - goto finished; - } else if (r > 0) { - r = task->tbuf->len; - goto finished; - } - - /* Need to read more */ - if (task->tbuf->len >= sizeof(task->tbuf->buf)) { - r = E2BIG; - goto finished; - } - - uring_read_offset(task, task->tbuf->buf + task->tbuf->len, - sizeof(task->tbuf->buf) - task->tbuf->len, - task->tbuf->len, uring_tbuf_read_until_cb); - return; - -finished: - task->final_cb(task, r); -} - -void -uring_tbuf_read_until(struct uring_task *task, - rutask_cb_t is_complete_cb, utask_cb_t final_cb) -{ - assert_return(task && task->fd >= 0 && task->tbuf && is_complete_cb && final_cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - task->tbuf->len = 0; - task->is_complete_cb = is_complete_cb; - task->final_cb = final_cb; - uring_read(task, &task->tbuf->buf, sizeof(task->tbuf->buf), - uring_tbuf_read_until_cb); -} - -static int -uring_tbuf_eof(struct uring_task *task, int res) -{ - assert_return(task && task->tbuf, -EINVAL); - assert_task_alive_or(DBG_UR, task, return -EINTR); - - if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) - return -E2BIG; - - if (res > 0) - return 0; - - task->tbuf->buf[task->tbuf->len] = '\0'; - task->tbuf->len++; - return 1; -} - -void -uring_tbuf_read_until_eof(struct uring_task *task, - utask_cb_t final_cb) -{ - assert_return(task && task->tbuf && final_cb); - - uring_tbuf_read_until(task, uring_tbuf_eof, final_cb); -} - -static int -uring_tbuf_have_data(struct uring_task *task, int res) -{ - assert_return(task, -EINVAL); - - if (res < 0) - return res; - else - return 1; -} - -void -uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb) -{ - assert_return(task && final_cb); - - uring_tbuf_read_until(task, uring_tbuf_have_data, final_cb); -} - -void -uring_read_offset(struct uring_task *task, void *buf, size_t len, off_t offset, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && buf && len > 0 && task->fd >= 0); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->cb = cb; - io_uring_prep_read(sqe, task->fd, buf, len, offset); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_openat(struct uring_task *task, const char *path, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && !empty_str(path) && cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->cb = cb; - io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && task->fd >= 0 && task->tbuf && cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->tbuf->done = 0; - task->tbuf->len = 0; - task->tbuf->iov.iov_len = sizeof(task->tbuf->buf); - task->tbuf->msg.msg_namelen = task->saddr.addrlen; - task->cb = cb; - io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && task->fd >= 0 && task->tbuf && cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->tbuf->done = 0; - task->tbuf->iov.iov_len = task->tbuf->len; - task->tbuf->msg.msg_namelen = task->saddr.addrlen; - task->cb = cb; - io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && task->fd >= 0 && saddr && cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->cb = cb; - io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&saddr->storage, saddr->addrlen); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && task->fd >= 0 && saddr && cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - saddr->addrlen = sizeof(saddr->storage); - task->cb = cb; - io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&saddr->storage, &saddr->addrlen, SOCK_CLOEXEC); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && fd_in >= 0 && fd_out >= 0 && cb); - - debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u", - task->name, task, fd_in, fd_out, task->refcount); - - sqe = get_sqe(task); - task->cb = cb; - io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb) -{ - struct io_uring_sqe *sqe; - - assert_return(task && task->fd >= 0 && poll_mask && cb); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->cb = cb; - io_uring_prep_poll_add(sqe, task->fd, poll_mask); - io_uring_sqe_set_data(sqe, task); -} - -void -uring_poll_cancel(struct uring_task *task) -{ - struct io_uring_sqe *sqe; - - assert_return(task); - - if (task->fd < 0) { - /* not an error, no need to print error msg */ - return; - } - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - sqe = get_sqe(task); - task->dead = true; - io_uring_prep_poll_remove(sqe, task); - io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL)); -} - -static void -uring_free(struct uring_task *task) -{ - struct uring_ev *uring = container_of(task, struct uring_ev, task); - - assert_return(task); - - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - io_uring_queue_exit(&uring->uring); - cfg->uring = NULL; - xfree(uring); -} - -void -uring_refdump() -{ - assert_return(cfg->uring); - - uring_task_refdump(&cfg->uring->task); -} - -void -uring_delete() -{ - struct uring_task *task; - - assert_return(cfg->uring); - - task = &cfg->uring->task; - debug(DBG_UR, "task %s (%p), fd %i, refcount %u", - task->name, task, task->fd, task->refcount); - - uring_task_put(task); -} - -static void -uring_splice_test_cb(struct uring_task *task, int res) -{ - struct uring_ev *uring = container_of(task, struct uring_ev, task); - - assert_die(task && uring == cfg->uring, "splice test failed"); - - uring_close(task, uring->tfd); - uring_close(task, uring->pipe[PIPE_RD]); - uring_close(task, uring->pipe[PIPE_WR]); - - uring->tfd = -1; - uring->pipe[PIPE_RD] = -1; - uring->pipe[PIPE_WR] = -1; - - if (res >= 0) { - cfg->splice_supported = true; - debug(DBG_UR, "splice supported"); - } else if (res == -EINVAL) - debug(DBG_UR, "splice not supported"); - else - error("splice check failed: %i\n", res); -} - -void -uring_init() -{ - struct uring_ev *uring; - - assert_return(!cfg->uring); - - uring = zmalloc(sizeof(*uring)); - if (!uring) - die("malloc: %m"); - - if (io_uring_queue_init_params(4096, &uring->uring, &uring->uring_params) < 0) - die("io_uring_queue_init_params"); - - debug(DBG_UR, "uring initialized, features: 0x%08x", - uring->uring_params.features); - - uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free); - cfg->uring = uring; - - /* splice check, a bit convoluted, but seems to be no simpler way */ - cfg->splice_supported = false; - if (pipe2(uring->pipe, O_CLOEXEC) < 0) - die("pipe2: %m"); - uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY); - if (uring->tfd < 0) - die("open(\"/dev/null\"): %m"); - uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb); -} - -static inline void -uring_print_cqe(const char *type, struct uring_task *task, - struct io_uring_cqe *cqe) -{ - assert_return(!empty_str(type) && task && cqe); - - debug(DBG_UR, "got CQE " - "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)", - type, - cqe->res, - cqe->res < 0 ? strerror(-cqe->res) : "ok", - task->name ? task->name : "", - task, - task->fd, - task->cb); -} - -void -uring_event_loop() -{ - while (true) { - struct io_uring_cqe *cqe; - unsigned nr, head; - int r; - - io_uring_submit(&cfg->uring->uring); - r = io_uring_wait_cqe(&cfg->uring->uring, &cqe); - if (r < 0) { - if (errno == EINTR) - continue; - else - die("io_uring_wait_cqe: %i", r); - } - - nr = 0; - io_uring_for_each_cqe(&cfg->uring->uring, head, cqe) { - struct uring_task *task = io_uring_cqe_get_data(cqe); - bool do_cb; - enum cqe_type cqe_type; - - cqe_count++; - - cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK); - task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK); - - if (!task) - die("null task"); - - switch (cqe_type) { - case CQE_TYPE_CANCEL: - uring_print_cqe("cancel", task, cqe); - do_cb = false; - break; - - case CQE_TYPE_CLOSE: - uring_print_cqe("close", task, cqe); - do_cb = false; - break; - - case CQE_TYPE_POLL_CANCEL: - uring_print_cqe("poll_cancel", task, cqe); - do_cb = false; - break; - - case CQE_TYPE_NORMAL: - uring_print_cqe("standard", task, cqe); - do_cb = true; - break; - - default: - die("unknown CQE type"); - } - - if (do_cb && task->cb) - task->cb(task, cqe->res); - - uring_task_put(task); - - if (exiting) - return; - - nr++; - } - - io_uring_cq_advance(&cfg->uring->uring, nr); - } -} - -- cgit v1.2.3