summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/kiro-client.c103
1 files changed, 74 insertions, 29 deletions
diff --git a/src/kiro-client.c b/src/kiro-client.c
index 714a003..2b042c0 100644
--- a/src/kiro-client.c
+++ b/src/kiro-client.c
@@ -56,7 +56,8 @@ struct _KiroClientPrivate {
gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down
GMainLoop *main_loop; // Main loop of the server for event polling and handling
- GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel
+ GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel
+ GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the connection manager event channel
GThread *main_thread; // Main KIRO client thread
};
@@ -147,6 +148,59 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
}
+
+static gboolean
+process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ // Right now, we don't need 'source' and 'condition'
+ // Tell the compiler to ignore them by (void)-ing them
+ (void) source;
+ (void) condition;
+
+ KiroClientPrivate *priv = (KiroClientPrivate *)data;
+ struct ibv_wc wc;
+
+ if (rdma_get_recv_comp (priv->conn, &wc) < 0) {
+ g_critical ("Failure waiting for POST from server: %s", strerror (errno));
+ return FALSE;
+ }
+
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context;
+ guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;
+ g_debug ("Received a message from the Server of type: %u", type);
+
+ if (type == KIRO_ACK_RDMA) {
+ g_debug ("Got RDMI Access information from Server");
+ ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri);
+ g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length);
+ ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE);
+
+ if (!ctx->rdma_mr) {
+ //TODO: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Failed to allocate memory for receive buffer (Out of memory?)");
+ rdma_disconnect (priv->conn);
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (priv->conn);
+ return FALSE;
+ }
+ }
+
+ //Post a generic receive in order to stay responsive to any messages from
+ //the server
+ if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
+ //TODO: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Posting generic receive for connection failed: %s", strerror (errno));
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (priv->conn);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
gpointer
start_client_main_loop (gpointer data)
{
@@ -195,6 +249,11 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)
g_critical ("Endpoint creation failed: %s", strerror (errno));
return -1;
}
+ // make sure the receive queue pushes an event onto its completion channel
+ // This is needed in order to see any events on the recv_cq on its
+ // respective completion_channel file descriptor. We will use this
+ // mechanismn in our main loop to poll for those events.
+ ibv_req_notify_cq (priv->conn->recv_cq, 0);
g_debug ("Route to server resolved");
struct kiro_connection_context *ctx = (struct kiro_connection_context *)g_try_malloc (sizeof (struct kiro_connection_context));
@@ -218,6 +277,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)
ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg);
priv->conn->context = ctx;
+ //Post an preemtive receive for the servers welcome message
if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno));
kiro_destroy_connection_context (&ctx);
@@ -232,40 +292,21 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)
return -1;
}
- g_message ("Connection to server established");
priv->ec = priv->conn->channel; //For easy access
- struct ibv_wc wc;
-
- if (rdma_get_recv_comp (priv->conn, &wc) < 0) {
- g_critical ("Failure waiting for POST from server: %s", strerror (errno));
- rdma_disconnect (priv->conn);
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
- }
-
- g_debug ("Got RDMI Access information from Server");
- ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri);
- g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length);
- ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE);
-
- if (!ctx->rdma_mr) {
- g_critical ("Failed to allocate memory for receive buffer (Out of memory?)");
- rdma_disconnect (priv->conn);
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
- }
priv->main_loop = g_main_loop_new (NULL, FALSE);
- priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd);
- g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv);
+ priv->conn_ec = g_io_channel_unix_new (priv->ec->fd);
+ priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd);
+ g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv);
+ g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_rdma_event, (gpointer)priv);
priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop);
// We gave control to the main_loop (with add_watch) and don't need our ref
// any longer
- g_io_channel_unref (priv->g_io_ec);
+ g_io_channel_unref (priv->conn_ec);
+ g_io_channel_unref (priv->rdma_ec);
+ g_message ("Connection to server established");
g_message ("Connected to %s:%s", address, port);
return 0;
}
@@ -375,8 +416,12 @@ kiro_client_disconnect (KiroClient *self)
// We don't need the connection management IO channel container any more.
// Unref and thus free it.
- g_io_channel_unref (priv->g_io_ec);
- priv->g_io_ec = NULL;
+ g_io_channel_unref (priv->conn_ec);
+ priv->conn_ec = NULL;
+
+ // The same goes for the cp channels
+ g_io_channel_unref (priv->rdma_ec);
+ priv->rdma_ec = NULL;
priv->close_signal = FALSE;