summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-22 15:17:36 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-22 15:17:36 +0200
commitfcd154280c21746db7d994ed1be77f20f91c90c0 (patch)
tree78796c8560c004ad4c4d81747a6d7546be4ca039
parenta7fd6536f76144d7e2d18caa71f4abe516299b91 (diff)
Add basic splice support to server-proxy (untested)
-rw-r--r--main.h1
-rw-r--r--server-proxy.c153
-rw-r--r--server-proxy.h2
-rw-r--r--uring.c55
-rw-r--r--uring.h2
5 files changed, 194 insertions, 19 deletions
diff --git a/main.h b/main.h
index 05c5c82..84d0a1c 100644
--- a/main.h
+++ b/main.h
@@ -146,6 +146,7 @@ struct cfg {
char *cfg_path;
bool do_igmp;
char *igmp_iface;
+ bool splice_supported;
struct uring_ev *uring;
struct server_cfg_monitor *server_cfg_monitor;
diff --git a/server-proxy.c b/server-proxy.c
index a0affbd..60f4b33 100644
--- a/server-proxy.c
+++ b/server-proxy.c
@@ -1,9 +1,12 @@
+#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
#include "main.h"
#include "uring.h"
@@ -118,15 +121,26 @@ proxy_delete(struct server_proxy *proxy)
assert_return(proxy);
+ if (cfg->splice_supported) {
+ uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]);
+ uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]);
+ uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]);
+ uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]);
+ }
+ uring_task_set_fd(&proxy->servertask, proxy->sfd);
uring_task_destroy(&proxy->servertask);
+ uring_task_set_fd(&proxy->clienttask, proxy->cfd);
uring_task_destroy(&proxy->clienttask);
uring_task_destroy(&proxy->task);
}
-static void proxy_client_data_in(struct uring_task *task, int res);
+/*
+ * These four functions provide the fallback read-write mode
+ */
+static void proxy_client_read(struct uring_task *task, int res);
static void
-proxy_client_data_out(struct uring_task *task, int res)
+proxy_client_written(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
@@ -134,7 +148,7 @@ proxy_client_data_out(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
@@ -142,11 +156,11 @@ proxy_client_data_out(struct uring_task *task, int res)
proxy->client_bytes += res;
uring_task_set_fd(&proxy->clienttask, proxy->cfd);
- uring_tbuf_read(task, proxy_client_data_in);
+ uring_tbuf_read(task, proxy_client_read);
}
static void
-proxy_client_data_in(struct uring_task *task, int res)
+proxy_client_read(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
@@ -154,21 +168,20 @@ proxy_client_data_in(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
}
uring_task_set_fd(&proxy->clienttask, proxy->sfd);
- uring_tbuf_write(task, proxy_client_data_out);
+ uring_tbuf_write(task, proxy_client_written);
}
-static void proxy_server_data_in(struct uring_task *task,
- int res);
+static void proxy_server_read(struct uring_task *task, int res);
static void
-proxy_server_data_out(struct uring_task *task, int res)
+proxy_server_written(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
@@ -176,7 +189,7 @@ proxy_server_data_out(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
@@ -184,11 +197,11 @@ proxy_server_data_out(struct uring_task *task, int res)
proxy->server_bytes += res;
uring_task_set_fd(&proxy->servertask, proxy->sfd);
- uring_tbuf_read(&proxy->servertask, proxy_server_data_in);
+ uring_tbuf_read(&proxy->servertask, proxy_server_read);
}
static void
-proxy_server_data_in(struct uring_task *task, int res)
+proxy_server_read(struct uring_task *task, int res)
{
struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
@@ -196,14 +209,89 @@ proxy_server_data_in(struct uring_task *task, int res)
assert_task_alive(DBG_PROXY, task);
if (res <= 0) {
- debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res);
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
uring_task_close_fd(task);
proxy_delete(proxy);
return;
}
uring_task_set_fd(&proxy->servertask, proxy->cfd);
- uring_tbuf_write(task, proxy_server_data_out);
+ uring_tbuf_write(task, proxy_server_written);
+}
+
+/*
+ * These four functions provide the splice fd->pipe->fd mode
+ */
+static void proxy_client_spliced_in(struct uring_task *task, int res);
+
+static void
+proxy_client_spliced_out(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
+}
+
+static void
+proxy_client_spliced_in(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out);
+}
+
+static void proxy_server_spliced_in(struct uring_task *task, int res);
+
+static void
+proxy_server_spliced_out(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
+}
+
+static void
+proxy_server_spliced_in(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out);
}
static void
@@ -229,8 +317,15 @@ proxy_connected_cb(struct connection *conn, bool connected)
proxy->server_conn.remote.addrstr);
proxy->begin = time(NULL);
- uring_tbuf_read(&proxy->clienttask, proxy_client_data_in);
- uring_tbuf_read(&proxy->servertask, proxy_server_data_in);
+ if (cfg->splice_supported) {
+ debug(DBG_PROXY, "handling proxy connection with splice");
+ uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
+ uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
+ } else {
+ debug(DBG_PROXY, "handling proxy connection with read-write");
+ uring_tbuf_read(&proxy->clienttask, proxy_client_read);
+ uring_tbuf_read(&proxy->servertask, proxy_server_read);
+ }
}
void
@@ -253,7 +348,19 @@ proxy_new(struct server *server, struct saddr *client, int fd)
proxy = zmalloc(sizeof(*proxy));
if (!proxy) {
error("malloc: %m");
- return NULL;
+ goto out;
+ }
+
+ if (cfg->splice_supported) {
+ if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) {
+ error("pipe2: %m");
+ goto out_free;
+ }
+
+ if (pipe2(proxy->spipe, O_CLOEXEC) < 0) {
+ error("pipe2: %m");
+ goto out_close_cpipe;
+ }
}
proxy->sfd = -1;
@@ -278,6 +385,14 @@ proxy_new(struct server *server, struct saddr *client, int fd)
&proxy->server_conn, proxy_connected_cb);
return proxy;
+
+out_close_cpipe:
+ uring_close(&server->task, proxy->cpipe[PIPE_RD]);
+ uring_close(&server->task, proxy->cpipe[PIPE_WR]);
+out_free:
+ xfree(proxy);
+out:
+ return NULL;
}
static void
@@ -293,7 +408,7 @@ local_accept(struct uring_task *task, int res)
debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name);
if (res < 0) {
- error("result was %i", res);
+ error("res: %i", res);
goto out;
}
diff --git a/server-proxy.h b/server-proxy.h
index dc8300d..14d176d 100644
--- a/server-proxy.h
+++ b/server-proxy.h
@@ -6,12 +6,14 @@ struct server_proxy {
struct uring_task_buf clientbuf;
struct uring_task clienttask;
uint64_t client_bytes;
+ int cpipe[2];
int cfd;
struct connection server_conn;
struct uring_task_buf serverbuf;
struct uring_task servertask;
uint64_t server_bytes;
+ int spipe[2];
int sfd;
time_t begin;
diff --git a/uring.c b/uring.c
index 31dfe26..3437d7e 100644
--- a/uring.c
+++ b/uring.c
@@ -1,9 +1,11 @@
+#define _GNU_SOURCE
#include <liburing.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
+#include <unistd.h>
#include "main.h"
#include "uring.h"
@@ -12,6 +14,10 @@ struct uring_ev {
struct io_uring uring;
struct io_uring_params uring_params;
struct uring_task task;
+
+ /* for testing if the kernel supports splice */
+ int pipe[2];
+ int tfd;
};
enum cqe_type {
@@ -519,6 +525,22 @@ uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb)
}
void
+uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb)
+{
+ struct io_uring_sqe *sqe;
+
+ assert_return(task && fd_in >= 0 && fd_out >= 0 && cb);
+
+ debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u",
+ task->name, task, fd_in, fd_out, task->refcount);
+
+ sqe = get_sqe(task);
+ task->cb = cb;
+ io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb)
{
struct io_uring_sqe *sqe;
@@ -592,6 +614,30 @@ uring_delete()
uring_task_put(task);
}
+static void
+uring_splice_test_cb(struct uring_task *task, int res)
+{
+ struct uring_ev *uring = container_of(task, struct uring_ev, task);
+
+ assert_die(task && uring == cfg->uring, "splice test failed");
+
+ uring_close(task, uring->tfd);
+ uring_close(task, uring->pipe[PIPE_RD]);
+ uring_close(task, uring->pipe[PIPE_WR]);
+
+ uring->tfd = -1;
+ uring->pipe[PIPE_RD] = -1;
+ uring->pipe[PIPE_WR] = -1;
+
+ if (res >= 0) {
+ cfg->splice_supported = true;
+ debug(DBG_UR, "splice supported");
+ } else if (res == -EINVAL)
+ debug(DBG_UR, "splice not supported");
+ else
+ error("splice check failed: %i\n", res);
+}
+
void
uring_init()
{
@@ -611,6 +657,15 @@ uring_init()
uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free);
cfg->uring = uring;
+
+ /* splice check, a bit convoluted, but seems to be no simpler way */
+ cfg->splice_supported = false;
+ if (pipe2(uring->pipe, O_CLOEXEC) < 0)
+ die("pipe2: %m");
+ uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY);
+ if (uring->tfd < 0)
+ die("open(\"/dev/null\"): %m");
+ uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb);
}
static inline void
diff --git a/uring.h b/uring.h
index 9f686b3..9c33104 100644
--- a/uring.h
+++ b/uring.h
@@ -56,6 +56,8 @@ void uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb);
void uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb);
+void uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb);
+
void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb);
void uring_poll_cancel(struct uring_task *task);