From fcd154280c21746db7d994ed1be77f20f91c90c0 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Mon, 22 Jun 2020 15:17:36 +0200 Subject: Add basic splice support to server-proxy (untested) --- server-proxy.c | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 134 insertions(+), 19 deletions(-) (limited to 'server-proxy.c') diff --git a/server-proxy.c b/server-proxy.c index a0affbd..60f4b33 100644 --- a/server-proxy.c +++ b/server-proxy.c @@ -1,9 +1,12 @@ +#define _GNU_SOURCE #include #include #include #include #include #include +#include +#include #include "main.h" #include "uring.h" @@ -118,15 +121,26 @@ proxy_delete(struct server_proxy *proxy) assert_return(proxy); + if (cfg->splice_supported) { + uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]); + } + uring_task_set_fd(&proxy->servertask, proxy->sfd); uring_task_destroy(&proxy->servertask); + uring_task_set_fd(&proxy->clienttask, proxy->cfd); uring_task_destroy(&proxy->clienttask); uring_task_destroy(&proxy->task); } -static void proxy_client_data_in(struct uring_task *task, int res); +/* + * These four functions provide the fallback read-write mode + */ +static void proxy_client_read(struct uring_task *task, int res); static void -proxy_client_data_out(struct uring_task *task, int res) +proxy_client_written(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); @@ -134,7 +148,7 @@ proxy_client_data_out(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; @@ -142,11 +156,11 @@ proxy_client_data_out(struct uring_task *task, int res) proxy->client_bytes += res; uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_tbuf_read(task, proxy_client_data_in); + uring_tbuf_read(task, proxy_client_read); } static void -proxy_client_data_in(struct uring_task *task, int res) +proxy_client_read(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); @@ -154,21 +168,20 @@ proxy_client_data_in(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_tbuf_write(task, proxy_client_data_out); + uring_tbuf_write(task, proxy_client_written); } -static void proxy_server_data_in(struct uring_task *task, - int res); +static void proxy_server_read(struct uring_task *task, int res); static void -proxy_server_data_out(struct uring_task *task, int res) +proxy_server_written(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); @@ -176,7 +189,7 @@ proxy_server_data_out(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; @@ -184,11 +197,11 @@ proxy_server_data_out(struct uring_task *task, int res) proxy->server_bytes += res; uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); + uring_tbuf_read(&proxy->servertask, proxy_server_read); } static void -proxy_server_data_in(struct uring_task *task, int res) +proxy_server_read(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); @@ -196,14 +209,89 @@ proxy_server_data_in(struct uring_task *task, int res) assert_task_alive(DBG_PROXY, task); if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); uring_task_close_fd(task); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_tbuf_write(task, proxy_server_data_out); + uring_tbuf_write(task, proxy_server_written); +} + +/* + * These four functions provide the splice fd->pipe->fd mode + */ +static void proxy_client_spliced_in(struct uring_task *task, int res); + +static void +proxy_client_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); +} + +static void +proxy_client_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out); +} + +static void proxy_server_spliced_in(struct uring_task *task, int res); + +static void +proxy_server_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); +} + +static void +proxy_server_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out); } static void @@ -229,8 +317,15 @@ proxy_connected_cb(struct connection *conn, bool connected) proxy->server_conn.remote.addrstr); proxy->begin = time(NULL); - uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); + if (cfg->splice_supported) { + debug(DBG_PROXY, "handling proxy connection with splice"); + uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); + uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); + } else { + debug(DBG_PROXY, "handling proxy connection with read-write"); + uring_tbuf_read(&proxy->clienttask, proxy_client_read); + uring_tbuf_read(&proxy->servertask, proxy_server_read); + } } void @@ -253,7 +348,19 @@ proxy_new(struct server *server, struct saddr *client, int fd) proxy = zmalloc(sizeof(*proxy)); if (!proxy) { error("malloc: %m"); - return NULL; + goto out; + } + + if (cfg->splice_supported) { + if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_free; + } + + if (pipe2(proxy->spipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_close_cpipe; + } } proxy->sfd = -1; @@ -278,6 +385,14 @@ proxy_new(struct server *server, struct saddr *client, int fd) &proxy->server_conn, proxy_connected_cb); return proxy; + +out_close_cpipe: + uring_close(&server->task, proxy->cpipe[PIPE_RD]); + uring_close(&server->task, proxy->cpipe[PIPE_WR]); +out_free: + xfree(proxy); +out: + return NULL; } static void @@ -293,7 +408,7 @@ local_accept(struct uring_task *task, int res) debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); if (res < 0) { - error("result was %i", res); + error("res: %i", res); goto out; } -- cgit v1.2.3