summaryrefslogtreecommitdiff
path: root/uring.c
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-23 16:25:36 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-23 16:25:36 +0200
commit8c27290245b7bcc7cd2f72f3b4a7562294b43bbe (patch)
tree54bae7909a94bfc598df7b88d9794742daf0bb31 /uring.c
parent973ae757342b91e3e6aafd07e0c0a24af84aad98 (diff)
Split directories better
Diffstat (limited to 'uring.c')
-rw-r--r--uring.c759
1 files changed, 0 insertions, 759 deletions
diff --git a/uring.c b/uring.c
deleted file mode 100644
index e979471..0000000
--- a/uring.c
+++ /dev/null
@@ -1,759 +0,0 @@
-#define _GNU_SOURCE
-#include <liburing.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "main.h"
-#include "uring.h"
-
-struct uring_ev {
- struct io_uring uring;
- struct io_uring_params uring_params;
- struct uring_task task;
-
- /* for testing if the kernel supports splice */
- int pipe[2];
- int tfd;
-};
-
-enum cqe_type {
- CQE_TYPE_NORMAL = 0x0,
- CQE_TYPE_CANCEL = 0x1,
- CQE_TYPE_CLOSE = 0x2,
- CQE_TYPE_POLL_CANCEL = 0x3
-};
-
-#define CQE_TYPE_PTR_MASK 0x3
-
-uint64_t sqe_count = 0;
-uint64_t cqe_count = 0;
-
-static struct io_uring_sqe *
-get_sqe(struct uring_task *task)
-{
- struct io_uring_sqe *sqe;
-
- assert_die(task, "invalid arguments");
-
- sqe = io_uring_get_sqe(&cfg->uring->uring);
- if (!sqe) {
- io_uring_submit(&cfg->uring->uring);
- sqe = io_uring_get_sqe(&cfg->uring->uring);
- if (!sqe)
- die("failed to get an sqe!");
- }
-
- sqe_count++;
- uring_task_get(task);
- return sqe;
-}
-
-void
-uring_task_refdump(struct uring_task *task)
-{
- char buf[4096];
- struct uring_task *tmp;
-
- assert_return(task);
-
- if (!debug_enabled(DBG_REF))
- return;
-
- buf[0] = '\0';
- for (tmp = task; tmp; tmp = tmp->parent) {
- size_t prefix;
- char *dst;
-
- if (tmp->parent)
- prefix = strlen("->") + strlen(tmp->name);
- else
- prefix = strlen(tmp->name);
-
- memmove(&buf[prefix], &buf[0], strlen(buf) + 1);
-
- dst = &buf[0];
- if (tmp->parent) {
- *dst++ = '-';
- *dst++ = '>';
- }
-
- memcpy(dst, tmp->name, strlen(tmp->name));
- }
-
- info("%s (0x%p parent 0x%p free 0x%p fd %i ref %u)",
- buf, task, task->parent, task->free, task->fd,
- task->refcount);
-}
-
-/*
- * Similar to uring_task_put, but can be called from other tasks
- * while the task is active.
- */
-void
-uring_task_destroy(struct uring_task *task)
-{
- assert_return(task);
- assert_return_silent(!task->dead);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- if (task->fd >= 0) {
- struct io_uring_sqe *sqe;
-
- sqe = get_sqe(task);
- io_uring_prep_cancel(sqe, task, 0);
- io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL));
- }
-
- task->dead = true;
- uring_task_put(task);
-}
-
-void
-uring_task_put(struct uring_task *task)
-{
- struct uring_task *parent;
-
- assert_return(task);
-
- debug(DBG_REF, "task %s (%p), refcount %u -> %u",
- task->name, task, task->refcount, task->refcount - 1);
-
- task->refcount--;
-
- if (task->refcount > 0)
- return;
-
- if (task->refcount < 0)
- error("Negative refcount!");
-
- if (task->fd >= 0) {
- uring_task_close_fd(task);
- /* We'll be called again once the fd is closed */
- return;
- }
-
- parent = task->parent;
- if (task->free)
- task->free(task);
-
- if (parent) {
- debug(DBG_REF, "putting parent %s (%p)", parent->name, parent);
- uring_task_put(parent);
- }
-}
-
-void
-uring_task_get(struct uring_task *task)
-{
- assert_return(task);
-
- debug(DBG_REF, "task %s (%p), refcount %u -> %u",
- task->name, task, task->refcount, task->refcount + 1);
-
- if (task->refcount < 0)
- error("Negative refcount!");
-
- task->refcount++;
-}
-
-void
-uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf)
-{
- assert_return(task && tbuf);
-
- debug(DBG_UR, "task %s (%p), buf %p, refcount %u",
- task->name, task, tbuf, task->refcount);
-
- /* iov_len and msg_namelen are set at send/receive time */
- tbuf->iov.iov_base = tbuf->buf;
- tbuf->msg.msg_name = &task->saddr.storage;
- tbuf->msg.msg_iov = &tbuf->iov;
- tbuf->msg.msg_iovlen = 1;
- tbuf->msg.msg_control = NULL;
- tbuf->msg.msg_controllen = 0;
- tbuf->msg.msg_flags = 0;
- task->tbuf = tbuf;
-}
-
-void
-uring_task_set_fd(struct uring_task *task, int fd)
-{
- assert_return(task);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, fd, task->refcount);
-
- task->fd = fd;
-}
-
-void
-uring_task_close_fd(struct uring_task *task)
-{
- assert_return(task);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- if (task->fd < 0)
- return;
-
- uring_close(task, task->fd);
- task->fd = -1;
-}
-
-struct uring_task *
-uring_parent()
-{
- assert_die(cfg->uring, "invalid arguments");
-
- return &cfg->uring->task;
-}
-
-void
-uring_task_init(struct uring_task *task, const char *name,
- struct uring_task *parent, void (*free)(struct uring_task *))
-{
- static bool first = true;
-
- assert_die(task && !empty_str(name) && free, "invalid arguments");
-
- if (first)
- first = false;
- else
- assert_die(parent, "called without a parent task");
-
- task->refcount = 1;
- task->fd = -1;
- task->parent = parent;
- task->free = free;
- task->dead = false;
- task->name = name;
- task->tbuf = NULL;
-
- if (task->parent) {
- debug(DBG_REF, "task %s (%p), refcount %u, "
- "getting parent %s (%p), refcount %u",
- task->name, task, task->refcount,
- task->parent->name, task->parent, task->parent->refcount);
- uring_task_get(task->parent);
- }
-}
-
-void
-uring_close(struct uring_task *task, int fd)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && fd >= 0);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- io_uring_prep_close(sqe, fd);
- io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE));
-}
-
-static void
-uring_tbuf_write_cb(struct uring_task *task, int res)
-{
- int r;
-
- assert_return(task && task->tbuf && task->final_cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- if (res < 0) {
- r = res;
- goto finished;
- }
-
- /* We wrote some more data */
- task->tbuf->done += res;
- if (task->tbuf->done >= task->tbuf->len || res == 0) {
- r = task->tbuf->len;
- goto finished;
- }
-
- uring_write(task, task->tbuf->buf + task->tbuf->done,
- task->tbuf->len - task->tbuf->done,
- uring_tbuf_write_cb);
- return;
-
-finished:
- task->final_cb(task, r);
-}
-
-void
-uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb)
-{
- assert_return(task && task->fd >= 0 && task->tbuf && task->tbuf->len > 0);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- task->tbuf->done = 0;
- task->final_cb = final_cb;
- uring_write(task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb);
-}
-
-void
-uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && buf && len > 0 && cb && task->fd >= 0);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->cb = cb;
- io_uring_prep_write(sqe, task->fd, buf, len, 0);
- io_uring_sqe_set_data(sqe, task);
-}
-
-static void
-uring_tbuf_read_until_cb(struct uring_task *task, int res)
-{
- int r;
-
- assert_return(task && task->tbuf && task->final_cb && task->is_complete_cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- if (res < 0) {
- r = res;
- goto finished;
- }
-
- task->tbuf->len += res;
- r = task->is_complete_cb(task, res);
- if (r < 0) {
- r = res;
- goto finished;
- } else if (r > 0) {
- r = task->tbuf->len;
- goto finished;
- }
-
- /* Need to read more */
- if (task->tbuf->len >= sizeof(task->tbuf->buf)) {
- r = E2BIG;
- goto finished;
- }
-
- uring_read_offset(task, task->tbuf->buf + task->tbuf->len,
- sizeof(task->tbuf->buf) - task->tbuf->len,
- task->tbuf->len, uring_tbuf_read_until_cb);
- return;
-
-finished:
- task->final_cb(task, r);
-}
-
-void
-uring_tbuf_read_until(struct uring_task *task,
- rutask_cb_t is_complete_cb, utask_cb_t final_cb)
-{
- assert_return(task && task->fd >= 0 && task->tbuf && is_complete_cb && final_cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- task->tbuf->len = 0;
- task->is_complete_cb = is_complete_cb;
- task->final_cb = final_cb;
- uring_read(task, &task->tbuf->buf, sizeof(task->tbuf->buf),
- uring_tbuf_read_until_cb);
-}
-
-static int
-uring_tbuf_eof(struct uring_task *task, int res)
-{
- assert_return(task && task->tbuf, -EINVAL);
- assert_task_alive_or(DBG_UR, task, return -EINTR);
-
- if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf))
- return -E2BIG;
-
- if (res > 0)
- return 0;
-
- task->tbuf->buf[task->tbuf->len] = '\0';
- task->tbuf->len++;
- return 1;
-}
-
-void
-uring_tbuf_read_until_eof(struct uring_task *task,
- utask_cb_t final_cb)
-{
- assert_return(task && task->tbuf && final_cb);
-
- uring_tbuf_read_until(task, uring_tbuf_eof, final_cb);
-}
-
-static int
-uring_tbuf_have_data(struct uring_task *task, int res)
-{
- assert_return(task, -EINVAL);
-
- if (res < 0)
- return res;
- else
- return 1;
-}
-
-void
-uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb)
-{
- assert_return(task && final_cb);
-
- uring_tbuf_read_until(task, uring_tbuf_have_data, final_cb);
-}
-
-void
-uring_read_offset(struct uring_task *task, void *buf, size_t len, off_t offset, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && buf && len > 0 && task->fd >= 0);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->cb = cb;
- io_uring_prep_read(sqe, task->fd, buf, len, offset);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_openat(struct uring_task *task, const char *path, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && !empty_str(path) && cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->cb = cb;
- io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && task->fd >= 0 && task->tbuf && cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->tbuf->done = 0;
- task->tbuf->len = 0;
- task->tbuf->iov.iov_len = sizeof(task->tbuf->buf);
- task->tbuf->msg.msg_namelen = task->saddr.addrlen;
- task->cb = cb;
- io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && task->fd >= 0 && task->tbuf && cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->tbuf->done = 0;
- task->tbuf->iov.iov_len = task->tbuf->len;
- task->tbuf->msg.msg_namelen = task->saddr.addrlen;
- task->cb = cb;
- io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && task->fd >= 0 && saddr && cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->cb = cb;
- io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&saddr->storage, saddr->addrlen);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && task->fd >= 0 && saddr && cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- saddr->addrlen = sizeof(saddr->storage);
- task->cb = cb;
- io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&saddr->storage, &saddr->addrlen, SOCK_CLOEXEC);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && fd_in >= 0 && fd_out >= 0 && cb);
-
- debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u",
- task->name, task, fd_in, fd_out, task->refcount);
-
- sqe = get_sqe(task);
- task->cb = cb;
- io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task && task->fd >= 0 && poll_mask && cb);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->cb = cb;
- io_uring_prep_poll_add(sqe, task->fd, poll_mask);
- io_uring_sqe_set_data(sqe, task);
-}
-
-void
-uring_poll_cancel(struct uring_task *task)
-{
- struct io_uring_sqe *sqe;
-
- assert_return(task);
-
- if (task->fd < 0) {
- /* not an error, no need to print error msg */
- return;
- }
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- sqe = get_sqe(task);
- task->dead = true;
- io_uring_prep_poll_remove(sqe, task);
- io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL));
-}
-
-static void
-uring_free(struct uring_task *task)
-{
- struct uring_ev *uring = container_of(task, struct uring_ev, task);
-
- assert_return(task);
-
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- io_uring_queue_exit(&uring->uring);
- cfg->uring = NULL;
- xfree(uring);
-}
-
-void
-uring_refdump()
-{
- assert_return(cfg->uring);
-
- uring_task_refdump(&cfg->uring->task);
-}
-
-void
-uring_delete()
-{
- struct uring_task *task;
-
- assert_return(cfg->uring);
-
- task = &cfg->uring->task;
- debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
- task->name, task, task->fd, task->refcount);
-
- uring_task_put(task);
-}
-
-static void
-uring_splice_test_cb(struct uring_task *task, int res)
-{
- struct uring_ev *uring = container_of(task, struct uring_ev, task);
-
- assert_die(task && uring == cfg->uring, "splice test failed");
-
- uring_close(task, uring->tfd);
- uring_close(task, uring->pipe[PIPE_RD]);
- uring_close(task, uring->pipe[PIPE_WR]);
-
- uring->tfd = -1;
- uring->pipe[PIPE_RD] = -1;
- uring->pipe[PIPE_WR] = -1;
-
- if (res >= 0) {
- cfg->splice_supported = true;
- debug(DBG_UR, "splice supported");
- } else if (res == -EINVAL)
- debug(DBG_UR, "splice not supported");
- else
- error("splice check failed: %i\n", res);
-}
-
-void
-uring_init()
-{
- struct uring_ev *uring;
-
- assert_return(!cfg->uring);
-
- uring = zmalloc(sizeof(*uring));
- if (!uring)
- die("malloc: %m");
-
- if (io_uring_queue_init_params(4096, &uring->uring, &uring->uring_params) < 0)
- die("io_uring_queue_init_params");
-
- debug(DBG_UR, "uring initialized, features: 0x%08x",
- uring->uring_params.features);
-
- uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free);
- cfg->uring = uring;
-
- /* splice check, a bit convoluted, but seems to be no simpler way */
- cfg->splice_supported = false;
- if (pipe2(uring->pipe, O_CLOEXEC) < 0)
- die("pipe2: %m");
- uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY);
- if (uring->tfd < 0)
- die("open(\"/dev/null\"): %m");
- uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb);
-}
-
-static inline void
-uring_print_cqe(const char *type, struct uring_task *task,
- struct io_uring_cqe *cqe)
-{
- assert_return(!empty_str(type) && task && cqe);
-
- debug(DBG_UR, "got CQE "
- "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)",
- type,
- cqe->res,
- cqe->res < 0 ? strerror(-cqe->res) : "ok",
- task->name ? task->name : "<none>",
- task,
- task->fd,
- task->cb);
-}
-
-void
-uring_event_loop()
-{
- while (true) {
- struct io_uring_cqe *cqe;
- unsigned nr, head;
- int r;
-
- io_uring_submit(&cfg->uring->uring);
- r = io_uring_wait_cqe(&cfg->uring->uring, &cqe);
- if (r < 0) {
- if (errno == EINTR)
- continue;
- else
- die("io_uring_wait_cqe: %i", r);
- }
-
- nr = 0;
- io_uring_for_each_cqe(&cfg->uring->uring, head, cqe) {
- struct uring_task *task = io_uring_cqe_get_data(cqe);
- bool do_cb;
- enum cqe_type cqe_type;
-
- cqe_count++;
-
- cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK);
- task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK);
-
- if (!task)
- die("null task");
-
- switch (cqe_type) {
- case CQE_TYPE_CANCEL:
- uring_print_cqe("cancel", task, cqe);
- do_cb = false;
- break;
-
- case CQE_TYPE_CLOSE:
- uring_print_cqe("close", task, cqe);
- do_cb = false;
- break;
-
- case CQE_TYPE_POLL_CANCEL:
- uring_print_cqe("poll_cancel", task, cqe);
- do_cb = false;
- break;
-
- case CQE_TYPE_NORMAL:
- uring_print_cqe("standard", task, cqe);
- do_cb = true;
- break;
-
- default:
- die("unknown CQE type");
- }
-
- if (do_cb && task->cb)
- task->cb(task, cqe->res);
-
- uring_task_put(task);
-
- if (exiting)
- return;
-
- nr++;
- }
-
- io_uring_cq_advance(&cfg->uring->uring, nr);
- }
-}
-