From fcd154280c21746db7d994ed1be77f20f91c90c0 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Mon, 22 Jun 2020 15:17:36 +0200 Subject: Add basic splice support to server-proxy (untested) --- main.h | 1 + server-proxy.c | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++------- server-proxy.h | 2 + uring.c | 55 +++++++++++++++++++++ uring.h | 2 + 5 files changed, 194 insertions(+), 19 deletions(-) diff --git a/main.h b/main.h index 05c5c82..84d0a1c 100644 --- a/main.h +++ b/main.h @@ -146,6 +146,7 @@ struct cfg { char *cfg_path; bool do_igmp; char *igmp_iface; + bool splice_supported; struct uring_ev *uring; struct server_cfg_monitor *server_cfg_monitor; diff --git a/server-proxy.c b/server-proxy.c index a0affbd..60f4b33 100644 --- a/server-proxy.c +++ b/server-proxy.c @@ -1,9 +1,12 @@ +#define _GNU_SOURCE #include #include #include #include #include #include +#include +#include #include "main.h" #include "uring.h" @@ -118,15 +121,26 @@ proxy_delete(struct server_proxy *proxy) assert_return(proxy); + if (cfg->splice_supported) { + uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]); + } + uring_task_set_fd(&proxy->servertask, proxy->sfd); uring_task_destroy(&proxy->servertask); + uring_task_set_fd(&proxy->clienttask, proxy->cfd); uring_task_destroy(&proxy->clienttask); uring_task_destroy(&proxy->task); } -static void proxy_client_data_in(struct uring_task *task, int res); +/* + * These four functions provide the fallback read-write mode + */ +static void proxy_client_read(struct uring_task *task, int res); static void -proxy_client_data_out(struct uring_task *task, int res) +proxy_client_written(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); @@ -134,7 +148,7 @@ proxy_client_data_out(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; @@ -142,11 +156,11 @@ proxy_client_data_out(struct uring_task *task, int res) proxy->client_bytes += res; uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_tbuf_read(task, proxy_client_data_in); + uring_tbuf_read(task, proxy_client_read); } static void -proxy_client_data_in(struct uring_task *task, int res) +proxy_client_read(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); @@ -154,21 +168,20 @@ proxy_client_data_in(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_tbuf_write(task, proxy_client_data_out); + uring_tbuf_write(task, proxy_client_written); } -static void proxy_server_data_in(struct uring_task *task, - int res); +static void proxy_server_read(struct uring_task *task, int res); static void -proxy_server_data_out(struct uring_task *task, int res) +proxy_server_written(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); @@ -176,7 +189,7 @@ proxy_server_data_out(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; @@ -184,11 +197,11 @@ proxy_server_data_out(struct uring_task *task, int res) proxy->server_bytes += res; uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); + uring_tbuf_read(&proxy->servertask, proxy_server_read); } static void -proxy_server_data_in(struct uring_task *task, int res) +proxy_server_read(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); @@ -196,14 +209,89 @@ proxy_server_data_in(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_tbuf_write(task, proxy_server_data_out); + uring_tbuf_write(task, proxy_server_written); +} + +/* + * These four functions provide the splice fd->pipe->fd mode + */ +static void proxy_client_spliced_in(struct uring_task *task, int res); + +static void +proxy_client_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); +} + +static void +proxy_client_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out); +} + +static void proxy_server_spliced_in(struct uring_task *task, int res); + +static void +proxy_server_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); +} + +static void +proxy_server_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out); } static void @@ -229,8 +317,15 @@ proxy_connected_cb(struct connection *conn, bool connected) proxy->server_conn.remote.addrstr); proxy->begin = time(NULL); - uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); + if (cfg->splice_supported) { + debug(DBG_PROXY, "handling proxy connection with splice"); + uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); + uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); + } else { + debug(DBG_PROXY, "handling proxy connection with read-write"); + uring_tbuf_read(&proxy->clienttask, proxy_client_read); + uring_tbuf_read(&proxy->servertask, proxy_server_read); + } } void @@ -253,7 +348,19 @@ proxy_new(struct server *server, struct saddr *client, int fd) proxy = zmalloc(sizeof(*proxy)); if (!proxy) { error("malloc: %m"); - return NULL; + goto out; + } + + if (cfg->splice_supported) { + if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_free; + } + + if (pipe2(proxy->spipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_close_cpipe; + } } proxy->sfd = -1; @@ -278,6 +385,14 @@ proxy_new(struct server *server, struct saddr *client, int fd) &proxy->server_conn, proxy_connected_cb); return proxy; + +out_close_cpipe: + uring_close(&server->task, proxy->cpipe[PIPE_RD]); + uring_close(&server->task, proxy->cpipe[PIPE_WR]); +out_free: + xfree(proxy); +out: + return NULL; } static void @@ -293,7 +408,7 @@ local_accept(struct uring_task *task, int res) debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); if (res < 0) { - error("result was %i", res); + error("res: %i", res); goto out; } diff --git a/server-proxy.h b/server-proxy.h index dc8300d..14d176d 100644 --- a/server-proxy.h +++ b/server-proxy.h @@ -6,12 +6,14 @@ struct server_proxy { struct uring_task_buf clientbuf; struct uring_task clienttask; uint64_t client_bytes; + int cpipe[2]; int cfd; struct connection server_conn; struct uring_task_buf serverbuf; struct uring_task servertask; uint64_t server_bytes; + int spipe[2]; int sfd; time_t begin; diff --git a/uring.c b/uring.c index 31dfe26..3437d7e 100644 --- a/uring.c +++ b/uring.c @@ -1,9 +1,11 @@ +#define _GNU_SOURCE #include #include #include #include #include #include +#include #include "main.h" #include "uring.h" @@ -12,6 +14,10 @@ struct uring_ev { struct io_uring uring; struct io_uring_params uring_params; struct uring_task task; + + /* for testing if the kernel supports splice */ + int pipe[2]; + int tfd; }; enum cqe_type { @@ -518,6 +524,22 @@ uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) io_uring_sqe_set_data(sqe, task); } +void +uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && fd_in >= 0 && fd_out >= 0 && cb); + + debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u", + task->name, task, fd_in, fd_out, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE); + io_uring_sqe_set_data(sqe, task); +} + void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb) { @@ -592,6 +614,30 @@ uring_delete() uring_task_put(task); } +static void +uring_splice_test_cb(struct uring_task *task, int res) +{ + struct uring_ev *uring = container_of(task, struct uring_ev, task); + + assert_die(task && uring == cfg->uring, "splice test failed"); + + uring_close(task, uring->tfd); + uring_close(task, uring->pipe[PIPE_RD]); + uring_close(task, uring->pipe[PIPE_WR]); + + uring->tfd = -1; + uring->pipe[PIPE_RD] = -1; + uring->pipe[PIPE_WR] = -1; + + if (res >= 0) { + cfg->splice_supported = true; + debug(DBG_UR, "splice supported"); + } else if (res == -EINVAL) + debug(DBG_UR, "splice not supported"); + else + error("splice check failed: %i\n", res); +} + void uring_init() { @@ -611,6 +657,15 @@ uring_init() uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free); cfg->uring = uring; + + /* splice check, a bit convoluted, but seems to be no simpler way */ + cfg->splice_supported = false; + if (pipe2(uring->pipe, O_CLOEXEC) < 0) + die("pipe2: %m"); + uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY); + if (uring->tfd < 0) + die("open(\"/dev/null\"): %m"); + uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb); } static inline void diff --git a/uring.h b/uring.h index 9f686b3..9c33104 100644 --- a/uring.h +++ b/uring.h @@ -56,6 +56,8 @@ void uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb); void uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb); +void uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb); + void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb); void uring_poll_cancel(struct uring_task *task); -- cgit v1.2.3