From f945a7040e0a93c32b8c382dbd5b23cf675de2fa Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Mon, 1 Dec 2014 19:21:35 +0100 Subject: KIRO Client now has a communication event handler for receives --- src/kiro-client.c | 103 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 29 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 714a003..2b042c0 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -56,7 +56,8 @@ struct _KiroClientPrivate { gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down GMainLoop *main_loop; // Main loop of the server for event polling and handling - GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the connection manager event channel GThread *main_thread; // Main KIRO client thread }; @@ -147,6 +148,59 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) } + +static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + (void) condition; + + KiroClientPrivate *priv = (KiroClientPrivate *)data; + struct ibv_wc wc; + + if (rdma_get_recv_comp (priv->conn, &wc) < 0) { + g_critical ("Failure waiting for POST from server: %s", strerror (errno)); + return FALSE; + } + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; + g_debug ("Received a message from the Server of type: %u", type); + + if (type == KIRO_ACK_RDMA) { + g_debug ("Got RDMI Access information from Server"); + ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); + g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); + ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); + + if (!ctx->rdma_mr) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); + rdma_disconnect (priv->conn); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + return FALSE; + } + } + + //Post a generic receive in order to stay responsive to any messages from + //the server + if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + return FALSE; + } + + return TRUE; +} + + gpointer start_client_main_loop (gpointer data) { @@ -195,6 +249,11 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) g_critical ("Endpoint creation failed: %s", strerror (errno)); return -1; } + // make sure the receive queue pushes an event onto its completion channel + // This is needed in order to see any events on the recv_cq on its + // respective completion_channel file descriptor. We will use this + // mechanismn in our main loop to poll for those events. + ibv_req_notify_cq (priv->conn->recv_cq, 0); g_debug ("Route to server resolved"); struct kiro_connection_context *ctx = (struct kiro_connection_context *)g_try_malloc (sizeof (struct kiro_connection_context)); @@ -218,6 +277,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg); priv->conn->context = ctx; + //Post an preemtive receive for the servers welcome message if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno)); kiro_destroy_connection_context (&ctx); @@ -232,40 +292,21 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) return -1; } - g_message ("Connection to server established"); priv->ec = priv->conn->channel; //For easy access - struct ibv_wc wc; - - if (rdma_get_recv_comp (priv->conn, &wc) < 0) { - g_critical ("Failure waiting for POST from server: %s", strerror (errno)); - rdma_disconnect (priv->conn); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; - } - - g_debug ("Got RDMI Access information from Server"); - ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); - g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); - ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); - - if (!ctx->rdma_mr) { - g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); - rdma_disconnect (priv->conn); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; - } priv->main_loop = g_main_loop_new (NULL, FALSE); - priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); - g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); + priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_rdma_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref // any longer - g_io_channel_unref (priv->g_io_ec); + g_io_channel_unref (priv->conn_ec); + g_io_channel_unref (priv->rdma_ec); + g_message ("Connection to server established"); g_message ("Connected to %s:%s", address, port); return 0; } @@ -375,8 +416,12 @@ kiro_client_disconnect (KiroClient *self) // We don't need the connection management IO channel container any more. // Unref and thus free it. - g_io_channel_unref (priv->g_io_ec); - priv->g_io_ec = NULL; + g_io_channel_unref (priv->conn_ec); + priv->conn_ec = NULL; + + // The same goes for the cp channels + g_io_channel_unref (priv->rdma_ec); + priv->rdma_ec = NULL; priv->close_signal = FALSE; -- cgit v1.2.3 From 8579e596df0bebee274dcadf766ae425bad9b1e8 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 3 Dec 2014 14:52:24 +0100 Subject: Fixed a race condition in kiro_client_connect --- src/kiro-client.c | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 2b042c0..0bb95fa 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -57,7 +57,7 @@ struct _KiroClientPrivate { gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down GMainLoop *main_loop; // Main loop of the server for event polling and handling GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel - GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the communication event channel GThread *main_thread; // Main KIRO client thread }; @@ -90,7 +90,6 @@ kiro_client_init (KiroClient *self) { KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); memset (priv, 0, sizeof (&priv)); - //Hack to make the 'unused function' from the kiro-rdma include go away... kiro_attach_qp (NULL); } @@ -164,7 +163,7 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) g_critical ("Failure waiting for POST from server: %s", strerror (errno)); return FALSE; } - + struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; g_debug ("Received a message from the Server of type: %u", type); @@ -269,9 +268,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) if (!ctx->cf_mr_recv || !ctx->cf_mr_send) { g_critical ("Failed to register control message memory (Out of memory?)"); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + goto fail; } ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg); @@ -280,20 +277,23 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) //Post an preemtive receive for the servers welcome message if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno)); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + goto fail; } if (rdma_connect (priv->conn, NULL)) { g_critical ("Failed to establish connection to the server: %s", strerror (errno)); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + goto fail; } - priv->ec = priv->conn->channel; //For easy access + g_message ("Connection to server established. Waiting for response."); + if (!process_rdma_event (NULL, 0, (gpointer)priv)) { + g_critical ("No RDMA access information received from the server. Failed to connect."); + goto fail; + } + + g_message ("Connected to %s:%s", address, port); + priv->ec = priv->conn->channel; //For easy access priv->main_loop = g_main_loop_new (NULL, FALSE); priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); @@ -306,9 +306,13 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) g_io_channel_unref (priv->conn_ec); g_io_channel_unref (priv->rdma_ec); - g_message ("Connection to server established"); - g_message ("Connected to %s:%s", address, port); return 0; + +fail: + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + priv->conn = NULL; + return -1; } -- cgit v1.2.3 From cc2059982024af79136b9420eaec6fcfedabf3fb Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 3 Dec 2014 19:29:03 +0100 Subject: KIRO Server now has a message event handler for receives --- src/kiro-client.c | 7 +-- src/kiro-rdma.h | 6 ++- src/kiro-server.c | 138 +++++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 120 insertions(+), 31 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 0bb95fa..bb2645c 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -154,7 +154,8 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) // Right now, we don't need 'source' and 'condition' // Tell the compiler to ignore them by (void)-ing them (void) source; - (void) condition; + //(void) condition; + g_debug ("Message condidition: %i", condition); KiroClientPrivate *priv = (KiroClientPrivate *)data; struct ibv_wc wc; @@ -297,8 +298,8 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) priv->main_loop = g_main_loop_new (NULL, FALSE); priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); - g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); - g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_rdma_event, (gpointer)priv); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv); + g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index af502ec..361dabc 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -36,11 +36,15 @@ struct kiro_connection_context { struct ibv_mr peer_mr; // RDMA Memory Region Information of the peer + void *container; // Make the connection aware of its container (if any) + enum { KIRO_IDLE, KIRO_MRI_REQUESTED, // Memory Region Information Requested KIRO_RDMA_ESTABLISHED, // MRI Exchange complete. RDMA is ready - KIRO_RDMA_ACTIVE // RDMA Operation is being performed + KIRO_RDMA_ACTIVE, // RDMA Operation is being performed + KIRO_PING, // PING Message + KIRO_PONG // PONG Message (PING reply) } rdma_state; }; diff --git a/src/kiro-server.c b/src/kiro-server.c index bedba95..1694679 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -59,7 +59,7 @@ struct _KiroServerPrivate { gboolean close_signal; // Flag used to signal event listening to stop for server shutdown GMainLoop *main_loop; // Main loop of the server for event polling and handling - GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel GThread *main_thread; // Main KIRO server thread }; @@ -67,6 +67,15 @@ struct _KiroServerPrivate { G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT); +struct kiro_client_connection { + + guint id; // Client identification (Easy access) + GIOChannel *rcv_ec; // GLib IO Channel encapsulation for receive completions for the client + guint source_id; // ID of the source created by g_io_add_watch, needed to remove it again + struct rdma_cm_id *conn; // Connection Manager ID of the client +}; + + KiroServer * kiro_server_new (void) { @@ -212,6 +221,42 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) } +static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + //(void) condition; + g_debug ("Message condition: %i", condition); + + struct kiro_client_connection *cc = (struct kiro_client_connection *)data; + struct ibv_wc wc; + + if (rdma_get_recv_comp (cc->conn, &wc) < 0) { + g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); + return FALSE; + } + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context; + guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; + g_debug ("Received a message from Client %u of type %u", cc->id, type); + + //Post a generic receive in order to stay responsive to any messages from + //the client + if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (cc->conn); + return FALSE; + } + + return TRUE; +} + + static gboolean process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) { @@ -245,29 +290,63 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) return TRUE; } - g_debug ("Got connection request from client"); + do { + g_debug ("Got connection request from client"); + struct kiro_client_connection *cc = (struct kiro_client_connection *)g_try_malloc (sizeof (struct kiro_client_connection)); + if (!cc) { + errno = ENOMEM; + rdma_reject (ev->id, NULL, 0); + goto fail; + } + + if (connect_client (ev->id)) + goto fail; - if (0 == connect_client (ev->id)) { // Post a welcoming "Receive" for handshaking - if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) { - // Connection set-up successfully! (Server) - struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); - ctx->identifier = priv->next_client_id++; - priv->clients = g_list_append (priv->clients, (gpointer)ev->id); - g_debug ("Client connection assigned with ID %u", ctx->identifier); - g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); - } - } - else - g_warning ("Failed to accept client connection: %s", strerror (errno)); + if (welcome_client (ev->id, priv->mem, priv->mem_size)) + goto fail; + + // Connection set-up successfully! (Server) + // ctx was created by 'welcome_client' + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); + ctx->identifier = priv->next_client_id++; + ctx->container = cc; // Make the connection aware of its container + + // Fill the client connection container. Also create a + // g_io_channel wrapper for the new clients receive queue event + // channel and add a main_loop watch to it. + cc->id = ctx->identifier; + cc->conn = ev->id; + cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd); + ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel + cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc); + g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more + + priv->clients = g_list_append (priv->clients, (gpointer)cc); + g_debug ("Client connection assigned with ID %u", ctx->identifier); + g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); + break; + + fail: + g_warning ("Failed to accept client connection: %s", strerror (errno)); + + } while(0); } else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) { - GList *client = g_list_find (priv->clients, (gconstpointer) ev->id); + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); + if (!ctx->container) { + g_debug ("Got disconnect request from unknown client"); + return FALSE; + } + + GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container); if (client) { - struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); g_debug ("Got disconnect request from client ID %u", ctx->identifier); + struct kiro_client_connection *cc = (struct kiro_client_connection *)ctx->container; + g_source_remove (cc->source_id); // this also unrefs the GIOChannel of the source. Nice. priv->clients = g_list_delete_link (priv->clients, client); + g_free (cc); } else g_debug ("Got disconnect request from unknown client"); @@ -281,6 +360,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) struct ibv_pd *pd = ev->id->pd; kiro_destroy_connection (& (ev->id)); g_free (pd); + g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients)); } @@ -324,7 +404,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void int rtn = rdma_getaddrinfo (addr_c, port_c, &hints, &res_addrinfo); g_free (addr_c); g_free (port_c); - + if (rtn) { g_critical ("Failed to create address information: %s", strerror (errno)); return -1; @@ -382,13 +462,13 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void } priv->main_loop = g_main_loop_new (NULL, FALSE); - priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); - g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Server main loop", start_server_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref // any longer - g_io_channel_unref (priv->g_io_ec); + g_io_channel_unref (priv->conn_ec); g_message ("Enpoint listening"); @@ -400,19 +480,23 @@ static void disconnect_client (gpointer data, gpointer user_data) { (void)user_data; - + if (data) { - struct rdma_cm_id *id = (struct rdma_cm_id *)data; + struct kiro_client_connection *cc = (struct kiro_client_connection *)data; + struct rdma_cm_id *id = cc->conn; struct kiro_connection_context *ctx = (struct kiro_connection_context *) (id->context); g_debug ("Disconnecting client: %u", ctx->identifier); + g_source_remove (cc->source_id); + // Note: // The ProtectionDomain needs to be buffered and freed manually. // Each connecting client is attached with its own pd, which we // create manually. So we also need to clean it up manually. // This needs to be done AFTER the connection is brought down, so we // buffer the pointer to the pd and clean it up afterwards. - struct ibv_pd *pd = id->pd; - kiro_destroy_connection (&id); + struct ibv_pd *pd = cc->conn->pd; + kiro_destroy_connection (&(cc->conn)); + g_free (cc); g_free (pd); } } @@ -432,7 +516,7 @@ kiro_server_stop (KiroServer *self) //Shut down event listening priv->close_signal = TRUE; g_debug ("Event handling stopped"); - + g_list_foreach (priv->clients, disconnect_client, NULL); g_list_free (priv->clients); @@ -448,8 +532,8 @@ kiro_server_stop (KiroServer *self) // We don't need the connection management IO channel container any more. // Unref and thus free it. - g_io_channel_unref (priv->g_io_ec); - priv->g_io_ec = NULL; + g_io_channel_unref (priv->conn_ec); + priv->conn_ec = NULL; priv->close_signal = FALSE; // kiro_destroy_connection would try to call rdma_disconnect on the given -- cgit v1.2.3 From b71e3e75e13a40838c1b386fdeff3afd2f453e9f Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Thu, 4 Dec 2014 18:33:54 +0100 Subject: Fixed a race condition concerning message handling in kiro client --- src/kiro-client.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index bb2645c..084e4b8 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -197,6 +197,11 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) return FALSE; } + // make sure the next incoming work completion causes an event on the + // receive completion channel. We will poll() the channels file descriptor + // for this in the kiro client main loop. + ibv_req_notify_cq (priv->conn->recv_cq, 0); + return TRUE; } @@ -249,11 +254,6 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) g_critical ("Endpoint creation failed: %s", strerror (errno)); return -1; } - // make sure the receive queue pushes an event onto its completion channel - // This is needed in order to see any events on the recv_cq on its - // respective completion_channel file descriptor. We will use this - // mechanismn in our main loop to poll for those events. - ibv_req_notify_cq (priv->conn->recv_cq, 0); g_debug ("Route to server resolved"); struct kiro_connection_context *ctx = (struct kiro_connection_context *)g_try_malloc (sizeof (struct kiro_connection_context)); -- cgit v1.2.3 From b948c151b9f45db2b90a9ecf95ffd4f54edf3924 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Fri, 5 Dec 2014 19:31:09 +0100 Subject: Fixed a problem with the kiro server getting stuck in the RDMA event handler --- src/kiro-server.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/kiro-server.c b/src/kiro-server.c index 1694679..ad4593b 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -233,10 +233,15 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) struct kiro_client_connection *cc = (struct kiro_client_connection *)data; struct ibv_wc wc; - if (rdma_get_recv_comp (cc->conn, &wc) < 0) { + if (ibv_poll_cq (cc->conn->recv_cq, 1, &wc) < 0) { g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); return FALSE; } + void *cq_ctx; + struct ibv_cq *cq; + int err = ibv_get_cq_event (cc->conn->recv_cq_channel, &cq, &cq_ctx); + if (!err) + ibv_ack_cq_events (cq, 1); struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context; guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; @@ -253,6 +258,7 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) return FALSE; } + ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel return TRUE; } @@ -306,6 +312,8 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) if (welcome_client (ev->id, priv->mem, priv->mem_size)) goto fail; + ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel + // Connection set-up successfully! (Server) // ctx was created by 'welcome_client' struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); @@ -318,7 +326,6 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) cc->id = ctx->identifier; cc->conn = ev->id; cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd); - ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc); g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more -- cgit v1.2.3 From 50a297a290c78c7feb0a4918efba82edd019590b Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 10 Dec 2014 15:27:32 +0100 Subject: Fixed KIRO client getting stuck in the RDMA event handler Fix #8: KIRO Server and Client now have routines to handle async communication Fix #6: Added kiro_client_ping_server and respective event handling to server Changed kiro-test-latency to use the new kiro_client_ping_server instead --- src/kiro-client.c | 152 ++++++++++++++++++++++++++++++++++++++++++--- src/kiro-client.h | 13 ++++ src/kiro-rdma.h | 14 ++--- src/kiro-server.c | 19 +++++- test/test-client-latency.c | 32 ++++------ 5 files changed, 195 insertions(+), 35 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 084e4b8..6e140b5 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -65,6 +65,11 @@ struct _KiroClientPrivate { G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT); +// Temporary storage and lock for PING timing +G_LOCK_DEFINE (ping_time); +volatile struct timeval ping_time; + + KiroClient * kiro_client_new (void) { @@ -92,6 +97,8 @@ kiro_client_init (KiroClient *self) memset (priv, 0, sizeof (&priv)); //Hack to make the 'unused function' from the kiro-rdma include go away... kiro_attach_qp (NULL); + ping_time.tv_sec = -1; + ping_time.tv_usec = -1; } @@ -160,10 +167,15 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) KiroClientPrivate *priv = (KiroClientPrivate *)data; struct ibv_wc wc; - if (rdma_get_recv_comp (priv->conn, &wc) < 0) { - g_critical ("Failure waiting for POST from server: %s", strerror (errno)); - return FALSE; + if (ibv_poll_cq (priv->conn->recv_cq, 1, &wc) < 0) { + g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); + return FALSE; } + void *cq_ctx; + struct ibv_cq *cq; + int err = ibv_get_cq_event (priv->conn->recv_cq_channel, &cq, &cq_ctx); + if (!err) + ibv_ack_cq_events (cq, 1); struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; @@ -185,6 +197,22 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) return FALSE; } } + if (type == KIRO_PONG) { + G_LOCK (ping_time); + struct timeval local_time; + gettimeofday (&local_time, NULL); + + if (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) { + g_debug ("Received PONG message from server"); + ping_time.tv_sec = local_time.tv_sec; + ping_time.tv_usec = local_time.tv_usec; + } + else { + g_debug ("Received unexpected PONG message from server"); + } + + G_UNLOCK (ping_time); + } //Post a generic receive in order to stay responsive to any messages from //the server @@ -202,6 +230,7 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) // for this in the kiro client main loop. ibv_req_notify_cq (priv->conn->recv_cq, 0); + g_debug ("Finished RDMA event handling"); return TRUE; } @@ -287,6 +316,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) } g_message ("Connection to server established. Waiting for response."); + ibv_req_notify_cq (priv->conn->recv_cq, 0); // Make the respective Queue push events onto the channel if (!process_rdma_event (NULL, 0, (gpointer)priv)) { g_critical ("No RDMA access information received from the server. Failed to connect."); goto fail; @@ -355,11 +385,119 @@ kiro_client_sync (KiroClient *self) } fail: - kiro_destroy_connection (&(priv->conn)); + kiro_destroy_connection (&(priv->conn)); return -1; } +gboolean +ping_timeout (gpointer data) { + + //Not needed. Void it to prevent 'unused variable' warning + (void) data; + + G_LOCK (ping_time); + + // Maybe the server did answer while dispatching the timeout? + if (ping_time.tv_sec != 0 || ping_time.tv_usec != 0) { + goto done; + } + + ping_time.tv_usec = -1; + ping_time.tv_sec = -1; + + +done: + G_UNLOCK (ping_time); + + // Return FALSE to automtically stop the timeout from reoccuring + return FALSE; +} + + +gint +kiro_client_ping_server (KiroClient *self) +{ + // Will be returned. -1 for error. + gint t_usec = 0; + + KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); + if (!priv->conn) { + g_warning ("Client not connected"); + return -1; + } + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + + struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *)(ctx->cf_mr_send->mem); + msg->msg_type = KIRO_PING; + + G_LOCK (ping_time); + ping_time.tv_sec = 0; + ping_time.tv_usec = 0; + struct timeval local_time; + gettimeofday (&local_time, NULL); + + if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { + g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno)); + t_usec = -1; + goto end; + } + G_UNLOCK (ping_time); + + struct ibv_wc wc; + if (rdma_get_send_comp (priv->conn, &wc) < 0) { + g_warning ("Failure during PING send: %s", strerror (errno)); + t_usec = -1; + goto end; + } + + // Set a two-second timeout for the ping + guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL); + + //Wait for ping response + while (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) {}; + + + G_LOCK (ping_time); + // No response from the server. Timeout kicked in + // (Note: The timeout callback has already deregistered itself. We don't + // need to do that here again) + if (ping_time.tv_sec == -1 && ping_time.tv_usec == -1) { + g_message ("PING timed out."); + G_UNLOCK (ping_time); + t_usec = -1; + goto end; + } + + // Remove the timeout + GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout); + if (timeout_source) { + g_source_destroy (timeout_source); + } + + gint secs = ping_time.tv_sec - local_time.tv_sec; + + // tv_usecs wraps back to 0 at 1000000us (1s). + // This might cause our calculation to produce negative numbers when time > 1s. + for (int i = 0; i < secs; i++) { + ping_time.tv_usec += 1000 * 1000; + } + t_usec = ping_time.tv_usec - local_time.tv_usec; + gint millis = (gint)(t_usec/1000.); + G_UNLOCK (ping_time); + + g_debug ("Server responded to PING in: %is, %ims, %ius", secs, millis, t_usec); + +end: + G_LOCK (ping_time); + ping_time.tv_sec = -1; + ping_time.tv_usec = -1; + G_UNLOCK (ping_time); + return t_usec; +} + + void * kiro_client_get_memory (KiroClient *self) { @@ -377,7 +515,7 @@ kiro_client_get_memory (KiroClient *self) } -size_t +size_t kiro_client_get_memory_size (KiroClient *self) { KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); @@ -431,13 +569,13 @@ kiro_client_disconnect (KiroClient *self) priv->close_signal = FALSE; //kiro_destroy_connection does not free RDMA memory. Therefore, we need to - //cache the memory pointer and free the memory afterwards manually + //cache the memory pointer and free the memory afterwards manually struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->conn->context); void *rdma_mem = ctx->rdma_mr->mem; kiro_destroy_connection (&(priv->conn)); free (rdma_mem); - // priv->ec is just an easy-access pointer. Don't free it. Just NULL it + // priv->ec is just an easy-access pointer. Don't free it. Just NULL it priv->ec = NULL; g_message ("Client disconnected from server"); } diff --git a/src/kiro-client.h b/src/kiro-client.h index 9c6036d..3be2621 100644 --- a/src/kiro-client.h +++ b/src/kiro-client.h @@ -159,6 +159,19 @@ void kiro_client_disconnect (KiroClient *client); */ int kiro_client_sync (KiroClient *client); +/** + * kiro_client_ping_server - Sends a PING to the server + * @client: (transfer none): The #KiroServer to send the PING from + * Returns: + * A #guint telling the time (in microseconds) how long it took for the + * connected #KiroServer to reply + * Description: + * Sends a PING package to the connected #KiroServer and waits for a PONG + * package from that server. The time between sending the PING and receiving + * the PONG (in microseconds) is measured and returned by this function. + */ +gint kiro_client_ping_server (KiroClient *client); + /** * kiro_client_get_memory - Return a pointer to the current client memory * @client: (transfer none): The #KiroClient to get the memory from diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index 361dabc..5b4895f 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -19,6 +19,7 @@ #include #include #include +#include #ifndef __KIRO_RDMA_H__ #define __KIRO_RDMA_H__ @@ -42,9 +43,7 @@ struct kiro_connection_context { KIRO_IDLE, KIRO_MRI_REQUESTED, // Memory Region Information Requested KIRO_RDMA_ESTABLISHED, // MRI Exchange complete. RDMA is ready - KIRO_RDMA_ACTIVE, // RDMA Operation is being performed - KIRO_PING, // PING Message - KIRO_PONG // PONG Message (PING reply) + KIRO_RDMA_ACTIVE // RDMA Operation is being performed } rdma_state; }; @@ -55,11 +54,12 @@ struct kiro_ctrl_msg { enum { KIRO_REQ_RDMA, // Requesting RDMA Access to/from the peer KIRO_ACK_RDMA, // acknowledge RDMA Request and provide Memory Region Information - KIRO_REJ_RDMA // RDMA Request rejected :( (peer_mri will be invalid) + KIRO_REJ_RDMA, // RDMA Request rejected :( (peer_mri will be invalid) + KIRO_PING, // PING Message + KIRO_PONG // PONG Message (PING reply) } msg_type; struct ibv_mr peer_mri; - }; @@ -89,8 +89,8 @@ kiro_attach_qp (struct rdma_cm_id *id) qp_attr.send_cq = id->send_cq; qp_attr.recv_cq = id->recv_cq; qp_attr.qp_type = IBV_QPT_RC; - qp_attr.cap.max_send_wr = 1; - qp_attr.cap.max_recv_wr = 1; + qp_attr.cap.max_send_wr = 10; + qp_attr.cap.max_recv_wr = 10; qp_attr.cap.max_send_sge = 1; qp_attr.cap.max_recv_sge = 1; return rdma_create_qp (id, id->pd, &qp_attr); diff --git a/src/kiro-server.c b/src/kiro-server.c index ad4593b..e7a3908 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -247,18 +247,35 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; g_debug ("Received a message from Client %u of type %u", cc->id, type); + if (type == KIRO_PING) { + struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); + msg->msg_type = KIRO_PONG; + + if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { + g_warning ("Failure while trying to post PONG send: %s", strerror (errno)); + goto done; + } + + if (rdma_get_send_comp (cc->conn, &wc) < 0) { + g_warning ("An error occured while sending PONG: %s", strerror (errno)); + } + } + +done: //Post a generic receive in order to stay responsive to any messages from //the client if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { //TODO: Connection teardown in an event handler routine? Not a good //idea... - g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); + g_critical ("Posting generic receive for event handling failed: %s", strerror (errno)); kiro_destroy_connection_context (&ctx); rdma_destroy_ep (cc->conn); return FALSE; } ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel + + g_debug ("Finished RDMA event handling"); return TRUE; } diff --git a/test/test-client-latency.c b/test/test-client-latency.c index d05747d..208c37c 100644 --- a/test/test-client-latency.c +++ b/test/test-client-latency.c @@ -6,7 +6,7 @@ #include -int +int main ( int argc, char *argv[] ) { if (argc < 3) { @@ -15,38 +15,30 @@ main ( int argc, char *argv[] ) } KiroClient *client = kiro_client_new (); - KiroTrb *trb = kiro_trb_new (); if (-1 == kiro_client_connect (client, argv[1], argv[2])) { kiro_client_free (client); return -1; } - kiro_client_sync (client); - kiro_trb_adopt (trb, kiro_client_get_memory (client)); + int iterations = 10000; - GTimer *timer = g_timer_new (); -while (1) { - g_timer_reset (timer); +while (1) { int i = 0; - while(i < 50000) { - kiro_client_sync (client); + float ping_us = 0; + int fail_count = 0; + while(i < iterations) { + float tmp = kiro_client_ping_server (client); + if (tmp < 0) + fail_count++; + else + ping_us += tmp; i++; } - double elapsed = g_timer_elapsed (timer, NULL); - printf ("Average Latency: %fus\n", (elapsed/50000.)*1000*1000); + printf ("Average Latency: %fus\n", ping_us/(float)(iterations - fail_count)); } - g_timer_stop (timer); kiro_client_free (client); - kiro_trb_free (trb); return 0; } - - - - - - - -- cgit v1.2.3 From 3e0184e13e7d16f532e0c691eb70e55706589f67 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 10 Dec 2014 16:15:23 +0100 Subject: Added a missing pointer cleanup --- src/kiro-server.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/kiro-server.c b/src/kiro-server.c index e7a3908..ff6c0f8 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -371,6 +371,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) g_source_remove (cc->source_id); // this also unrefs the GIOChannel of the source. Nice. priv->clients = g_list_delete_link (priv->clients, client); g_free (cc); + ctx->container = NULL; } else g_debug ("Got disconnect request from unknown client"); -- cgit v1.2.3 From 0dc8c19937b52dfb793672226183697b6987b9fe Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 10 Dec 2014 16:20:24 +0100 Subject: Push release version to 2 (0.2.0) --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a2491de..31c74f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,9 +5,9 @@ set(CMAKE_INCLUDE_CURRENT_DIR TRUE) set(TARNAME "kiro") set(LIBKIRO_VERSION_MAJOR "0") -set(LIBKIRO_VERSION_MINOR "1") +set(LIBKIRO_VERSION_MINOR "2") set(LIBKIRO_VERSION_PATCH "0") -set(LIBKIRO_VERSION_RELEASE "1") +set(LIBKIRO_VERSION_RELEASE "2") set(LIBKIRO_VERSION_STRING "${LIBKIRO_VERSION_MAJOR}.${LIBKIRO_VERSION_MINOR}.${LIBKIRO_VERSION_PATCH}") set(VERSION "${LIBKIRO_VERSION_STRING}") set(LIBKIRO_DESCRIPTION "Small InfiniBand communication Server and Client") -- cgit v1.2.3