From cc2059982024af79136b9420eaec6fcfedabf3fb Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 3 Dec 2014 19:29:03 +0100 Subject: KIRO Server now has a message event handler for receives --- src/kiro-client.c | 7 +-- src/kiro-rdma.h | 6 ++- src/kiro-server.c | 138 +++++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 120 insertions(+), 31 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 0bb95fa..bb2645c 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -154,7 +154,8 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) // Right now, we don't need 'source' and 'condition' // Tell the compiler to ignore them by (void)-ing them (void) source; - (void) condition; + //(void) condition; + g_debug ("Message condidition: %i", condition); KiroClientPrivate *priv = (KiroClientPrivate *)data; struct ibv_wc wc; @@ -297,8 +298,8 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) priv->main_loop = g_main_loop_new (NULL, FALSE); priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); - g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); - g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_rdma_event, (gpointer)priv); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv); + g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index af502ec..361dabc 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -36,11 +36,15 @@ struct kiro_connection_context { struct ibv_mr peer_mr; // RDMA Memory Region Information of the peer + void *container; // Make the connection aware of its container (if any) + enum { KIRO_IDLE, KIRO_MRI_REQUESTED, // Memory Region Information Requested KIRO_RDMA_ESTABLISHED, // MRI Exchange complete. RDMA is ready - KIRO_RDMA_ACTIVE // RDMA Operation is being performed + KIRO_RDMA_ACTIVE, // RDMA Operation is being performed + KIRO_PING, // PING Message + KIRO_PONG // PONG Message (PING reply) } rdma_state; }; diff --git a/src/kiro-server.c b/src/kiro-server.c index bedba95..1694679 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -59,7 +59,7 @@ struct _KiroServerPrivate { gboolean close_signal; // Flag used to signal event listening to stop for server shutdown GMainLoop *main_loop; // Main loop of the server for event polling and handling - GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel GThread *main_thread; // Main KIRO server thread }; @@ -67,6 +67,15 @@ struct _KiroServerPrivate { G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT); +struct kiro_client_connection { + + guint id; // Client identification (Easy access) + GIOChannel *rcv_ec; // GLib IO Channel encapsulation for receive completions for the client + guint source_id; // ID of the source created by g_io_add_watch, needed to remove it again + struct rdma_cm_id *conn; // Connection Manager ID of the client +}; + + KiroServer * kiro_server_new (void) { @@ -212,6 +221,42 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) } +static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + //(void) condition; + g_debug ("Message condition: %i", condition); + + struct kiro_client_connection *cc = (struct kiro_client_connection *)data; + struct ibv_wc wc; + + if (rdma_get_recv_comp (cc->conn, &wc) < 0) { + g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); + return FALSE; + } + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context; + guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; + g_debug ("Received a message from Client %u of type %u", cc->id, type); + + //Post a generic receive in order to stay responsive to any messages from + //the client + if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (cc->conn); + return FALSE; + } + + return TRUE; +} + + static gboolean process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) { @@ -245,29 +290,63 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) return TRUE; } - g_debug ("Got connection request from client"); + do { + g_debug ("Got connection request from client"); + struct kiro_client_connection *cc = (struct kiro_client_connection *)g_try_malloc (sizeof (struct kiro_client_connection)); + if (!cc) { + errno = ENOMEM; + rdma_reject (ev->id, NULL, 0); + goto fail; + } + + if (connect_client (ev->id)) + goto fail; - if (0 == connect_client (ev->id)) { // Post a welcoming "Receive" for handshaking - if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) { - // Connection set-up successfully! (Server) - struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); - ctx->identifier = priv->next_client_id++; - priv->clients = g_list_append (priv->clients, (gpointer)ev->id); - g_debug ("Client connection assigned with ID %u", ctx->identifier); - g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); - } - } - else - g_warning ("Failed to accept client connection: %s", strerror (errno)); + if (welcome_client (ev->id, priv->mem, priv->mem_size)) + goto fail; + + // Connection set-up successfully! (Server) + // ctx was created by 'welcome_client' + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); + ctx->identifier = priv->next_client_id++; + ctx->container = cc; // Make the connection aware of its container + + // Fill the client connection container. Also create a + // g_io_channel wrapper for the new clients receive queue event + // channel and add a main_loop watch to it. + cc->id = ctx->identifier; + cc->conn = ev->id; + cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd); + ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel + cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc); + g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more + + priv->clients = g_list_append (priv->clients, (gpointer)cc); + g_debug ("Client connection assigned with ID %u", ctx->identifier); + g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); + break; + + fail: + g_warning ("Failed to accept client connection: %s", strerror (errno)); + + } while(0); } else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) { - GList *client = g_list_find (priv->clients, (gconstpointer) ev->id); + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); + if (!ctx->container) { + g_debug ("Got disconnect request from unknown client"); + return FALSE; + } + + GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container); if (client) { - struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); g_debug ("Got disconnect request from client ID %u", ctx->identifier); + struct kiro_client_connection *cc = (struct kiro_client_connection *)ctx->container; + g_source_remove (cc->source_id); // this also unrefs the GIOChannel of the source. Nice. priv->clients = g_list_delete_link (priv->clients, client); + g_free (cc); } else g_debug ("Got disconnect request from unknown client"); @@ -281,6 +360,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) struct ibv_pd *pd = ev->id->pd; kiro_destroy_connection (& (ev->id)); g_free (pd); + g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients)); } @@ -324,7 +404,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void int rtn = rdma_getaddrinfo (addr_c, port_c, &hints, &res_addrinfo); g_free (addr_c); g_free (port_c); - + if (rtn) { g_critical ("Failed to create address information: %s", strerror (errno)); return -1; @@ -382,13 +462,13 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void } priv->main_loop = g_main_loop_new (NULL, FALSE); - priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); - g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Server main loop", start_server_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref // any longer - g_io_channel_unref (priv->g_io_ec); + g_io_channel_unref (priv->conn_ec); g_message ("Enpoint listening"); @@ -400,19 +480,23 @@ static void disconnect_client (gpointer data, gpointer user_data) { (void)user_data; - + if (data) { - struct rdma_cm_id *id = (struct rdma_cm_id *)data; + struct kiro_client_connection *cc = (struct kiro_client_connection *)data; + struct rdma_cm_id *id = cc->conn; struct kiro_connection_context *ctx = (struct kiro_connection_context *) (id->context); g_debug ("Disconnecting client: %u", ctx->identifier); + g_source_remove (cc->source_id); + // Note: // The ProtectionDomain needs to be buffered and freed manually. // Each connecting client is attached with its own pd, which we // create manually. So we also need to clean it up manually. // This needs to be done AFTER the connection is brought down, so we // buffer the pointer to the pd and clean it up afterwards. - struct ibv_pd *pd = id->pd; - kiro_destroy_connection (&id); + struct ibv_pd *pd = cc->conn->pd; + kiro_destroy_connection (&(cc->conn)); + g_free (cc); g_free (pd); } } @@ -432,7 +516,7 @@ kiro_server_stop (KiroServer *self) //Shut down event listening priv->close_signal = TRUE; g_debug ("Event handling stopped"); - + g_list_foreach (priv->clients, disconnect_client, NULL); g_list_free (priv->clients); @@ -448,8 +532,8 @@ kiro_server_stop (KiroServer *self) // We don't need the connection management IO channel container any more. // Unref and thus free it. - g_io_channel_unref (priv->g_io_ec); - priv->g_io_ec = NULL; + g_io_channel_unref (priv->conn_ec); + priv->conn_ec = NULL; priv->close_signal = FALSE; // kiro_destroy_connection would try to call rdma_disconnect on the given -- cgit v1.2.3