summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-05 19:34:52 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-05 19:34:52 +0200
commita683051b05930d1dd2766b98494bbd124817a6dd (patch)
treedcc2ab910af026c8cf32df5dc7d548a665098082
parent3d2d0d4b3aa64aba018b049edf9c0396d5a598d5 (diff)
Implement more proxy functionality
-rw-r--r--proxy.c154
-rw-r--r--proxy.h13
-rw-r--r--uring.c19
-rw-r--r--uring.h3
4 files changed, 166 insertions, 23 deletions
diff --git a/proxy.c b/proxy.c
index 00090b1..28760e4 100644
--- a/proxy.c
+++ b/proxy.c
@@ -7,41 +7,140 @@
void
proxy_refdump(struct server_proxy *proxy)
{
- uring_task_refdump(&proxy->task);
+ uring_task_refdump(&proxy->clienttask);
+ uring_task_refdump(&proxy->servertask);
}
static void
-proxy_free(struct uring_task *task)
+proxy_client_free(struct uring_task *task)
{
- struct server_proxy *proxy = container_of(task, struct server_proxy, task);
+ struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
+ fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name);
+ /*
list_del(&proxy->list);
free(proxy);
+ */
}
static void
-proxy_connected(struct cfg *cfg, struct uring_task *task, int res)
+proxy_server_free(struct uring_task *task)
{
- //struct server_proxy *proxy = container_of(task, struct server_proxy, task);
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name);
+ /*
+ list_del(&proxy->list);
+ free(proxy);
+ */
+}
+
+static void proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res);
+
+static void
+proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
+
+ fprintf(stderr, "%s: result was %i\n", __func__, res);
+ if (res <= 0)
+ return;
+
+ if (res != proxy->clientlen) {
+ fprintf(stderr, "%s: short write\n", __func__);
+ return;
+ }
+
+ uring_read(cfg, task, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in);
+}
+
+static void
+proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask);
+
+ fprintf(stderr, "%s: result was %i\n", __func__, res);
+ if (res <= 0)
+ return;
+
+ proxy->clientlen = res;
+ uring_write(cfg, task, proxy->clientbuf, res, proxy_client_data_out);
+}
+
+static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res);
+
+static void
+proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ fprintf(stderr, "%s: result was %i\n", __func__, res);
+ if (res <= 0)
+ return;
+
+ if (res != proxy->serverlen) {
+ fprintf(stderr, "%s: short write\n", __func__);
+ return;
+ }
+
+ uring_read(cfg, task, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in);
+}
+
+static void
+proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
+
+ fprintf(stderr, "%s: result was %i\n", __func__, res);
+ if (res <= 0)
+ return;
+
+ proxy->serverlen = res;
+ uring_write(cfg, task, proxy->serverbuf, res, proxy_server_data_out);
+}
+
+static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy);
+
+static void
+proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res)
+{
+ struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);
fprintf(stderr, "%s: connected %i\n", __func__, res);
- return;
+ if (res < 0) {
+ proxy_connect_next_remote(cfg, proxy);
+ return;
+ }
+
+ uring_read(cfg, &proxy->clienttask, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in);
+ uring_read(cfg, &proxy->servertask, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in);
}
-struct server_proxy *
-proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd)
+static void
+proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy)
{
- struct server_proxy *proxy;
- struct sockaddr_in46 *remote;
+ struct sockaddr_in46 *remote, *tmp;
+ struct server *scfg = proxy->scfg;
int sfd;
+ unsigned i = 0;
- proxy = zmalloc(sizeof(*proxy));
- if (!proxy) {
- perror("malloc");
- return NULL;
+again:
+ remote = NULL;
+ list_for_each_entry(tmp, &scfg->remotes, list) {
+ if (i == proxy->next_remote) {
+ remote = tmp;
+ break;
+ }
+ i++;
+ }
+
+ if (!remote) {
+ fprintf(stderr, "No more remote addresses to attempt\n");
+ /* FIXME: put tasks */
+ return;
}
- remote = list_first_entry(&scfg->remotes, struct sockaddr_in46, list);
+ proxy->next_remote++;
proxy->server = *remote;
sockaddr_to_str(&proxy->server, proxy->serverstr, sizeof(proxy->serverstr));
fprintf(stderr, "%s: attempting proxy connection to %s (len %u)\n",
@@ -50,17 +149,32 @@ proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, in
sfd = socket(proxy->server.storage.ss_family, SOCK_STREAM, 0);
if (sfd < 0) {
perror("socket");
- uring_close(cfg, NULL, fd, NULL);
- free(proxy);
+ goto again;
+ }
+
+ uring_task_set_fd(&proxy->servertask, sfd);
+ uring_connect(cfg, &proxy->servertask, &proxy->server, proxy_server_connected);
+}
+
+struct server_proxy *
+proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd)
+{
+ struct server_proxy *proxy;
+
+ proxy = zmalloc(sizeof(*proxy));
+ if (!proxy) {
+ perror("malloc");
return NULL;
}
+ proxy->scfg = scfg;
proxy->client = *client;
sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr));
- uring_task_init(&proxy->task, "server_proxy", &scfg->task, proxy_free);
- uring_task_set_fd(&proxy->task, sfd);
+ uring_task_init(&proxy->clienttask, "proxy_client", &scfg->task, proxy_client_free);
+ uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free);
+ uring_task_set_fd(&proxy->clienttask, fd);
list_add(&proxy->list, &scfg->proxys);
- uring_connect(cfg, &proxy->task, &proxy->server, proxy_connected);
+ proxy_connect_next_remote(cfg, proxy);
return proxy;
}
diff --git a/proxy.h b/proxy.h
index 5b09cbc..4d68a5d 100644
--- a/proxy.h
+++ b/proxy.h
@@ -4,11 +4,18 @@
struct server_proxy {
struct sockaddr_in46 client;
char clientstr[ADDRSTRLEN];
+ char clientbuf[4096];
+ size_t clientlen;
+ struct uring_task clienttask;
+
struct sockaddr_in46 server;
char serverstr[ADDRSTRLEN];
- struct uring_task task;
- char buf[4096];
- size_t len;
+ char serverbuf[4096];
+ size_t serverlen;
+ struct uring_task servertask;
+
+ unsigned next_remote;
+ struct server *scfg;
struct list_head list;
};
diff --git a/uring.c b/uring.c
index c43a8fb..96e3a67 100644
--- a/uring.c
+++ b/uring.c
@@ -142,6 +142,25 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac
}
void
+uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback)
+{
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
+
+ if (!sqe)
+ perrordie("io_uring_sqe");
+
+ if (task->fd < 0) {
+ error("uring_write called with no fd set\n");
+ return;
+ }
+
+ uring_task_get(cfg, task);
+ task->callback = callback;
+ io_uring_prep_write(sqe, task->fd, buf, len, 0);
+ io_uring_sqe_set_data(sqe, task);
+}
+
+void
uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring);
diff --git a/uring.h b/uring.h
index 4c5ee17..a851a73 100644
--- a/uring.h
+++ b/uring.h
@@ -18,6 +18,9 @@ void uring_task_init(struct uring_task *task, const char *name,
void uring_close(struct cfg *cfg, struct uring_task *task, int fd,
callback_t callback);
+void uring_write(struct cfg *cfg, struct uring_task *task, void *buf,
+ size_t len, callback_t callback);
+
void uring_read(struct cfg *cfg, struct uring_task *task, void *buf,
size_t len, off_t offset, callback_t callback);