From 097a84d8d1a041a66c9b0b51c7f89e70dec13b0a Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Wed, 10 Jun 2020 21:20:46 +0200 Subject: Implement proper proxy shutdown --- proxy.c | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------- proxy.h | 2 ++ server.c | 15 +++++---- 3 files changed, 103 insertions(+), 23 deletions(-) diff --git a/proxy.c b/proxy.c index c61ad9c..38f5cea 100644 --- a/proxy.c +++ b/proxy.c @@ -1,3 +1,6 @@ +#include +#include + #include "main.h" #include "uring.h" #include "server.h" @@ -7,20 +10,83 @@ void proxy_refdump(struct server_proxy *proxy) { + uring_task_refdump(&proxy->task); uring_task_refdump(&proxy->clienttask); uring_task_refdump(&proxy->servertask); } static void -proxy_client_free(struct uring_task *task) +format_bytes(char *buf, size_t len, uint64_t val) { - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + uint64_t tmp; + const char *suffix = "B"; + + tmp = val * 10; + if (val > 1125899906842624ULL) { + if (val > (9223372036854775807ULL / 10)) + tmp = val / 112589990684262ULL; + else + tmp /= 1125899906842624ULL; + suffix = "PiB"; + } else if (val > 1099511627776ULL) { + tmp /= 1099511627776ULL; + suffix = "TiB"; + } else if (val > 1073741824ULL) { + tmp /= 1073741824ULL; + suffix = "GiB"; + } else if (val > 1048576) { + tmp /= 1048576; + suffix = "MiB"; + } else if (val > 1024) { + tmp /= 1024; + suffix = "KiB"; + } + + snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); +} + +static void +format_time(char *buf, size_t len, time_t diff) +{ + unsigned hh, mm, ss; + + hh = diff / 3600; + diff %= 3600; + mm = diff / 60; + diff %= 60; + ss = diff; + + snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); +} + +static void +proxy_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, task); + char cts[100]; + char stc[100]; + char duration[100]; fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); - /* + + format_time(duration, sizeof(duration), time(NULL) - proxy->begin); + format_bytes(cts, sizeof(cts), proxy->client_bytes); + format_bytes(stc, sizeof(stc), proxy->server_bytes); + + fprintf(stderr, "%s: proxy connection %s -> %s closed (CtS: %s, StC: %s), duration %s\n", + proxy->scfg->name, proxy->clientstr, proxy->serverstr, cts, stc, + duration); + list_del(&proxy->list); xfree(proxy); - */ +} + +static void +proxy_client_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + fprintf(stderr, "%s: %s client connection closed\n", __func__, proxy->scfg->name); } static void @@ -28,24 +94,24 @@ proxy_server_free(struct uring_task *task) { struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); - /* - list_del(&proxy->list); - xfree(proxy); - */ + fprintf(stderr, "%s: %s server connection closed\n", __func__, proxy->scfg->name); } void proxy_delete(struct cfg *cfg, struct server_proxy *proxy) { - char cts[] = "13kB"; - char stc[] = "28kB"; - char duration[] = "00:12:23"; + fprintf(stderr, "%s: shutting down proxy 0x%p\n", __func__, proxy); - fprintf(stderr, "%s: would delete proxy 0x%p\n", __func__, proxy); + /* FIXME: review half-open proxy situation */ + if (proxy->servertask.fd >= 0) + uring_cancel(cfg, &proxy->servertask); - fprintf(stderr, "%s: proxy connection %s -> %s closed (CtS: %s, StC: %s), duration %s\n", - proxy->scfg->name, proxy->clientstr, proxy->serverstr, cts, stc, duration); + if (proxy->clienttask.fd >= 0) + uring_cancel(cfg, &proxy->clienttask); + + uring_task_put(cfg, &proxy->servertask); + uring_task_put(cfg, &proxy->clienttask); + uring_task_put(cfg, &proxy->task); } static void proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res); @@ -57,6 +123,7 @@ proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { + uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } @@ -73,6 +140,7 @@ proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { + uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } @@ -90,6 +158,7 @@ proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { + uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } @@ -106,6 +175,7 @@ proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: result was %i\n", __func__, res); if (res <= 0) { + uring_task_close_fd(cfg, task); proxy_delete(cfg, proxy); return; } @@ -131,6 +201,7 @@ proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res) fprintf(stderr, "%s: proxy connection %s -> %s opened\n", proxy->scfg->name, proxy->clientstr, proxy->serverstr); + proxy->begin = time(NULL); uring_tbuf_read(cfg, &proxy->clienttask, proxy_client_data_in); uring_tbuf_read(cfg, &proxy->servertask, proxy_server_data_in); } @@ -191,12 +262,16 @@ proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, in proxy->cfd = fd; proxy->scfg = scfg; proxy->client = *client; + uring_task_init(&proxy->task, "proxy", &scfg->task, proxy_free); + sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr)); - uring_task_init(&proxy->clienttask, "proxy_client", &scfg->task, proxy_client_free); + uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, proxy_client_free); uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); uring_task_set_fd(&proxy->clienttask, fd); - uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free); + + uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, proxy_server_free); uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); + list_add(&proxy->list, &scfg->proxys); proxy_connect_next_remote(cfg, proxy); diff --git a/proxy.h b/proxy.h index 62ac5b7..735a12e 100644 --- a/proxy.h +++ b/proxy.h @@ -16,7 +16,9 @@ struct server_proxy { uint64_t server_bytes; int sfd; + time_t begin; unsigned next_remote; + struct uring_task task; struct server *scfg; struct list_head list; }; diff --git a/server.c b/server.c index 9ce133a..f317d36 100644 --- a/server.c +++ b/server.c @@ -79,6 +79,7 @@ void server_delete(struct cfg *cfg, struct server *scfg) { struct server_local *local, *ltmp; + struct server_proxy *proxy, *ptmp; struct sockaddr_in46 *remote, *rtmp; struct sockaddr_in46 *rcon, *rctmp; @@ -87,18 +88,20 @@ server_delete(struct cfg *cfg, struct server *scfg) idle_delete(cfg, scfg); rcon_delete(cfg, scfg); - list_for_each_entry_safe(remote, rtmp, &scfg->remotes, list) { - list_del(&remote->list); - xfree(remote); - } + list_for_each_entry_safe(local, ltmp, &scfg->locals, list) + uring_cancel(cfg, &local->task); + + list_for_each_entry_safe(proxy, ptmp, &scfg->proxys, list) + proxy_delete(cfg, proxy); list_for_each_entry_safe(rcon, rctmp, &scfg->rcons, list) { list_del(&rcon->list); xfree(rcon); } - list_for_each_entry_safe(local, ltmp, &scfg->locals, list) { - uring_cancel(cfg, &local->task); + list_for_each_entry_safe(remote, rtmp, &scfg->remotes, list) { + list_del(&remote->list); + xfree(remote); } uring_poll_cancel(cfg, &scfg->exec_task); -- cgit v1.2.3