From 77071eb45391c9f0bbc593bcf2c10605ed1d5d17 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Mon, 22 Jun 2020 12:00:05 +0200 Subject: Rename proxy and cfgdir to more descriptive names --- server-proxy.c | 424 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 424 insertions(+) create mode 100644 server-proxy.c (limited to 'server-proxy.c') diff --git a/server-proxy.c b/server-proxy.c new file mode 100644 index 0000000..a0affbd --- /dev/null +++ b/server-proxy.c @@ -0,0 +1,424 @@ +#include +#include +#include +#include +#include +#include + +#include "main.h" +#include "uring.h" +#include "server.h" +#include "server-proxy.h" +#include "utils.h" + +static void +format_bytes(char *buf, size_t len, uint64_t val) +{ + uint64_t tmp; + const char *suffix = "B"; + + assert_return(buf && len > 0); + + tmp = val * 10; + if (val > 1152921504606846976ULL) { + tmp = val / 115292150460684697ULL; + suffix= "EiB"; + } else if (val > 1125899906842624ULL) { + tmp /= 1125899906842624ULL; + suffix = "PiB"; + } else if (val > 1099511627776ULL) { + tmp /= 1099511627776ULL; + suffix = "TiB"; + } else if (val > 1073741824ULL) { + tmp /= 1073741824ULL; + suffix = "GiB"; + } else if (val > 1048576) { + tmp /= 1048576; + suffix = "MiB"; + } else if (val > 1024) { + tmp /= 1024; + suffix = "KiB"; + } + + snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); +} + +static void +format_time(char *buf, size_t len, time_t diff) +{ + unsigned hh, mm, ss; + + assert_return(buf && len > 0); + + hh = diff / 3600; + diff %= 3600; + mm = diff / 60; + diff %= 60; + ss = diff; + + snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); +} + +static void +proxy_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, task); + char cts[100]; + char stc[100]; + char duration[100]; + + assert_return(task); + + debug(DBG_PROXY, "server: %s, src: %s, dst: %s", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr); + + if (proxy->begin > 0) { + format_time(duration, sizeof(duration), time(NULL) - proxy->begin); + format_bytes(cts, sizeof(cts), proxy->client_bytes); + format_bytes(stc, sizeof(stc), proxy->server_bytes); + + info("%s: proxy connection %s -> %s closed " + "(CtS: %s, StC: %s), duration %s", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr, + cts, stc, duration); + } + + list_del(&proxy->list); + xfree(proxy); +} + +static void +proxy_client_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + + debug(DBG_PROXY, "%s: client connection closed", proxy->server->name); +} + +static void +proxy_server_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + + debug(DBG_PROXY, "%s: server connection closed", proxy->server->name); +} + +void +proxy_delete(struct server_proxy *proxy) +{ + debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy); + + assert_return(proxy); + + uring_task_destroy(&proxy->servertask); + uring_task_destroy(&proxy->clienttask); + uring_task_destroy(&proxy->task); +} + +static void proxy_client_data_in(struct uring_task *task, int res); + +static void +proxy_client_data_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + proxy->client_bytes += res; + uring_task_set_fd(&proxy->clienttask, proxy->cfd); + uring_tbuf_read(task, proxy_client_data_in); +} + +static void +proxy_client_data_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + uring_task_set_fd(&proxy->clienttask, proxy->sfd); + uring_tbuf_write(task, proxy_client_data_out); +} + +static void proxy_server_data_in(struct uring_task *task, + int res); + +static void +proxy_server_data_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + proxy->server_bytes += res; + uring_task_set_fd(&proxy->servertask, proxy->sfd); + uring_tbuf_read(&proxy->servertask, proxy_server_data_in); +} + +static void +proxy_server_data_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + uring_task_set_fd(&proxy->servertask, proxy->cfd); + uring_tbuf_write(task, proxy_server_data_out); +} + +static void +proxy_connected_cb(struct connection *conn, bool connected) +{ + struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); + + assert_return(conn); + assert_task_alive(DBG_PROXY, &proxy->clienttask); + assert_task_alive(DBG_PROXY, &proxy->servertask); + + if (!connected) { + error("%s: proxy connection to remote server failed", + proxy->server->name); + proxy_delete(proxy); + return; + } + + proxy->sfd = proxy->servertask.fd; + verbose("%s: proxy connection %s -> %s opened", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr); + proxy->begin = time(NULL); + + uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); + uring_tbuf_read(&proxy->servertask, proxy_server_data_in); +} + +void +proxy_refdump(struct server_proxy *proxy) +{ + assert_return(proxy); + + uring_task_refdump(&proxy->task); + uring_task_refdump(&proxy->clienttask); + uring_task_refdump(&proxy->servertask); +} + +struct server_proxy * +proxy_new(struct server *server, struct saddr *client, int fd) +{ + struct server_proxy *proxy; + + assert_return(server && client && fd > 0, NULL); + + proxy = zmalloc(sizeof(*proxy)); + if (!proxy) { + error("malloc: %m"); + return NULL; + } + + proxy->sfd = -1; + proxy->cfd = fd; + proxy->server = server; + uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); + + connection_set_local(&proxy->client_conn, fd); + connection_set_remote(&proxy->client_conn, client); + + uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, + proxy_client_free); + uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); + uring_task_set_fd(&proxy->clienttask, fd); + + uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, + proxy_server_free); + uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); + + list_add(&proxy->list, &server->proxys); + connect_any(&proxy->servertask, &server->remotes, + &proxy->server_conn, proxy_connected_cb); + + return proxy; +} + +static void +local_accept(struct uring_task *task, int res) +{ + struct server_local *local = container_of(task, struct server_local, task); + struct server *server = container_of(task->parent, struct server, task); + struct server_proxy *proxy; + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); + + if (res < 0) { + error("result was %i", res); + goto out; + } + + saddr_set_addrstr(&local->client); + + verbose("%s: incoming proxy connection: %s -> %s", + server->name, local->client.addrstr, local->local.addrstr); + + if (list_empty(&server->remotes)) { + /* This shouldn't be possible, checked before opening local */ + error("server->remotes empty!"); + uring_close(&local->task, res); + goto out; + } + + proxy = proxy_new(server, &local->client, res); + if (!proxy) + uring_close(&local->task, res); + +out: + uring_accept(&local->task, &local->client, local_accept); +} + +bool +local_open(struct server_local *local) +{ + int sfd; + int option; + int r; + + assert_return(local && local->server, false); + + sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (sfd < 0) { + error("socket: %m"); + goto error; + } + + option = true; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { + error("setsockopt: %m"); + goto error; + } + + /* The MC protocol expects the client to send data first */ + /* FIXME: could make this configurable */ + option = true; + if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) + error("setsockopt: %m"); + + /* FIXME: could make this configurable */ + option = true; + if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) + error("setsockopt: %m"); + + socket_set_low_latency(sfd); + + r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen); + if (r < 0) { + error("bind: %m"); + goto error; + } + + r = listen(sfd, 100); + if (r < 0) { + error("listen: %m"); + goto error; + } + + uring_task_set_fd(&local->task, sfd); + uring_accept(&local->task, &local->client, local_accept); + return true; + +error: + if (sfd >= 0) + uring_close(&local->task, sfd); + return false; +} + +void +local_refdump(struct server_local *local) +{ + assert_return(local); + + uring_task_refdump(&local->task); +} + +static void +local_free(struct uring_task *task) +{ + struct server_local *local = container_of(task, struct server_local, task); + + assert_return(task); + + debug(DBG_PROXY, "task %p, local %p", task, local); + list_del(&local->list); + xfree(local); +} + +void +local_delete(struct server_local *local) +{ + assert_return(local); + + uring_task_destroy(&local->task); +} + +struct server_local * +local_new(struct server *server, struct saddr *saddr) +{ + struct server_local *local; + + assert_return(server && saddr, NULL); + + local = zmalloc(sizeof(*local)); + if (!local) { + error("malloc: %m"); + return NULL; + } + + debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr); + local->local = *saddr; + local->server = server; + uring_task_init(&local->task, "local", &server->task, local_free); + xfree(saddr); + return local; +} + -- cgit v1.2.3