diff options
-rw-r--r-- | src/kiro-client.c | 103 |
1 files changed, 74 insertions, 29 deletions
diff --git a/src/kiro-client.c b/src/kiro-client.c index 714a003..2b042c0 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -56,7 +56,8 @@ struct _KiroClientPrivate { gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down GMainLoop *main_loop; // Main loop of the server for event polling and handling - GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the connection manager event channel GThread *main_thread; // Main KIRO client thread }; @@ -147,6 +148,59 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) } + +static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + (void) condition; + + KiroClientPrivate *priv = (KiroClientPrivate *)data; + struct ibv_wc wc; + + if (rdma_get_recv_comp (priv->conn, &wc) < 0) { + g_critical ("Failure waiting for POST from server: %s", strerror (errno)); + return FALSE; + } + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; + g_debug ("Received a message from the Server of type: %u", type); + + if (type == KIRO_ACK_RDMA) { + g_debug ("Got RDMI Access information from Server"); + ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); + g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); + ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); + + if (!ctx->rdma_mr) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); + rdma_disconnect (priv->conn); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + return FALSE; + } + } + + //Post a generic receive in order to stay responsive to any messages from + //the server + if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + return FALSE; + } + + return TRUE; +} + + gpointer start_client_main_loop (gpointer data) { @@ -195,6 +249,11 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) g_critical ("Endpoint creation failed: %s", strerror (errno)); return -1; } + // make sure the receive queue pushes an event onto its completion channel + // This is needed in order to see any events on the recv_cq on its + // respective completion_channel file descriptor. We will use this + // mechanismn in our main loop to poll for those events. + ibv_req_notify_cq (priv->conn->recv_cq, 0); g_debug ("Route to server resolved"); struct kiro_connection_context *ctx = (struct kiro_connection_context *)g_try_malloc (sizeof (struct kiro_connection_context)); @@ -218,6 +277,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg); priv->conn->context = ctx; + //Post an preemtive receive for the servers welcome message if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno)); kiro_destroy_connection_context (&ctx); @@ -232,40 +292,21 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) return -1; } - g_message ("Connection to server established"); priv->ec = priv->conn->channel; //For easy access - struct ibv_wc wc; - - if (rdma_get_recv_comp (priv->conn, &wc) < 0) { - g_critical ("Failure waiting for POST from server: %s", strerror (errno)); - rdma_disconnect (priv->conn); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; - } - - g_debug ("Got RDMI Access information from Server"); - ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); - g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); - ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); - - if (!ctx->rdma_mr) { - g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); - rdma_disconnect (priv->conn); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; - } priv->main_loop = g_main_loop_new (NULL, FALSE); - priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); - g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); + priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_rdma_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref // any longer - g_io_channel_unref (priv->g_io_ec); + g_io_channel_unref (priv->conn_ec); + g_io_channel_unref (priv->rdma_ec); + g_message ("Connection to server established"); g_message ("Connected to %s:%s", address, port); return 0; } @@ -375,8 +416,12 @@ kiro_client_disconnect (KiroClient *self) // We don't need the connection management IO channel container any more. // Unref and thus free it. - g_io_channel_unref (priv->g_io_ec); - priv->g_io_ec = NULL; + g_io_channel_unref (priv->conn_ec); + priv->conn_ec = NULL; + + // The same goes for the cp channels + g_io_channel_unref (priv->rdma_ec); + priv->rdma_ec = NULL; priv->close_signal = FALSE; |