From af7dbbcbc9fe89deb4951d45b0f6ce839199c88b Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Wed, 10 Jun 2020 14:51:49 +0200 Subject: Convert proxy to use task_buf as well --- announce.c | 4 ++-- cfgdir.c | 4 ++-- idle.c | 4 ++-- main.c | 4 ++-- proxy.c | 30 +++++++++++------------------- proxy.h | 6 ++---- uring.c | 60 ++++++++++++++++++++++++++++++++---------------------------- uring.h | 12 ++++++++++-- 8 files changed, 63 insertions(+), 61 deletions(-) diff --git a/announce.c b/announce.c index f5233c4..a17bd9a 100644 --- a/announce.c +++ b/announce.c @@ -87,7 +87,7 @@ announce_cb(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: called with value %" PRIu64 "\n", __func__, aev->value); mcast_send_all(cfg, aev); - uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), 0, announce_cb); + uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), announce_cb); } static void @@ -188,6 +188,6 @@ announce_init(struct cfg *cfg) aev->mcast_addr.sin_port = htons(4445); cfg->aev = aev; - uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), 0, announce_cb); + uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), announce_cb); } diff --git a/cfgdir.c b/cfgdir.c index 8793486..ff35707 100644 --- a/cfgdir.c +++ b/cfgdir.c @@ -380,7 +380,7 @@ inotify_cb(struct cfg *cfg, struct uring_task *task, int res) error("inotify: weird, unknown event: 0x%08x\n", event->mask); } - uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), 0, inotify_cb); + uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), inotify_cb); } void @@ -432,7 +432,7 @@ cfgdir_init(struct cfg *cfg) uring_task_init(&iev->task, "iev", uring_parent(cfg), inotify_free); uring_task_set_fd(&iev->task, ifd); cfg->iev = iev; - uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), 0, inotify_cb); + uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), inotify_cb); cfgdir = opendir("."); if (!cfgdir) diff --git a/idle.c b/idle.c index ce0dfc4..f17d161 100644 --- a/idle.c +++ b/idle.c @@ -390,7 +390,7 @@ idle_cb(struct cfg *cfg, struct uring_task *task, int res) idle->server->idle_count = 0; else idle_check_connect_next_remote(cfg, idle); - uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), 0, idle_cb); + uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), idle_cb); } static void @@ -469,6 +469,6 @@ idle_init(struct cfg *cfg, struct server *server) idle->server = server; server->idle = idle; - uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), 0, idle_cb); + uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), idle_cb); } diff --git a/main.c b/main.c index eace2d0..615231c 100644 --- a/main.c +++ b/main.c @@ -203,7 +203,7 @@ signalfd_read(struct cfg *cfg, struct uring_task *task, int res) server_delete(cfg, server); fprintf(stderr, "%s: putting sev task 0x%p\n", __func__, &sev->task); uring_task_put(cfg, &sev->task); - //uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), 0, signalfd_read); + //uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), signalfd_read); } } @@ -283,7 +283,7 @@ signalfd_init(struct cfg *cfg) cfg->sev = sev; sev->cfg = cfg; hack_efd = sfd; - uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), 0, signalfd_read); + uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), signalfd_read); } int diff --git a/proxy.c b/proxy.c index e5a4eff..98a5af1 100644 --- a/proxy.c +++ b/proxy.c @@ -46,13 +46,8 @@ proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res) if (res <= 0) return; - if (res != proxy->clientlen) { - fprintf(stderr, "%s: short write\n", __func__); - return; - } - uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_read(cfg, task, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in); + uring_tbuf_read(cfg, task, proxy_client_data_in); } static void @@ -64,9 +59,8 @@ proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res) if (res <= 0) return; - proxy->clientlen = res; uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_write(cfg, task, proxy->clientbuf, res, proxy_client_data_out); + uring_tbuf_write(cfg, task, proxy_client_data_out); } static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res); @@ -80,13 +74,8 @@ proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res) if (res <= 0) return; - if (res != proxy->serverlen) { - fprintf(stderr, "%s: short write\n", __func__); - return; - } - uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_read(cfg, task, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in); + uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in); } static void @@ -98,9 +87,8 @@ proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res) if (res <= 0) return; - proxy->serverlen = res; uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_write(cfg, task, proxy->serverbuf, res, proxy_server_data_out); + uring_tbuf_write(cfg, task, proxy_server_data_out); } static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy); @@ -112,12 +100,14 @@ proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: connected %i\n", __func__, res); if (res < 0) { + proxy->sfd = -1; + uring_task_close_fd(cfg, task); proxy_connect_next_remote(cfg, proxy); return; } - uring_read(cfg, &proxy->clienttask, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in); - uring_read(cfg, &proxy->servertask, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in); + uring_tbuf_read(cfg, &proxy->clienttask, proxy_client_data_in); + uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in); } static void @@ -178,8 +168,10 @@ proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, in proxy->client = *client; sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr)); uring_task_init(&proxy->clienttask, "proxy_client", &scfg->task, proxy_client_free); - uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free); + uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); uring_task_set_fd(&proxy->clienttask, fd); + uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free); + uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); list_add(&proxy->list, &scfg->proxys); proxy_connect_next_remote(cfg, proxy); diff --git a/proxy.h b/proxy.h index e580d80..cbd2990 100644 --- a/proxy.h +++ b/proxy.h @@ -4,15 +4,13 @@ struct server_proxy { struct sockaddr_in46 client; char clientstr[ADDRSTRLEN]; - char clientbuf[4096]; - size_t clientlen; + struct uring_task_buf clientbuf; struct uring_task clienttask; int cfd; struct sockaddr_in46 server; char serverstr[ADDRSTRLEN]; - char serverbuf[4096]; - size_t serverlen; + struct uring_task_buf serverbuf; struct uring_task servertask; int sfd; diff --git a/uring.c b/uring.c index 30cf513..58d6f2f 100644 --- a/uring.c +++ b/uring.c @@ -236,20 +236,6 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal io_uring_sqe_set_data(sqe, task); } -static int -uring_tbuf_complete_eof(struct cfg *cfg, struct uring_task *task, int res) -{ - if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) - return -E2BIG; - - if (res > 0) - return 1; - - task->tbuf->buf[task->tbuf->len] = '\0'; - task->tbuf->len++; - return 0; -} - static void uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res) { @@ -281,9 +267,9 @@ uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res) goto finished; } - uring_read(cfg, task, task->tbuf->buf + task->tbuf->len, - sizeof(task->tbuf->buf) - task->tbuf->len, - task->tbuf->len, uring_tbuf_read_until_cb); + uring_read_offset(cfg, task, task->tbuf->buf + task->tbuf->len, + sizeof(task->tbuf->buf) - task->tbuf->len, + task->tbuf->len, uring_tbuf_read_until_cb); return; finished: @@ -295,14 +281,6 @@ void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, rcallback_t complete, callback_t callback) { - if (!task) - error("task\n"); - if (task->fd < 0) - error("task->fd\n"); - if (!task->tbuf) - error("task->tbuf\n"); - if (!complete) - error("complete\n"); if (!task || task->fd < 0 || !task->tbuf || !complete) { error("%s: invalid parameters\n", __func__); return; @@ -311,17 +289,43 @@ uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, task->tbuf->len = 0; task->complete_callback = complete; task->final_callback = callback; - uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), 0, uring_tbuf_read_until_cb); + uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), uring_tbuf_read_until_cb); +} + +static int +uring_tbuf_eof(struct cfg *cfg, struct uring_task *task, int res) +{ + if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) + return -E2BIG; + + if (res > 0) + return 0; + + task->tbuf->buf[task->tbuf->len] = '\0'; + task->tbuf->len++; + return 1; } void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback) { - uring_tbuf_read_until(cfg, task, uring_tbuf_complete_eof, callback); + uring_tbuf_read_until(cfg, task, uring_tbuf_eof, callback); +} + +static int +uring_tbuf_have_data(struct cfg *cfg, struct uring_task *task, int res) +{ + return res; +} + +void +uring_tbuf_read(struct cfg *cfg, struct uring_task *task, callback_t callback) +{ + uring_tbuf_read_until(cfg, task, uring_tbuf_have_data, callback); } void -uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) +uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { struct io_uring_sqe *sqe; diff --git a/uring.h b/uring.h index 935720d..d044f05 100644 --- a/uring.h +++ b/uring.h @@ -33,8 +33,16 @@ void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback); -void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, - size_t len, off_t offset, callback_t callback); +void uring_tbuf_read(struct cfg *cfg, struct uring_task *task, callback_t callback); + +void uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, + size_t len, off_t offset, callback_t callback); + +static inline void +uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) +{ + uring_read_offset(cfg, task, buf, len, 0, callback); +} void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path, callback_t callback); -- cgit v1.2.3