/* SPDX-License-Identifier: GPL-2.0 */ #include #include #include #include #include #include #include #include #include "main.h" #include "uring.h" #include "ptimer.h" #include "server.h" #include "server-proxy.h" static void format_bytes(char *buf, size_t len, uint64_t val) { uint64_t tmp; const char *suffix = "B"; assert_return(buf && len > 0); tmp = val * 10; if (val > 1152921504606846976ULL) { tmp = val / 115292150460684697ULL; suffix = "EiB"; } else if (val > 1125899906842624ULL) { tmp /= 1125899906842624ULL; suffix = "PiB"; } else if (val > 1099511627776ULL) { tmp /= 1099511627776ULL; suffix = "TiB"; } else if (val > 1073741824ULL) { tmp /= 1073741824ULL; suffix = "GiB"; } else if (val > 1048576) { tmp /= 1048576; suffix = "MiB"; } else if (val > 1024) { tmp /= 1024; suffix = "KiB"; } snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); } static void format_time(char *buf, size_t len, time_t diff) { unsigned hh, mm, ss; assert_return(buf && len > 0); hh = diff / 3600; diff %= 3600; mm = diff / 60; diff %= 60; ss = diff; snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); } static void proxy_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, task); char cts[100]; char stc[100]; char duration[100]; assert_return(task); debug(DBG_PROXY, "server: %s, src: %s, dst: %s", proxy->server->scfg.name, proxy->client_conn.remote.addrstr, proxy->server_conn.remote.addrstr); if (proxy->begin > 0) { format_time(duration, sizeof(duration), time(NULL) - proxy->begin); format_bytes(cts, sizeof(cts), proxy->client_bytes); format_bytes(stc, sizeof(stc), proxy->server_bytes); info("%s: proxy connection %s -> %s closed " "(CtS: %s, StC: %s), duration %s", proxy->server->scfg.name, proxy->client_conn.remote.addrstr, proxy->server_conn.remote.addrstr, cts, stc, duration); } list_del(&proxy->list); xfree(proxy); } static void proxy_client_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); assert_return(task); debug(DBG_PROXY, "%s: client connection closed", proxy->server->scfg.name); } static void proxy_server_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); assert_return(task); debug(DBG_PROXY, "%s: server connection closed", proxy->server->scfg.name); } void proxy_delete(struct server_proxy *proxy) { debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->scfg.name, proxy); assert_return(proxy); ptimer_del_task(&proxy->ptask); if (cfg->splice_supported) { uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]); uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]); uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]); uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]); } uring_task_set_fd(&proxy->servertask, proxy->sfd); uring_task_destroy(&proxy->servertask); uring_task_set_fd(&proxy->clienttask, proxy->cfd); uring_task_destroy(&proxy->clienttask); uring_task_destroy(&proxy->task); } /* * These four functions provide the fallback read-write mode */ static void proxy_client_read_in(struct uring_task *task, int res); static void proxy_client_written_out(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } proxy->client_bytes += res; uring_task_set_fd(&proxy->clienttask, proxy->cfd); uring_tbuf_read(task, proxy_client_read_in); } static void proxy_client_read_in(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->clienttask, proxy->sfd); uring_tbuf_write(task, proxy_client_written_out); } static void proxy_server_read_in(struct uring_task *task, int res); static void proxy_server_written_out(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } proxy->server_bytes += res; uring_task_set_fd(&proxy->servertask, proxy->sfd); uring_tbuf_read(&proxy->servertask, proxy_server_read_in); } static void proxy_server_read_in(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } uring_task_set_fd(&proxy->servertask, proxy->cfd); uring_tbuf_write(task, proxy_server_written_out); } /* * These four functions provide the splice fd->pipe->fd mode */ static void proxy_client_spliced_in(struct uring_task *task, int res); static void proxy_client_spliced_out(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } proxy->client_bytes += res; uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); } static void proxy_client_spliced_in(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out); } static void proxy_server_spliced_in(struct uring_task *task, int res); static void proxy_server_spliced_out(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } proxy->server_bytes += res; uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); } static void proxy_server_spliced_in(struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); assert_return(task); assert_task_alive(DBG_PROXY, task); if (res <= 0) { debug(DBG_PROXY, "%s: res: %i", proxy->server->scfg.name, res); proxy_delete(proxy); return; } uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out); } static void proxy_connected_cb(struct connection *conn, bool connected) { struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); assert_return(conn); assert_task_alive(DBG_PROXY, &proxy->clienttask); assert_task_alive(DBG_PROXY, &proxy->servertask); proxy->connecting = false; if (!connected) { error("%s: proxy connection to remote server failed", proxy->server->scfg.name); if (!proxy->ptask.active) proxy_delete(proxy); return; } ptimer_del_task(&proxy->ptask); proxy->sfd = proxy->servertask.fd; verbose("%s: proxy connection %s -> %s opened", proxy->server->scfg.name, proxy->client_conn.remote.addrstr, proxy->server_conn.remote.addrstr); proxy->begin = time(NULL); if (cfg->splice_supported) { debug(DBG_PROXY, "handling proxy connection with splice"); uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); } else { debug(DBG_PROXY, "handling proxy connection with read-write"); uring_tbuf_read(&proxy->clienttask, proxy_client_read_in); uring_tbuf_read(&proxy->servertask, proxy_server_read_in); } } void proxy_refdump(struct server_proxy *proxy) { assert_return(proxy); uring_task_refdump(&proxy->task); uring_task_refdump(&proxy->clienttask); uring_task_refdump(&proxy->servertask); } static void proxy_connect_timer_cb(struct ptimer_task *ptask) { struct server_proxy *proxy = container_of(ptask, struct server_proxy, ptask); assert_return(ptask); if (proxy->connecting) return; proxy->connecting = true; connect_any(&proxy->servertask, &proxy->server->scfg.remotes, &proxy->server_conn, proxy_connected_cb); } struct server_proxy *proxy_new(struct server *server, struct saddr *client, int fd) { struct server_proxy *proxy; assert_return(server && client && fd > 0, NULL); proxy = zmalloc(sizeof(*proxy)); if (!proxy) { error("malloc: %m"); goto out; } if (cfg->splice_supported) { if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) { error("pipe2: %m"); goto out_free; } if (pipe2(proxy->spipe, O_CLOEXEC) < 0) { error("pipe2: %m"); goto out_close_cpipe; } } proxy->sfd = -1; proxy->cfd = fd; proxy->server = server; uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); connection_set_local(&proxy->client_conn, fd); connection_set_remote(&proxy->client_conn, client); uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, proxy_client_free); uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); uring_task_set_fd(&proxy->clienttask, fd); uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, proxy_server_free); uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); list_add(&proxy->list, &server->proxys); if (server->state != SERVER_STATE_RUNNING) { if (server_start(server) && cfg->proxy_connection_interval > 0 && cfg->proxy_connection_attempts > 0) { ptask_init(&proxy->ptask, cfg->proxy_connection_interval, cfg->proxy_connection_attempts, proxy_connect_timer_cb); ptimer_add_task(&proxy->ptask); } } proxy->connecting = true; connect_any(&proxy->servertask, &server->scfg.remotes, &proxy->server_conn, proxy_connected_cb); return proxy; out_close_cpipe: uring_close(&server->task, proxy->cpipe[PIPE_RD]); uring_close(&server->task, proxy->cpipe[PIPE_WR]); out_free: xfree(proxy); out: return NULL; } static void local_accept(struct uring_task *task, int res) { struct server_local *local = container_of(task, struct server_local, task); struct server *server = container_of(task->parent, struct server, task); struct server_proxy *proxy; assert_return(task); assert_task_alive(DBG_PROXY, task); debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->scfg.name); if (res < 0) { error("res: %i", res); goto out; } saddr_set_addrstr(&local->client); verbose("%s: incoming proxy connection: %s -> %s", server->scfg.name, local->client.addrstr, local->local.addrstr); if (list_empty(&server->scfg.remotes)) { /* This shouldn't be possible, checked before opening local */ error("server->scfg.remotes empty!"); uring_close(&local->task, res); goto out; } proxy = proxy_new(server, &local->client, res); if (!proxy) uring_close(&local->task, res); out: uring_accept(&local->task, &local->client, local_accept); } bool local_open(struct server_local *local) { int sfd; int option; int r; assert_return(local && local->server, false); sfd = socket(local->local.st.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); if (sfd < 0) { error("socket: %m"); goto error; } option = true; if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { error("setsockopt: %m"); goto error; } /* IPv4 and IPv6 sockets are handled separately */ if (local->local.st.ss_family == AF_INET6) { option = true; if (setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, &option, sizeof(option)) < 0) error("setsockopt: %m"); } /* The MC protocol expects the client to send data first */ if (cfg->socket_defer) { option = true; if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) error("setsockopt: %m"); } /* * This has the advantage that interfaces don't need to be up but * it means that cfg errors will not be caught. */ if (cfg->socket_freebind) { option = true; if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) error("setsockopt: %m"); } socket_set_low_latency(sfd, cfg->socket_keepalive, cfg->socket_iptos, cfg->socket_nodelay); verbose("%s: attempting to bind to %s", local->server->scfg.name, local->local.addrstr); r = bind(sfd, (struct sockaddr *)&local->local.st, local->local.addrlen); if (r < 0) { error("bind(%s): %m", local->local.addrstr); goto error; } r = listen(sfd, 100); if (r < 0) { error("listen(%s): %m", local->local.addrstr); goto error; } uring_task_set_fd(&local->task, sfd); uring_accept(&local->task, &local->client, local_accept); return true; error: if (sfd >= 0) uring_close(&local->task, sfd); return false; } void local_refdump(struct server_local *local) { assert_return(local); uring_task_refdump(&local->task); } static void local_free(struct uring_task *task) { struct server_local *local = container_of(task, struct server_local, task); assert_return(task); debug(DBG_PROXY, "task %p, local %p", task, local); list_del(&local->list); xfree(local); } void local_delete(struct server_local *local) { assert_return(local); uring_task_destroy(&local->task); } struct server_local *local_new(struct server *server, struct saddr *saddr) { struct server_local *local; assert_return(server && saddr, NULL); local = zmalloc(sizeof(*local)); if (!local) { error("malloc: %m"); xfree(saddr); return NULL; } debug(DBG_PROXY, "%s adding local: %s", server->scfg.name, saddr->addrstr); local->local = *saddr; local->server = server; uring_task_init(&local->task, "local", &server->task, local_free); xfree(saddr); return local; }