From ea053d96f7e89e053d4af8d39b04c5428760345f Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Tue, 23 Jun 2020 20:56:22 +0200 Subject: Big renaming, move some more functionality to shared lib --- minecproxy/uring.c | 759 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 759 insertions(+) create mode 100644 minecproxy/uring.c (limited to 'minecproxy/uring.c') diff --git a/minecproxy/uring.c b/minecproxy/uring.c new file mode 100644 index 0000000..e979471 --- /dev/null +++ b/minecproxy/uring.c @@ -0,0 +1,759 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include + +#include "main.h" +#include "uring.h" + +struct uring_ev { + struct io_uring uring; + struct io_uring_params uring_params; + struct uring_task task; + + /* for testing if the kernel supports splice */ + int pipe[2]; + int tfd; +}; + +enum cqe_type { + CQE_TYPE_NORMAL = 0x0, + CQE_TYPE_CANCEL = 0x1, + CQE_TYPE_CLOSE = 0x2, + CQE_TYPE_POLL_CANCEL = 0x3 +}; + +#define CQE_TYPE_PTR_MASK 0x3 + +uint64_t sqe_count = 0; +uint64_t cqe_count = 0; + +static struct io_uring_sqe * +get_sqe(struct uring_task *task) +{ + struct io_uring_sqe *sqe; + + assert_die(task, "invalid arguments"); + + sqe = io_uring_get_sqe(&cfg->uring->uring); + if (!sqe) { + io_uring_submit(&cfg->uring->uring); + sqe = io_uring_get_sqe(&cfg->uring->uring); + if (!sqe) + die("failed to get an sqe!"); + } + + sqe_count++; + uring_task_get(task); + return sqe; +} + +void +uring_task_refdump(struct uring_task *task) +{ + char buf[4096]; + struct uring_task *tmp; + + assert_return(task); + + if (!debug_enabled(DBG_REF)) + return; + + buf[0] = '\0'; + for (tmp = task; tmp; tmp = tmp->parent) { + size_t prefix; + char *dst; + + if (tmp->parent) + prefix = strlen("->") + strlen(tmp->name); + else + prefix = strlen(tmp->name); + + memmove(&buf[prefix], &buf[0], strlen(buf) + 1); + + dst = &buf[0]; + if (tmp->parent) { + *dst++ = '-'; + *dst++ = '>'; + } + + memcpy(dst, tmp->name, strlen(tmp->name)); + } + + info("%s (0x%p parent 0x%p free 0x%p fd %i ref %u)", + buf, task, task->parent, task->free, task->fd, + task->refcount); +} + +/* + * Similar to uring_task_put, but can be called from other tasks + * while the task is active. + */ +void +uring_task_destroy(struct uring_task *task) +{ + assert_return(task); + assert_return_silent(!task->dead); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (task->fd >= 0) { + struct io_uring_sqe *sqe; + + sqe = get_sqe(task); + io_uring_prep_cancel(sqe, task, 0); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL)); + } + + task->dead = true; + uring_task_put(task); +} + +void +uring_task_put(struct uring_task *task) +{ + struct uring_task *parent; + + assert_return(task); + + debug(DBG_REF, "task %s (%p), refcount %u -> %u", + task->name, task, task->refcount, task->refcount - 1); + + task->refcount--; + + if (task->refcount > 0) + return; + + if (task->refcount < 0) + error("Negative refcount!"); + + if (task->fd >= 0) { + uring_task_close_fd(task); + /* We'll be called again once the fd is closed */ + return; + } + + parent = task->parent; + if (task->free) + task->free(task); + + if (parent) { + debug(DBG_REF, "putting parent %s (%p)", parent->name, parent); + uring_task_put(parent); + } +} + +void +uring_task_get(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_REF, "task %s (%p), refcount %u -> %u", + task->name, task, task->refcount, task->refcount + 1); + + if (task->refcount < 0) + error("Negative refcount!"); + + task->refcount++; +} + +void +uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) +{ + assert_return(task && tbuf); + + debug(DBG_UR, "task %s (%p), buf %p, refcount %u", + task->name, task, tbuf, task->refcount); + + /* iov_len and msg_namelen are set at send/receive time */ + tbuf->iov.iov_base = tbuf->buf; + tbuf->msg.msg_name = &task->saddr.storage; + tbuf->msg.msg_iov = &tbuf->iov; + tbuf->msg.msg_iovlen = 1; + tbuf->msg.msg_control = NULL; + tbuf->msg.msg_controllen = 0; + tbuf->msg.msg_flags = 0; + task->tbuf = tbuf; +} + +void +uring_task_set_fd(struct uring_task *task, int fd) +{ + assert_return(task); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, fd, task->refcount); + + task->fd = fd; +} + +void +uring_task_close_fd(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (task->fd < 0) + return; + + uring_close(task, task->fd); + task->fd = -1; +} + +struct uring_task * +uring_parent() +{ + assert_die(cfg->uring, "invalid arguments"); + + return &cfg->uring->task; +} + +void +uring_task_init(struct uring_task *task, const char *name, + struct uring_task *parent, void (*free)(struct uring_task *)) +{ + static bool first = true; + + assert_die(task && !empty_str(name) && free, "invalid arguments"); + + if (first) + first = false; + else + assert_die(parent, "called without a parent task"); + + task->refcount = 1; + task->fd = -1; + task->parent = parent; + task->free = free; + task->dead = false; + task->name = name; + task->tbuf = NULL; + + if (task->parent) { + debug(DBG_REF, "task %s (%p), refcount %u, " + "getting parent %s (%p), refcount %u", + task->name, task, task->refcount, + task->parent->name, task->parent, task->parent->refcount); + uring_task_get(task->parent); + } +} + +void +uring_close(struct uring_task *task, int fd) +{ + struct io_uring_sqe *sqe; + + assert_return(task && fd >= 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + io_uring_prep_close(sqe, fd); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE)); +} + +static void +uring_tbuf_write_cb(struct uring_task *task, int res) +{ + int r; + + assert_return(task && task->tbuf && task->final_cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (res < 0) { + r = res; + goto finished; + } + + /* We wrote some more data */ + task->tbuf->done += res; + if (task->tbuf->done >= task->tbuf->len || res == 0) { + r = task->tbuf->len; + goto finished; + } + + uring_write(task, task->tbuf->buf + task->tbuf->done, + task->tbuf->len - task->tbuf->done, + uring_tbuf_write_cb); + return; + +finished: + task->final_cb(task, r); +} + +void +uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb) +{ + assert_return(task && task->fd >= 0 && task->tbuf && task->tbuf->len > 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + task->tbuf->done = 0; + task->final_cb = final_cb; + uring_write(task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); +} + +void +uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && buf && len > 0 && cb && task->fd >= 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_write(sqe, task->fd, buf, len, 0); + io_uring_sqe_set_data(sqe, task); +} + +static void +uring_tbuf_read_until_cb(struct uring_task *task, int res) +{ + int r; + + assert_return(task && task->tbuf && task->final_cb && task->is_complete_cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (res < 0) { + r = res; + goto finished; + } + + task->tbuf->len += res; + r = task->is_complete_cb(task, res); + if (r < 0) { + r = res; + goto finished; + } else if (r > 0) { + r = task->tbuf->len; + goto finished; + } + + /* Need to read more */ + if (task->tbuf->len >= sizeof(task->tbuf->buf)) { + r = E2BIG; + goto finished; + } + + uring_read_offset(task, task->tbuf->buf + task->tbuf->len, + sizeof(task->tbuf->buf) - task->tbuf->len, + task->tbuf->len, uring_tbuf_read_until_cb); + return; + +finished: + task->final_cb(task, r); +} + +void +uring_tbuf_read_until(struct uring_task *task, + rutask_cb_t is_complete_cb, utask_cb_t final_cb) +{ + assert_return(task && task->fd >= 0 && task->tbuf && is_complete_cb && final_cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + task->tbuf->len = 0; + task->is_complete_cb = is_complete_cb; + task->final_cb = final_cb; + uring_read(task, &task->tbuf->buf, sizeof(task->tbuf->buf), + uring_tbuf_read_until_cb); +} + +static int +uring_tbuf_eof(struct uring_task *task, int res) +{ + assert_return(task && task->tbuf, -EINVAL); + assert_task_alive_or(DBG_UR, task, return -EINTR); + + if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) + return -E2BIG; + + if (res > 0) + return 0; + + task->tbuf->buf[task->tbuf->len] = '\0'; + task->tbuf->len++; + return 1; +} + +void +uring_tbuf_read_until_eof(struct uring_task *task, + utask_cb_t final_cb) +{ + assert_return(task && task->tbuf && final_cb); + + uring_tbuf_read_until(task, uring_tbuf_eof, final_cb); +} + +static int +uring_tbuf_have_data(struct uring_task *task, int res) +{ + assert_return(task, -EINVAL); + + if (res < 0) + return res; + else + return 1; +} + +void +uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb) +{ + assert_return(task && final_cb); + + uring_tbuf_read_until(task, uring_tbuf_have_data, final_cb); +} + +void +uring_read_offset(struct uring_task *task, void *buf, size_t len, off_t offset, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && buf && len > 0 && task->fd >= 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_read(sqe, task->fd, buf, len, offset); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_openat(struct uring_task *task, const char *path, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && !empty_str(path) && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && task->tbuf && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->tbuf->done = 0; + task->tbuf->len = 0; + task->tbuf->iov.iov_len = sizeof(task->tbuf->buf); + task->tbuf->msg.msg_namelen = task->saddr.addrlen; + task->cb = cb; + io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && task->tbuf && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->tbuf->done = 0; + task->tbuf->iov.iov_len = task->tbuf->len; + task->tbuf->msg.msg_namelen = task->saddr.addrlen; + task->cb = cb; + io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && saddr && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&saddr->storage, saddr->addrlen); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && saddr && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + saddr->addrlen = sizeof(saddr->storage); + task->cb = cb; + io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&saddr->storage, &saddr->addrlen, SOCK_CLOEXEC); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && fd_in >= 0 && fd_out >= 0 && cb); + + debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u", + task->name, task, fd_in, fd_out, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && poll_mask && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_poll_add(sqe, task->fd, poll_mask); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_poll_cancel(struct uring_task *task) +{ + struct io_uring_sqe *sqe; + + assert_return(task); + + if (task->fd < 0) { + /* not an error, no need to print error msg */ + return; + } + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->dead = true; + io_uring_prep_poll_remove(sqe, task); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL)); +} + +static void +uring_free(struct uring_task *task) +{ + struct uring_ev *uring = container_of(task, struct uring_ev, task); + + assert_return(task); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + io_uring_queue_exit(&uring->uring); + cfg->uring = NULL; + xfree(uring); +} + +void +uring_refdump() +{ + assert_return(cfg->uring); + + uring_task_refdump(&cfg->uring->task); +} + +void +uring_delete() +{ + struct uring_task *task; + + assert_return(cfg->uring); + + task = &cfg->uring->task; + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + uring_task_put(task); +} + +static void +uring_splice_test_cb(struct uring_task *task, int res) +{ + struct uring_ev *uring = container_of(task, struct uring_ev, task); + + assert_die(task && uring == cfg->uring, "splice test failed"); + + uring_close(task, uring->tfd); + uring_close(task, uring->pipe[PIPE_RD]); + uring_close(task, uring->pipe[PIPE_WR]); + + uring->tfd = -1; + uring->pipe[PIPE_RD] = -1; + uring->pipe[PIPE_WR] = -1; + + if (res >= 0) { + cfg->splice_supported = true; + debug(DBG_UR, "splice supported"); + } else if (res == -EINVAL) + debug(DBG_UR, "splice not supported"); + else + error("splice check failed: %i\n", res); +} + +void +uring_init() +{ + struct uring_ev *uring; + + assert_return(!cfg->uring); + + uring = zmalloc(sizeof(*uring)); + if (!uring) + die("malloc: %m"); + + if (io_uring_queue_init_params(4096, &uring->uring, &uring->uring_params) < 0) + die("io_uring_queue_init_params"); + + debug(DBG_UR, "uring initialized, features: 0x%08x", + uring->uring_params.features); + + uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free); + cfg->uring = uring; + + /* splice check, a bit convoluted, but seems to be no simpler way */ + cfg->splice_supported = false; + if (pipe2(uring->pipe, O_CLOEXEC) < 0) + die("pipe2: %m"); + uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY); + if (uring->tfd < 0) + die("open(\"/dev/null\"): %m"); + uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb); +} + +static inline void +uring_print_cqe(const char *type, struct uring_task *task, + struct io_uring_cqe *cqe) +{ + assert_return(!empty_str(type) && task && cqe); + + debug(DBG_UR, "got CQE " + "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)", + type, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task->name ? task->name : "", + task, + task->fd, + task->cb); +} + +void +uring_event_loop() +{ + while (true) { + struct io_uring_cqe *cqe; + unsigned nr, head; + int r; + + io_uring_submit(&cfg->uring->uring); + r = io_uring_wait_cqe(&cfg->uring->uring, &cqe); + if (r < 0) { + if (errno == EINTR) + continue; + else + die("io_uring_wait_cqe: %i", r); + } + + nr = 0; + io_uring_for_each_cqe(&cfg->uring->uring, head, cqe) { + struct uring_task *task = io_uring_cqe_get_data(cqe); + bool do_cb; + enum cqe_type cqe_type; + + cqe_count++; + + cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK); + task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK); + + if (!task) + die("null task"); + + switch (cqe_type) { + case CQE_TYPE_CANCEL: + uring_print_cqe("cancel", task, cqe); + do_cb = false; + break; + + case CQE_TYPE_CLOSE: + uring_print_cqe("close", task, cqe); + do_cb = false; + break; + + case CQE_TYPE_POLL_CANCEL: + uring_print_cqe("poll_cancel", task, cqe); + do_cb = false; + break; + + case CQE_TYPE_NORMAL: + uring_print_cqe("standard", task, cqe); + do_cb = true; + break; + + default: + die("unknown CQE type"); + } + + if (do_cb && task->cb) + task->cb(task, cqe->res); + + uring_task_put(task); + + if (exiting) + return; + + nr++; + } + + io_uring_cq_advance(&cfg->uring->uring, nr); + } +} + -- cgit v1.2.3