#include #include #include #include #include #include #include "main.h" #include "uring.h" #include "config.h" struct uring_ev { struct io_uring uring; struct io_uring_params uring_params; struct cfg *cfg; struct uring_task task; }; void uring_task_refdump(struct uring_task *task) { char buf[4096]; struct uring_task *tmp; buf[0] = '\0'; for (tmp = task; tmp; tmp = tmp->parent) { size_t prefix; char *dst; if (tmp->parent) prefix = strlen("->") + strlen(tmp->name); else prefix = strlen(tmp->name); memmove(&buf[prefix], &buf[0], strlen(buf) + 1); dst = &buf[0]; if (tmp->parent) { *dst++ = '-'; *dst++ = '>'; } memcpy(dst, tmp->name, strlen(tmp->name)); } fprintf(stderr, "%s (0x%p parent 0x%p free 0x%p fd %i ref %u)\n", buf, task, task->parent, task->free, task->fd, task->refcount); } void uring_task_put(struct cfg *cfg, struct uring_task *task) { struct uring_task *parent = task->parent; fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); task->refcount--; if (task->refcount > 0) return; if (task->refcount < 0) error("Negative refcount!\n"); if (task->fd >= 0) { uring_task_close_fd(cfg, task); /* We'll be called again once the fd is closed */ return; } if (task->free) task->free(task); if (parent) uring_task_put(NULL, parent); } void uring_task_get(struct cfg *cfg, struct uring_task *task) { fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); if (task->refcount < 0) error("Negative refcount!\n"); task->refcount++; } void uring_task_set_fd(struct uring_task *task, int fd) { task->fd = fd; } void uring_task_close_fd(struct cfg *cfg, struct uring_task *task) { fprintf(stderr, "%s called with task 0x%p\n", __func__, task); if (task->fd < 0) return; uring_close(cfg, task, task->fd, NULL); task->fd = -1; } struct uring_task * uring_parent(struct cfg *cfg) { if (!cfg) die("%s: called with null cfg\n"); if (!cfg->uev) die("%s: called with uninitialized uring\n"); return &cfg->uev->task; } void uring_task_init(struct uring_task *task, const char *name, struct uring_task *parent, void (*free)(struct uring_task *)) { if (!free) die("uring_task_init called without destructor\n"); task->refcount = 1; task->fd = -1; task->parent = parent; task->free = free; task->dead = false; task->name = name; if (task->parent) uring_task_get(NULL, task->parent); } void uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback); if (!sqe) perrordie("io_uring_sqe"); if (task) { uring_task_get(cfg, task); task->callback = callback; } io_uring_prep_close(sqe, fd); io_uring_sqe_set_data(sqe, task); fprintf(stderr, "%s: done\n", __func__); } void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("io_uring_sqe"); if (task->fd < 0) { error("uring_write called with no fd set\n"); return; } uring_task_get(cfg, task); task->callback = callback; io_uring_prep_write(sqe, task->fd, buf, len, 0); io_uring_sqe_set_data(sqe, task); } void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("io_uring_sqe"); if (task->fd < 0) { error("uring_read called with no fd set\n"); return; } uring_task_get(cfg, task); task->callback = callback; io_uring_prep_read(sqe, task->fd, buf, len, offset); io_uring_sqe_set_data(sqe, task); } void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("io_uring_sqe"); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); io_uring_sqe_set_data(sqe, task); } void uring_sendmsg(struct cfg *cfg, struct uring_task *task, struct msghdr *msg, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("io_uring_sqe"); uring_task_get(cfg, task); task->callback = callback; io_uring_prep_sendmsg(sqe, task->fd, msg, 0); io_uring_sqe_set_data(sqe, task); } void uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("io_uring_sqe"); if (task->fd < 0) { error("uring_connect called with no fd set\n"); return; } uring_task_get(cfg, task); task->callback = callback; io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen); io_uring_sqe_set_data(sqe, task); } void uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *addr, callback_t callback) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); addr->addrlen = sizeof(addr->storage); if (!sqe) perrordie("io_uring_sqe"); if (task->fd < 0) { error("uring_accept called with no fd set\n"); return; } uring_task_get(cfg, task); task->callback = callback; io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, 0); io_uring_sqe_set_data(sqe, task); } void uring_cancel(struct cfg *cfg, struct uring_task *task) { struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); if (!sqe) perrordie("io_uring_sqe"); task->dead = true; io_uring_prep_cancel(sqe, task, 0); io_uring_sqe_set_data(sqe, NULL); } static void uring_free(struct uring_task *task) { struct uring_ev *uev = container_of(task, struct uring_ev, task); fprintf(stderr, "%s: called\n", __func__); io_uring_queue_exit(&uev->uring); uev->cfg->uev = NULL; free(uev); } void uring_refdump(struct uring_ev *uev) { uring_task_refdump(&uev->task); } void uring_init(struct cfg *cfg) { struct uring_ev *uev; uev = zmalloc(sizeof(*uev)); if (!uev) perrordie("malloc"); if (io_uring_queue_init_params(4096, &uev->uring, &uev->uring_params) < 0) perrordie("io_uring_queue_init_params"); fprintf(stderr, "uring initialized, features: 0x%08x\n", uev->uring_params.features); uring_task_init(&uev->task, "uev", &cfg->task, uring_free); cfg->uev = uev; uev->cfg = cfg; } void uring_event_loop(struct cfg *cfg) { while (true) { struct io_uring_cqe *cqe; unsigned nr, head; int r; io_uring_submit(&cfg->uev->uring); r = io_uring_wait_cqe(&cfg->uev->uring, &cqe); if (r < 0) { if (errno == EINTR) continue; else perrordie("io_uring_wait_cqe"); } nr = 0; io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) { struct uring_task *task = io_uring_cqe_get_data(cqe); fprintf(stderr, "%s: got CEQ (res: %i, task: 0x%p, cb: 0x%p)\n", __func__, cqe->res, task, task ? task->callback : NULL); if (task && task->callback) task->callback(cfg, task, cqe->res); if (task) uring_task_put(cfg, task); if (exiting) return; nr++; } printf("%s: %u CQEs treated\n", __func__, nr); io_uring_cq_advance(&cfg->uev->uring, nr); } }