summaryrefslogtreecommitdiff
path: root/uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'uring.c')
-rw-r--r--uring.c140
1 files changed, 138 insertions, 2 deletions
diff --git a/uring.c b/uring.c
index d74b590..561be25 100644
--- a/uring.c
+++ b/uring.c
@@ -69,7 +69,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task)
{
struct uring_task *parent = task->parent;
- fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount);
+ fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n",
+ __func__, task->name, task, task->refcount);
task->refcount--;
if (task->refcount > 0)
@@ -94,7 +95,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task)
void
uring_task_get(struct cfg *cfg, struct uring_task *task)
{
- fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount);
+ fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n",
+ __func__, task->name, task, task->refcount);
if (task->refcount < 0)
error("Negative refcount!\n");
@@ -103,6 +105,12 @@ uring_task_get(struct cfg *cfg, struct uring_task *task)
}
void
+uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf)
+{
+ task->tbuf = tbuf;
+}
+
+void
uring_task_set_fd(struct uring_task *task, int fd)
{
task->fd = fd;
@@ -143,6 +151,7 @@ uring_task_init(struct uring_task *task, const char *name, struct uring_task *pa
task->free = free;
task->dead = false;
task->name = name;
+ task->tbuf = NULL;
if (task->parent)
uring_task_get(NULL, task->parent);
@@ -165,6 +174,49 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac
fprintf(stderr, "%s: done\n", __func__);
}
+static void
+uring_tbuf_write_cb(struct cfg *cfg, struct uring_task *task, int res)
+{
+ int r;
+
+ if (!task || !task->tbuf || !task->final_callback)
+ perrordie("%s: missing parameters\n");
+
+ if (res < 0) {
+ r = res;
+ goto finished;
+ }
+
+ /* We wrote some more data */
+ task->tbuf->done += res;
+ if (task->tbuf->done >= task->tbuf->len) {
+ r = task->tbuf->len;
+ goto finished;
+ }
+
+ uring_write(cfg, task, task->tbuf->buf + task->tbuf->done,
+ task->tbuf->len - task->tbuf->done,
+ uring_tbuf_write_cb);
+ return;
+
+finished:
+ task->final_callback(cfg, task, r);
+ return;
+}
+
+void
+uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback)
+{
+ if (!task || task->fd < 0 || !task->tbuf || task->tbuf->len < 0) {
+ error("%s invalid parameters\n", __func__);
+ return;
+ }
+
+ task->tbuf->done = 0;
+ task->final_callback = callback;
+ uring_write(cfg, task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb);
+}
+
void
uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback)
{
@@ -182,6 +234,90 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal
io_uring_sqe_set_data(sqe, task);
}
+static int
+uring_tbuf_complete_eof(struct cfg *cfg, struct uring_task *task, int res)
+{
+ if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf))
+ return -E2BIG;
+
+ if (res > 0)
+ return 1;
+
+ task->tbuf->buf[task->tbuf->len] = '\0';
+ task->tbuf->len++;
+ return 0;
+}
+
+static void
+uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res)
+{
+ int r;
+
+ if (!task || !task->tbuf || !task->final_callback || !task->complete_callback) {
+ error("%s: invalid parameters\n", __func__);
+ return;
+ }
+
+ if (res < 0) {
+ r = res;
+ goto finished;
+ }
+
+ task->tbuf->len += res;
+ r = task->complete_callback(cfg, task, res);
+ if (r < 0) {
+ r = res;
+ goto finished;
+ } else if (r > 0) {
+ r = task->tbuf->len;
+ goto finished;
+ }
+
+ /* Need to read more */
+ if (task->tbuf->len >= sizeof(task->tbuf->buf)) {
+ r = E2BIG;
+ goto finished;
+ }
+
+ uring_read(cfg, task, task->tbuf->buf + task->tbuf->len,
+ sizeof(task->tbuf->buf) - task->tbuf->len,
+ task->tbuf->len, uring_tbuf_read_until_cb);
+ return;
+
+finished:
+ task->final_callback(cfg, task, r);
+ return;
+}
+
+void
+uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task,
+ rcallback_t complete, callback_t callback)
+{
+ if (!task)
+ error("task\n");
+ if (task->fd < 0)
+ error("task->fd\n");
+ if (!task->tbuf)
+ error("task->tbuf\n");
+ if (!complete)
+ error("complete\n");
+ if (!task || task->fd < 0 || !task->tbuf || !complete) {
+ error("%s: invalid parameters\n", __func__);
+ return;
+ }
+
+ task->tbuf->len = 0;
+ task->complete_callback = complete;
+ task->final_callback = callback;
+ uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), 0, uring_tbuf_read_until_cb);
+}
+
+void
+uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback)
+{
+ uring_tbuf_read_until(cfg, task, uring_tbuf_complete_eof, callback);
+}
+
void
uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)
{