diff options
author | David Härdeman <david@hardeman.nu> | 2020-06-22 15:17:36 +0200 |
---|---|---|
committer | David Härdeman <david@hardeman.nu> | 2020-06-22 15:17:36 +0200 |
commit | fcd154280c21746db7d994ed1be77f20f91c90c0 (patch) | |
tree | 78796c8560c004ad4c4d81747a6d7546be4ca039 | |
parent | a7fd6536f76144d7e2d18caa71f4abe516299b91 (diff) |
Add basic splice support to server-proxy (untested)
-rw-r--r-- | main.h | 1 | ||||
-rw-r--r-- | server-proxy.c | 153 | ||||
-rw-r--r-- | server-proxy.h | 2 | ||||
-rw-r--r-- | uring.c | 55 | ||||
-rw-r--r-- | uring.h | 2 |
5 files changed, 194 insertions, 19 deletions
@@ -146,6 +146,7 @@ struct cfg { char *cfg_path; bool do_igmp; char *igmp_iface; + bool splice_supported; struct uring_ev *uring; struct server_cfg_monitor *server_cfg_monitor; diff --git a/server-proxy.c b/server-proxy.c index a0affbd..60f4b33 100644 --- a/server-proxy.c +++ b/server-proxy.c @@ -1,9 +1,12 @@ +#define _GNU_SOURCE #include <stdio.h> #include <unistd.h> #include <time.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> #include "main.h" #include "uring.h" @@ -118,15 +121,26 @@ proxy_delete(struct server_proxy *proxy) assert_return(proxy); + if (cfg->splice_supported) { + uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]); + } + uring_task_set_fd(&proxy->servertask, proxy->sfd); uring_task_destroy(&proxy->servertask); + uring_task_set_fd(&proxy->clienttask, proxy->cfd); uring_task_destroy(&proxy->clienttask); uring_task_destroy(&proxy->task); } -static void proxy_client_data_in(struct uring_task *task, int res); +/* + * These four functions provide the fallback read-write mode + */ +static void proxy_client_read(struct uring_task *task, int res); static void -proxy_client_data_out(struct uring_task *task, int res) +proxy_client_written(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); @@ -134,7 +148,7 @@ proxy_client_data_out(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; @@ -142,11 +156,11 @@ proxy_client_data_out(struct uring_task *task, int res) proxy->client_bytes += res; uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_tbuf_read(task, proxy_client_data_in); + uring_tbuf_read(task, proxy_client_read); } static void -proxy_client_data_in(struct uring_task *task, int res) +proxy_client_read(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); @@ -154,21 +168,20 @@ proxy_client_data_in(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_tbuf_write(task, proxy_client_data_out); + uring_tbuf_write(task, proxy_client_written); } -static void proxy_server_data_in(struct uring_task *task, - int res); +static void proxy_server_read(struct uring_task *task, int res); static void -proxy_server_data_out(struct uring_task *task, int res) +proxy_server_written(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); @@ -176,7 +189,7 @@ proxy_server_data_out(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; @@ -184,11 +197,11 @@ proxy_server_data_out(struct uring_task *task, int res) proxy->server_bytes += res; uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); + uring_tbuf_read(&proxy->servertask, proxy_server_read); } static void -proxy_server_data_in(struct uring_task *task, int res) +proxy_server_read(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); @@ -196,14 +209,89 @@ proxy_server_data_in(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_tbuf_write(task, proxy_server_data_out); + uring_tbuf_write(task, proxy_server_written); +} + +/* + * These four functions provide the splice fd->pipe->fd mode + */ +static void proxy_client_spliced_in(struct uring_task *task, int res); + +static void +proxy_client_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); +} + +static void +proxy_client_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out); +} + +static void proxy_server_spliced_in(struct uring_task *task, int res); + +static void +proxy_server_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); +} + +static void +proxy_server_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out); } static void @@ -229,8 +317,15 @@ proxy_connected_cb(struct connection *conn, bool connected) proxy->server_conn.remote.addrstr); proxy->begin = time(NULL); - uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); + if (cfg->splice_supported) { + debug(DBG_PROXY, "handling proxy connection with splice"); + uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); + uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); + } else { + debug(DBG_PROXY, "handling proxy connection with read-write"); + uring_tbuf_read(&proxy->clienttask, proxy_client_read); + uring_tbuf_read(&proxy->servertask, proxy_server_read); + } } void @@ -253,7 +348,19 @@ proxy_new(struct server *server, struct saddr *client, int fd) proxy = zmalloc(sizeof(*proxy)); if (!proxy) { error("malloc: %m"); - return NULL; + goto out; + } + + if (cfg->splice_supported) { + if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_free; + } + + if (pipe2(proxy->spipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_close_cpipe; + } } proxy->sfd = -1; @@ -278,6 +385,14 @@ proxy_new(struct server *server, struct saddr *client, int fd) &proxy->server_conn, proxy_connected_cb); return proxy; + +out_close_cpipe: + uring_close(&server->task, proxy->cpipe[PIPE_RD]); + uring_close(&server->task, proxy->cpipe[PIPE_WR]); +out_free: + xfree(proxy); +out: + return NULL; } static void @@ -293,7 +408,7 @@ local_accept(struct uring_task *task, int res) debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); if (res < 0) { - error("result was %i", res); + error("res: %i", res); goto out; } diff --git a/server-proxy.h b/server-proxy.h index dc8300d..14d176d 100644 --- a/server-proxy.h +++ b/server-proxy.h @@ -6,12 +6,14 @@ struct server_proxy { struct uring_task_buf clientbuf; struct uring_task clienttask; uint64_t client_bytes; + int cpipe[2]; int cfd; struct connection server_conn; struct uring_task_buf serverbuf; struct uring_task servertask; uint64_t server_bytes; + int spipe[2]; int sfd; time_t begin; @@ -1,9 +1,11 @@ +#define _GNU_SOURCE #include <liburing.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include <string.h> +#include <unistd.h> #include "main.h" #include "uring.h" @@ -12,6 +14,10 @@ struct uring_ev { struct io_uring uring; struct io_uring_params uring_params; struct uring_task task; + + /* for testing if the kernel supports splice */ + int pipe[2]; + int tfd; }; enum cqe_type { @@ -519,6 +525,22 @@ uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) } void +uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && fd_in >= 0 && fd_out >= 0 && cb); + + debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u", + task->name, task, fd_in, fd_out, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE); + io_uring_sqe_set_data(sqe, task); +} + +void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb) { struct io_uring_sqe *sqe; @@ -592,6 +614,30 @@ uring_delete() uring_task_put(task); } +static void +uring_splice_test_cb(struct uring_task *task, int res) +{ + struct uring_ev *uring = container_of(task, struct uring_ev, task); + + assert_die(task && uring == cfg->uring, "splice test failed"); + + uring_close(task, uring->tfd); + uring_close(task, uring->pipe[PIPE_RD]); + uring_close(task, uring->pipe[PIPE_WR]); + + uring->tfd = -1; + uring->pipe[PIPE_RD] = -1; + uring->pipe[PIPE_WR] = -1; + + if (res >= 0) { + cfg->splice_supported = true; + debug(DBG_UR, "splice supported"); + } else if (res == -EINVAL) + debug(DBG_UR, "splice not supported"); + else + error("splice check failed: %i\n", res); +} + void uring_init() { @@ -611,6 +657,15 @@ uring_init() uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free); cfg->uring = uring; + + /* splice check, a bit convoluted, but seems to be no simpler way */ + cfg->splice_supported = false; + if (pipe2(uring->pipe, O_CLOEXEC) < 0) + die("pipe2: %m"); + uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY); + if (uring->tfd < 0) + die("open(\"/dev/null\"): %m"); + uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb); } static inline void @@ -56,6 +56,8 @@ void uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb); void uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb); +void uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb); + void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb); void uring_poll_cancel(struct uring_task *task); |