From 8c27290245b7bcc7cd2f72f3b4a7562294b43bbe Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Tue, 23 Jun 2020 16:25:36 +0200 Subject: Split directories better --- server-proxy.c | 578 --------------------------------------------------------- 1 file changed, 578 deletions(-) delete mode 100644 server-proxy.c (limited to 'server-proxy.c') diff --git a/server-proxy.c b/server-proxy.c deleted file mode 100644 index 4cbbb87..0000000 --- a/server-proxy.c +++ /dev/null @@ -1,578 +0,0 @@ -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include - -#include "main.h" -#include "uring.h" -#include "ptimer.h" -#include "server.h" -#include "server-proxy.h" -#include "utils.h" - -static void -format_bytes(char *buf, size_t len, uint64_t val) -{ - uint64_t tmp; - const char *suffix = "B"; - - assert_return(buf && len > 0); - - tmp = val * 10; - if (val > 1152921504606846976ULL) { - tmp = val / 115292150460684697ULL; - suffix= "EiB"; - } else if (val > 1125899906842624ULL) { - tmp /= 1125899906842624ULL; - suffix = "PiB"; - } else if (val > 1099511627776ULL) { - tmp /= 1099511627776ULL; - suffix = "TiB"; - } else if (val > 1073741824ULL) { - tmp /= 1073741824ULL; - suffix = "GiB"; - } else if (val > 1048576) { - tmp /= 1048576; - suffix = "MiB"; - } else if (val > 1024) { - tmp /= 1024; - suffix = "KiB"; - } - - snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); -} - -static void -format_time(char *buf, size_t len, time_t diff) -{ - unsigned hh, mm, ss; - - assert_return(buf && len > 0); - - hh = diff / 3600; - diff %= 3600; - mm = diff / 60; - diff %= 60; - ss = diff; - - snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); -} - -static void -proxy_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, task); - char cts[100]; - char stc[100]; - char duration[100]; - - assert_return(task); - - debug(DBG_PROXY, "server: %s, src: %s, dst: %s", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr); - - if (proxy->begin > 0) { - format_time(duration, sizeof(duration), time(NULL) - proxy->begin); - format_bytes(cts, sizeof(cts), proxy->client_bytes); - format_bytes(stc, sizeof(stc), proxy->server_bytes); - - info("%s: proxy connection %s -> %s closed " - "(CtS: %s, StC: %s), duration %s", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr, - cts, stc, duration); - } - - list_del(&proxy->list); - xfree(proxy); -} - -static void -proxy_client_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - - debug(DBG_PROXY, "%s: client connection closed", proxy->server->name); -} - -static void -proxy_server_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - - debug(DBG_PROXY, "%s: server connection closed", proxy->server->name); -} - -void -proxy_delete(struct server_proxy *proxy) -{ - debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy); - - assert_return(proxy); - - ptimer_del_task(&proxy->ptask); - - if (cfg->splice_supported) { - uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]); - uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]); - uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]); - uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]); - } - - uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_task_destroy(&proxy->servertask); - uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_task_destroy(&proxy->clienttask); - uring_task_destroy(&proxy->task); -} - -/* - * These four functions provide the fallback read-write mode - */ -static void proxy_client_read(struct uring_task *task, int res); - -static void -proxy_client_written(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - proxy->client_bytes += res; - uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_tbuf_read(task, proxy_client_read); -} - -static void -proxy_client_read(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_tbuf_write(task, proxy_client_written); -} - -static void proxy_server_read(struct uring_task *task, int res); - -static void -proxy_server_written(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - proxy->server_bytes += res; - uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_tbuf_read(&proxy->servertask, proxy_server_read); -} - -static void -proxy_server_read(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_tbuf_write(task, proxy_server_written); -} - -/* - * These four functions provide the splice fd->pipe->fd mode - */ -static void proxy_client_spliced_in(struct uring_task *task, int res); - -static void -proxy_client_spliced_out(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); -} - -static void -proxy_client_spliced_in(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out); -} - -static void proxy_server_spliced_in(struct uring_task *task, int res); - -static void -proxy_server_spliced_out(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); -} - -static void -proxy_server_spliced_in(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); - proxy_delete(proxy); - return; - } - - uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out); -} - -static void -proxy_connected_cb(struct connection *conn, bool connected) -{ - struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); - - assert_return(conn); - assert_task_alive(DBG_PROXY, &proxy->clienttask); - assert_task_alive(DBG_PROXY, &proxy->servertask); - - proxy->connecting = false; - if (!connected) { - error("%s: proxy connection to remote server failed", - proxy->server->name); - if (!proxy->ptask.active) - proxy_delete(proxy); - return; - } - - ptimer_del_task(&proxy->ptask); - - proxy->sfd = proxy->servertask.fd; - verbose("%s: proxy connection %s -> %s opened", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr); - proxy->begin = time(NULL); - - if (cfg->splice_supported) { - debug(DBG_PROXY, "handling proxy connection with splice"); - uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); - uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); - } else { - debug(DBG_PROXY, "handling proxy connection with read-write"); - uring_tbuf_read(&proxy->clienttask, proxy_client_read); - uring_tbuf_read(&proxy->servertask, proxy_server_read); - } -} - -void -proxy_refdump(struct server_proxy *proxy) -{ - assert_return(proxy); - - uring_task_refdump(&proxy->task); - uring_task_refdump(&proxy->clienttask); - uring_task_refdump(&proxy->servertask); -} - -static void -proxy_connect_timer_cb(struct ptimer_task *ptask) -{ - struct server_proxy *proxy = container_of(ptask, struct server_proxy, ptask); - - assert_return(ptask); - - if (proxy->connecting) - return; - - proxy->connecting = true; - connect_any(&proxy->servertask, &proxy->server->remotes, - &proxy->server_conn, proxy_connected_cb); -} - -struct server_proxy * -proxy_new(struct server *server, struct saddr *client, int fd) -{ - struct server_proxy *proxy; - - assert_return(server && client && fd > 0, NULL); - - proxy = zmalloc(sizeof(*proxy)); - if (!proxy) { - error("malloc: %m"); - goto out; - } - - if (cfg->splice_supported) { - if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) { - error("pipe2: %m"); - goto out_free; - } - - if (pipe2(proxy->spipe, O_CLOEXEC) < 0) { - error("pipe2: %m"); - goto out_close_cpipe; - } - } - - proxy->sfd = -1; - proxy->cfd = fd; - proxy->server = server; - uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); - - connection_set_local(&proxy->client_conn, fd); - connection_set_remote(&proxy->client_conn, client); - - uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, - proxy_client_free); - uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); - uring_task_set_fd(&proxy->clienttask, fd); - - uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, - proxy_server_free); - uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); - - list_add(&proxy->list, &server->proxys); - - if (server->state != SERVER_STATE_RUNNING) { - if (server_start(server) && - cfg->proxy_connection_interval > 0 && - cfg->proxy_connection_attempts > 0) { - ptask_init(&proxy->ptask, - cfg->proxy_connection_interval, - cfg->proxy_connection_attempts, - proxy_connect_timer_cb); - ptimer_add_task(&proxy->ptask); - } - } - - proxy->connecting = true; - connect_any(&proxy->servertask, &server->remotes, - &proxy->server_conn, proxy_connected_cb); - - return proxy; - -out_close_cpipe: - uring_close(&server->task, proxy->cpipe[PIPE_RD]); - uring_close(&server->task, proxy->cpipe[PIPE_WR]); -out_free: - xfree(proxy); -out: - return NULL; -} - -static void -local_accept(struct uring_task *task, int res) -{ - struct server_local *local = container_of(task, struct server_local, task); - struct server *server = container_of(task->parent, struct server, task); - struct server_proxy *proxy; - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); - - if (res < 0) { - error("res: %i", res); - goto out; - } - - saddr_set_addrstr(&local->client); - - verbose("%s: incoming proxy connection: %s -> %s", - server->name, local->client.addrstr, local->local.addrstr); - - if (list_empty(&server->remotes)) { - /* This shouldn't be possible, checked before opening local */ - error("server->remotes empty!"); - uring_close(&local->task, res); - goto out; - } - - proxy = proxy_new(server, &local->client, res); - if (!proxy) - uring_close(&local->task, res); - -out: - uring_accept(&local->task, &local->client, local_accept); -} - -bool -local_open(struct server_local *local) -{ - int sfd; - int option; - int r; - - assert_return(local && local->server, false); - - sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); - if (sfd < 0) { - error("socket: %m"); - goto error; - } - - option = true; - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { - error("setsockopt: %m"); - goto error; - } - - /* The MC protocol expects the client to send data first */ - if (cfg->socket_defer) { - option = true; - if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) - error("setsockopt: %m"); - } - - /* - * This has the advantage that interfaces don't need to be up but - * it means that cfg errors will not be caught. - */ - if (cfg->socket_freebind) { - option = true; - if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) - error("setsockopt: %m"); - } - - socket_set_low_latency(sfd); - - r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen); - if (r < 0) { - error("bind: %m"); - goto error; - } - - r = listen(sfd, 100); - if (r < 0) { - error("listen: %m"); - goto error; - } - - uring_task_set_fd(&local->task, sfd); - uring_accept(&local->task, &local->client, local_accept); - return true; - -error: - if (sfd >= 0) - uring_close(&local->task, sfd); - return false; -} - -void -local_refdump(struct server_local *local) -{ - assert_return(local); - - uring_task_refdump(&local->task); -} - -static void -local_free(struct uring_task *task) -{ - struct server_local *local = container_of(task, struct server_local, task); - - assert_return(task); - - debug(DBG_PROXY, "task %p, local %p", task, local); - list_del(&local->list); - xfree(local); -} - -void -local_delete(struct server_local *local) -{ - assert_return(local); - - uring_task_destroy(&local->task); -} - -struct server_local * -local_new(struct server *server, struct saddr *saddr) -{ - struct server_local *local; - - assert_return(server && saddr, NULL); - - local = zmalloc(sizeof(*local)); - if (!local) { - error("malloc: %m"); - return NULL; - } - - debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr); - local->local = *saddr; - local->server = server; - uring_task_init(&local->task, "local", &server->task, local_free); - xfree(saddr); - return local; -} - -- cgit v1.2.3