From db66484c4300f5f0e857eff01d15fd3593002a79 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Fri, 5 Jun 2020 14:09:18 +0200 Subject: Initial commit --- uring.c | 294 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 uring.c (limited to 'uring.c') diff --git a/uring.c b/uring.c new file mode 100644 index 0000000..c43a8fb --- /dev/null +++ b/uring.c @@ -0,0 +1,294 @@ +#include +#include +#include +#include +#include +#include + +#include "main.h" +#include "uring.h" +#include "config.h" + +struct uring_ev { + struct io_uring uring; + struct io_uring_params uring_params; + struct uring_task task; +}; + +void +uring_task_refdump(struct uring_task *task) +{ + char buf[4096]; + struct uring_task *tmp; + + buf[0] = '\0'; + for (tmp = task; tmp; tmp = tmp->parent) { + size_t prefix; + char *dst; + + if (tmp->parent) + prefix = strlen("->") + strlen(tmp->name); + else + prefix = strlen(tmp->name); + + memmove(&buf[prefix], &buf[0], strlen(buf) + 1); + + dst = &buf[0]; + if (tmp->parent) { + *dst++ = '-'; + *dst++ = '>'; + } + + memcpy(dst, tmp->name, strlen(tmp->name)); + } + + fprintf(stderr, "%s (0x%p parent 0x%p free 0x%p fd %i ref %u)\n", + buf, task, task->parent, task->free, task->fd, + task->refcount); +} + +void +uring_task_put(struct cfg *cfg, struct uring_task *task) +{ + struct uring_task *parent = task->parent; + + fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); + task->refcount--; + + if (task->refcount > 0) + return; + + if (task->refcount < 0) + error("Negative refcount!\n"); + + if (task->fd >= 0) { + uring_task_close_fd(cfg, task); + /* We'll be called again once the fd is closed */ + return; + } + + if (task->free) + task->free(task); + + if (parent) + uring_task_put(NULL, parent); +} + +void +uring_task_get(struct cfg *cfg, struct uring_task *task) +{ + fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); + + if (task->refcount < 0) + error("Negative refcount!\n"); + + task->refcount++; +} + +void +uring_task_set_fd(struct uring_task *task, int fd) +{ + if (task->fd >= 0) + error("Leaking fd %i\n", task->fd); + + task->fd = fd; +} + +void +uring_task_close_fd(struct cfg *cfg, struct uring_task *task) +{ + fprintf(stderr, "%s called with task 0x%p\n", __func__, task); + if (task->fd < 0) + return; + + uring_close(cfg, task, task->fd, NULL); + task->fd = -1; +} + +void +uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *)) +{ + if (!free) + die("uring_task_init called without destructor\n"); + + task->refcount = 1; + task->fd = -1; + task->parent = parent; + task->free = free; + task->dead = false; + task->name = name; + + if (task->parent) + uring_task_get(NULL, task->parent); +} + +void +uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + + fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback); + if (!sqe) + perrordie("io_uring_sqe"); + + if (task) { + uring_task_get(cfg, task); + task->callback = callback; + } + io_uring_prep_close(sqe, fd); + io_uring_sqe_set_data(sqe, task); + + fprintf(stderr, "%s: done\n", __func__); +} + +void +uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + + if (!sqe) + perrordie("io_uring_sqe"); + + if (task->fd < 0) { + error("uring_read called with no fd set\n"); + return; + } + + uring_task_get(cfg, task); + task->callback = callback; + io_uring_prep_read(sqe, task->fd, buf, len, offset); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + + if (!sqe) + perrordie("io_uring_sqe"); + + uring_task_get(cfg, task); + task->callback = callback; + io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + + if (!sqe) + perrordie("io_uring_sqe"); + + if (task->fd < 0) { + error("uring_connect called with no fd set\n"); + return; + } + + uring_task_get(cfg, task); + task->callback = callback; + io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + addr->addrlen = sizeof(addr->storage); + + if (!sqe) + perrordie("io_uring_sqe"); + + if (task->fd < 0) { + error("uring_accept called with no fd set\n"); + return; + } + + uring_task_get(cfg, task); + task->callback = callback; + io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_cancel(struct cfg *cfg, struct uring_task *task) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + + if (!sqe) + perrordie("io_uring_sqe"); + + task->dead = true; + io_uring_prep_cancel(sqe, task, 0); + io_uring_sqe_set_data(sqe, NULL); +} + +static void +uring_free(struct uring_task *task) +{ + fprintf(stderr, "%s called\n", __func__); +} + +void +uring_refdump(struct uring_ev *uev) +{ + uring_task_refdump(&uev->task); +} + +void +uring_init(struct cfg *cfg) +{ + struct uring_ev *uev; + + uev = zmalloc(sizeof(*uev)); + if (!uev) + perrordie("malloc"); + + if (io_uring_queue_init_params(4096, &uev->uring, &uev->uring_params) < 0) + perrordie("io_uring_queue_init_params"); + + fprintf(stderr, "uring initialized, features: 0x%08x\n", uev->uring_params.features); + + uring_task_init(&uev->task, "uev", &cfg->task, uring_free); + cfg->uev = uev; +} + +int +uring_event_loop(struct cfg *cfg) +{ + while (true) { + struct io_uring_cqe *cqe; + unsigned nr, head; + int r; + + io_uring_submit(&cfg->uev->uring); + r = io_uring_wait_cqe(&cfg->uev->uring, &cqe); + if (r < 0) { + if (errno == EINTR) + continue; + else + perrordie("io_uring_wait_cqe"); + } + + nr = 0; + io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) { + struct uring_task *task = io_uring_cqe_get_data(cqe); + + fprintf(stderr, "%s: got CEQ (res: %i, task: 0x%p, cb: 0x%p)\n", __func__, cqe->res, task, task ? task->callback : NULL); + if (task && task->callback) + task->callback(cfg, task, cqe->res); + nr++; + if (task) + uring_task_put(cfg, task); + } + + printf("%s: %u CQEs treated\n", __func__, nr); + io_uring_cq_advance(&cfg->uev->uring, nr); + } + + return 0; +} + -- cgit v1.2.3