diff options
Diffstat (limited to 'proxy.c')
-rw-r--r-- | proxy.c | 424 |
1 files changed, 0 insertions, 424 deletions
diff --git a/proxy.c b/proxy.c deleted file mode 100644 index 7edfffd..0000000 --- a/proxy.c +++ /dev/null @@ -1,424 +0,0 @@ -#include <stdio.h> -#include <unistd.h> -#include <time.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netinet/tcp.h> - -#include "main.h" -#include "uring.h" -#include "server.h" -#include "proxy.h" -#include "utils.h" - -static void -format_bytes(char *buf, size_t len, uint64_t val) -{ - uint64_t tmp; - const char *suffix = "B"; - - assert_return(buf && len > 0); - - tmp = val * 10; - if (val > 1152921504606846976ULL) { - tmp = val / 115292150460684697ULL; - suffix= "EiB"; - } else if (val > 1125899906842624ULL) { - tmp /= 1125899906842624ULL; - suffix = "PiB"; - } else if (val > 1099511627776ULL) { - tmp /= 1099511627776ULL; - suffix = "TiB"; - } else if (val > 1073741824ULL) { - tmp /= 1073741824ULL; - suffix = "GiB"; - } else if (val > 1048576) { - tmp /= 1048576; - suffix = "MiB"; - } else if (val > 1024) { - tmp /= 1024; - suffix = "KiB"; - } - - snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); -} - -static void -format_time(char *buf, size_t len, time_t diff) -{ - unsigned hh, mm, ss; - - assert_return(buf && len > 0); - - hh = diff / 3600; - diff %= 3600; - mm = diff / 60; - diff %= 60; - ss = diff; - - snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); -} - -static void -proxy_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, task); - char cts[100]; - char stc[100]; - char duration[100]; - - assert_return(task); - - debug(DBG_PROXY, "server: %s, src: %s, dst: %s", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr); - - if (proxy->begin > 0) { - format_time(duration, sizeof(duration), time(NULL) - proxy->begin); - format_bytes(cts, sizeof(cts), proxy->client_bytes); - format_bytes(stc, sizeof(stc), proxy->server_bytes); - - info("%s: proxy connection %s -> %s closed " - "(CtS: %s, StC: %s), duration %s", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr, - cts, stc, duration); - } - - list_del(&proxy->list); - xfree(proxy); -} - -static void -proxy_client_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - - debug(DBG_PROXY, "%s: client connection closed", proxy->server->name); -} - -static void -proxy_server_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - - debug(DBG_PROXY, "%s: server connection closed", proxy->server->name); -} - -void -proxy_delete(struct server_proxy *proxy) -{ - debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy); - - assert_return(proxy); - - uring_task_destroy(&proxy->servertask); - uring_task_destroy(&proxy->clienttask); - uring_task_destroy(&proxy->task); -} - -static void proxy_client_data_in(struct uring_task *task, int res); - -static void -proxy_client_data_out(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - proxy->client_bytes += res; - uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_tbuf_read(task, proxy_client_data_in); -} - -static void -proxy_client_data_in(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_tbuf_write(task, proxy_client_data_out); -} - -static void proxy_server_data_in(struct uring_task *task, - int res); - -static void -proxy_server_data_out(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - proxy->server_bytes += res; - uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); -} - -static void -proxy_server_data_in(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_tbuf_write(task, proxy_server_data_out); -} - -static void -proxy_connected_cb(struct connection *conn, bool connected) -{ - struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); - - assert_return(conn); - assert_task_alive(DBG_PROXY, &proxy->clienttask); - assert_task_alive(DBG_PROXY, &proxy->servertask); - - if (!connected) { - error("%s: proxy connection to remote server failed", - proxy->server->name); - proxy_delete(proxy); - return; - } - - proxy->sfd = proxy->servertask.fd; - verbose("%s: proxy connection %s -> %s opened", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr); - proxy->begin = time(NULL); - - uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); -} - -void -proxy_refdump(struct server_proxy *proxy) -{ - assert_return(proxy); - - uring_task_refdump(&proxy->task); - uring_task_refdump(&proxy->clienttask); - uring_task_refdump(&proxy->servertask); -} - -struct server_proxy * -proxy_new(struct server *server, struct saddr *client, int fd) -{ - struct server_proxy *proxy; - - assert_return(server && client && fd > 0, NULL); - - proxy = zmalloc(sizeof(*proxy)); - if (!proxy) { - error("malloc: %m"); - return NULL; - } - - proxy->sfd = -1; - proxy->cfd = fd; - proxy->server = server; - uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); - - connection_set_local(&proxy->client_conn, fd); - connection_set_remote(&proxy->client_conn, client); - - uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, - proxy_client_free); - uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); - uring_task_set_fd(&proxy->clienttask, fd); - - uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, - proxy_server_free); - uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); - - list_add(&proxy->list, &server->proxys); - connect_any(&proxy->servertask, &server->remotes, - &proxy->server_conn, proxy_connected_cb); - - return proxy; -} - -static void -local_accept(struct uring_task *task, int res) -{ - struct server_local *local = container_of(task, struct server_local, task); - struct server *server = container_of(task->parent, struct server, task); - struct server_proxy *proxy; - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); - - if (res < 0) { - error("result was %i", res); - goto out; - } - - saddr_set_addrstr(&local->client); - - verbose("%s: incoming proxy connection: %s -> %s", - server->name, local->client.addrstr, local->local.addrstr); - - if (list_empty(&server->remotes)) { - /* This shouldn't be possible, checked before opening local */ - error("server->remotes empty!"); - uring_close(&local->task, res); - goto out; - } - - proxy = proxy_new(server, &local->client, res); - if (!proxy) - uring_close(&local->task, res); - -out: - uring_accept(&local->task, &local->client, local_accept); -} - -bool -local_open(struct server_local *local) -{ - int sfd; - int option; - int r; - - assert_return(local && local->server, false); - - sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); - if (sfd < 0) { - error("socket: %m"); - goto error; - } - - option = true; - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { - error("setsockopt: %m"); - goto error; - } - - /* The MC protocol expects the client to send data first */ - /* FIXME: could make this configurable */ - option = true; - if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) - error("setsockopt: %m"); - - /* FIXME: could make this configurable */ - option = true; - if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) - error("setsockopt: %m"); - - socket_set_low_latency(sfd); - - r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen); - if (r < 0) { - error("bind: %m"); - goto error; - } - - r = listen(sfd, 100); - if (r < 0) { - error("listen: %m"); - goto error; - } - - uring_task_set_fd(&local->task, sfd); - uring_accept(&local->task, &local->client, local_accept); - return true; - -error: - if (sfd >= 0) - uring_close(&local->task, sfd); - return false; -} - -void -local_refdump(struct server_local *local) -{ - assert_return(local); - - uring_task_refdump(&local->task); -} - -static void -local_free(struct uring_task *task) -{ - struct server_local *local = container_of(task, struct server_local, task); - - assert_return(task); - - debug(DBG_PROXY, "task %p, local %p", task, local); - list_del(&local->list); - xfree(local); -} - -void -local_delete(struct server_local *local) -{ - assert_return(local); - - uring_task_destroy(&local->task); -} - -struct server_local * -local_new(struct server *server, struct saddr *saddr) -{ - struct server_local *local; - - assert_return(server && saddr, NULL); - - local = zmalloc(sizeof(*local)); - if (!local) { - error("malloc: %m"); - return NULL; - } - - debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr); - local->local = *saddr; - local->server = server; - uring_task_init(&local->task, "local", &server->task, local_free); - xfree(saddr); - return local; -} - |