summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-10 14:51:49 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-10 14:51:49 +0200
commitaf7dbbcbc9fe89deb4951d45b0f6ce839199c88b (patch)
tree86bdf50d190c81ff267f4b26432d3cefa58d3387
parent06edc57d49547c2d5981467550043660c1f5f282 (diff)
Convert proxy to use task_buf as well
-rw-r--r--announce.c4
-rw-r--r--cfgdir.c4
-rw-r--r--idle.c4
-rw-r--r--main.c4
-rw-r--r--proxy.c30
-rw-r--r--proxy.h6
-rw-r--r--uring.c60
-rw-r--r--uring.h12
8 files changed, 63 insertions, 61 deletions
diff --git a/announce.c b/announce.c
index f5233c4..a17bd9a 100644
--- a/announce.c
+++ b/announce.c
@@ -87,7 +87,7 @@ announce_cb(struct cfg *cfg, struct uring_task *task, int res)
fprintf(stderr, "%s: called with value %" PRIu64 "\n", __func__, aev->value);
mcast_send_all(cfg, aev);
- uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), 0, announce_cb);
+ uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), announce_cb);
}
static void
@@ -188,6 +188,6 @@ announce_init(struct cfg *cfg)
aev->mcast_addr.sin_port = htons(4445);
cfg->aev = aev;
- uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), 0, announce_cb);
+ uring_read(cfg, &aev->task, &aev->value, sizeof(aev->value), announce_cb);
}
diff --git a/cfgdir.c b/cfgdir.c
index 8793486..ff35707 100644
--- a/cfgdir.c
+++ b/cfgdir.c
@@ -380,7 +380,7 @@ inotify_cb(struct cfg *cfg, struct uring_task *task, int res)
error("inotify: weird, unknown event: 0x%08x\n", event->mask);
}
- uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), 0, inotify_cb);
+ uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), inotify_cb);
}
void
@@ -432,7 +432,7 @@ cfgdir_init(struct cfg *cfg)
uring_task_init(&iev->task, "iev", uring_parent(cfg), inotify_free);
uring_task_set_fd(&iev->task, ifd);
cfg->iev = iev;
- uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), 0, inotify_cb);
+ uring_read(cfg, &iev->task, iev->buf, sizeof(iev->buf), inotify_cb);
cfgdir = opendir(".");
if (!cfgdir)
diff --git a/idle.c b/idle.c
index ce0dfc4..f17d161 100644
--- a/idle.c
+++ b/idle.c
@@ -390,7 +390,7 @@ idle_cb(struct cfg *cfg, struct uring_task *task, int res)
idle->server->idle_count = 0;
else
idle_check_connect_next_remote(cfg, idle);
- uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), 0, idle_cb);
+ uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), idle_cb);
}
static void
@@ -469,6 +469,6 @@ idle_init(struct cfg *cfg, struct server *server)
idle->server = server;
server->idle = idle;
- uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), 0, idle_cb);
+ uring_read(cfg, &idle->task, &idle->value, sizeof(idle->value), idle_cb);
}
diff --git a/main.c b/main.c
index eace2d0..615231c 100644
--- a/main.c
+++ b/main.c
@@ -203,7 +203,7 @@ signalfd_read(struct cfg *cfg, struct uring_task *task, int res)
server_delete(cfg, server);
fprintf(stderr, "%s: putting sev task 0x%p\n", __func__, &sev->task);
uring_task_put(cfg, &sev->task);
- //uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), 0, signalfd_read);
+ //uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), signalfd_read);
}
}
@@ -283,7 +283,7 @@ signalfd_init(struct cfg *cfg)
cfg->sev = sev;
sev->cfg = cfg;
hack_efd = sfd;
- uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), 0, signalfd_read);
+ uring_read(cfg, &sev->task, &sev->buf, sizeof(sev->buf), signalfd_read);
}
int
diff --git a/proxy.c b/proxy.c
index e5a4eff..98a5af1 100644
--- a/proxy.c
+++ b/proxy.c
@@ -46,13 +46,8 @@ proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res)
if (res <= 0)
return;
- if (res != proxy->clientlen) {
- fprintf(stderr, "%s: short write\n", __func__);
- return;
- }
-
uring_task_set_fd(&proxy->clienttask, proxy->cfd);
- uring_read(cfg, task, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in);
+ uring_tbuf_read(cfg, task, proxy_client_data_in);
}
static void
@@ -64,9 +59,8 @@ proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res)
if (res <= 0)
return;
- proxy->clientlen = res;
uring_task_set_fd(&proxy->clienttask, proxy->sfd);
- uring_write(cfg, task, proxy->clientbuf, res, proxy_client_data_out);
+ uring_tbuf_write(cfg, task, proxy_client_data_out);
}
static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res);
@@ -80,13 +74,8 @@ proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res)
if (res <= 0)
return;
- if (res != proxy->serverlen) {
- fprintf(stderr, "%s: short write\n", __func__);
- return;
- }
-
uring_task_set_fd(&proxy->servertask, proxy->sfd);
- uring_read(cfg, task, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in);
+ uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in);
}
static void
@@ -98,9 +87,8 @@ proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res)
if (res <= 0)
return;
- proxy->serverlen = res;
uring_task_set_fd(&proxy->servertask, proxy->cfd);
- uring_write(cfg, task, proxy->serverbuf, res, proxy_server_data_out);
+ uring_tbuf_write(cfg, task, proxy_server_data_out);
}
static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy);
@@ -112,12 +100,14 @@ proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res)
fprintf(stderr, "%s: connected %i\n", __func__, res);
if (res < 0) {
+ proxy->sfd = -1;
+ uring_task_close_fd(cfg, task);
proxy_connect_next_remote(cfg, proxy);
return;
}
- uring_read(cfg, &proxy->clienttask, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in);
- uring_read(cfg, &proxy->servertask, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in);
+ uring_tbuf_read(cfg, &proxy->clienttask, proxy_client_data_in);
+ uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in);
}
static void
@@ -178,8 +168,10 @@ proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, in
proxy->client = *client;
sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr));
uring_task_init(&proxy->clienttask, "proxy_client", &scfg->task, proxy_client_free);
- uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free);
+ uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf);
uring_task_set_fd(&proxy->clienttask, fd);
+ uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free);
+ uring_task_set_buf(&proxy->servertask, &proxy->serverbuf);
list_add(&proxy->list, &scfg->proxys);
proxy_connect_next_remote(cfg, proxy);
diff --git a/proxy.h b/proxy.h
index e580d80..cbd2990 100644
--- a/proxy.h
+++ b/proxy.h
@@ -4,15 +4,13 @@
struct server_proxy {
struct sockaddr_in46 client;
char clientstr[ADDRSTRLEN];
- char clientbuf[4096];
- size_t clientlen;
+ struct uring_task_buf clientbuf;
struct uring_task clienttask;
int cfd;
struct sockaddr_in46 server;
char serverstr[ADDRSTRLEN];
- char serverbuf[4096];
- size_t serverlen;
+ struct uring_task_buf serverbuf;
struct uring_task servertask;
int sfd;
diff --git a/uring.c b/uring.c
index 30cf513..58d6f2f 100644
--- a/uring.c
+++ b/uring.c
@@ -236,20 +236,6 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal
io_uring_sqe_set_data(sqe, task);
}
-static int
-uring_tbuf_complete_eof(struct cfg *cfg, struct uring_task *task, int res)
-{
- if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf))
- return -E2BIG;
-
- if (res > 0)
- return 1;
-
- task->tbuf->buf[task->tbuf->len] = '\0';
- task->tbuf->len++;
- return 0;
-}
-
static void
uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res)
{
@@ -281,9 +267,9 @@ uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res)
goto finished;
}
- uring_read(cfg, task, task->tbuf->buf + task->tbuf->len,
- sizeof(task->tbuf->buf) - task->tbuf->len,
- task->tbuf->len, uring_tbuf_read_until_cb);
+ uring_read_offset(cfg, task, task->tbuf->buf + task->tbuf->len,
+ sizeof(task->tbuf->buf) - task->tbuf->len,
+ task->tbuf->len, uring_tbuf_read_until_cb);
return;
finished:
@@ -295,14 +281,6 @@ void
uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task,
rcallback_t complete, callback_t callback)
{
- if (!task)
- error("task\n");
- if (task->fd < 0)
- error("task->fd\n");
- if (!task->tbuf)
- error("task->tbuf\n");
- if (!complete)
- error("complete\n");
if (!task || task->fd < 0 || !task->tbuf || !complete) {
error("%s: invalid parameters\n", __func__);
return;
@@ -311,17 +289,43 @@ uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task,
task->tbuf->len = 0;
task->complete_callback = complete;
task->final_callback = callback;
- uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), 0, uring_tbuf_read_until_cb);
+ uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), uring_tbuf_read_until_cb);
+}
+
+static int
+uring_tbuf_eof(struct cfg *cfg, struct uring_task *task, int res)
+{
+ if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf))
+ return -E2BIG;
+
+ if (res > 0)
+ return 0;
+
+ task->tbuf->buf[task->tbuf->len] = '\0';
+ task->tbuf->len++;
+ return 1;
}
void
uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback)
{
- uring_tbuf_read_until(cfg, task, uring_tbuf_complete_eof, callback);
+ uring_tbuf_read_until(cfg, task, uring_tbuf_eof, callback);
+}
+
+static int
+uring_tbuf_have_data(struct cfg *cfg, struct uring_task *task, int res)
+{
+ return res;
+}
+
+void
+uring_tbuf_read(struct cfg *cfg, struct uring_task *task, callback_t callback)
+{
+ uring_tbuf_read_until(cfg, task, uring_tbuf_have_data, callback);
}
void
-uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)
+uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)
{
struct io_uring_sqe *sqe;
diff --git a/uring.h b/uring.h
index 935720d..d044f05 100644
--- a/uring.h
+++ b/uring.h
@@ -33,8 +33,16 @@ void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task,
void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task,
callback_t callback);
-void uring_read(struct cfg *cfg, struct uring_task *task, void *buf,
- size_t len, off_t offset, callback_t callback);
+void uring_tbuf_read(struct cfg *cfg, struct uring_task *task, callback_t callback);
+
+void uring_read_offset(struct cfg *cfg, struct uring_task *task, void *buf,
+ size_t len, off_t offset, callback_t callback);
+
+static inline void
+uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback)
+{
+ uring_read_offset(cfg, task, buf, len, 0, callback);
+}
void uring_openat(struct cfg *cfg, struct uring_task *task, const char *path,
callback_t callback);