summaryrefslogtreecommitdiff
path: root/server-proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'server-proxy.c')
-rw-r--r--server-proxy.c153
1 files changed, 134 insertions, 19 deletions
diff --git a/server-proxy.c b/server-proxy.c
index a0affbd..60f4b33 100644
--- a/server-proxy.c
+++ b/server-proxy.c
@@ -1,9 +1,12 @@
+#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
#include "main.h"
#include "uring.h"
@@ -118,15 +121,26 @@ proxy_delete(struct server_proxy *proxy)
assert_return(proxy);
+ if (cfg->splice_supported) {
+ uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]);
+ uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]);
+ uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]);
+ uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]);
+ }
+ uring_task_set_fd(&proxy->servertask, proxy->sfd);
uring_task_destroy(&proxy->servertask);
+ uring_task_set_fd(&proxy->clienttask, proxy->cfd);
uring_task_destroy(&proxy->clienttask);
uring_task_destroy(&proxy->task);
}
-static void proxy_client_data_in(struct uring_task *task, int res);
+/*
+ * These four functions provide the fallback read-write mode
+ */
+static void proxy_client_read(struct uring_task *task, int res);
static void
-proxy_client_data_out(struct uring_task *task, int res)
+proxy_client_written(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
@@ -134,7 +148,7 @@ proxy_client_data_out(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
@@ -142,11 +156,11 @@ proxy_client_data_out(struct uring_task *task, int res)
proxy->client_bytes += res;
uring_task_set_fd(&proxy->clienttask, proxy->cfd);
- uring_tbuf_read(task, proxy_client_data_in);
+ uring_tbuf_read(task, proxy_client_read);
}
static void
-proxy_client_data_in(struct uring_task *task, int res)
+proxy_client_read(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
@@ -154,21 +168,20 @@ proxy_client_data_in(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
}
uring_task_set_fd(&proxy->clienttask, proxy->sfd);
- uring_tbuf_write(task, proxy_client_data_out);
+ uring_tbuf_write(task, proxy_client_written);
}
-static void proxy_server_data_in(struct uring_task *task,
- int res);
+static void proxy_server_read(struct uring_task *task, int res);
static void
-proxy_server_data_out(struct uring_task *task, int res)
+proxy_server_written(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
@@ -176,7 +189,7 @@ proxy_server_data_out(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
@@ -184,11 +197,11 @@ proxy_server_data_out(struct uring_task *task, int res)
proxy->server_bytes += res;
uring_task_set_fd(&proxy->servertask, proxy->sfd);
- uring_tbuf_read(&proxy->servertask, proxy_server_data_in);
+ uring_tbuf_read(&proxy->servertask, proxy_server_read);
}
static void
-proxy_server_data_in(struct uring_task *task, int res)
+proxy_server_read(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
@@ -196,14 +209,89 @@ proxy_server_data_in(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
}
uring_task_set_fd(&proxy->servertask, proxy->cfd);
- uring_tbuf_write(task, proxy_server_data_out);
+ uring_tbuf_write(task, proxy_server_written);
+}
+
+/*
+ * These four functions provide the splice fd->pipe->fd mode
+ */
+static void proxy_client_spliced_in(struct uring_task *task, int res);
+
+static void
+proxy_client_spliced_out(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
+}
+
+static void
+proxy_client_spliced_in(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out);
+}
+
+static void proxy_server_spliced_in(struct uring_task *task, int res);
+
+static void
+proxy_server_spliced_out(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
+}
+
+static void
+proxy_server_spliced_in(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out);
}
static void
@@ -229,8 +317,15 @@ proxy_connected_cb(struct connection *conn, bool connected)
proxy->server_conn.remote.addrstr);
proxy->begin = time(NULL);
- uring_tbuf_read(&proxy->clienttask, proxy_client_data_in);
- uring_tbuf_read(&proxy->servertask, proxy_server_data_in);
+ if (cfg->splice_supported) {
+ debug(DBG_PROXY, "handling proxy connection with splice");
+ uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
+ uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
+ } else {
+ debug(DBG_PROXY, "handling proxy connection with read-write");
+ uring_tbuf_read(&proxy->clienttask, proxy_client_read);
+ uring_tbuf_read(&proxy->servertask, proxy_server_read);
+ }
}
void
@@ -253,7 +348,19 @@ proxy_new(struct server *server, struct saddr *client, int fd)
proxy = zmalloc(sizeof(*proxy));
if (!proxy) {
error("malloc: %m");
- return NULL;
+ goto out;
+ }
+
+ if (cfg->splice_supported) {
+ if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) {
+ error("pipe2: %m");
+ goto out_free;
+ }
+
+ if (pipe2(proxy->spipe, O_CLOEXEC) < 0) {
+ error("pipe2: %m");
+ goto out_close_cpipe;
+ }
}
proxy->sfd = -1;
@@ -278,6 +385,14 @@ proxy_new(struct server *server, struct saddr *client, int fd)
&proxy->server_conn, proxy_connected_cb);
return proxy;
+
+out_close_cpipe:
+ uring_close(&server->task, proxy->cpipe[PIPE_RD]);
+ uring_close(&server->task, proxy->cpipe[PIPE_WR]);
+out_free:
+ xfree(proxy);
+out:
+ return NULL;
}
static void
@@ -293,7 +408,7 @@ local_accept(struct uring_task *task, int res)
debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name);
if (res < 0) {
- error("result was %i", res);
+ error("res: %i", res);
goto out;
}