From 9daf6a0e6b461c6c2a16f810f722b9d10504bf90 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Wed, 10 Jun 2020 13:28:41 +0200 Subject: Introduce a common task buffer and convert cfgdir and rcon to use it --- uring.c | 140 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 138 insertions(+), 2 deletions(-) (limited to 'uring.c') diff --git a/uring.c b/uring.c index d74b590..561be25 100644 --- a/uring.c +++ b/uring.c @@ -69,7 +69,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task) { struct uring_task *parent = task->parent; - fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); + fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n", + __func__, task->name, task, task->refcount); task->refcount--; if (task->refcount > 0) @@ -94,7 +95,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task) void uring_task_get(struct cfg *cfg, struct uring_task *task) { - fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); + fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n", + __func__, task->name, task, task->refcount); if (task->refcount < 0) error("Negative refcount!\n"); @@ -102,6 +104,12 @@ uring_task_get(struct cfg *cfg, struct uring_task *task) task->refcount++; } +void +uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) +{ + task->tbuf = tbuf; +} + void uring_task_set_fd(struct uring_task *task, int fd) { @@ -143,6 +151,7 @@ uring_task_init(struct uring_task *task, const char *name, struct uring_task *pa task->free = free; task->dead = false; task->name = name; + task->tbuf = NULL; if (task->parent) uring_task_get(NULL, task->parent); @@ -165,6 +174,49 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac fprintf(stderr, "%s: done\n", __func__); } +static void +uring_tbuf_write_cb(struct cfg *cfg, struct uring_task *task, int res) +{ + int r; + + if (!task || !task->tbuf || !task->final_callback) + perrordie("%s: missing parameters\n"); + + if (res < 0) { + r = res; + goto finished; + } + + /* We wrote some more data */ + task->tbuf->done += res; + if (task->tbuf->done >= task->tbuf->len) { + r = task->tbuf->len; + goto finished; + } + + uring_write(cfg, task, task->tbuf->buf + task->tbuf->done, + task->tbuf->len - task->tbuf->done, + uring_tbuf_write_cb); + return; + +finished: + task->final_callback(cfg, task, r); + return; +} + +void +uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback) +{ + if (!task || task->fd < 0 || !task->tbuf || task->tbuf->len < 0) { + error("%s invalid parameters\n", __func__); + return; + } + + task->tbuf->done = 0; + task->final_callback = callback; + uring_write(cfg, task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); +} + void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) { @@ -182,6 +234,90 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal io_uring_sqe_set_data(sqe, task); } +static int +uring_tbuf_complete_eof(struct cfg *cfg, struct uring_task *task, int res) +{ + if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) + return -E2BIG; + + if (res > 0) + return 1; + + task->tbuf->buf[task->tbuf->len] = '\0'; + task->tbuf->len++; + return 0; +} + +static void +uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res) +{ + int r; + + if (!task || !task->tbuf || !task->final_callback || !task->complete_callback) { + error("%s: invalid parameters\n", __func__); + return; + } + + if (res < 0) { + r = res; + goto finished; + } + + task->tbuf->len += res; + r = task->complete_callback(cfg, task, res); + if (r < 0) { + r = res; + goto finished; + } else if (r > 0) { + r = task->tbuf->len; + goto finished; + } + + /* Need to read more */ + if (task->tbuf->len >= sizeof(task->tbuf->buf)) { + r = E2BIG; + goto finished; + } + + uring_read(cfg, task, task->tbuf->buf + task->tbuf->len, + sizeof(task->tbuf->buf) - task->tbuf->len, + task->tbuf->len, uring_tbuf_read_until_cb); + return; + +finished: + task->final_callback(cfg, task, r); + return; +} + +void +uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, + rcallback_t complete, callback_t callback) +{ + if (!task) + error("task\n"); + if (task->fd < 0) + error("task->fd\n"); + if (!task->tbuf) + error("task->tbuf\n"); + if (!complete) + error("complete\n"); + if (!task || task->fd < 0 || !task->tbuf || !complete) { + error("%s: invalid parameters\n", __func__); + return; + } + + task->tbuf->len = 0; + task->complete_callback = complete; + task->final_callback = callback; + uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), 0, uring_tbuf_read_until_cb); +} + +void +uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback) +{ + uring_tbuf_read_until(cfg, task, uring_tbuf_complete_eof, callback); +} + void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { -- cgit v1.2.3