From a683051b05930d1dd2766b98494bbd124817a6dd Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Fri, 5 Jun 2020 19:34:52 +0200 Subject: Implement more proxy functionality --- proxy.c | 154 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- proxy.h | 13 ++++-- uring.c | 19 ++++++++ uring.h | 3 ++ 4 files changed, 166 insertions(+), 23 deletions(-) diff --git a/proxy.c b/proxy.c index 00090b1..28760e4 100644 --- a/proxy.c +++ b/proxy.c @@ -7,41 +7,140 @@ void proxy_refdump(struct server_proxy *proxy) { - uring_task_refdump(&proxy->task); + uring_task_refdump(&proxy->clienttask); + uring_task_refdump(&proxy->servertask); } static void -proxy_free(struct uring_task *task) +proxy_client_free(struct uring_task *task) { - struct server_proxy *proxy = container_of(task, struct server_proxy, task); + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); + /* list_del(&proxy->list); free(proxy); + */ } static void -proxy_connected(struct cfg *cfg, struct uring_task *task, int res) +proxy_server_free(struct uring_task *task) { - //struct server_proxy *proxy = container_of(task, struct server_proxy, task); + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); + /* + list_del(&proxy->list); + free(proxy); + */ +} + +static void proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res); + +static void +proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + fprintf(stderr, "%s: result was %i\n", __func__, res); + if (res <= 0) + return; + + if (res != proxy->clientlen) { + fprintf(stderr, "%s: short write\n", __func__); + return; + } + + uring_read(cfg, task, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in); +} + +static void +proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + fprintf(stderr, "%s: result was %i\n", __func__, res); + if (res <= 0) + return; + + proxy->clientlen = res; + uring_write(cfg, task, proxy->clientbuf, res, proxy_client_data_out); +} + +static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res); + +static void +proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + fprintf(stderr, "%s: result was %i\n", __func__, res); + if (res <= 0) + return; + + if (res != proxy->serverlen) { + fprintf(stderr, "%s: short write\n", __func__); + return; + } + + uring_read(cfg, task, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in); +} + +static void +proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + fprintf(stderr, "%s: result was %i\n", __func__, res); + if (res <= 0) + return; + + proxy->serverlen = res; + uring_write(cfg, task, proxy->serverbuf, res, proxy_server_data_out); +} + +static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy); + +static void +proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); fprintf(stderr, "%s: connected %i\n", __func__, res); - return; + if (res < 0) { + proxy_connect_next_remote(cfg, proxy); + return; + } + + uring_read(cfg, &proxy->clienttask, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in); + uring_read(cfg, &proxy->servertask, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in); } -struct server_proxy * -proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd) +static void +proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy) { - struct server_proxy *proxy; - struct sockaddr_in46 *remote; + struct sockaddr_in46 *remote, *tmp; + struct server *scfg = proxy->scfg; int sfd; + unsigned i = 0; - proxy = zmalloc(sizeof(*proxy)); - if (!proxy) { - perror("malloc"); - return NULL; +again: + remote = NULL; + list_for_each_entry(tmp, &scfg->remotes, list) { + if (i == proxy->next_remote) { + remote = tmp; + break; + } + i++; + } + + if (!remote) { + fprintf(stderr, "No more remote addresses to attempt\n"); + /* FIXME: put tasks */ + return; } - remote = list_first_entry(&scfg->remotes, struct sockaddr_in46, list); + proxy->next_remote++; proxy->server = *remote; sockaddr_to_str(&proxy->server, proxy->serverstr, sizeof(proxy->serverstr)); fprintf(stderr, "%s: attempting proxy connection to %s (len %u)\n", @@ -50,17 +149,32 @@ proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, in sfd = socket(proxy->server.storage.ss_family, SOCK_STREAM, 0); if (sfd < 0) { perror("socket"); - uring_close(cfg, NULL, fd, NULL); - free(proxy); + goto again; + } + + uring_task_set_fd(&proxy->servertask, sfd); + uring_connect(cfg, &proxy->servertask, &proxy->server, proxy_server_connected); +} + +struct server_proxy * +proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd) +{ + struct server_proxy *proxy; + + proxy = zmalloc(sizeof(*proxy)); + if (!proxy) { + perror("malloc"); return NULL; } + proxy->scfg = scfg; proxy->client = *client; sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr)); - uring_task_init(&proxy->task, "server_proxy", &scfg->task, proxy_free); - uring_task_set_fd(&proxy->task, sfd); + uring_task_init(&proxy->clienttask, "proxy_client", &scfg->task, proxy_client_free); + uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free); + uring_task_set_fd(&proxy->clienttask, fd); list_add(&proxy->list, &scfg->proxys); - uring_connect(cfg, &proxy->task, &proxy->server, proxy_connected); + proxy_connect_next_remote(cfg, proxy); return proxy; } diff --git a/proxy.h b/proxy.h index 5b09cbc..4d68a5d 100644 --- a/proxy.h +++ b/proxy.h @@ -4,11 +4,18 @@ struct server_proxy { struct sockaddr_in46 client; char clientstr[ADDRSTRLEN]; + char clientbuf[4096]; + size_t clientlen; + struct uring_task clienttask; + struct sockaddr_in46 server; char serverstr[ADDRSTRLEN]; - struct uring_task task; - char buf[4096]; - size_t len; + char serverbuf[4096]; + size_t serverlen; + struct uring_task servertask; + + unsigned next_remote; + struct server *scfg; struct list_head list; }; diff --git a/uring.c b/uring.c index c43a8fb..96e3a67 100644 --- a/uring.c +++ b/uring.c @@ -141,6 +141,25 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac fprintf(stderr, "%s: done\n", __func__); } +void +uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + + if (!sqe) + perrordie("io_uring_sqe"); + + if (task->fd < 0) { + error("uring_write called with no fd set\n"); + return; + } + + uring_task_get(cfg, task); + task->callback = callback; + io_uring_prep_write(sqe, task->fd, buf, len, 0); + io_uring_sqe_set_data(sqe, task); +} + void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { diff --git a/uring.h b/uring.h index 4c5ee17..a851a73 100644 --- a/uring.h +++ b/uring.h @@ -18,6 +18,9 @@ void uring_task_init(struct uring_task *task, const char *name, void uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback); +void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, + size_t len, callback_t callback); + void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback); -- cgit v1.2.3