summaryrefslogtreecommitdiff
path: root/server-proxy.c
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-23 16:25:36 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-23 16:25:36 +0200
commit8c27290245b7bcc7cd2f72f3b4a7562294b43bbe (patch)
tree54bae7909a94bfc598df7b88d9794742daf0bb31 /server-proxy.c
parent973ae757342b91e3e6aafd07e0c0a24af84aad98 (diff)
Split directories better
Diffstat (limited to 'server-proxy.c')
-rw-r--r--server-proxy.c578
1 files changed, 0 insertions, 578 deletions
diff --git a/server-proxy.c b/server-proxy.c
deleted file mode 100644
index 4cbbb87..0000000
--- a/server-proxy.c
+++ /dev/null
@@ -1,578 +0,0 @@
-#define _GNU_SOURCE
-#include <stdio.h>
-#include <unistd.h>
-#include <time.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-#include "main.h"
-#include "uring.h"
-#include "ptimer.h"
-#include "server.h"
-#include "server-proxy.h"
-#include "utils.h"
-
-static void
-format_bytes(char *buf, size_t len, uint64_t val)
-{
- uint64_t tmp;
- const char *suffix = "B";
-
- assert_return(buf && len > 0);
-
- tmp = val * 10;
- if (val > 1152921504606846976ULL) {
- tmp = val / 115292150460684697ULL;
- suffix= "EiB";
- } else if (val > 1125899906842624ULL) {
- tmp /= 1125899906842624ULL;
- suffix = "PiB";
- } else if (val > 1099511627776ULL) {
- tmp /= 1099511627776ULL;
- suffix = "TiB";
- } else if (val > 1073741824ULL) {
- tmp /= 1073741824ULL;
- suffix = "GiB";
- } else if (val > 1048576) {
- tmp /= 1048576;
- suffix = "MiB";
- } else if (val > 1024) {
- tmp /= 1024;
- suffix = "KiB";
- }
-
- snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix);
-}
-
-static void
-format_time(char *buf, size_t len, time_t diff)
-{
- unsigned hh, mm, ss;
-
- assert_return(buf && len > 0);
-
- hh = diff / 3600;
- diff %= 3600;
- mm = diff / 60;
- diff %= 60;
- ss = diff;
-
- snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss);
-}
-
-static void
-proxy_free(struct uring_task *task)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, task);
- char cts[100];
- char stc[100];
- char duration[100];
-
- assert_return(task);
-
- debug(DBG_PROXY, "server: %s, src: %s, dst: %s",
- proxy->server->name,
- proxy->client_conn.remote.addrstr,
- proxy->server_conn.remote.addrstr);
-
- if (proxy->begin > 0) {
- format_time(duration, sizeof(duration), time(NULL) - proxy->begin);
- format_bytes(cts, sizeof(cts), proxy->client_bytes);
- format_bytes(stc, sizeof(stc), proxy->server_bytes);
-
- info("%s: proxy connection %s -> %s closed "
- "(CtS: %s, StC: %s), duration %s",
- proxy->server->name,
- proxy->client_conn.remote.addrstr,
- proxy->server_conn.remote.addrstr,
- cts, stc, duration);
- }
-
- list_del(&proxy->list);
- xfree(proxy);
-}
-
-static void
-proxy_client_free(struct uring_task *task)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
-
- assert_return(task);
-
- debug(DBG_PROXY, "%s: client connection closed", proxy->server->name);
-}
-
-static void
-proxy_server_free(struct uring_task *task)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
-
- debug(DBG_PROXY, "%s: server connection closed", proxy->server->name);
-}
-
-void
-proxy_delete(struct server_proxy *proxy)
-{
- debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy);
-
- assert_return(proxy);
-
- ptimer_del_task(&proxy->ptask);
-
- if (cfg->splice_supported) {
- uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]);
- uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]);
- uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]);
- uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]);
- }
-
- uring_task_set_fd(&proxy->servertask, proxy->sfd);
- uring_task_destroy(&proxy->servertask);
- uring_task_set_fd(&proxy->clienttask, proxy->cfd);
- uring_task_destroy(&proxy->clienttask);
- uring_task_destroy(&proxy->task);
-}
-
-/*
- * These four functions provide the fallback read-write mode
- */
-static void proxy_client_read(struct uring_task *task, int res);
-
-static void
-proxy_client_written(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- proxy->client_bytes += res;
- uring_task_set_fd(&proxy->clienttask, proxy->cfd);
- uring_tbuf_read(task, proxy_client_read);
-}
-
-static void
-proxy_client_read(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- uring_task_set_fd(&proxy->clienttask, proxy->sfd);
- uring_tbuf_write(task, proxy_client_written);
-}
-
-static void proxy_server_read(struct uring_task *task, int res);
-
-static void
-proxy_server_written(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- proxy->server_bytes += res;
- uring_task_set_fd(&proxy->servertask, proxy->sfd);
- uring_tbuf_read(&proxy->servertask, proxy_server_read);
-}
-
-static void
-proxy_server_read(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- uring_task_set_fd(&proxy->servertask, proxy->cfd);
- uring_tbuf_write(task, proxy_server_written);
-}
-
-/*
- * These four functions provide the splice fd->pipe->fd mode
- */
-static void proxy_client_spliced_in(struct uring_task *task, int res);
-
-static void
-proxy_client_spliced_out(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
-}
-
-static void
-proxy_client_spliced_in(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out);
-}
-
-static void proxy_server_spliced_in(struct uring_task *task, int res);
-
-static void
-proxy_server_spliced_out(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
-}
-
-static void
-proxy_server_spliced_in(struct uring_task *task, int res)
-{
- struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- if (res <= 0) {
- debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res);
- proxy_delete(proxy);
- return;
- }
-
- uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out);
-}
-
-static void
-proxy_connected_cb(struct connection *conn, bool connected)
-{
- struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn);
-
- assert_return(conn);
- assert_task_alive(DBG_PROXY, &proxy->clienttask);
- assert_task_alive(DBG_PROXY, &proxy->servertask);
-
- proxy->connecting = false;
- if (!connected) {
- error("%s: proxy connection to remote server failed",
- proxy->server->name);
- if (!proxy->ptask.active)
- proxy_delete(proxy);
- return;
- }
-
- ptimer_del_task(&proxy->ptask);
-
- proxy->sfd = proxy->servertask.fd;
- verbose("%s: proxy connection %s -> %s opened",
- proxy->server->name,
- proxy->client_conn.remote.addrstr,
- proxy->server_conn.remote.addrstr);
- proxy->begin = time(NULL);
-
- if (cfg->splice_supported) {
- debug(DBG_PROXY, "handling proxy connection with splice");
- uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in);
- uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in);
- } else {
- debug(DBG_PROXY, "handling proxy connection with read-write");
- uring_tbuf_read(&proxy->clienttask, proxy_client_read);
- uring_tbuf_read(&proxy->servertask, proxy_server_read);
- }
-}
-
-void
-proxy_refdump(struct server_proxy *proxy)
-{
- assert_return(proxy);
-
- uring_task_refdump(&proxy->task);
- uring_task_refdump(&proxy->clienttask);
- uring_task_refdump(&proxy->servertask);
-}
-
-static void
-proxy_connect_timer_cb(struct ptimer_task *ptask)
-{
- struct server_proxy *proxy = container_of(ptask, struct server_proxy, ptask);
-
- assert_return(ptask);
-
- if (proxy->connecting)
- return;
-
- proxy->connecting = true;
- connect_any(&proxy->servertask, &proxy->server->remotes,
- &proxy->server_conn, proxy_connected_cb);
-}
-
-struct server_proxy *
-proxy_new(struct server *server, struct saddr *client, int fd)
-{
- struct server_proxy *proxy;
-
- assert_return(server && client && fd > 0, NULL);
-
- proxy = zmalloc(sizeof(*proxy));
- if (!proxy) {
- error("malloc: %m");
- goto out;
- }
-
- if (cfg->splice_supported) {
- if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) {
- error("pipe2: %m");
- goto out_free;
- }
-
- if (pipe2(proxy->spipe, O_CLOEXEC) < 0) {
- error("pipe2: %m");
- goto out_close_cpipe;
- }
- }
-
- proxy->sfd = -1;
- proxy->cfd = fd;
- proxy->server = server;
- uring_task_init(&proxy->task, "proxy", &server->task, proxy_free);
-
- connection_set_local(&proxy->client_conn, fd);
- connection_set_remote(&proxy->client_conn, client);
-
- uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task,
- proxy_client_free);
- uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf);
- uring_task_set_fd(&proxy->clienttask, fd);
-
- uring_task_init(&proxy->servertask, "proxy_server", &proxy->task,
- proxy_server_free);
- uring_task_set_buf(&proxy->servertask, &proxy->serverbuf);
-
- list_add(&proxy->list, &server->proxys);
-
- if (server->state != SERVER_STATE_RUNNING) {
- if (server_start(server) &&
- cfg->proxy_connection_interval > 0 &&
- cfg->proxy_connection_attempts > 0) {
- ptask_init(&proxy->ptask,
- cfg->proxy_connection_interval,
- cfg->proxy_connection_attempts,
- proxy_connect_timer_cb);
- ptimer_add_task(&proxy->ptask);
- }
- }
-
- proxy->connecting = true;
- connect_any(&proxy->servertask, &server->remotes,
- &proxy->server_conn, proxy_connected_cb);
-
- return proxy;
-
-out_close_cpipe:
- uring_close(&server->task, proxy->cpipe[PIPE_RD]);
- uring_close(&server->task, proxy->cpipe[PIPE_WR]);
-out_free:
- xfree(proxy);
-out:
- return NULL;
-}
-
-static void
-local_accept(struct uring_task *task, int res)
-{
- struct server_local *local = container_of(task, struct server_local, task);
- struct server *server = container_of(task->parent, struct server, task);
- struct server_proxy *proxy;
-
- assert_return(task);
- assert_task_alive(DBG_PROXY, task);
-
- debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name);
-
- if (res < 0) {
- error("res: %i", res);
- goto out;
- }
-
- saddr_set_addrstr(&local->client);
-
- verbose("%s: incoming proxy connection: %s -> %s",
- server->name, local->client.addrstr, local->local.addrstr);
-
- if (list_empty(&server->remotes)) {
- /* This shouldn't be possible, checked before opening local */
- error("server->remotes empty!");
- uring_close(&local->task, res);
- goto out;
- }
-
- proxy = proxy_new(server, &local->client, res);
- if (!proxy)
- uring_close(&local->task, res);
-
-out:
- uring_accept(&local->task, &local->client, local_accept);
-}
-
-bool
-local_open(struct server_local *local)
-{
- int sfd;
- int option;
- int r;
-
- assert_return(local && local->server, false);
-
- sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0);
- if (sfd < 0) {
- error("socket: %m");
- goto error;
- }
-
- option = true;
- if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) {
- error("setsockopt: %m");
- goto error;
- }
-
- /* The MC protocol expects the client to send data first */
- if (cfg->socket_defer) {
- option = true;
- if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0)
- error("setsockopt: %m");
- }
-
- /*
- * This has the advantage that interfaces don't need to be up but
- * it means that cfg errors will not be caught.
- */
- if (cfg->socket_freebind) {
- option = true;
- if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0)
- error("setsockopt: %m");
- }
-
- socket_set_low_latency(sfd);
-
- r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen);
- if (r < 0) {
- error("bind: %m");
- goto error;
- }
-
- r = listen(sfd, 100);
- if (r < 0) {
- error("listen: %m");
- goto error;
- }
-
- uring_task_set_fd(&local->task, sfd);
- uring_accept(&local->task, &local->client, local_accept);
- return true;
-
-error:
- if (sfd >= 0)
- uring_close(&local->task, sfd);
- return false;
-}
-
-void
-local_refdump(struct server_local *local)
-{
- assert_return(local);
-
- uring_task_refdump(&local->task);
-}
-
-static void
-local_free(struct uring_task *task)
-{
- struct server_local *local = container_of(task, struct server_local, task);
-
- assert_return(task);
-
- debug(DBG_PROXY, "task %p, local %p", task, local);
- list_del(&local->list);
- xfree(local);
-}
-
-void
-local_delete(struct server_local *local)
-{
- assert_return(local);
-
- uring_task_destroy(&local->task);
-}
-
-struct server_local *
-local_new(struct server *server, struct saddr *saddr)
-{
- struct server_local *local;
-
- assert_return(server && saddr, NULL);
-
- local = zmalloc(sizeof(*local));
- if (!local) {
- error("malloc: %m");
- return NULL;
- }
-
- debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr);
- local->local = *saddr;
- local->server = server;
- uring_task_init(&local->task, "local", &server->task, local_free);
- xfree(saddr);
- return local;
-}
-