summaryrefslogtreecommitdiff
path: root/minecproxy/uring.c
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-23 20:56:22 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-23 20:56:22 +0200
commitea053d96f7e89e053d4af8d39b04c5428760345f (patch)
tree8182ca73675ad3933b0f38cb48a99c69101309b4 /minecproxy/uring.c
parent8c27290245b7bcc7cd2f72f3b4a7562294b43bbe (diff)
Big renaming, move some more functionality to shared lib
Diffstat (limited to 'minecproxy/uring.c')
-rw-r--r--minecproxy/uring.c759
1 files changed, 759 insertions, 0 deletions
diff --git a/minecproxy/uring.c b/minecproxy/uring.c
new file mode 100644
index 0000000..e979471
--- /dev/null
+++ b/minecproxy/uring.c
@@ -0,0 +1,759 @@
+#define _GNU_SOURCE
+#include <liburing.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "main.h"
+#include "uring.h"
+
+struct uring_ev {
+ struct io_uring uring;
+ struct io_uring_params uring_params;
+ struct uring_task task;
+
+ /* for testing if the kernel supports splice */
+ int pipe[2];
+ int tfd;
+};
+
+enum cqe_type {
+ CQE_TYPE_NORMAL = 0x0,
+ CQE_TYPE_CANCEL = 0x1,
+ CQE_TYPE_CLOSE = 0x2,
+ CQE_TYPE_POLL_CANCEL = 0x3
+};
+
+#define CQE_TYPE_PTR_MASK 0x3
+
+uint64_t sqe_count = 0;
+uint64_t cqe_count = 0;
+
+static struct io_uring_sqe *
+get_sqe(struct uring_task *task)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_die(task, "invalid arguments");
+
+ sqe = io_uring_get_sqe(&cfg->uring->uring);
+ if (!sqe) {
+ io_uring_submit(&cfg->uring->uring);
+ sqe = io_uring_get_sqe(&cfg->uring->uring);
+ if (!sqe)
+ die("failed to get an sqe!");
+ }
+
+ sqe_count++;
+ uring_task_get(task);
+ return sqe;
+}
+
+void
+uring_task_refdump(struct uring_task *task)
+{
+ char buf[4096];
+ struct uring_task *tmp;
+
+ assert_return(task);
+
+ if (!debug_enabled(DBG_REF))
+ return;
+
+ buf[0] = '\0';
+ for (tmp = task; tmp; tmp = tmp->parent) {
+ size_t prefix;
+ char *dst;
+
+ if (tmp->parent)
+ prefix = strlen("->") + strlen(tmp->name);
+ else
+ prefix = strlen(tmp->name);
+
+ memmove(&buf[prefix], &buf[0], strlen(buf) + 1);
+
+ dst = &buf[0];
+ if (tmp->parent) {
+ *dst++ = '-';
+ *dst++ = '>';
+ }
+
+ memcpy(dst, tmp->name, strlen(tmp->name));
+ }
+
+ info("%s (0x%p parent 0x%p free 0x%p fd %i ref %u)",
+ buf, task, task->parent, task->free, task->fd,
+ task->refcount);
+}
+
+/*
+ * Similar to uring_task_put, but can be called from other tasks
+ * while the task is active.
+ */
+void
+uring_task_destroy(struct uring_task *task)
+{
+ assert_return(task);
+ assert_return_silent(!task->dead);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ if (task->fd >= 0) {
+ struct io_uring_sqe *sqe;
+
+ sqe = get_sqe(task);
+ io_uring_prep_cancel(sqe, task, 0);
+ io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL));
+ }
+
+ task->dead = true;
+ uring_task_put(task);
+}
+
+void
+uring_task_put(struct uring_task *task)
+{
+ struct uring_task *parent;
+
+ assert_return(task);
+
+ debug(DBG_REF, "task %s (%p), refcount %u -> %u",
+ task->name, task, task->refcount, task->refcount - 1);
+
+ task->refcount--;
+
+ if (task->refcount > 0)
+ return;
+
+ if (task->refcount < 0)
+ error("Negative refcount!");
+
+ if (task->fd >= 0) {
+ uring_task_close_fd(task);
+ /* We'll be called again once the fd is closed */
+ return;
+ }
+
+ parent = task->parent;
+ if (task->free)
+ task->free(task);
+
+ if (parent) {
+ debug(DBG_REF, "putting parent %s (%p)", parent->name, parent);
+ uring_task_put(parent);
+ }
+}
+
+void
+uring_task_get(struct uring_task *task)
+{
+ assert_return(task);
+
+ debug(DBG_REF, "task %s (%p), refcount %u -> %u",
+ task->name, task, task->refcount, task->refcount + 1);
+
+ if (task->refcount < 0)
+ error("Negative refcount!");
+
+ task->refcount++;
+}
+
+void
+uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf)
+{
+ assert_return(task && tbuf);
+
+ debug(DBG_UR, "task %s (%p), buf %p, refcount %u",
+ task->name, task, tbuf, task->refcount);
+
+ /* iov_len and msg_namelen are set at send/receive time */
+ tbuf->iov.iov_base = tbuf->buf;
+ tbuf->msg.msg_name = &task->saddr.storage;
+ tbuf->msg.msg_iov = &tbuf->iov;
+ tbuf->msg.msg_iovlen = 1;
+ tbuf->msg.msg_control = NULL;
+ tbuf->msg.msg_controllen = 0;
+ tbuf->msg.msg_flags = 0;
+ task->tbuf = tbuf;
+}
+
+void
+uring_task_set_fd(struct uring_task *task, int fd)
+{
+ assert_return(task);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, fd, task->refcount);
+
+ task->fd = fd;
+}
+
+void
+uring_task_close_fd(struct uring_task *task)
+{
+ assert_return(task);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ if (task->fd < 0)
+ return;
+
+ uring_close(task, task->fd);
+ task->fd = -1;
+}
+
+struct uring_task *
+uring_parent()
+{
+ assert_die(cfg->uring, "invalid arguments");
+
+ return &cfg->uring->task;
+}
+
+void
+uring_task_init(struct uring_task *task, const char *name,
+ struct uring_task *parent, void (*free)(struct uring_task *))
+{
+ static bool first = true;
+
+ assert_die(task && !empty_str(name) && free, "invalid arguments");
+
+ if (first)
+ first = false;
+ else
+ assert_die(parent, "called without a parent task");
+
+ task->refcount = 1;
+ task->fd = -1;
+ task->parent = parent;
+ task->free = free;
+ task->dead = false;
+ task->name = name;
+ task->tbuf = NULL;
+
+ if (task->parent) {
+ debug(DBG_REF, "task %s (%p), refcount %u, "
+ "getting parent %s (%p), refcount %u",
+ task->name, task, task->refcount,
+ task->parent->name, task->parent, task->parent->refcount);
+ uring_task_get(task->parent);
+ }
+}
+
+void
+uring_close(struct uring_task *task, int fd)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && fd >= 0);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ io_uring_prep_close(sqe, fd);
+ io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE));
+}
+
+static void
+uring_tbuf_write_cb(struct uring_task *task, int res)
+{
+ int r;
+
+ assert_return(task && task->tbuf && task->final_cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ if (res < 0) {
+ r = res;
+ goto finished;
+ }
+
+ /* We wrote some more data */
+ task->tbuf->done += res;
+ if (task->tbuf->done >= task->tbuf->len || res == 0) {
+ r = task->tbuf->len;
+ goto finished;
+ }
+
+ uring_write(task, task->tbuf->buf + task->tbuf->done,
+ task->tbuf->len - task->tbuf->done,
+ uring_tbuf_write_cb);
+ return;
+
+finished:
+ task->final_cb(task, r);
+}
+
+void
+uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb)
+{
+ assert_return(task && task->fd >= 0 && task->tbuf && task->tbuf->len > 0);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ task->tbuf->done = 0;
+ task->final_cb = final_cb;
+ uring_write(task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb);
+}
+
+void
+uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && buf && len > 0 && cb && task->fd >= 0);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_write(sqe, task->fd, buf, len, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+static void
+uring_tbuf_read_until_cb(struct uring_task *task, int res)
+{
+ int r;
+
+ assert_return(task && task->tbuf && task->final_cb && task->is_complete_cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ if (res < 0) {
+ r = res;
+ goto finished;
+ }
+
+ task->tbuf->len += res;
+ r = task->is_complete_cb(task, res);
+ if (r < 0) {
+ r = res;
+ goto finished;
+ } else if (r > 0) {
+ r = task->tbuf->len;
+ goto finished;
+ }
+
+ /* Need to read more */
+ if (task->tbuf->len >= sizeof(task->tbuf->buf)) {
+ r = E2BIG;
+ goto finished;
+ }
+
+ uring_read_offset(task, task->tbuf->buf + task->tbuf->len,
+ sizeof(task->tbuf->buf) - task->tbuf->len,
+ task->tbuf->len, uring_tbuf_read_until_cb);
+ return;
+
+finished:
+ task->final_cb(task, r);
+}
+
+void
+uring_tbuf_read_until(struct uring_task *task,
+ rutask_cb_t is_complete_cb, utask_cb_t final_cb)
+{
+ assert_return(task && task->fd >= 0 && task->tbuf && is_complete_cb && final_cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ task->tbuf->len = 0;
+ task->is_complete_cb = is_complete_cb;
+ task->final_cb = final_cb;
+ uring_read(task, &task->tbuf->buf, sizeof(task->tbuf->buf),
+ uring_tbuf_read_until_cb);
+}
+
+static int
+uring_tbuf_eof(struct uring_task *task, int res)
+{
+ assert_return(task && task->tbuf, -EINVAL);
+ assert_task_alive_or(DBG_UR, task, return -EINTR);
+
+ if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf))
+ return -E2BIG;
+
+ if (res > 0)
+ return 0;
+
+ task->tbuf->buf[task->tbuf->len] = '\0';
+ task->tbuf->len++;
+ return 1;
+}
+
+void
+uring_tbuf_read_until_eof(struct uring_task *task,
+ utask_cb_t final_cb)
+{
+ assert_return(task && task->tbuf && final_cb);
+
+ uring_tbuf_read_until(task, uring_tbuf_eof, final_cb);
+}
+
+static int
+uring_tbuf_have_data(struct uring_task *task, int res)
+{
+ assert_return(task, -EINVAL);
+
+ if (res < 0)
+ return res;
+ else
+ return 1;
+}
+
+void
+uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb)
+{
+ assert_return(task && final_cb);
+
+ uring_tbuf_read_until(task, uring_tbuf_have_data, final_cb);
+}
+
+void
+uring_read_offset(struct uring_task *task, void *buf, size_t len, off_t offset, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && buf && len > 0 && task->fd >= 0);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_read(sqe, task->fd, buf, len, offset);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_openat(struct uring_task *task, const char *path, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && !empty_str(path) && cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && task->fd >= 0 && task->tbuf && cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->tbuf->done = 0;
+ task->tbuf->len = 0;
+ task->tbuf->iov.iov_len = sizeof(task->tbuf->buf);
+ task->tbuf->msg.msg_namelen = task->saddr.addrlen;
+ task->cb = cb;
+ io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && task->fd >= 0 && task->tbuf && cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->tbuf->done = 0;
+ task->tbuf->iov.iov_len = task->tbuf->len;
+ task->tbuf->msg.msg_namelen = task->saddr.addrlen;
+ task->cb = cb;
+ io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && task->fd >= 0 && saddr && cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&saddr->storage, saddr->addrlen);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && task->fd >= 0 && saddr && cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ saddr->addrlen = sizeof(saddr->storage);
+ task->cb = cb;
+ io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&saddr->storage, &saddr->addrlen, SOCK_CLOEXEC);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && fd_in >= 0 && fd_out >= 0 && cb);
+
+ debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u",
+ task->name, task, fd_in, fd_out, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && task->fd >= 0 && poll_mask && cb);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_poll_add(sqe, task->fd, poll_mask);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_poll_cancel(struct uring_task *task)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task);
+
+ if (task->fd < 0) {
+ /* not an error, no need to print error msg */
+ return;
+ }
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ sqe = get_sqe(task);
+ task->dead = true;
+ io_uring_prep_poll_remove(sqe, task);
+ io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL));
+}
+
+static void
+uring_free(struct uring_task *task)
+{
+ struct uring_ev *uring = container_of(task, struct uring_ev, task);
+
+ assert_return(task);
+
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ io_uring_queue_exit(&uring->uring);
+ cfg->uring = NULL;
+ xfree(uring);
+}
+
+void
+uring_refdump()
+{
+ assert_return(cfg->uring);
+
+ uring_task_refdump(&cfg->uring->task);
+}
+
+void
+uring_delete()
+{
+ struct uring_task *task;
+
+ assert_return(cfg->uring);
+
+ task = &cfg->uring->task;
+ debug(DBG_UR, "task %s (%p), fd %i, refcount %u",
+ task->name, task, task->fd, task->refcount);
+
+ uring_task_put(task);
+}
+
+static void
+uring_splice_test_cb(struct uring_task *task, int res)
+{
+ struct uring_ev *uring = container_of(task, struct uring_ev, task);
+
+ assert_die(task && uring == cfg->uring, "splice test failed");
+
+ uring_close(task, uring->tfd);
+ uring_close(task, uring->pipe[PIPE_RD]);
+ uring_close(task, uring->pipe[PIPE_WR]);
+
+ uring->tfd = -1;
+ uring->pipe[PIPE_RD] = -1;
+ uring->pipe[PIPE_WR] = -1;
+
+ if (res >= 0) {
+ cfg->splice_supported = true;
+ debug(DBG_UR, "splice supported");
+ } else if (res == -EINVAL)
+ debug(DBG_UR, "splice not supported");
+ else
+ error("splice check failed: %i\n", res);
+}
+
+void
+uring_init()
+{
+ struct uring_ev *uring;
+
+ assert_return(!cfg->uring);
+
+ uring = zmalloc(sizeof(*uring));
+ if (!uring)
+ die("malloc: %m");
+
+ if (io_uring_queue_init_params(4096, &uring->uring, &uring->uring_params) < 0)
+ die("io_uring_queue_init_params");
+
+ debug(DBG_UR, "uring initialized, features: 0x%08x",
+ uring->uring_params.features);
+
+ uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free);
+ cfg->uring = uring;
+
+ /* splice check, a bit convoluted, but seems to be no simpler way */
+ cfg->splice_supported = false;
+ if (pipe2(uring->pipe, O_CLOEXEC) < 0)
+ die("pipe2: %m");
+ uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY);
+ if (uring->tfd < 0)
+ die("open(\"/dev/null\"): %m");
+ uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb);
+}
+
+static inline void
+uring_print_cqe(const char *type, struct uring_task *task,
+ struct io_uring_cqe *cqe)
+{
+ assert_return(!empty_str(type) && task && cqe);
+
+ debug(DBG_UR, "got CQE "
+ "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)",
+ type,
+ cqe->res,
+ cqe->res < 0 ? strerror(-cqe->res) : "ok",
+ task->name ? task->name : "<none>",
+ task,
+ task->fd,
+ task->cb);
+}
+
+void
+uring_event_loop()
+{
+ while (true) {
+ struct io_uring_cqe *cqe;
+ unsigned nr, head;
+ int r;
+
+ io_uring_submit(&cfg->uring->uring);
+ r = io_uring_wait_cqe(&cfg->uring->uring, &cqe);
+ if (r < 0) {
+ if (errno == EINTR)
+ continue;
+ else
+ die("io_uring_wait_cqe: %i", r);
+ }
+
+ nr = 0;
+ io_uring_for_each_cqe(&cfg->uring->uring, head, cqe) {
+ struct uring_task *task = io_uring_cqe_get_data(cqe);
+ bool do_cb;
+ enum cqe_type cqe_type;
+
+ cqe_count++;
+
+ cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK);
+ task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK);
+
+ if (!task)
+ die("null task");
+
+ switch (cqe_type) {
+ case CQE_TYPE_CANCEL:
+ uring_print_cqe("cancel", task, cqe);
+ do_cb = false;
+ break;
+
+ case CQE_TYPE_CLOSE:
+ uring_print_cqe("close", task, cqe);
+ do_cb = false;
+ break;
+
+ case CQE_TYPE_POLL_CANCEL:
+ uring_print_cqe("poll_cancel", task, cqe);
+ do_cb = false;
+ break;
+
+ case CQE_TYPE_NORMAL:
+ uring_print_cqe("standard", task, cqe);
+ do_cb = true;
+ break;
+
+ default:
+ die("unknown CQE type");
+ }
+
+ if (do_cb && task->cb)
+ task->cb(task, cqe->res);
+
+ uring_task_put(task);
+
+ if (exiting)
+ return;
+
+ nr++;
+ }
+
+ io_uring_cq_advance(&cfg->uring->uring, nr);
+ }
+}
+