#include #include #include "main.h" #include "uring.h" #include "server.h" #include "proxy.h" #include "utils.h" void proxy_refdump(struct server_proxy *proxy) { uring_task_refdump(&proxy->task); uring_task_refdump(&proxy->clienttask); uring_task_refdump(&proxy->servertask); } static void format_bytes(char *buf, size_t len, uint64_t val) { uint64_t tmp; const char *suffix = "B"; tmp = val * 10; if (val > 1125899906842624ULL) { if (val > (9223372036854775807ULL / 10)) tmp = val / 112589990684262ULL; else tmp /= 1125899906842624ULL; suffix = "PiB"; } else if (val > 1099511627776ULL) { tmp /= 1099511627776ULL; suffix = "TiB"; } else if (val > 1073741824ULL) { tmp /= 1073741824ULL; suffix = "GiB"; } else if (val > 1048576) { tmp /= 1048576; suffix = "MiB"; } else if (val > 1024) { tmp /= 1024; suffix = "KiB"; } snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); } static void format_time(char *buf, size_t len, time_t diff) { unsigned hh, mm, ss; hh = diff / 3600; diff %= 3600; mm = diff / 60; diff %= 60; ss = diff; snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); } static void proxy_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, task); char cts[100]; char stc[100]; char duration[100]; fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); format_time(duration, sizeof(duration), time(NULL) - proxy->begin); format_bytes(cts, sizeof(cts), proxy->client_bytes); format_bytes(stc, sizeof(stc), proxy->server_bytes); fprintf(stderr, "%s: proxy connection %s -> %s closed (CtS: %s, StC: %s), duration %s\n", proxy->scfg->name, proxy->clientstr, proxy->serverstr, cts, stc, duration); list_del(&proxy->list); xfree(proxy); } static void proxy_client_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); fprintf(stderr, "%s: %s client connection closed\n", __func__, proxy->scfg->name); } static void proxy_server_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); fprintf(stderr, "%s: %s server connection closed\n", __func__, proxy->scfg->name); } void proxy_delete(struct cfg *cfg, struct server_proxy *proxy) { fprintf(stderr, "%s: shutting down proxy 0x%p\n", __func__, proxy); /* FIXME: review half-open proxy situation */ if (proxy->servertask.fd >= 0) uring_cancel(cfg, &proxy->servertask); if (proxy->clienttask.fd >= 0) uring_cancel(cfg, &proxy->clienttask); uring_task_put(cfg, &proxy->servertask); uring_task_put(cfg, &proxy->clienttask); uring_task_put(cfg, &proxy->task); } static void proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res); static void proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } proxy->client_bytes += res; uring_task_set_fd(&proxy->clienttask, proxy->cfd); uring_tbuf_read(cfg, task, proxy_client_data_in); } static void proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } uring_task_set_fd(&proxy->clienttask, proxy->sfd); uring_tbuf_write(cfg, task, proxy_client_data_out); } static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res); static void proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } proxy->server_bytes += res; uring_task_set_fd(&proxy->servertask, proxy->sfd); uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in); } static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } uring_task_set_fd(&proxy->servertask, proxy->cfd); uring_tbuf_write(cfg, task, proxy_server_data_out); } static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy); static void proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); fprintf(stderr, "%s: connected %i\n", __func__, res); if (res < 0) { proxy->sfd = -1; uring_task_close_fd(cfg, task); proxy_connect_next_remote(cfg, proxy); return; } fprintf(stderr, "%s: proxy connection %s -> %s opened\n", proxy->scfg->name, proxy->clientstr, proxy->serverstr); proxy->begin = time(NULL); uring_tbuf_read(cfg, &proxy->clienttask, proxy_client_data_in); uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in); } static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy) { struct sockaddr_in46 *remote, *tmp; struct server *scfg = proxy->scfg; int sfd; unsigned i = 0; again: remote = NULL; list_for_each_entry(tmp, &scfg->remotes, list) { if (i == proxy->next_remote) { remote = tmp; break; } i++; } if (!remote) { fprintf(stderr, "No more remote addresses to attempt\n"); /* FIXME: put tasks */ return; } proxy->next_remote++; proxy->server = *remote; sockaddr_to_str(&proxy->server, proxy->serverstr, sizeof(proxy->serverstr)); fprintf(stderr, "%s: attempting proxy connection to %s (len %u)\n", scfg->name, proxy->serverstr, proxy->server.addrlen); sfd = socket(proxy->server.storage.ss_family, SOCK_STREAM, 0); if (sfd < 0) { perror("socket"); goto again; } proxy->sfd = sfd; uring_task_set_fd(&proxy->servertask, sfd); uring_connect(cfg, &proxy->servertask, &proxy->server, proxy_server_connected); } struct server_proxy * proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd) { struct server_proxy *proxy; proxy = zmalloc(sizeof(*proxy)); if (!proxy) { perror("malloc"); return NULL; } proxy->sfd = -1; proxy->cfd = fd; proxy->scfg = scfg; proxy->client = *client; uring_task_init(&proxy->task, "proxy", &scfg->task, proxy_free); sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr)); uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, proxy_client_free); uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); uring_task_set_fd(&proxy->clienttask, fd); uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, proxy_server_free); uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); list_add(&proxy->list, &scfg->proxys); proxy_connect_next_remote(cfg, proxy); return proxy; }