summaryrefslogtreecommitdiff
path: root/uring.c
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-05 14:09:18 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-05 14:09:18 +0200
commitdb66484c4300f5f0e857eff01d15fd3593002a79 (patch)
treea787b9f0da1243ae0391d5931ecb9cb0f29d3ee4 /uring.c
Initial commit
Diffstat (limited to 'uring.c')
-rw-r--r--uring.c294
1 files changed, 294 insertions, 0 deletions
diff --git a/uring.c b/uring.c
new file mode 100644
index 0000000..c43a8fb
--- /dev/null
+++ b/uring.c
@@ -0,0 +1,294 @@
+#include <liburing.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+
+#include "main.h"
+#include "uring.h"
+#include "config.h"
+
+struct uring_ev {
+ struct io_uring uring;
+ struct io_uring_params uring_params;
+ struct uring_task task;
+};
+
+void
+uring_task_refdump(struct uring_task *task)
+{
+ char buf[4096];
+ struct uring_task *tmp;
+
+ buf[0] = '\0';
+ for (tmp = task; tmp; tmp = tmp->parent) {
+ size_t prefix;
+ char *dst;
+
+ if (tmp->parent)
+ prefix = strlen("->") + strlen(tmp->name);
+ else
+ prefix = strlen(tmp->name);
+
+ memmove(&buf[prefix], &buf[0], strlen(buf) + 1);
+
+ dst = &buf[0];
+ if (tmp->parent) {
+ *dst++ = '-';
+ *dst++ = '>';
+ }
+
+ memcpy(dst, tmp->name, strlen(tmp->name));
+ }
+
+ fprintf(stderr, "%s (0x%p parent 0x%p free 0x%p fd %i ref %u)\n",
+ buf, task, task->parent, task->free, task->fd,
+ task->refcount);
+}
+
+void
+uring_task_put(struct cfg *cfg, struct uring_task *task)
+{
+ struct uring_task *parent = task->parent;
+
+ fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount);
+ task->refcount--;
+
+ if (task->refcount > 0)
+ return;
+
+ if (task->refcount < 0)
+ error("Negative refcount!\n");
+
+ if (task->fd >= 0) {
+ uring_task_close_fd(cfg, task);
+ /* We'll be called again once the fd is closed */
+ return;
+ }
+
+ if (task->free)
+ task->free(task);
+
+ if (parent)
+ uring_task_put(NULL, parent);
+}
+
+void
+uring_task_get(struct cfg *cfg, struct uring_task *task)
+{
+ fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount);
+
+ if (task->refcount < 0)
+ error("Negative refcount!\n");
+
+ task->refcount++;
+}
+
+void
+uring_task_set_fd(struct uring_task *task, int fd)
+{
+ if (task->fd >= 0)
+ error("Leaking fd %i\n", task->fd);
+
+ task->fd = fd;
+}
+
+void
+uring_task_close_fd(struct cfg *cfg, struct uring_task *task)
+{
+ fprintf(stderr, "%s called with task 0x%p\n", __func__, task);
+ if (task->fd < 0)
+ return;
+
+ uring_close(cfg, task, task->fd, NULL);
+ task->fd = -1;
+}
+
+void
+uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *))
+{
+ if (!free)
+ die("uring_task_init called without destructor\n");
+
+ task->refcount = 1;
+ task->fd = -1;
+ task->parent = parent;
+ task->free = free;
+ task->dead = false;
+ task->name = name;
+
+ if (task->parent)
+ uring_task_get(NULL, task->parent);
+}
+
+void
+uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+
+ fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback);
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ if (task) {
+ uring_task_get(cfg, task);
+ task->callback = callback;
+ }
+ io_uring_prep_close(sqe, fd);
+ io_uring_sqe_set_data(sqe, task);
+
+ fprintf(stderr, "%s: done\n", __func__);
+}
+
+void
+uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ if (task->fd < 0) {
+ error("uring_read called with no fd set\n");
+ return;
+ }
+
+ uring_task_get(cfg, task);
+ task->callback = callback;
+ io_uring_prep_read(sqe, task->fd, buf, len, offset);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ uring_task_get(cfg, task);
+ task->callback = callback;
+ io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ if (task->fd < 0) {
+ error("uring_connect called with no fd set\n");
+ return;
+ }
+
+ uring_task_get(cfg, task);
+ task->callback = callback;
+ io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+ addr->addrlen = sizeof(addr->storage);
+
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ if (task->fd < 0) {
+ error("uring_accept called with no fd set\n");
+ return;
+ }
+
+ uring_task_get(cfg, task);
+ task->callback = callback;
+ io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
+uring_cancel(struct cfg *cfg, struct uring_task *task)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ task->dead = true;
+ io_uring_prep_cancel(sqe, task, 0);
+ io_uring_sqe_set_data(sqe, NULL);
+}
+
+static void
+uring_free(struct uring_task *task)
+{
+ fprintf(stderr, "%s called\n", __func__);
+}
+
+void
+uring_refdump(struct uring_ev *uev)
+{
+ uring_task_refdump(&uev->task);
+}
+
+void
+uring_init(struct cfg *cfg)
+{
+ struct uring_ev *uev;
+
+ uev = zmalloc(sizeof(*uev));
+ if (!uev)
+ perrordie("malloc");
+
+ if (io_uring_queue_init_params(4096, &uev->uring, &uev->uring_params) < 0)
+ perrordie("io_uring_queue_init_params");
+
+ fprintf(stderr, "uring initialized, features: 0x%08x\n", uev->uring_params.features);
+
+ uring_task_init(&uev->task, "uev", &cfg->task, uring_free);
+ cfg->uev = uev;
+}
+
+int
+uring_event_loop(struct cfg *cfg)
+{
+ while (true) {
+ struct io_uring_cqe *cqe;
+ unsigned nr, head;
+ int r;
+
+ io_uring_submit(&cfg->uev->uring);
+ r = io_uring_wait_cqe(&cfg->uev->uring, &cqe);
+ if (r < 0) {
+ if (errno == EINTR)
+ continue;
+ else
+ perrordie("io_uring_wait_cqe");
+ }
+
+ nr = 0;
+ io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) {
+ struct uring_task *task = io_uring_cqe_get_data(cqe);
+
+ fprintf(stderr, "%s: got CEQ (res: %i, task: 0x%p, cb: 0x%p)\n", __func__, cqe->res, task, task ? task->callback : NULL);
+ if (task && task->callback)
+ task->callback(cfg, task, cqe->res);
+ nr++;
+ if (task)
+ uring_task_put(cfg, task);
+ }
+
+ printf("%s: %u CQEs treated\n", __func__, nr);
+ io_uring_cq_advance(&cfg->uev->uring, nr);
+ }
+
+ return 0;
+}
+