nfs-ganesha 1.4
|
00001 /* 00002 * vim:expandtab:shiftwidth=8:tabstop=8: 00003 * 00004 * Copyright CEA/DAM/DIF (2011) 00005 * contributeur : Philippe DENIEL philippe.deniel@cea.fr 00006 * Thomas LEIBOVICI thomas.leibovici@cea.fr 00007 * 00008 * 00009 * This program is free software; you can redistribute it and/or 00010 * modify it under the terms of the GNU Lesser General Public 00011 * License as published by the Free Software Foundation; either 00012 * version 3 of the License, or (at your option) any later version. 00013 * 00014 * This program is distributed in the hope that it will be useful, 00015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 * Lesser General Public License for more details. 00018 * 00019 * You should have received a copy of the GNU Lesser General Public 00020 * License along with this library; if not, write to the Free Software 00021 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 00022 * 00023 * --------------------------------------- 00024 */ 00025 00035 #ifdef HAVE_CONFIG_H 00036 #include "config.h" 00037 #endif 00038 00039 #ifdef _SOLARIS 00040 #include "solaris_port.h" 00041 #endif 00042 00043 #include <stdio.h> 00044 #include <string.h> 00045 #include <pthread.h> 00046 #include <fcntl.h> 00047 #include <sys/file.h> /* for having FNDELAY */ 00048 #include <sys/select.h> 00049 #include <poll.h> 00050 #include "HashData.h" 00051 #include "HashTable.h" 00052 #include "log.h" 00053 #include "abstract_mem.h" 00054 #include "nfs_init.h" 00055 #include "nfs_core.h" 00056 #include "cache_inode.h" 00057 #include "nfs_exports.h" 00058 #include "nfs_creds.h" 00059 #include "nfs_proto_functions.h" 00060 #include "nfs_dupreq.h" 00061 #include "nfs_file_handle.h" 00062 #include "nfs_stat.h" 00063 #include "SemN.h" 00064 #include "9p.h" 00065 00066 #ifndef _USE_TIRPC_IPV6 00067 #define P_FAMILY AF_INET 00068 #else 00069 #define P_FAMILY AF_INET6 00070 #endif 00071 00072 void DispatchWork9P( request_data_t *preq, unsigned int worker_index) 00073 { 00074 LogDebug(COMPONENT_DISPATCH, 00075 "Awaking Worker Thread #%u for 9P request %p, tcpsock=%lu", 00076 worker_index, preq, preq->r_u._9p.pconn->sockfd); 00077 00078 P(workers_data[worker_index].wcb.tcb_mutex); 00079 P(workers_data[worker_index].request_pool_mutex); 00080 00081 glist_add_tail(&workers_data[worker_index].pending_request, &preq->pending_req_queue); 00082 workers_data[worker_index].pending_request_len++; 00083 00084 if(pthread_cond_signal(&(workers_data[worker_index].wcb.tcb_condvar)) == -1) 00085 { 00086 V(workers_data[worker_index].request_pool_mutex); 00087 V(workers_data[worker_index].wcb.tcb_mutex); 00088 LogMajor(COMPONENT_THREAD, 00089 "Error %d (%s) while signalling Worker Thread #%u... Exiting", 00090 errno, strerror(errno), worker_index); 00091 Fatal(); 00092 } 00093 V(workers_data[worker_index].request_pool_mutex); 00094 V(workers_data[worker_index].wcb.tcb_mutex); 00095 } 00096 00097 00110 void * _9p_socket_thread( void * Arg ) 00111 { 00112 long int tcp_sock = (long int)Arg; 00113 int rc = -1 ; 00114 struct pollfd fds[1] ; 00115 int fdcount = 1 ; 00116 static char my_name[MAXNAMLEN]; 00117 struct sockaddr_in addrpeer ; 00118 socklen_t addrpeerlen = sizeof( addrpeer ) ; 00119 char strcaller[MAXNAMLEN] ; 00120 request_data_t *preq = NULL; 00121 unsigned int worker_index; 00122 00123 char * _9pmsg ; 00124 uint32_t * p_9pmsglen = NULL ; 00125 00126 _9p_conn_t _9p_conn ; 00127 00128 int readlen = 0 ; 00129 int total_readlen = 0 ; 00130 00131 snprintf(my_name, MAXNAMLEN, "9p_sock_mgr#fd=%ld", tcp_sock); 00132 SetNameFunction(my_name); 00133 00134 /* Init the _9p_conn_t structure */ 00135 _9p_conn.sockfd = tcp_sock ; 00136 00137 if( gettimeofday( &_9p_conn.birth, NULL ) == -1 ) 00138 LogFatal( COMPONENT_9P, "Can get connection's time of birth" ) ; 00139 00140 if( ( rc = getpeername( tcp_sock, (struct sockaddr *)&addrpeer, &addrpeerlen) ) == -1 ) 00141 { 00142 LogMajor(COMPONENT_9P, 00143 "Cannot get peername to tcp socket for 9p, error %d (%s)", errno, strerror(errno)); 00144 strncpy( strcaller, "(unresolved)", MAXNAMLEN ) ; 00145 } 00146 else 00147 { 00148 snprintf(strcaller, MAXNAMLEN, "0x%x=%d.%d.%d.%d", 00149 ntohl(addrpeer.sin_addr.s_addr), 00150 (ntohl(addrpeer.sin_addr.s_addr) & 0xFF000000) >> 24, 00151 (ntohl(addrpeer.sin_addr.s_addr) & 0x00FF0000) >> 16, 00152 (ntohl(addrpeer.sin_addr.s_addr) & 0x0000FF00) >> 8, 00153 (ntohl(addrpeer.sin_addr.s_addr) & 0x000000FF)); 00154 00155 LogEvent( COMPONENT_9P, "9p socket #%ld is connected to %s", tcp_sock, strcaller ) ; 00156 } 00157 00158 /* Set up the structure used by poll */ 00159 memset( (char *)fds, 0, sizeof( struct pollfd ) ) ; 00160 fds[0].fd = tcp_sock ; 00161 fds[0].events = POLLIN|POLLPRI| POLLRDBAND|POLLRDNORM|POLLRDHUP|POLLHUP|POLLERR|POLLNVAL; 00162 00163 00164 for( ;; ) /* Infinite loop */ 00165 { 00166 00167 if( ( rc = poll( fds, fdcount, -1 ) ) == -1 ) /* timeout = -1 =>Wait indefinitely for incoming events */ 00168 { 00169 /* Interruption if not an issue */ 00170 if( errno == EINTR ) 00171 continue ; 00172 00173 LogCrit( COMPONENT_9P, 00174 "Got error %u (%s) on fd %ld connect to %s while polling on socket", 00175 errno, strerror( errno ), tcp_sock, strcaller ) ; 00176 00177 } 00178 00179 if( fds[0].revents & POLLNVAL ) 00180 { 00181 LogEvent( COMPONENT_9P, "Client %s on socket %lu produced POLLNVAL", strcaller, tcp_sock ) ; 00182 close( tcp_sock ); 00183 return NULL ; 00184 } 00185 00186 if( fds[0].revents & (POLLERR|POLLHUP|POLLRDHUP) ) 00187 { 00188 LogEvent( COMPONENT_9P, "Client %s on socket %lu has shut down and closed", strcaller, tcp_sock ) ; 00189 close( tcp_sock ); 00190 return NULL ; 00191 } 00192 00193 if( fds[0].revents & (POLLIN|POLLRDNORM) ) 00194 { 00195 /* choose a worker depending on its queue length */ 00196 worker_index = nfs_core_select_worker_queue( WORKER_INDEX_ANY ); 00197 00198 /* Get a preq from the worker's pool */ 00199 P(workers_data[worker_index].request_pool_mutex); 00200 00201 preq = pool_alloc( request_pool, NULL ) ; 00202 00203 V(workers_data[worker_index].request_pool_mutex); 00204 00205 /* Prepare to read the message */ 00206 preq->rtype = _9P_REQUEST ; 00207 _9pmsg = preq->r_u._9p._9pmsg ; 00208 preq->r_u._9p.pconn = &_9p_conn ; 00209 00210 /* An incoming 9P request: the msg has a 4 bytes header 00211 showing the size of the msg including the header */ 00212 if( (readlen = recv( fds[0].fd, _9pmsg ,_9P_HDR_SIZE, 0) == _9P_HDR_SIZE ) ) 00213 { 00214 p_9pmsglen = (uint32_t *)_9pmsg ; 00215 00216 LogFullDebug( COMPONENT_9P, 00217 "Received message of size %u from client %s on socket %lu", 00218 *p_9pmsglen, strcaller, tcp_sock ) ; 00219 00220 if( *p_9pmsglen < _9P_HDR_SIZE ) 00221 { 00222 LogEvent( COMPONENT_9P, 00223 "Badly formed 9P message: Header is too small for client %s on socket %lu: readlen=%u expected=%u", 00224 strcaller, tcp_sock, readlen, *p_9pmsglen - _9P_HDR_SIZE ) ; 00225 00226 /* Release the entry */ 00227 P(workers_data[worker_index].request_pool_mutex); 00228 pool_free(request_pool, preq ) ; 00229 00230 workers_data[worker_index].passcounter += 1; 00231 V(workers_data[worker_index].request_pool_mutex); 00232 00233 continue ; /* Maybe, use something smarter here */ 00234 } 00235 00236 total_readlen = 0 ; 00237 while( total_readlen < (*p_9pmsglen - _9P_HDR_SIZE) ) 00238 { 00239 readlen = recv( fds[0].fd, 00240 (char *)(_9pmsg + _9P_HDR_SIZE + total_readlen), 00241 *p_9pmsglen - _9P_HDR_SIZE - total_readlen, 0 ) ; 00242 00243 /* Signal management */ 00244 if( readlen < 0 && errno == EINTR ) 00245 continue ; 00246 00247 /* Error management */ 00248 if( readlen < 0 ) 00249 { 00250 LogEvent( COMPONENT_9P, "Badly formed 9P header for client %s on socket %lu", strcaller, tcp_sock ) ; 00251 00252 /* Release the entry */ 00253 P(workers_data[worker_index].request_pool_mutex); 00254 pool_free( request_pool, preq ) ; 00255 workers_data[worker_index].passcounter += 1; 00256 V(workers_data[worker_index].request_pool_mutex); 00257 00258 continue ; 00259 } 00260 00261 /* After this point, read() is supposed to be OK */ 00262 total_readlen += readlen ; 00263 } /* while */ 00264 00265 /* Message was OK push it the request to the right worker */ 00266 DispatchWork9P(preq, worker_index); 00267 } 00268 else if( readlen == 0 ) 00269 { 00270 LogEvent( COMPONENT_9P, "Client %s on socket %lu has shut down", strcaller, tcp_sock ) ; 00271 close( tcp_sock ); 00272 return NULL ; 00273 } 00274 } /* if( fds[0].revents & (POLLIN|POLLRDNORM) ) */ 00275 } /* for( ;; ) */ 00276 00277 return NULL ; 00278 } /* _9p_socket_thread */ 00279 00290 int _9p_create_socket( void ) 00291 { 00292 int sock = -1 ; 00293 int one = 1 ; 00294 struct sockaddr_in sinaddr; 00295 #ifdef _USE_TIRPC_IPV6 00296 struct sockaddr_in6 sinaddr_tcp6; 00297 struct netbuf netbuf_tcp6; 00298 struct t_bind bindaddr_tcp6; 00299 struct __rpc_sockinfo si_tcp6; 00300 #endif 00301 00302 if( ( sock= socket(P_FAMILY, SOCK_STREAM, IPPROTO_TCP) ) == -1 ) 00303 { 00304 LogFatal(COMPONENT_9P_DISPATCH, 00305 "Cannot allocate a tcp socket for 9p, error %d (%s)", errno, strerror(errno)); 00306 return -1 ; 00307 } 00308 00309 if(setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) 00310 { 00311 LogFatal(COMPONENT_9P_DISPATCH, 00312 "Bad tcp socket options for 9p, error %d (%s)", errno, strerror(errno)); 00313 return -1 ; 00314 } 00315 00316 socket_setoptions(sock); 00317 00318 #ifndef _USE_TIRPC_IPV6 00319 memset( &sinaddr, 0, sizeof(sinaddr)); 00320 sinaddr.sin_family = AF_INET; 00321 sinaddr.sin_addr.s_addr = nfs_param.core_param.bind_addr.sin_addr.s_addr; 00322 sinaddr.sin_port = htons(nfs_param._9p_param._9p_port); 00323 00324 if(bind(sock, (struct sockaddr *)&sinaddr, sizeof(sinaddr)) == -1) 00325 { 00326 LogFatal(COMPONENT_9P_DISPATCH, 00327 "Cannot bind 9p tcp socket, error %d (%s)", 00328 errno, strerror(errno)); 00329 return -1 ; 00330 } 00331 00332 if( listen( sock, 20 ) == -1 ) 00333 { 00334 LogFatal(COMPONENT_DISPATCH, 00335 "Cannot bind 9p socket, error %d (%s)", 00336 errno, strerror(errno)); 00337 return -1 ; 00338 } 00339 #else 00340 memset(&sinaddr_tcp6, 0, sizeof(sinaddr_tcp6)); 00341 sinaddr_tcp6.sin6_family = AF_INET6; 00342 sinaddr_tcp6.sin6_addr = in6addr_any; /* All the interfaces on the machine are used */ 00343 sinaddr_tcp6.sin6_port = htons(nfs_param.core_param._9p_port); 00344 00345 netbuf_tcp6.maxlen = sizeof(sinaddr_tcp6); 00346 netbuf_tcp6.len = sizeof(sinaddr_tcp6); 00347 netbuf_tcp6.buf = &sinaddr_tcp6; 00348 00349 bindaddr_tcp6.qlen = SOMAXCONN; 00350 bindaddr_tcp6.addr = netbuf_tcp6; 00351 00352 if(!__rpc_fd2sockinfo(sock, &si_tcp6)) 00353 { 00354 LogFatal(COMPONENT_DISPATCH, 00355 "Cannot get 9p socket info for tcp6 socket rc=%d errno=%d (%s)", 00356 rc, errno, strerror(errno)); 00357 return -1 ; 00358 } 00359 00360 if(bind( sock, 00361 (struct sockaddr *)bindaddr_tcp6.addr.buf, 00362 (socklen_t) si_nfs_tcp6.si_alen) == -1) 00363 { 00364 LogFatal(COMPONENT_DISPATCH, 00365 "Cannot bind 9p tcp6 socket, error %d (%s)", 00366 errno, strerror(errno)); 00367 return -1 ; 00368 } 00369 00370 if( listen( sock, 20 ) == -1 ) 00371 { 00372 LogFatal(COMPONENT_DISPATCH, 00373 "Cannot bind 9p tcp6 socket, error %d (%s)", 00374 errno, strerror(errno)); 00375 return -1 ; 00376 } 00377 00378 #endif 00379 00380 return sock ; 00381 } /* _9p_create_socket */ 00382 00393 void _9p_dispatcher_svc_run( long int sock ) 00394 { 00395 int rc = 0; 00396 struct sockaddr_in addr; 00397 socklen_t addrlen = sizeof( addr ) ; 00398 long int newsock = -1 ; 00399 pthread_attr_t attr_thr; 00400 pthread_t tcp_thrid ; 00401 00402 /* Init for thread parameter (mostly for scheduling) */ 00403 if(pthread_attr_init(&attr_thr) != 0) 00404 LogDebug(COMPONENT_9P_DISPATCH, "can't init pthread's attributes"); 00405 00406 if(pthread_attr_setscope(&attr_thr, PTHREAD_SCOPE_SYSTEM) != 0) 00407 LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's scope"); 00408 00409 if(pthread_attr_setdetachstate(&attr_thr, PTHREAD_CREATE_JOINABLE) != 0) 00410 LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's join state"); 00411 00412 if(pthread_attr_setstacksize(&attr_thr, THREAD_STACK_SIZE) != 0) 00413 LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's stack size"); 00414 00415 00416 LogEvent( COMPONENT_9P_DISPATCH, "9P dispatcher started" ) ; 00417 while(TRUE) 00418 { 00419 if( ( newsock = accept( sock, (struct sockaddr *)&addr, &addrlen ) ) < 0 ) 00420 { 00421 LogCrit( COMPONENT_9P_DISPATCH, "accept failed" ) ; 00422 continue ; 00423 } 00424 00425 /* Starting the thread dedicated to signal handling */ 00426 if( ( rc = pthread_create( &tcp_thrid, &attr_thr, _9p_socket_thread, (void *)newsock ) ) != 0 ) 00427 { 00428 LogFatal(COMPONENT_THREAD, 00429 "Could not create 9p socket manager thread, error = %d (%s)", 00430 errno, strerror(errno)); 00431 } 00432 } /* while */ 00433 return; 00434 } /* _9p_dispatcher_svc_run */ 00435 00436 00448 void * _9p_dispatcher_thread(void *Arg) 00449 { 00450 int _9p_socket = -1 ; 00451 00452 SetNameFunction("_9p_dispatch_thr"); 00453 00454 /* Calling dispatcher main loop */ 00455 LogInfo(COMPONENT_9P_DISPATCH, 00456 "Entering nfs/rpc dispatcher"); 00457 00458 LogDebug(COMPONENT_9P_DISPATCH, 00459 "My pthread id is %p", (caddr_t) pthread_self()); 00460 00461 /* Set up the _9p_socket */ 00462 if( ( _9p_socket = _9p_create_socket() ) == -1 ) 00463 { 00464 LogCrit( COMPONENT_9P_DISPATCH, 00465 "Can't get socket for 9p dispatcher" ) ; 00466 exit( 1 ) ; 00467 } 00468 00469 _9p_dispatcher_svc_run( _9p_socket ); 00470 00471 return NULL; 00472 } /* _9p_dispatcher_thread */ 00473