nfs-ganesha 1.4

9p_dispatcher.c

Go to the documentation of this file.
00001 /*
00002  * vim:expandtab:shiftwidth=8:tabstop=8:
00003  *
00004  * Copyright CEA/DAM/DIF  (2011)
00005  * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
00006  *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
00007  *
00008  *
00009  * This program is free software; you can redistribute it and/or
00010  * modify it under the terms of the GNU Lesser General Public
00011  * License as published by the Free Software Foundation; either
00012  * version 3 of the License, or (at your option) any later version.
00013  *
00014  * This program is distributed in the hope that it will be useful,
00015  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017  * Lesser General Public License for more details.
00018  *
00019  * You should have received a copy of the GNU Lesser General Public
00020  * License along with this library; if not, write to the Free Software
00021  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
00022  *
00023  * ---------------------------------------
00024  */
00025 
00035 #ifdef HAVE_CONFIG_H
00036 #include "config.h"
00037 #endif
00038 
00039 #ifdef _SOLARIS
00040 #include "solaris_port.h"
00041 #endif
00042 
00043 #include <stdio.h>
00044 #include <string.h>
00045 #include <pthread.h>
00046 #include <fcntl.h>
00047 #include <sys/file.h>           /* for having FNDELAY */
00048 #include <sys/select.h>
00049 #include <poll.h>
00050 #include "HashData.h"
00051 #include "HashTable.h"
00052 #include "log.h"
00053 #include "abstract_mem.h"
00054 #include "nfs_init.h"
00055 #include "nfs_core.h"
00056 #include "cache_inode.h"
00057 #include "nfs_exports.h"
00058 #include "nfs_creds.h"
00059 #include "nfs_proto_functions.h"
00060 #include "nfs_dupreq.h"
00061 #include "nfs_file_handle.h"
00062 #include "nfs_stat.h"
00063 #include "SemN.h"
00064 #include "9p.h"
00065 
00066 #ifndef _USE_TIRPC_IPV6
00067   #define P_FAMILY AF_INET
00068 #else
00069   #define P_FAMILY AF_INET6
00070 #endif
00071 
00072 void DispatchWork9P( request_data_t *preq, unsigned int worker_index)
00073 {
00074   LogDebug(COMPONENT_DISPATCH,
00075            "Awaking Worker Thread #%u for 9P request %p, tcpsock=%lu",
00076            worker_index, preq, preq->r_u._9p.pconn->sockfd);
00077 
00078   P(workers_data[worker_index].wcb.tcb_mutex);
00079   P(workers_data[worker_index].request_pool_mutex);
00080 
00081   glist_add_tail(&workers_data[worker_index].pending_request, &preq->pending_req_queue);
00082   workers_data[worker_index].pending_request_len++;
00083 
00084   if(pthread_cond_signal(&(workers_data[worker_index].wcb.tcb_condvar)) == -1)
00085     {
00086       V(workers_data[worker_index].request_pool_mutex);
00087       V(workers_data[worker_index].wcb.tcb_mutex);
00088       LogMajor(COMPONENT_THREAD,
00089                "Error %d (%s) while signalling Worker Thread #%u... Exiting",
00090                errno, strerror(errno), worker_index);
00091       Fatal();
00092     }
00093   V(workers_data[worker_index].request_pool_mutex);
00094   V(workers_data[worker_index].wcb.tcb_mutex);
00095 }
00096 
00097 
00110 void * _9p_socket_thread( void * Arg )
00111 {
00112   long int tcp_sock = (long int)Arg;
00113   int rc = -1 ;
00114   struct pollfd fds[1] ;
00115   int fdcount = 1 ;
00116   static char my_name[MAXNAMLEN];
00117   struct sockaddr_in addrpeer ;
00118   socklen_t addrpeerlen = sizeof( addrpeer ) ;
00119   char strcaller[MAXNAMLEN] ;
00120   request_data_t *preq = NULL;
00121   unsigned int worker_index;
00122 
00123   char * _9pmsg ;
00124   uint32_t * p_9pmsglen = NULL ;
00125 
00126   _9p_conn_t _9p_conn ;
00127 
00128   int readlen = 0  ;
00129   int total_readlen = 0  ;
00130 
00131   snprintf(my_name, MAXNAMLEN, "9p_sock_mgr#fd=%ld", tcp_sock);
00132   SetNameFunction(my_name);
00133 
00134   /* Init the _9p_conn_t structure */
00135   _9p_conn.sockfd = tcp_sock ;
00136 
00137   if( gettimeofday( &_9p_conn.birth, NULL ) == -1 )
00138    LogFatal( COMPONENT_9P, "Can get connection's time of birth" ) ;
00139 
00140   if( ( rc =  getpeername( tcp_sock, (struct sockaddr *)&addrpeer, &addrpeerlen) ) == -1 )
00141    {
00142       LogMajor(COMPONENT_9P,
00143                "Cannot get peername to tcp socket for 9p, error %d (%s)", errno, strerror(errno));
00144       strncpy( strcaller, "(unresolved)", MAXNAMLEN ) ;
00145    }
00146   else
00147    {
00148      snprintf(strcaller, MAXNAMLEN, "0x%x=%d.%d.%d.%d",
00149               ntohl(addrpeer.sin_addr.s_addr),
00150              (ntohl(addrpeer.sin_addr.s_addr) & 0xFF000000) >> 24,
00151              (ntohl(addrpeer.sin_addr.s_addr) & 0x00FF0000) >> 16,
00152              (ntohl(addrpeer.sin_addr.s_addr) & 0x0000FF00) >> 8,
00153              (ntohl(addrpeer.sin_addr.s_addr) & 0x000000FF));
00154 
00155      LogEvent( COMPONENT_9P, "9p socket #%ld is connected to %s", tcp_sock, strcaller ) ; 
00156    }
00157 
00158   /* Set up the structure used by poll */
00159   memset( (char *)fds, 0, sizeof( struct pollfd ) ) ;
00160   fds[0].fd = tcp_sock ;
00161   fds[0].events = POLLIN|POLLPRI| POLLRDBAND|POLLRDNORM|POLLRDHUP|POLLHUP|POLLERR|POLLNVAL;
00162 
00163 
00164   for( ;; ) /* Infinite loop */
00165    {
00166     
00167      if( ( rc = poll( fds, fdcount, -1 ) ) == -1 ) /* timeout = -1 =>Wait indefinitely for incoming events */
00168       {
00169          /* Interruption if not an issue */
00170          if( errno == EINTR )
00171            continue ;
00172 
00173          LogCrit( COMPONENT_9P,
00174                   "Got error %u (%s) on fd %ld connect to %s while polling on socket", 
00175                   errno, strerror( errno ), tcp_sock, strcaller ) ;
00176 
00177       }
00178 
00179      if( fds[0].revents & POLLNVAL )
00180       {
00181         LogEvent( COMPONENT_9P, "Client %s on socket %lu produced POLLNVAL", strcaller, tcp_sock ) ;
00182                   close( tcp_sock );
00183         return NULL ;
00184       }
00185 
00186      if( fds[0].revents & (POLLERR|POLLHUP|POLLRDHUP) )
00187       {
00188         LogEvent( COMPONENT_9P, "Client %s on socket %lu has shut down and closed", strcaller, tcp_sock ) ;
00189                   close( tcp_sock );
00190         return NULL ;
00191       }
00192 
00193      if( fds[0].revents & (POLLIN|POLLRDNORM) )
00194       {
00195         /* choose a worker depending on its queue length */
00196         worker_index = nfs_core_select_worker_queue( WORKER_INDEX_ANY );
00197 
00198         /* Get a preq from the worker's pool */
00199         P(workers_data[worker_index].request_pool_mutex);
00200 
00201         preq = pool_alloc( request_pool, NULL ) ;
00202 
00203         V(workers_data[worker_index].request_pool_mutex);
00204 
00205         /* Prepare to read the message */
00206         preq->rtype = _9P_REQUEST ;
00207         _9pmsg = preq->r_u._9p._9pmsg ;
00208         preq->r_u._9p.pconn = &_9p_conn ;
00209 
00210         /* An incoming 9P request: the msg has a 4 bytes header
00211            showing the size of the msg including the header */
00212         if( (readlen = recv( fds[0].fd, _9pmsg ,_9P_HDR_SIZE, 0) == _9P_HDR_SIZE ) )
00213          {
00214            p_9pmsglen = (uint32_t *)_9pmsg ;
00215 
00216             LogFullDebug( COMPONENT_9P,
00217                           "Received message of size %u from client %s on socket %lu",
00218                           *p_9pmsglen, strcaller, tcp_sock ) ;
00219 
00220             if( *p_9pmsglen < _9P_HDR_SIZE ) 
00221               {
00222                 LogEvent( COMPONENT_9P, 
00223                           "Badly formed 9P message: Header is too small for client %s on socket %lu: readlen=%u expected=%u", 
00224                           strcaller, tcp_sock, readlen, *p_9pmsglen - _9P_HDR_SIZE ) ;
00225 
00226                 /* Release the entry */
00227                 P(workers_data[worker_index].request_pool_mutex);
00228                 pool_free(request_pool, preq ) ;
00229 
00230                 workers_data[worker_index].passcounter += 1;
00231                 V(workers_data[worker_index].request_pool_mutex);
00232 
00233                 continue ; /* Maybe, use something smarter here */
00234               }
00235 
00236             total_readlen = 0 ;
00237             while( total_readlen < (*p_9pmsglen - _9P_HDR_SIZE) )
00238              {
00239                  readlen = recv( fds[0].fd,
00240                                  (char *)(_9pmsg + _9P_HDR_SIZE + total_readlen),  
00241                                  *p_9pmsglen - _9P_HDR_SIZE - total_readlen, 0 ) ;
00242                
00243                  /* Signal management */
00244                  if( readlen < 0 && errno == EINTR )
00245                     continue ;
00246 
00247                  /* Error management */
00248                  if( readlen < 0 )  
00249                   {
00250                     LogEvent( COMPONENT_9P, "Badly formed 9P header for client %s on socket %lu", strcaller, tcp_sock ) ;
00251 
00252                     /* Release the entry */
00253                     P(workers_data[worker_index].request_pool_mutex);
00254                     pool_free( request_pool, preq ) ;
00255                     workers_data[worker_index].passcounter += 1;
00256                     V(workers_data[worker_index].request_pool_mutex);
00257 
00258                     continue ;
00259                   }
00260                  
00261                  /* After this point, read() is supposed to be OK */
00262                  total_readlen += readlen ;
00263              } /* while */
00264              
00265              /* Message was OK push it the request to the right worker */
00266              DispatchWork9P(preq, worker_index);
00267          }
00268         else if( readlen == 0 )
00269          {
00270            LogEvent( COMPONENT_9P, "Client %s on socket %lu has shut down", strcaller, tcp_sock ) ;
00271            close( tcp_sock );
00272            return NULL ;
00273          }
00274       } /* if( fds[0].revents & (POLLIN|POLLRDNORM) ) */
00275    } /* for( ;; ) */
00276  
00277   return NULL ;
00278 } /* _9p_socket_thread */
00279 
00290 int _9p_create_socket( void )
00291 {
00292   int sock = -1 ;
00293   int one = 1 ;
00294   struct sockaddr_in sinaddr;
00295 #ifdef _USE_TIRPC_IPV6
00296   struct sockaddr_in6 sinaddr_tcp6;
00297   struct netbuf netbuf_tcp6;
00298   struct t_bind bindaddr_tcp6;
00299   struct __rpc_sockinfo si_tcp6;
00300 #endif
00301 
00302   if( ( sock= socket(P_FAMILY, SOCK_STREAM, IPPROTO_TCP) ) == -1 )
00303     {
00304           LogFatal(COMPONENT_9P_DISPATCH,
00305                    "Cannot allocate a tcp socket for 9p, error %d (%s)", errno, strerror(errno));
00306           return -1 ;
00307     }
00308 
00309   if(setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
00310     {
00311         LogFatal(COMPONENT_9P_DISPATCH,
00312                  "Bad tcp socket options for 9p, error %d (%s)", errno, strerror(errno));
00313         return -1 ;
00314     }
00315 
00316   socket_setoptions(sock);
00317 
00318 #ifndef _USE_TIRPC_IPV6
00319   memset( &sinaddr, 0, sizeof(sinaddr));
00320   sinaddr.sin_family      = AF_INET;
00321   sinaddr.sin_addr.s_addr = nfs_param.core_param.bind_addr.sin_addr.s_addr;
00322   sinaddr.sin_port        = htons(nfs_param._9p_param._9p_port);
00323 
00324   if(bind(sock, (struct sockaddr *)&sinaddr, sizeof(sinaddr)) == -1)
00325    {
00326      LogFatal(COMPONENT_9P_DISPATCH,
00327               "Cannot bind 9p tcp socket, error %d (%s)",
00328               errno, strerror(errno));
00329      return -1 ;
00330    }
00331 
00332   if( listen( sock, 20 ) == -1 )
00333    {
00334        LogFatal(COMPONENT_DISPATCH,
00335                 "Cannot bind 9p socket, error %d (%s)",
00336                 errno, strerror(errno));
00337        return -1 ;
00338    }
00339 #else
00340   memset(&sinaddr_tcp6, 0, sizeof(sinaddr_tcp6));
00341   sinaddr_tcp6.sin6_family = AF_INET6;
00342   sinaddr_tcp6.sin6_addr   = in6addr_any;     /* All the interfaces on the machine are used */
00343   sinaddr_tcp6.sin6_port   = htons(nfs_param.core_param._9p_port);
00344 
00345   netbuf_tcp6.maxlen = sizeof(sinaddr_tcp6);
00346   netbuf_tcp6.len    = sizeof(sinaddr_tcp6);
00347   netbuf_tcp6.buf    = &sinaddr_tcp6;
00348 
00349   bindaddr_tcp6.qlen = SOMAXCONN;
00350   bindaddr_tcp6.addr = netbuf_tcp6;
00351 
00352   if(!__rpc_fd2sockinfo(sock, &si_tcp6))
00353    {
00354      LogFatal(COMPONENT_DISPATCH,
00355               "Cannot get 9p socket info for tcp6 socket rc=%d errno=%d (%s)",
00356               rc, errno, strerror(errno));
00357      return -1 ;
00358    }
00359 
00360   if(bind( sock,
00361           (struct sockaddr *)bindaddr_tcp6.addr.buf,
00362           (socklen_t) si_nfs_tcp6.si_alen) == -1)
00363    {
00364        LogFatal(COMPONENT_DISPATCH,
00365                 "Cannot bind 9p tcp6 socket, error %d (%s)",
00366                 errno, strerror(errno));
00367        return -1 ;
00368    }
00369 
00370   if( listen( sock, 20 ) == -1 )
00371    {
00372        LogFatal(COMPONENT_DISPATCH,
00373                 "Cannot bind 9p tcp6 socket, error %d (%s)",
00374                 errno, strerror(errno));
00375        return -1 ;
00376    }
00377 
00378 #endif
00379 
00380   return sock ;
00381 } /* _9p_create_socket */
00382 
00393 void _9p_dispatcher_svc_run( long int sock )
00394 {
00395   int rc = 0;
00396   struct sockaddr_in addr;
00397   socklen_t addrlen = sizeof( addr ) ;
00398   long int newsock = -1 ;
00399   pthread_attr_t attr_thr;
00400   pthread_t tcp_thrid ;
00401 
00402   /* Init for thread parameter (mostly for scheduling) */
00403   if(pthread_attr_init(&attr_thr) != 0)
00404     LogDebug(COMPONENT_9P_DISPATCH, "can't init pthread's attributes");
00405 
00406   if(pthread_attr_setscope(&attr_thr, PTHREAD_SCOPE_SYSTEM) != 0)
00407     LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's scope");
00408 
00409   if(pthread_attr_setdetachstate(&attr_thr, PTHREAD_CREATE_JOINABLE) != 0)
00410     LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's join state");
00411 
00412   if(pthread_attr_setstacksize(&attr_thr, THREAD_STACK_SIZE) != 0)
00413     LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's stack size");
00414 
00415 
00416   LogEvent( COMPONENT_9P_DISPATCH, "9P dispatcher started" ) ;
00417   while(TRUE)
00418     {
00419       if( ( newsock = accept( sock, (struct sockaddr *)&addr, &addrlen ) ) < 0 )
00420        {
00421          LogCrit( COMPONENT_9P_DISPATCH, "accept failed" ) ;
00422          continue ; 
00423        }
00424 
00425       /* Starting the thread dedicated to signal handling */
00426       if( ( rc = pthread_create( &tcp_thrid, &attr_thr, _9p_socket_thread, (void *)newsock ) ) != 0 )
00427        {
00428          LogFatal(COMPONENT_THREAD,
00429                   "Could not create 9p socket manager thread, error = %d (%s)",
00430                   errno, strerror(errno));
00431        }
00432     }                           /* while */
00433   return;
00434 }                               /* _9p_dispatcher_svc_run */ 
00435 
00436 
00448 void * _9p_dispatcher_thread(void *Arg)
00449 {
00450   int _9p_socket = -1 ;
00451 
00452   SetNameFunction("_9p_dispatch_thr");
00453 
00454   /* Calling dispatcher main loop */
00455   LogInfo(COMPONENT_9P_DISPATCH,
00456           "Entering nfs/rpc dispatcher");
00457 
00458   LogDebug(COMPONENT_9P_DISPATCH,
00459            "My pthread id is %p", (caddr_t) pthread_self());
00460 
00461   /* Set up the _9p_socket */
00462   if( ( _9p_socket =  _9p_create_socket() ) == -1 )
00463    {
00464      LogCrit( COMPONENT_9P_DISPATCH,
00465               "Can't get socket for 9p dispatcher" ) ;
00466      exit( 1 ) ;
00467    }
00468 
00469   _9p_dispatcher_svc_run( _9p_socket );
00470 
00471   return NULL;
00472 }                               /* _9p_dispatcher_thread */
00473