summaryrefslogtreecommitdiff
path: root/minecproxy/server-proxy.c
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-23 20:56:22 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-23 20:56:22 +0200
commitea053d96f7e89e053d4af8d39b04c5428760345f (patch)
tree8182ca73675ad3933b0f38cb48a99c69101309b4 /minecproxy/server-proxy.c
parent8c27290245b7bcc7cd2f72f3b4a7562294b43bbe (diff)
Big renaming, move some more functionality to shared lib
Diffstat (limited to 'minecproxy/server-proxy.c')
-rw-r--r--minecproxy/server-proxy.c578
1 files changed, 578 insertions, 0 deletions
diff --git a/minecproxy/server-proxy.c b/minecproxy/server-proxy.c
new file mode 100644
index 0000000..d8ff0cf
--- /dev/null
+++ b/minecproxy/server-proxy.c
@@ -0,0 +1,578 @@
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "main.h"
+#include "uring.h"
+#include "ptimer.h"
+#include "server.h"
+#include "server-proxy.h"
+
+static void
+format_bytes(char *buf, size_t len, uint64_t val)
+{
+ uint64_t tmp;
+ const char *suffix = "B";
+
+ assert_return(buf && len > 0);
+
+ tmp = val * 10;
+ if (val > 1152921504606846976ULL) {
+ tmp = val / 115292150460684697ULL;
+ suffix= "EiB";
+ } else if (val > 1125899906842624ULL) {
+ tmp /= 1125899906842624ULL;
+ suffix = "PiB";
+ } else if (val > 1099511627776ULL) {
+ tmp /= 1099511627776ULL;
+ suffix = "TiB";
+ } else if (val > 1073741824ULL) {
+ tmp /= 1073741824ULL;
+ suffix = "GiB";
+ } else if (val > 1048576) {
+ tmp /= 1048576;
+ suffix = "MiB";
+ } else if (val > 1024) {
+ tmp /= 1024;
+ suffix = "KiB";
+ }
+
+ snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix);
+}
+
+static void
+format_time(char *buf, size_t len, time_t diff)
+{
+ unsigned hh, mm, ss;
+
+ assert_return(buf && len > 0);
+
+ hh = diff / 3600;
+ diff %= 3600;
+ mm = diff / 60;
+ diff %= 60;
+ ss = diff;
+
+ snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss);
+}
+
+static void
+proxy_free(struct uring_task *task)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, task);
+ char cts[100];
+ char stc[100];
+ char duration[100];
+
+ assert_return(task);
+
+ debug(DBG_PROXY, "server: %s, src: %s, dst: %s",
+ proxy->server->name,
+ proxy->client_conn.remote.addrstr,
+ proxy->server_conn.remote.addrstr);
+
+ if (proxy->begin > 0) {
+ format_time(duration, sizeof(duration), time(NULL) - proxy->begin);
+ format_bytes(cts, sizeof(cts), proxy->client_bytes);
+ format_bytes(stc, sizeof(stc), proxy->server_bytes);
+
+ info("%s: proxy connection %s -> %s closed "
+ "(CtS: %s, StC: %s), duration %s",
+ proxy->server->name,
+ proxy->client_conn.remote.addrstr,
+ proxy->server_conn.remote.addrstr,
+ cts, stc, duration);
+ }
+
+ list_del(&proxy->list);
+ xfree(proxy);
+}
+
+static void
+proxy_client_free(struct uring_task *task)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
+
+ assert_return(task);
+
+ debug(DBG_PROXY, "%s: client connection closed", proxy->server->name);
+}
+
+static void
+proxy_server_free(struct uring_task *task)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+
+ debug(DBG_PROXY, "%s: server connection closed", proxy->server->name);
+}
+
+void
+proxy_delete(struct server_proxy *proxy)
+{
+ debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy);
+
+ assert_return(proxy);
+
+ ptimer_del_task(&proxy->ptask);
+
+ if (cfg->splice_supported) {
+ uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]);
+ uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]);
+ uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]);
+ uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]);
+ }
+
+ uring_task_set_fd(&proxy->servertask, proxy->sfd);
+ uring_task_destroy(&proxy->servertask);
+ uring_task_set_fd(&proxy->clienttask, proxy->cfd);
+ uring_task_destroy(&proxy->clienttask);
+ uring_task_destroy(&proxy->task);
+}
+
+/*
+ * These four functions provide the fallback read-write mode
+ */
+static void proxy_client_read(struct uring_task *task, int res);
+
+static void
+proxy_client_written(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ proxy->client_bytes += res;
+ uring_task_set_fd(&proxy->clienttask, proxy->cfd);
+ uring_tbuf_read(task, proxy_client_read);
+}
+
+static void
+proxy_client_read(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_task_set_fd(&proxy->clienttask, proxy->sfd);
+ uring_tbuf_write(task, proxy_client_written);
+}
+
+static void proxy_server_read(struct uring_task *task, int res);
+
+static void
+proxy_server_written(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ proxy->server_bytes += res;
+ uring_task_set_fd(&proxy->servertask, proxy->sfd);
+ uring_tbuf_read(&proxy->servertask, proxy_server_read);
+}
+
+static void
+proxy_server_read(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_task_set_fd(&proxy->servertask, proxy->cfd);
+ uring_tbuf_write(task, proxy_server_written);
+}
+
+/*
+ * These four functions provide the splice fd->pipe->fd mode
+ */
+static void proxy_client_spliced_in(struct uring_task *task, int res);
+
+static void
+proxy_client_spliced_out(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
+}
+
+static void
+proxy_client_spliced_in(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out);
+}
+
+static void proxy_server_spliced_in(struct uring_task *task, int res);
+
+static void
+proxy_server_spliced_out(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
+}
+
+static void
+proxy_server_spliced_in(struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ if (res <= 0) {
+ debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
+ proxy_delete(proxy);
+ return;
+ }
+
+ uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out);
+}
+
+static void
+proxy_connected_cb(struct connection *conn, bool connected)
+{
+ struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn);
+
+ assert_return(conn);
+ assert_task_alive(DBG_PROXY, &proxy->clienttask);
+ assert_task_alive(DBG_PROXY, &proxy->servertask);
+
+ proxy->connecting = false;
+ if (!connected) {
+ error("%s: proxy connection to remote server failed",
+ proxy->server->name);
+ if (!proxy->ptask.active)
+ proxy_delete(proxy);
+ return;
+ }
+
+ ptimer_del_task(&proxy->ptask);
+
+ proxy->sfd = proxy->servertask.fd;
+ verbose("%s: proxy connection %s -> %s opened",
+ proxy->server->name,
+ proxy->client_conn.remote.addrstr,
+ proxy->server_conn.remote.addrstr);
+ proxy->begin = time(NULL);
+
+ if (cfg->splice_supported) {
+ debug(DBG_PROXY, "handling proxy connection with splice");
+ uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
+ uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
+ } else {
+ debug(DBG_PROXY, "handling proxy connection with read-write");
+ uring_tbuf_read(&proxy->clienttask, proxy_client_read);
+ uring_tbuf_read(&proxy->servertask, proxy_server_read);
+ }
+}
+
+void
+proxy_refdump(struct server_proxy *proxy)
+{
+ assert_return(proxy);
+
+ uring_task_refdump(&proxy->task);
+ uring_task_refdump(&proxy->clienttask);
+ uring_task_refdump(&proxy->servertask);
+}
+
+static void
+proxy_connect_timer_cb(struct ptimer_task *ptask)
+{
+ struct server_proxy *proxy = container_of(ptask, struct server_proxy, ptask);
+
+ assert_return(ptask);
+
+ if (proxy->connecting)
+ return;
+
+ proxy->connecting = true;
+ connect_any(&proxy->servertask, &proxy->server->remotes,
+ &proxy->server_conn, proxy_connected_cb);
+}
+
+struct server_proxy *
+proxy_new(struct server *server, struct saddr *client, int fd)
+{
+ struct server_proxy *proxy;
+
+ assert_return(server && client && fd > 0, NULL);
+
+ proxy = zmalloc(sizeof(*proxy));
+ if (!proxy) {
+ error("malloc: %m");
+ goto out;
+ }
+
+ if (cfg->splice_supported) {
+ if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) {
+ error("pipe2: %m");
+ goto out_free;
+ }
+
+ if (pipe2(proxy->spipe, O_CLOEXEC) < 0) {
+ error("pipe2: %m");
+ goto out_close_cpipe;
+ }
+ }
+
+ proxy->sfd = -1;
+ proxy->cfd = fd;
+ proxy->server = server;
+ uring_task_init(&proxy->task, "proxy", &server->task, proxy_free);
+
+ connection_set_local(&proxy->client_conn, fd);
+ connection_set_remote(&proxy->client_conn, client);
+
+ uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task,
+ proxy_client_free);
+ uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf);
+ uring_task_set_fd(&proxy->clienttask, fd);
+
+ uring_task_init(&proxy->servertask, "proxy_server", &proxy->task,
+ proxy_server_free);
+ uring_task_set_buf(&proxy->servertask, &proxy->serverbuf);
+
+ list_add(&proxy->list, &server->proxys);
+
+ if (server->state != SERVER_STATE_RUNNING) {
+ if (server_start(server) &&
+ cfg->proxy_connection_interval > 0 &&
+ cfg->proxy_connection_attempts > 0) {
+ ptask_init(&proxy->ptask,
+ cfg->proxy_connection_interval,
+ cfg->proxy_connection_attempts,
+ proxy_connect_timer_cb);
+ ptimer_add_task(&proxy->ptask);
+ }
+ }
+
+ proxy->connecting = true;
+ connect_any(&proxy->servertask, &server->remotes,
+ &proxy->server_conn, proxy_connected_cb);
+
+ return proxy;
+
+out_close_cpipe:
+ uring_close(&server->task, proxy->cpipe[PIPE_RD]);
+ uring_close(&server->task, proxy->cpipe[PIPE_WR]);
+out_free:
+ xfree(proxy);
+out:
+ return NULL;
+}
+
+static void
+local_accept(struct uring_task *task, int res)
+{
+ struct server_local *local = container_of(task, struct server_local, task);
+ struct server *server = container_of(task->parent, struct server, task);
+ struct server_proxy *proxy;
+
+ assert_return(task);
+ assert_task_alive(DBG_PROXY, task);
+
+ debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name);
+
+ if (res < 0) {
+ error("res: %i", res);
+ goto out;
+ }
+
+ saddr_set_addrstr(&local->client);
+
+ verbose("%s: incoming proxy connection: %s -> %s",
+ server->name, local->client.addrstr, local->local.addrstr);
+
+ if (list_empty(&server->remotes)) {
+ /* This shouldn't be possible, checked before opening local */
+ error("server->remotes empty!");
+ uring_close(&local->task, res);
+ goto out;
+ }
+
+ proxy = proxy_new(server, &local->client, res);
+ if (!proxy)
+ uring_close(&local->task, res);
+
+out:
+ uring_accept(&local->task, &local->client, local_accept);
+}
+
+bool
+local_open(struct server_local *local)
+{
+ int sfd;
+ int option;
+ int r;
+
+ assert_return(local && local->server, false);
+
+ sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ if (sfd < 0) {
+ error("socket: %m");
+ goto error;
+ }
+
+ option = true;
+ if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) {
+ error("setsockopt: %m");
+ goto error;
+ }
+
+ /* The MC protocol expects the client to send data first */
+ if (cfg->socket_defer) {
+ option = true;
+ if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0)
+ error("setsockopt: %m");
+ }
+
+ /*
+ * This has the advantage that interfaces don't need to be up but
+ * it means that cfg errors will not be caught.
+ */
+ if (cfg->socket_freebind) {
+ option = true;
+ if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0)
+ error("setsockopt: %m");
+ }
+
+ socket_set_low_latency(sfd, cfg->socket_keepalive,
+ cfg->socket_iptos, cfg->socket_nodelay);
+
+ r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen);
+ if (r < 0) {
+ error("bind: %m");
+ goto error;
+ }
+
+ r = listen(sfd, 100);
+ if (r < 0) {
+ error("listen: %m");
+ goto error;
+ }
+
+ uring_task_set_fd(&local->task, sfd);
+ uring_accept(&local->task, &local->client, local_accept);
+ return true;
+
+error:
+ if (sfd >= 0)
+ uring_close(&local->task, sfd);
+ return false;
+}
+
+void
+local_refdump(struct server_local *local)
+{
+ assert_return(local);
+
+ uring_task_refdump(&local->task);
+}
+
+static void
+local_free(struct uring_task *task)
+{
+ struct server_local *local = container_of(task, struct server_local, task);
+
+ assert_return(task);
+
+ debug(DBG_PROXY, "task %p, local %p", task, local);
+ list_del(&local->list);
+ xfree(local);
+}
+
+void
+local_delete(struct server_local *local)
+{
+ assert_return(local);
+
+ uring_task_destroy(&local->task);
+}
+
+struct server_local *
+local_new(struct server *server, struct saddr *saddr)
+{
+ struct server_local *local;
+
+ assert_return(server && saddr, NULL);
+
+ local = zmalloc(sizeof(*local));
+ if (!local) {
+ error("malloc: %m");
+ return NULL;
+ }
+
+ debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr);
+ local->local = *saddr;
+ local->server = server;
+ uring_task_init(&local->task, "local", &server->task, local_free);
+ xfree(saddr);
+ return local;
+}
+