summaryrefslogtreecommitdiff
path: root/uring.c
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-11 16:05:59 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-11 16:05:59 +0200
commitafa80c32554dcd02737e2c341c27b6a9fa7f3e80 (patch)
tree3e5ba8d16f65c5c1d62ec773fe92ea321907f777 /uring.c
parente9ba9f703ec1afa09e004de097ba5d8666c481ea (diff)
Further improve reference counting and tracking in uring
Diffstat (limited to 'uring.c')
-rw-r--r--uring.c130
1 files changed, 84 insertions, 46 deletions
diff --git a/uring.c b/uring.c
index a2668b0..78a084a 100644
--- a/uring.c
+++ b/uring.c
@@ -17,10 +17,13 @@ struct uring_ev {
};
static struct io_uring_sqe *
-get_sqe(struct cfg *cfg)
+get_sqe(struct cfg *cfg, struct uring_task *task)
{
struct io_uring_sqe *sqe;
+ if (!cfg || !task)
+ die("%s: invalid parameters\n", __func__);
+
sqe = io_uring_get_sqe(&cfg->uev->uring);
if (!sqe) {
io_uring_submit(&cfg->uev->uring);
@@ -29,6 +32,7 @@ get_sqe(struct cfg *cfg)
perrordie("Failed to get an sqe!\n");
}
+ uring_task_get(cfg, task);
return sqe;
}
@@ -64,15 +68,6 @@ uring_task_refdump(struct uring_task *task)
task->refcount);
}
-static void
-uring_cancel(struct cfg *cfg, struct uring_task *task)
-{
- struct io_uring_sqe *sqe = get_sqe(cfg);
-
- io_uring_prep_cancel(sqe, task, 0);
- io_uring_sqe_set_data(sqe, NULL);
-}
-
/*
* Similar to uring_task_put, but can be called from other tasks
* while the task is active.
@@ -80,11 +75,21 @@ uring_cancel(struct cfg *cfg, struct uring_task *task)
void
uring_task_destroy(struct cfg *cfg, struct uring_task *task)
{
- fprintf(stderr, "%s: called with task %s (%p), fd %i and refcount %u\n",
+ fprintf(stderr, "%s: called (task: %s (%p), fd: %i, refcount: %u)\n",
__func__, task->name, task, task->fd, task->refcount);
- if (task->fd >= 0)
- uring_cancel(cfg, task);
+ if (!task) {
+ error("%s: called with no task\n", __func__);
+ return;
+ }
+
+ if (task->fd >= 0) {
+ struct io_uring_sqe *sqe;
+
+ sqe = get_sqe(cfg, task);
+ io_uring_prep_cancel(sqe, task, 0);
+ io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task + 0x1));
+ }
task->dead = true;
uring_task_put(cfg, task);
@@ -121,7 +126,7 @@ uring_task_put(struct cfg *cfg, struct uring_task *task)
task->free(task);
if (parent)
- uring_task_put(NULL, parent);
+ uring_task_put(cfg, parent);
}
void
@@ -161,7 +166,7 @@ uring_task_close_fd(struct cfg *cfg, struct uring_task *task)
if (task->fd < 0)
return;
- uring_close(cfg, task, task->fd, NULL);
+ uring_close(cfg, task, task->fd);
task->fd = -1;
}
@@ -207,20 +212,21 @@ uring_task_init(struct uring_task *task, const char *name,
}
void
-uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback)
+uring_close(struct cfg *cfg, struct uring_task *task, int fd)
{
- struct io_uring_sqe *sqe = get_sqe(cfg);
+ struct io_uring_sqe *sqe;
- fprintf(stderr, "%s: called with task 0x%p and cb 0x%p\n", __func__, task, callback);
+ fprintf(stderr, "%s: called (task: %p (%s), fd: %i)\n", __func__, task, task->name, fd);
- if (task) {
- uring_task_get(cfg, task);
- task->callback = callback;
+ if (!task || fd < 0) {
+ error("%s: invalid parameters (task: %p (%s), fd: %i)\n", __func__, task, task->name, fd);
+ return;
}
+
+ sqe = get_sqe(cfg, task);
+ task = (void *)((uintptr_t)task + 0x2);
io_uring_prep_close(sqe, fd);
io_uring_sqe_set_data(sqe, task);
-
- fprintf(stderr, "%s: done\n", __func__);
}
static void
@@ -276,8 +282,7 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal
return;
}
- sqe = get_sqe(cfg);
- uring_task_get(cfg, task);
+ sqe = get_sqe(cfg, task);
task->callback = callback;
io_uring_prep_write(sqe, task->fd, buf, len, 0);
io_uring_sqe_set_data(sqe, task);
@@ -384,8 +389,7 @@ uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t le
return;
}
- sqe = get_sqe(cfg);
- uring_task_get(cfg, task);
+ sqe = get_sqe(cfg, task);
task->callback = callback;
io_uring_prep_read(sqe, task->fd, buf, len, offset);
io_uring_sqe_set_data(sqe, task);
@@ -394,9 +398,9 @@ uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t le
void
uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback)
{
- struct io_uring_sqe *sqe = get_sqe(cfg);
-
- uring_task_get(cfg, task);
+ struct io_uring_sqe *sqe;
+
+ sqe = get_sqe(cfg, task);
task->callback = callback;
io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0);
io_uring_sqe_set_data(sqe, task);
@@ -405,9 +409,9 @@ uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callbac
void
uring_sendmsg(struct cfg *cfg, struct uring_task *task, struct msghdr *msg, callback_t callback)
{
- struct io_uring_sqe *sqe = get_sqe(cfg);
+ struct io_uring_sqe *sqe;
- uring_task_get(cfg, task);
+ sqe = get_sqe(cfg, task);
task->callback = callback;
io_uring_prep_sendmsg(sqe, task->fd, msg, 0);
io_uring_sqe_set_data(sqe, task);
@@ -423,8 +427,7 @@ uring_connect(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *ad
return;
}
- sqe = get_sqe(cfg);
- uring_task_get(cfg, task);
+ sqe = get_sqe(cfg, task);
task->callback = callback;
io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&addr->storage, addr->addrlen);
io_uring_sqe_set_data(sqe, task);
@@ -440,9 +443,8 @@ uring_accept(struct cfg *cfg, struct uring_task *task, struct sockaddr_in46 *add
return;
}
- sqe = get_sqe(cfg);
+ sqe = get_sqe(cfg, task);
addr->addrlen = sizeof(addr->storage);
- uring_task_get(cfg, task);
task->callback = callback;
io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&addr->storage, &addr->addrlen, SOCK_CLOEXEC);
io_uring_sqe_set_data(sqe, task);
@@ -458,8 +460,7 @@ uring_poll(struct cfg *cfg, struct uring_task *task, short poll_mask, callback_t
return;
}
- sqe = get_sqe(cfg);
- uring_task_get(cfg, task);
+ sqe = get_sqe(cfg, task);
task->callback = callback;
io_uring_prep_poll_add(sqe, task->fd, poll_mask);
io_uring_sqe_set_data(sqe, task);
@@ -475,9 +476,10 @@ uring_poll_cancel(struct cfg *cfg, struct uring_task *task)
return;
}
- sqe = get_sqe(cfg);
+ sqe = get_sqe(cfg, task);
task->dead = true;
io_uring_prep_poll_remove(sqe, task);
+ /* FIXME: should not set this to NULL */
io_uring_sqe_set_data(sqe, NULL);
}
@@ -544,12 +546,49 @@ uring_event_loop(struct cfg *cfg)
nr = 0;
io_uring_for_each_cqe(&cfg->uev->uring, head, cqe) {
struct uring_task *task = io_uring_cqe_get_data(cqe);
-
- fprintf(stderr, "%s: got CEQ (res: %i, task: %s (%p), cb: %p)\n",
- __func__, cqe->res, task ? task->name : "<none>",
- task, task ? task->callback : NULL);
-
- if (task && task->callback)
+ bool do_callback;
+
+ if (!task)
+ die("%s: null task\n", __func__);
+
+ if (task != NULL && ((uintptr_t)task & 0x1)) {
+ task = (void *)((uintptr_t)task - 0x1);
+ fprintf(stderr, "%s: got cancellation CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n",
+ __func__,
+ cqe->res,
+ cqe->res < 0 ? strerror(-cqe->res) : "ok",
+ task ? task->name : "<none>",
+ task,
+ task ? task->fd : -1,
+ task ? task->callback : NULL);
+ do_callback = false;
+ } else if (task != NULL && ((uintptr_t)task & 0x2)) {
+ task = (void *)((uintptr_t)task - 0x2);
+ fprintf(stderr, "%s: got close CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n",
+ __func__,
+ cqe->res,
+ cqe->res < 0 ? strerror(-cqe->res) : "ok",
+ task ? task->name : "<none>",
+ task,
+ task ? task->fd : -1,
+ task ? task->callback : NULL);
+ do_callback = false;
+ } else {
+ fprintf(stderr, "%s: got CQE (res: %i (%s), task: %s (%p), fd: %i, cb: %p)\n",
+ __func__,
+ cqe->res,
+ cqe->res < 0 ? strerror(-cqe->res) : "ok",
+ task ? task->name : "<none>",
+ task,
+ task ? task->fd : -1,
+ task ? task->callback : NULL);
+ do_callback = true;
+ }
+
+ if (!task)
+ error("%s: null task!?\n", __func__);
+
+ if (do_callback && task && task->callback)
task->callback(cfg, task, cqe->res);
if (task)
@@ -561,7 +600,6 @@ uring_event_loop(struct cfg *cfg)
nr++;
}
- //printf("%s: %u CQEs treated\n", __func__, nr);
io_uring_cq_advance(&cfg->uev->uring, nr);
}
}