From 72cb71c99131db200871dac9e17acefdf97292e7 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Tue, 13 May 2014 16:25:42 +0200 Subject: Added kiro_attach_qp to kiro-rdma.h that creates a new QP for a rdma_cm_id and attaches it. Changed kiro_destroy_connection to work on a rdma_cm_id instead. Changed kiro-server accordingly. Restructured kiro-server to use an event loop thread to listen for new client connections. Restructured kiro-server to no longer memorize the client connections. It is currently unneccessary as no control-flow is exchanged. --- kiro-client.c | 4 +- kiro-rdma.h | 38 +++++++++++--- kiro-server.c | 163 ++++++++++++++++++++++++++++++++++++++-------------------- test-client.c | 10 ++-- 4 files changed, 146 insertions(+), 69 deletions(-) diff --git a/kiro-client.c b/kiro-client.c index b6a515e..807d7d5 100644 --- a/kiro-client.c +++ b/kiro-client.c @@ -69,8 +69,8 @@ static void kiro_client_init (KiroClient *self) static void kiro_client_finalize (GObject *object) { - KiroClient *self = KIRO_CLIENT(object); - KiroClientPrivate * priv = KIRO_CLIENT_GET_PRIVATE(self); + //KiroClient *self = KIRO_CLIENT(object); + //KiroClientPrivate * priv = KIRO_CLIENT_GET_PRIVATE(self); //PASS } diff --git a/kiro-rdma.h b/kiro-rdma.h index 696c880..fa16fd1 100644 --- a/kiro-rdma.h +++ b/kiro-rdma.h @@ -77,6 +77,32 @@ struct kiro_rdma_mem { }; +static int kiro_attach_qp (struct rdma_cm_id *id) +{ + if(!id) + return -1; + + id->pd = ibv_alloc_pd(id->verbs); + id->send_cq_channel = ibv_create_comp_channel(id->verbs); + id->recv_cq_channel = id->send_cq_channel; //we use one shared completion channel + id->send_cq = ibv_create_cq(id->verbs, 1, id, id->send_cq_channel, 0); + id->recv_cq = id->send_cq; //we use one shared completion queue + + struct ibv_qp_init_attr qp_attr; + memset(&qp_attr, 0, sizeof(struct ibv_qp_init_attr)); + qp_attr.qp_context = (uintptr_t)id; + qp_attr.send_cq = id->send_cq; + qp_attr.recv_cq = id->recv_cq; + qp_attr.qp_type = IBV_QPT_RC; + qp_attr.cap.max_send_wr = 1; + qp_attr.cap.max_recv_wr = 1; + qp_attr.cap.max_send_sge = 1; + qp_attr.cap.max_recv_sge = 1; + + return rdma_create_qp(id, id->pd, &qp_attr); +} + + static int kiro_register_rdma_memory (struct ibv_pd *pd, struct ibv_mr **mr, void *mem, size_t mem_size, int access) { @@ -186,21 +212,17 @@ static void kiro_destroy_connection_context (struct kiro_connection_context **ct } -static void kiro_destroy_connection (struct kiro_connection **conn) +static void kiro_destroy_connection (struct rdma_cm_id **conn) { if(!(*conn)) return; - - if(!(*conn)->id) - return; - rdma_disconnect((*conn)->id); - struct kiro_connection_context *ctx = (struct kiro_connection_context *)((*conn)->id->context); + rdma_disconnect(*conn); + struct kiro_connection_context *ctx = (struct kiro_connection_context *)((*conn)->context); if(ctx) kiro_destroy_connection_context(&ctx); - rdma_destroy_ep((*conn)->id); - free(*conn); + rdma_destroy_ep(*conn); *conn = NULL; } diff --git a/kiro-server.c b/kiro-server.c index c017b07..52304c8 100644 --- a/kiro-server.c +++ b/kiro-server.c @@ -50,9 +50,13 @@ struct _KiroServerPrivate { /* 'Real' private structures */ /* (Not accessible by properties) */ - struct rdma_event_channel *ec; // Main Event Channel - struct rdma_cm_id *base; // Base-Listening-Connection - struct kiro_connection *client; // Connection to the client + struct rdma_event_channel *ec; // Main Event Channel + struct rdma_cm_id *base; // Base-Listening-Connection + struct kiro_connection *client; // Connection to the client + pthread_t event_listener; // Pointer to the completion-listener thread of this connection + pthread_mutex_t mtx; // Mutex to signal the listener-thread termination + void *mem; // Pointer to the server buffer + size_t mem_size; // Server Buffer Size in bytes }; @@ -82,13 +86,23 @@ kiro_server_class_init (KiroServerClass *klass) } -static int connect_client (struct kiro_connection *client) +static int connect_client (struct rdma_cm_id *client) { + if(!client) + return -1; + + if( -1 == kiro_attach_qp(client)) + { + printf("Could not create a QP for the new connection.\n"); + rdma_destroy_id(client); + return -1; + } + struct kiro_connection_context *ctx = (struct kiro_connection_context *)calloc(1,sizeof(struct kiro_connection_context)); if(!ctx) { printf("Failed to create connection context.\n"); - rdma_destroy_id(client->id); + rdma_destroy_id(client); return -1; } @@ -100,25 +114,25 @@ static int connect_client (struct kiro_connection *client) goto error; } - ctx->cf_mr_recv = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); - ctx->cf_mr_send = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); + ctx->cf_mr_recv = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); + ctx->cf_mr_send = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); if(!ctx->cf_mr_recv || !ctx->cf_mr_send) { printf("Failed to register control message memory.\n"); goto error; } ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof(struct kiro_ctrl_msg); - client->id->context = ctx; + client->context = ctx; - if(rdma_post_recv(client->id, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) + if(rdma_post_recv(client, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { printf("Posting preemtive receive for connection failed.\n"); goto error; } - if(rdma_accept(client->id, NULL)) + if(rdma_accept(client, NULL)) { - printf("Failed to establish connection to the server.\n"); + printf("Failed to establish connection to the client with error: %i.\n", errno); goto error; } printf("Client Connected.\n"); @@ -126,16 +140,16 @@ static int connect_client (struct kiro_connection *client) error: - rdma_reject(client->id, NULL, 0); + rdma_reject(client, NULL, 0); kiro_destroy_connection_context(&ctx); - rdma_destroy_id(client->id); + rdma_destroy_id(client); return -1; } -static int welcome_client (struct kiro_connection *client, void *mem, size_t mem_size) +static int welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) { - struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->id->context); + struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->context); ctx->rdma_mr = (struct kiro_rdma_mem *)calloc(1, sizeof(struct kiro_rdma_mem)); if(!ctx->rdma_mr) { @@ -145,7 +159,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem ctx->rdma_mr->mem = mem; ctx->rdma_mr->size = mem_size; - ctx->rdma_mr->mr = rdma_reg_read(client->id, ctx->rdma_mr->mem, ctx->rdma_mr->size); + ctx->rdma_mr->mr = rdma_reg_read(client, ctx->rdma_mr->mem, ctx->rdma_mr->size); if(!ctx->rdma_mr->mr) { printf("Failed to register RDMA Memory Region.\n"); @@ -157,7 +171,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem msg->msg_type = KIRO_ACK_RDMA; msg->peer_mri = *(ctx->rdma_mr->mr); - if(rdma_post_send(client->id, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) + if(rdma_post_send(client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { printf("Failure while trying to post SEND.\n"); kiro_destroy_rdma_memory(ctx->rdma_mr); @@ -166,7 +180,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem struct ibv_wc wc; - if(rdma_get_send_comp(client->id, &wc) < 0) + if(rdma_get_send_comp(client, &wc) < 0) { printf("Failed to post RDMA MRI to client.\n"); kiro_destroy_rdma_memory(ctx->rdma_mr); @@ -178,6 +192,71 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem } +void * event_loop (void *self) +{ + KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE((KiroServer *)self); + struct rdma_cm_event *active_event; + + int stop = 0; + + while(0 == stop) { + if(0 <= rdma_get_cm_event(priv->ec, &active_event)) + { + + struct rdma_cm_event *ev = malloc(sizeof(*active_event)); + if(!ev) + { + printf("Unable to allocate memory for Event handling!\n"); + rdma_ack_cm_event(active_event); + continue; + } + memcpy(ev, active_event, sizeof(*active_event)); + rdma_ack_cm_event(active_event); + + if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) + { + + /* + priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection)); + if(!(priv->client)) + { + printf("Failed to create container for client connection.\n"); + free(ev); + continue; + } + priv->client->identifier = 0; //First Client + priv->client->id = ev->id; + */ + + if(0 == connect_client(ev->id)) + { + // Connection set-up successfully! (Server) + // Post a welcoming "Recieve" for handshaking + welcome_client(ev->id, priv->mem, priv->mem_size); + } + } + else if(ev->event == RDMA_CM_EVENT_DISCONNECTED) + { + printf("Got disconnect request.\n"); + //pthread_mutex_unlock(&(priv->mtx)); + kiro_destroy_connection(&(ev->id)); + printf("Connection closed successfully\n"); + } + free(ev); + } + + // Mutex will be freed as a signal to stop request + if(0 == pthread_mutex_trylock(&(priv->mtx))) + stop = 1; + } + + printf("Closing Event Listener Thread\n"); + return NULL; +} + + + + int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, size_t mem_size) { KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE(self); @@ -245,43 +324,10 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s rdma_destroy_ep(priv->base); return -1; } - printf("Enpoint listening.\n"); - - - priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection)); - if(!(priv->client)) - { - printf("Failed to create container for client connection.\n"); - return -1; - } - priv->client->identifier = 0; //First Client - - printf("Waiting for connection request.\n"); - if(rdma_get_request(priv->base, &(priv->client->id))) - { - printf("Failure waiting for clienet connection.\n"); - rdma_destroy_ep(priv->base); - return -1; - } - printf("Connection Request received.\n"); - - - if(connect_client(priv->client)) - { - printf("Client connection failed!\n"); - rdma_destroy_ep(priv->base); - free(priv->client); - return -1; - } - - if(welcome_client(priv->client, mem, mem_size)) - { - printf("Failed to setup client communication.\n"); - kiro_destroy_connection(&(priv->client)); - rdma_destroy_id(priv->base); - return -1; - } + priv->mem = mem; + priv->mem_size = mem_size; + priv->ec = rdma_create_event_channel(); int oldflags = fcntl (priv->ec->fd, F_GETFL, 0); /* Only change the FD Mode if we were able to get its flags */ @@ -293,10 +339,15 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s if(rdma_migrate_id(priv->base, priv->ec)) { printf("Was unable to migrate connection to new Event Channel.\n"); - kiro_destroy_connection(&(priv->client)); - rdma_destroy_id(priv->base); + rdma_destroy_ep(priv->base); return -1; } + + pthread_mutex_init(&(priv->mtx), NULL); + pthread_mutex_lock(&(priv->mtx)); + pthread_create(&(priv->event_listener), NULL, event_loop, self); + + printf("Enpoint listening.\n"); sleep(1); return 0; diff --git a/test-client.c b/test-client.c index 65a3c08..469aa5e 100644 --- a/test-client.c +++ b/test-client.c @@ -38,9 +38,13 @@ int main ( int argc, char *argv[] ) return -1; } KiroClient *client = g_object_new(KIRO_TYPE_CLIENT, NULL); - if(-1 != kiro_client_connect(client, argv[1], argv[2])) - kiro_client_sync(client); + if(-1 == kiro_client_connect(client, argv[1], argv[2])) + { + g_object_unref(client); + return -1; + } + kiro_client_sync(client); KiroTrb *trb = g_object_new(KIRO_TYPE_TRB, NULL); kiro_trb_adopt(trb, kiro_client_get_memory(client)); @@ -67,7 +71,7 @@ int main ( int argc, char *argv[] ) int cont = 1; - struct KiroTrbInfo *header = (struct KiroTrbInfo *)kiro_trb_get_raw_buffer(trb); + //struct KiroTrbInfo *header = (struct KiroTrbInfo *)kiro_trb_get_raw_buffer(trb); while(cont) { -- cgit v1.2.3