nfs-ganesha 1.4

nfs_rpc_dispatcher_thread.c

Go to the documentation of this file.
00001 /*
00002  * vim:expandtab:shiftwidth=8:tabstop=8:
00003  *
00004  * Copyright CEA/DAM/DIF  (2008)
00005  * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
00006  *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
00007  *
00008  *
00009  * This program is free software; you can redistribute it and/or
00010  * modify it under the terms of the GNU Lesser General Public
00011  * License as published by the Free Software Foundation; either
00012  * version 3 of the License, or (at your option) any later version.
00013  *
00014  * This program is distributed in the hope that it will be useful,
00015  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017  * Lesser General Public License for more details.
00018  *
00019  * You should have received a copy of the GNU Lesser General Public
00020  * License along with this library; if not, write to the Free Software
00021  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
00022  *
00023  * ---------------------------------------
00024  */
00025 
00037 #ifdef HAVE_CONFIG_H
00038 #include "config.h"
00039 #endif
00040 
00041 #ifdef _SOLARIS
00042 #include "solaris_port.h"
00043 #endif
00044 
00045 #include <stdio.h>
00046 #include <string.h>
00047 #include <pthread.h>
00048 #include <fcntl.h>
00049 #include <sys/file.h>           /* for having FNDELAY */
00050 #include <sys/select.h>
00051 #include <poll.h>
00052 #include <assert.h>
00053 #include "HashData.h"
00054 #include "HashTable.h"
00055 #include "log.h"
00056 #include "ganesha_rpc.h"
00057 #include "nfs23.h"
00058 #include "nfs4.h"
00059 #include "mount.h"
00060 #include "nlm4.h"
00061 #include "rquota.h"
00062 #include "nfs_init.h"
00063 #include "nfs_core.h"
00064 #include "cache_inode.h"
00065 #include "nfs_exports.h"
00066 #include "nfs_creds.h"
00067 #include "nfs_proto_functions.h"
00068 #include "nfs_dupreq.h"
00069 #include "nfs_file_handle.h"
00070 #include "nfs_stat.h"
00071 #include "SemN.h"
00072 #include "nfs_tcb.h"
00073 
00074 #ifndef _USE_TIRPC_IPV6
00075   #define P_FAMILY AF_INET
00076 #else
00077   #define P_FAMILY AF_INET6
00078 #endif
00079 
00080 static pthread_mutex_t lock_worker_selection = PTHREAD_MUTEX_INITIALIZER;
00081 
00082 /* TI-RPC event channels.  Each channel is a thread servicing an event
00083  * demultiplexer. */
00084 
00085 struct rpc_evchan {
00086     uint32_t chan_id;
00087     pthread_t thread_id;
00088 };
00089 
00090 #define N_TCP_EVENT_CHAN  3 /* we don't really want to have too many, relative to the
00091                              * number of available cores. */
00092 #define UDP_EVENT_CHAN    0 /* put udp on a dedicated channel */
00093 #define TCP_RDVS_CHAN     1 /* accepts new tcp connections */
00094 #define TCP_EVCHAN_0      2
00095 #define N_EVENT_CHAN N_TCP_EVENT_CHAN + 2
00096 
00097 static struct rpc_evchan rpc_evchan[N_EVENT_CHAN];
00098 
00099 static u_int nfs_rpc_rdvs(SVCXPRT *xprt, SVCXPRT *newxprt, const u_int flags,
00100                           void *u_data);
00101 static bool_t nfs_rpc_getreq_ng(SVCXPRT *xprt /*, int chan_id */);
00102 static void nfs_rpc_free_xprt(SVCXPRT *xprt);
00103 
00115 void nfs_rpc_dispatch_dummy(struct svc_req *ptr_req, SVCXPRT * ptr_svc)
00116 {
00117   LogMajor(COMPONENT_DISPATCH,
00118            "NFS DISPATCH DUMMY: Possible error, function nfs_rpc_dispatch_dummy should never be called");
00119   return;
00120 }                               /* nfs_rpc_dispatch_dummy */
00121 
00122 const char *tags[] = {
00123   "NFS",
00124   "MNT",
00125 #ifdef _USE_NLM
00126   "NLM",
00127 #endif
00128 #ifdef _USE_RQUOTA
00129   "RQUOTA",
00130 #endif
00131 };
00132 
00133 typedef struct proto_data
00134 {
00135   struct sockaddr_in sinaddr;
00136 #ifdef _USE_TIRPC_IPV6
00137   struct sockaddr_in6 sinaddr_udp6;
00138   struct sockaddr_in6 sinaddr_tcp6;
00139   struct netbuf netbuf_udp6;
00140   struct netbuf netbuf_tcp6;
00141   struct t_bind bindaddr_udp6;
00142   struct t_bind bindaddr_tcp6;
00143   struct __rpc_sockinfo si_udp6;
00144   struct __rpc_sockinfo si_tcp6;
00145 #endif
00146 } proto_data;
00147 
00148 proto_data pdata[P_COUNT];
00149 
00150 struct netconfig *netconfig_udpv4;
00151 struct netconfig *netconfig_tcpv4;
00152 #ifdef _USE_TIRPC_IPV6
00153 struct netconfig *netconfig_udpv6;
00154 struct netconfig *netconfig_tcpv6;
00155 #endif
00156 
00157 /* RPC Service Sockets and Transports */
00158 int udp_socket[P_COUNT];
00159 int tcp_socket[P_COUNT];
00160 SVCXPRT *udp_xprt[P_COUNT];
00161 SVCXPRT *tcp_xprt[P_COUNT];
00162 
00167 static void unregister(const rpcprog_t prog, const rpcvers_t vers1, const rpcvers_t vers2)
00168 {
00169   rpcvers_t vers;
00170   for(vers = vers1; vers <= vers2; vers++)
00171    {
00172      rpcb_unset(prog, vers, netconfig_udpv4);
00173      rpcb_unset(prog, vers, netconfig_tcpv4);
00174 #ifdef _USE_TIRPC_IPV6
00175      rpcb_unset(prog, vers, netconfig_udpv6);
00176      rpcb_unset(prog, vers, netconfig_tcpv6);
00177 #endif
00178    }
00179 }
00180 
00181 static void unregister_rpc(void)
00182 {
00183   unregister(nfs_param.core_param.program[P_NFS], NFS_V2, NFS_V4);
00184   unregister(nfs_param.core_param.program[P_MNT], MOUNT_V1, MOUNT_V3);
00185 #ifdef _USE_NLM
00186   unregister(nfs_param.core_param.program[P_NLM], 1, NLM4_VERS);
00187 #endif
00188 #ifdef _USE_RQUOTA
00189   unregister(nfs_param.core_param.program[P_RQUOTA], RQUOTAVERS, EXT_RQUOTAVERS);
00190 #endif
00191 }
00192 
00193 #ifdef _USE_NLM
00194 #define test_for_additional_nfs_protocols(p) \
00195   ((p != P_MNT && p != P_NLM) || \
00196   (nfs_param.core_param.core_options & (CORE_OPTION_NFSV2 | CORE_OPTION_NFSV3)) != 0)
00197 #else
00198 #define test_for_additional_nfs_protocols(p) \
00199   (p != P_MNT || \
00200   (nfs_param.core_param.core_options & (CORE_OPTION_NFSV2 | CORE_OPTION_NFSV3)) != 0)
00201 #endif
00202 
00212 static void close_rpc_fd()
00213 {
00214     protos p;
00215 
00216     for(p = P_NFS; p < P_COUNT; p++) {
00217         if (udp_socket[p] != -1) {
00218             close(udp_socket[p]);
00219         }
00220         if (tcp_socket[p] != -1) {
00221             close(tcp_socket[p]);
00222         }
00223     }
00224 }
00225 
00226 
00227 void Create_udp(protos prot)
00228 {
00229     udp_xprt[prot] = svc_dg_create(udp_socket[prot],
00230                                    nfs_param.core_param.max_send_buffer_size,
00231                                    nfs_param.core_param.max_recv_buffer_size);
00232     if(udp_xprt[prot] == NULL)
00233         LogFatal(COMPONENT_DISPATCH,
00234                  "Cannot allocate %s/UDP SVCXPRT", tags[prot]);
00235 
00236     /* Hook xp_getreq */
00237     (void) SVC_CONTROL(udp_xprt[prot], SVCSET_XP_GETREQ, nfs_rpc_getreq_ng);
00238 
00239     /* Hook xp_free_xprt (finalize/free private data) */
00240     (void) SVC_CONTROL(udp_xprt[prot], SVCSET_XP_FREE_XPRT, nfs_rpc_free_xprt);
00241 
00242     /* Setup private data */
00243     (udp_xprt[prot])->xp_u1 = alloc_gsh_xprt_private(XPRT_PRIVATE_FLAG_REF);
00244 
00245     /* bind xprt to channel--unregister it from the global event
00246      * channel (if applicable) */
00247     (void) svc_rqst_evchan_reg(rpc_evchan[UDP_EVENT_CHAN].chan_id, udp_xprt[prot],
00248                                SVC_RQST_FLAG_XPRT_UREG);
00249 
00250     /* XXXX why are we doing this?  Is it also stale (see below)? */
00251 #ifdef _USE_TIRPC_IPV6
00252     udp_xprt[prot]->xp_netid = Str_Dup(netconfig_udpv6->nc_netid);
00253     udp_xprt[prot]->xp_tp    = Str_Dup(netconfig_udpv6->nc_device);
00254 #endif
00255 }
00256 
00257 void Create_tcp(protos prot)
00258 {
00259 #if 0
00260     /* XXXX By itself, non-block mode will currently stall, so, we probably
00261      * will remove this. */
00262     int maxrec = nfs_param.core_param.max_recv_buffer_size;
00263     rpc_control(RPC_SVC_CONNMAXREC_SET, &maxrec);
00264 #endif
00265 
00266     tcp_xprt[prot] = svc_vc_create2(tcp_socket[prot],
00267                                     nfs_param.core_param.max_send_buffer_size,
00268                                     nfs_param.core_param.max_recv_buffer_size,
00269                                     SVC_VC_CREATE_FLAG_LISTEN);
00270     if(tcp_xprt[prot] == NULL)
00271         LogFatal(COMPONENT_DISPATCH,
00272                  "Cannot allocate %s/TCP SVCXPRT", tags[prot]);
00273 
00274     /* bind xprt to channel--unregister it from the global event
00275      * channel (if applicable) */
00276     (void) svc_rqst_evchan_reg(rpc_evchan[TCP_RDVS_CHAN].chan_id,
00277                                tcp_xprt[prot], SVC_RQST_FLAG_XPRT_UREG);
00278 
00279     /* Hook xp_getreq */
00280     (void) SVC_CONTROL(tcp_xprt[prot], SVCSET_XP_GETREQ, nfs_rpc_getreq_ng);
00281 
00282     /* Hook xp_rdvs -- allocate new xprts to event channels */
00283     (void) SVC_CONTROL(tcp_xprt[prot], SVCSET_XP_RDVS, nfs_rpc_rdvs);
00284 
00285     /* Hook xp_free_xprt (finalize/free private data) */
00286     (void) SVC_CONTROL(tcp_xprt[prot], SVCSET_XP_FREE_XPRT, nfs_rpc_free_xprt);
00287 
00288     /* Setup private data */
00289     (tcp_xprt[prot])->xp_u1 = alloc_gsh_xprt_private(XPRT_PRIVATE_FLAG_REF);
00290 
00291 /* XXXX the following code cannot compile (socket, binadaddr_udp6 are gone)
00292  * (Matt) */
00293 #ifdef _USE_TIRPC_IPV6
00294     if(listen(socket, pdata[prot].bindaddr_udp6.qlen) != 0)
00295         LogFatal(COMPONENT_DISPATCH,
00296                  "Cannot listen on  %s/TCPv6 SVCXPRT, errno=%u (%s)",
00297                  tags[prot], errno, strerror(errno));
00298     /* XXX what if we errored above? */
00299     tcp_xprt[prot]->xp_netid = Str_Dup(netconfig_tcpv6->nc_netid);
00300     tcp_xprt[prot]->xp_tp    = Str_Dup(netconfig_tcpv6->nc_device);
00301 #endif
00302 }
00303 
00308 void Create_SVCXPRTs(void)
00309 {
00310   protos p;
00311 
00312   LogFullDebug(COMPONENT_DISPATCH, "Allocation of the SVCXPRT");
00313   for(p = P_NFS; p < P_COUNT; p++)
00314     if(test_for_additional_nfs_protocols(p))
00315       {
00316         Create_udp(p);
00317         Create_tcp(p);
00318       }
00319 }
00320 
00325 void Bind_sockets(void)
00326 {
00327   protos p;
00328 
00329   for(p = P_NFS; p < P_COUNT; p++)
00330     if(test_for_additional_nfs_protocols(p))
00331       {
00332         proto_data *pdatap = &pdata[p];
00333 #ifndef _USE_TIRPC_IPV6
00334         memset(&pdatap->sinaddr, 0, sizeof(pdatap->sinaddr));
00335         pdatap->sinaddr.sin_family      = AF_INET;
00336         pdatap->sinaddr.sin_addr.s_addr = nfs_param.core_param.bind_addr.sin_addr.s_addr;
00337         pdatap->sinaddr.sin_port        = htons(nfs_param.core_param.port[p]);
00338 
00339         if(bind(udp_socket[p],
00340                 (struct sockaddr *)&pdatap->sinaddr, sizeof(pdatap->sinaddr)) == -1)
00341           LogFatal(COMPONENT_DISPATCH,
00342                    "Cannot bind %s udp socket, error %d (%s)",
00343                    tags[p], errno, strerror(errno));
00344 
00345         if(bind(tcp_socket[p],
00346                 (struct sockaddr *)&pdatap->sinaddr, sizeof(pdatap->sinaddr)) == -1)
00347           LogFatal(COMPONENT_DISPATCH,
00348                    "Cannot bind %s tcp socket, error %d (%s)",
00349                    tags[p], errno, strerror(errno));
00350 #else
00351         memset(&pdatap->sinaddr_udp6, 0, sizeof(pdatap->sinaddr_udp6));
00352         pdatap->sinaddr_udp6.sin6_family = AF_INET6;
00353         pdatap->sinaddr_udp6.sin6_addr   = in6addr_any;     /* All the interfaces on the machine are used */
00354         pdatap->sinaddr_udp6.sin6_port   = htons(nfs_param.core_param.port[p]);
00355 
00356         pdatap->netbuf_udp6.maxlen = sizeof(pdatap->sinaddr_udp6);
00357         pdatap->netbuf_udp6.len    = sizeof(pdatap->sinaddr_udp6);
00358         pdatap->netbuf_udp6.buf    = &pdatap->sinaddr_udp6;
00359 
00360         pdatap->bindaddr_udp6.qlen = SOMAXCONN;
00361         pdatap->bindaddr_udp6.addr = pdatap->netbuf_udp6;
00362 
00363         if(!__rpc_fd2sockinfo(udp_socket[p], &pdatap->si_udp6))
00364           LogFatal(COMPONENT_DISPATCH,
00365                    "Cannot get %s socket info for udp6 socket rc=%d errno=%d (%s)",
00366                    tags[p], rc, errno, strerror(errno));
00367 
00368         if(bind(udp_socket[p],
00369                 (struct sockaddr *)pdatap->bindaddr_udp6.addr.buf,
00370                 (socklen_t) si_nfs_udp6.si_alen) == -1)
00371           LogFatal(COMPONENT_DISPATCH,
00372                    "Cannot bind %s udp6 socket, error %d (%s)",
00373                    tags[p], errno, strerror(errno));
00374 
00375         memset(&pdatap->sinaddr_tcp6, 0, sizeof(pdatap->sinaddr_tcp6));
00376         pdatap->sinaddr_tcp6.sin6_family = AF_INET6;
00377         pdatap->sinaddr_tcp6.sin6_addr   = in6addr_any;     /* All the interfaces on the machine are used */
00378         pdatap->sinaddr_tcp6.sin6_port   = htons(nfs_param.core_param.port[p]);
00379 
00380         pdatap->netbuf_tcp6.maxlen = sizeof(pdatap->sinaddr_tcp6);
00381         pdatap->netbuf_tcp6.len    = sizeof(pdatap->sinaddr_tcp6);
00382         pdatap->netbuf_tcp6.buf    = &pdatap->sinaddr_tcp6;
00383 
00384         pdatap->bindaddr_tcp6.qlen = SOMAXCONN;
00385         pdatap->bindaddr_tcp6.addr = pdatap->netbuf_tcp6;
00386 
00387         if(!__rpc_fd2sockinfo(tcp_socket[p], &pdatap->si_tcp6))
00388           LogFatal(COMPONENT_DISPATCH,
00389                    "Cannot get %s socket info for tcp6 socket rc=%d errno=%d (%s)",
00390                    tags[p], rc, errno, strerror(errno));
00391 
00392         if(bind(tcp_socket[p],
00393                 (struct sockaddr *)pdatap->bindaddr_tcp6.addr.buf,
00394                 (socklen_t) si_nfs_tcp6.si_alen) == -1)
00395           LogFatal(COMPONENT_DISPATCH,
00396                    "Cannot bind %s tcp6 socket, error %d (%s)",
00397                    tags[p], errno, strerror(errno));
00398 #endif
00399       }
00400 }
00401 
00402 void Clean_RPC(void)
00403 {
00404   //TODO: consider the need to call Svc_dg_destroy for UDP & ?? for TCP based services
00405   unregister_rpc();
00406   close_rpc_fd();
00407 }
00408 
00409 cleanup_list_element clean_rpc = {NULL, Clean_RPC};
00410 
00411 #define UDP_REGISTER(prot, vers, netconfig) \
00412     svc_register(udp_xprt[prot], nfs_param.core_param.program[prot], (u_long) vers, \
00413                  nfs_rpc_dispatch_dummy, IPPROTO_UDP)
00414 
00415 #define TCP_REGISTER(prot, vers, netconfig) \
00416     svc_register(tcp_xprt[prot], nfs_param.core_param.program[prot], (u_long) vers, \
00417                  nfs_rpc_dispatch_dummy, IPPROTO_TCP)
00418 
00419 void Register_program(protos prot, int flag, int vers)
00420 {
00421   if((nfs_param.core_param.core_options & flag) != 0)
00422     {
00423       LogInfo(COMPONENT_DISPATCH,
00424               "Registering %s V%d/UDP",
00425               tags[prot], (int)vers);
00426 
00427       /* XXXX fix svc_register! */
00428       if(!UDP_REGISTER(prot, vers, netconfig_udpv4))
00429         LogFatal(COMPONENT_DISPATCH,
00430                  "Cannot register %s V%d on UDP",
00431                  tags[prot], (int)vers);
00432 
00433 #ifdef _USE_TIRPC_IPV6
00434       LogInfo(COMPONENT_DISPATCH,
00435               "Registering %s V%d/UDPv6",
00436               tags[prot], (int)vers);
00437       if(!UDP_REGISTER(prot, vers, netconfig_udpv6))
00438         LogFatal(COMPONENT_DISPATCH,
00439                  "Cannot register %s V%d on UDPv6",
00440                  tags[prot], (int)vers);
00441 #endif
00442 
00443 #ifndef _NO_TCP_REGISTER
00444       LogInfo(COMPONENT_DISPATCH,
00445               "Registering %s V%d/TCP",
00446               tags[prot], (int)vers);
00447 
00448       if(!TCP_REGISTER(prot, vers, netconfig_tcpv4))
00449         LogFatal(COMPONENT_DISPATCH,
00450                  "Cannot register %s V%d on TCP",
00451                  tags[prot], (int)vers);
00452 
00453 #ifdef _USE_TIRPC_IPV6
00454       LogInfo(COMPONENT_DISPATCH,
00455               "Registering %s V%d/TCPv6",
00456               tags[prot], (int)vers);
00457       if(!TCP_REGISTER(prot, vers, netconfig_tcpv6))
00458         LogFatal(COMPONENT_DISPATCH,
00459                  "Cannot register %s V%d on TCPv6",
00460                  tags[prot], (int)vers);
00461 #endif /* _USE_TIRPC_IPV6 */
00462 #endif /* _NO_TCP_REGISTER */
00463     }
00464 }
00465 
00475 void nfs_Init_svc()
00476 {
00477     protos p;
00478     svc_init_params svc_params;
00479     int ix, code __attribute__((unused)) = 0;
00480     int one = 1;
00481 
00482     LogInfo(COMPONENT_DISPATCH, "NFS INIT: Core options = %d",
00483             nfs_param.core_param.core_options);
00484 
00485     LogInfo(COMPONENT_DISPATCH, "NFS INIT: using TIRPC");
00486 
00487     /* New TI-RPC package init function */
00488     svc_params.flags = SVC_INIT_EPOLL; /* use EPOLL event mgmt */
00489     svc_params.flags |= SVC_INIT_NOREG_XPRTS; /* don't call xprt_register */
00490     svc_params.max_connections = nfs_param.core_param.nb_max_fd;
00491     svc_params.max_events = 1024; /* length of epoll event queue */
00492 
00493     svc_init(&svc_params);
00494 
00495   /* Redirect TI-RPC allocators, log channel */
00496   if (!tirpc_control(TIRPC_SET_WARNX, (warnx_t) rpc_warnx))
00497       LogCrit(COMPONENT_INIT, "Failed redirecting TI-RPC __warnx");
00498 
00499 #define TIRPC_SET_ALLOCATORS 0
00500 #if TIRPC_SET_ALLOCATORS
00501   if (!tirpc_control(TIRPC_SET_MALLOC, (mem_alloc_t) gsh_malloc))
00502       LogCrit(COMPONENT_INIT, "Failed redirecting TI-RPC alloc");
00503 
00504   if (!tirpc_control(TIRPC_SET_MEM_FREE, (mem_free_t) gsh_free_size))
00505       LogCrit(COMPONENT_INIT, "Failed redirecting TI-RPC mem_free");
00506 
00507   if (!tirpc_control(TIRPC_SET_FREE, (std_free_t) gsh_free))
00508       LogCrit(COMPONENT_INIT, "Failed redirecting TI-RPC __free");
00509 #endif /* TIRPC_SET_ALLOCATORS */
00510 
00511     for (ix = 0; ix < N_EVENT_CHAN; ++ix) {
00512         rpc_evchan[ix].chan_id = 0;
00513         if ((code = svc_rqst_new_evchan(&rpc_evchan[ix].chan_id, NULL /* u_data */,
00514                                         SVC_RQST_FLAG_NONE)))
00515             LogFatal(COMPONENT_DISPATCH,
00516                      "Cannot create TI-RPC event channel (%d, %d)", ix, code);
00517         /* XXX bail?? */
00518     }
00519 
00520   /* Get the netconfig entries from /etc/netconfig */
00521     if((netconfig_udpv4 = (struct netconfig *)getnetconfigent("udp")) == NULL)
00522         LogFatal(COMPONENT_DISPATCH,
00523                  "Cannot get udp netconfig, cannot get a entry for udp in netconfig file. Check file /etc/netconfig...");
00524 
00525     /* Get the netconfig entries from /etc/netconfig */
00526     if((netconfig_tcpv4 = (struct netconfig *)getnetconfigent("tcp")) == NULL)
00527         LogFatal(COMPONENT_DISPATCH,
00528                  "Cannot get tcp netconfig, cannot get a entry for tcp in netconfig file. Check file /etc/netconfig...");
00529 
00530     /* A short message to show that /etc/netconfig parsing was a success */
00531     LogFullDebug(COMPONENT_DISPATCH, "netconfig found for UDPv4 and TCPv4");
00532 
00533 #ifdef _USE_TIRPC_IPV6
00534     LogInfo(COMPONENT_DISPATCH, "NFS INIT: Using IPv6");
00535 
00536     /* Get the netconfig entries from /etc/netconfig */
00537     if((netconfig_udpv6 = (struct netconfig *)getnetconfigent("udp6")) == NULL)
00538         LogFatal(COMPONENT_DISPATCH,
00539                  "Cannot get udp6 netconfig, cannot get a entry for udp6 in netconfig file. Check file /etc/netconfig...");
00540 
00541   /* Get the netconfig entries from /etc/netconfig */
00542     if((netconfig_tcpv6 = (struct netconfig *)getnetconfigent("tcp6")) == NULL)
00543         LogFatal(COMPONENT_DISPATCH,
00544                  "Cannot get tcp6 netconfig, cannot get a entry for tcp in netconfig file. Check file /etc/netconfig...");
00545 
00546     /* A short message to show that /etc/netconfig parsing was a success */
00547     LogFullDebug(COMPONENT_DISPATCH, "netconfig found for UDPv6 and TCPv6");
00548 #endif
00549 
00550   /* Allocate the UDP and TCP sockets for the RPC */
00551   LogFullDebug(COMPONENT_DISPATCH, "Allocation of the sockets");
00552   for(p = P_NFS; p < P_COUNT; p++)
00553     if(test_for_additional_nfs_protocols(p))
00554       {
00555         /* Initialize all the sockets to -1 because it makes some code later easier */
00556         udp_socket[p] = -1;
00557         tcp_socket[p] = -1;
00558 
00559         udp_socket[p] = socket(P_FAMILY, SOCK_DGRAM, IPPROTO_UDP);
00560 
00561         if(udp_socket[p] == -1)
00562           LogFatal(COMPONENT_DISPATCH,
00563                    "Cannot allocate a udp socket for %s, error %d (%s)",
00564                    tags[p], errno, strerror(errno));
00565 
00566         tcp_socket[p] = socket(P_FAMILY, SOCK_STREAM, IPPROTO_TCP);
00567 
00568         if(tcp_socket[p] == -1)
00569           LogFatal(COMPONENT_DISPATCH,
00570                    "Cannot allocate a tcp socket for %s, error %d (%s)",
00571                    tags[p], errno, strerror(errno));
00572 
00573         /* Use SO_REUSEADDR in order to avoid wait the 2MSL timeout */
00574         if(setsockopt(udp_socket[p],
00575                       SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
00576           LogFatal(COMPONENT_DISPATCH,
00577                    "Bad udp socket options for %s, error %d (%s)",
00578                    tags[p], errno, strerror(errno));
00579 
00580         if(setsockopt(tcp_socket[p],
00581                       SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)))
00582           LogFatal(COMPONENT_DISPATCH,
00583                    "Bad tcp socket options for %s, error %d (%s)",
00584                    tags[p], errno, strerror(errno));
00585 
00586         /* We prefer using non-blocking socket in the specific case */
00587         if(fcntl(udp_socket[p], F_SETFL, FNDELAY) == -1)
00588           LogFatal(COMPONENT_DISPATCH,
00589                    "Cannot set udp socket for %s as non blocking, error %d (%s)",
00590                    tags[p], errno, strerror(errno));
00591       }
00592 
00593   socket_setoptions(tcp_socket[P_NFS]);
00594 
00595   if((nfs_param.core_param.core_options & (CORE_OPTION_NFSV2 | CORE_OPTION_NFSV3)) != 0)
00596     {
00597 #ifdef _USE_NLM
00598      /* Some log that can be useful when debug ONC/RPC and RPCSEC_GSS matter */
00599      LogDebug(COMPONENT_DISPATCH, "Socket numbers are: nfs_udp=%u  nfs_tcp=%u "
00600               "mnt_udp=%u  mnt_tcp=%u nlm_tcp=%u nlm_udp=%u",
00601               udp_socket[P_NFS],
00602               tcp_socket[P_NFS],
00603               udp_socket[P_MNT],
00604               tcp_socket[P_MNT],
00605               udp_socket[P_NLM],
00606               tcp_socket[P_NLM]);
00607 #else
00608       /* Some log that can be useful when debug ONC/RPC and RPCSEC_GSS matter */
00609       LogDebug(COMPONENT_DISPATCH, "Socket numbers are: nfs_udp=%u  nfs_tcp=%u "
00610                "mnt_udp=%u  mnt_tcp=%u",
00611                udp_socket[P_NFS],
00612                tcp_socket[P_NFS],
00613                udp_socket[P_MNT],
00614                tcp_socket[P_MNT]);
00615 #endif                          /* USE_NLM */
00616     }
00617   else
00618     {
00619       /* Some log that can be useful when debug ONC/RPC and RPCSEC_GSS matter */
00620       LogDebug(COMPONENT_DISPATCH, "Socket numbers are: nfs_udp=%u  nfs_tcp=%u",
00621                udp_socket[P_NFS],
00622                tcp_socket[P_NFS]);
00623     }
00624 
00625 #ifdef _USE_RQUOTA
00626   /* Some log that can be useful when debug ONC/RPC and RPCSEC_GSS matter */
00627   LogDebug(COMPONENT_DISPATCH,
00628            "Socket numbers are: rquota_udp=%u  rquota_tcp=%u",
00629            udp_socket[P_RQUOTA],
00630            tcp_socket[P_RQUOTA]) ;
00631 #endif
00632 
00633   /* Bind the tcp and udp sockets */
00634   Bind_sockets();
00635 
00636   /* Unregister from portmapper/rpcbind */
00637   unregister_rpc();
00638   RegisterCleanup(&clean_rpc);
00639 
00640   /* Set up well-known xprt handles */
00641   Create_SVCXPRTs();
00642 
00643 #ifdef _HAVE_GSSAPI
00644   /* Acquire RPCSEC_GSS basis if needed */
00645   if(nfs_param.krb5_param.active_krb5 == TRUE)
00646     {
00647       if(Svcauth_gss_import_name(nfs_param.krb5_param.svc.principal) != TRUE)
00648         {
00649           LogFatal(COMPONENT_DISPATCH,
00650                    "Could not import principal name %s into GSSAPI",
00651                    nfs_param.krb5_param.svc.principal);
00652         }
00653       else
00654         {
00655           LogInfo(COMPONENT_DISPATCH,
00656                   "Successfully imported principal %s into GSSAPI",
00657                   nfs_param.krb5_param.svc.principal);
00658 
00659           /* Trying to acquire a credentials for checking name's validity */
00660           if(!Svcauth_gss_acquire_cred())
00661             LogCrit(COMPONENT_DISPATCH,
00662                      "Cannot acquire credentials for principal %s",
00663                      nfs_param.krb5_param.svc.principal);
00664           else
00665             LogInfo(COMPONENT_DISPATCH,
00666                     "Principal %s is suitable for acquiring credentials",
00667                     nfs_param.krb5_param.svc.principal);
00668         }
00669     }
00670 #endif                          /* _HAVE_GSSAPI */
00671 
00672 #ifndef _NO_PORTMAPPER
00673   /* Perform all the RPC registration, for UDP and TCP, for NFS_V2, NFS_V3 and NFS_V4 */
00674   Register_program(P_NFS, CORE_OPTION_NFSV2, NFS_V2);
00675   Register_program(P_NFS, CORE_OPTION_NFSV3, NFS_V3);
00676   Register_program(P_NFS, CORE_OPTION_NFSV4, NFS_V4);
00677   Register_program(P_MNT, (CORE_OPTION_NFSV2 | CORE_OPTION_NFSV3), MOUNT_V1);
00678   Register_program(P_MNT, CORE_OPTION_NFSV3, MOUNT_V3);
00679 #ifdef _USE_NLM
00680   Register_program(P_NLM, CORE_OPTION_NFSV3, NLM4_VERS);
00681 #endif                          /* USE_NLM */
00682 #ifdef _USE_RQUOTA
00683   Register_program(P_RQUOTA, CORE_OPTION_ALL_VERS, RQUOTAVERS);
00684   Register_program(P_RQUOTA, CORE_OPTION_ALL_VERS, EXT_RQUOTAVERS);
00685 #endif                          /* USE_QUOTA */
00686 #endif                          /* _NO_PORTMAPPER */
00687 
00688 }                               /* nfs_Init_svc */
00689 
00690 /*
00691  * Start service threads.
00692  */
00693 void nfs_rpc_dispatch_threads(pthread_attr_t *attr_thr)
00694 {
00695     int ix, code = 0;
00696 
00697     /* Start event channel service threads */
00698     for (ix = 0; ix < N_EVENT_CHAN; ++ix) {
00699         if((code = pthread_create(&rpc_evchan[ix].thread_id,
00700                                   attr_thr,
00701                                   rpc_dispatcher_thread,
00702                                   (void *) &rpc_evchan[ix].chan_id)) != 0) {
00703             LogFatal(COMPONENT_THREAD,
00704                    "Could not create rpc_dispatcher_thread #%u, error = %d (%s)",
00705                      ix, errno, strerror(errno));
00706         }
00707     }
00708     LogEvent(COMPONENT_THREAD,
00709              "%d rpc dispatcher threads were started successfully",
00710              N_EVENT_CHAN); 
00711 }
00712 
00713 /*
00714  * Rendezvous callout.  This routine will be called by TI-RPC after newxprt
00715  * has been accepted.
00716  *
00717  * Register newxprt on a TCP event channel.  Balancing events/channels could
00718  * become involved.  To start with, just cycle through them as new connections
00719  * are accepted.
00720  */
00721 static u_int nfs_rpc_rdvs(SVCXPRT *xprt, SVCXPRT *newxprt, const u_int flags,
00722                           void *u_data)
00723 {
00724     static uint32_t next_chan = TCP_EVCHAN_0;
00725     pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
00726     uint32_t tchan;
00727 
00728     pthread_mutex_lock(&mtx);
00729 
00730     tchan = next_chan;
00731     assert((next_chan >= TCP_EVCHAN_0) && (next_chan < N_EVENT_CHAN));
00732     if (++next_chan >= N_EVENT_CHAN)
00733         next_chan = TCP_EVCHAN_0;
00734 
00735     /* setup private data (freed when xprt is destroyed) */
00736     newxprt->xp_u1 = alloc_gsh_xprt_private(XPRT_PRIVATE_FLAG_REF);
00737 
00738     pthread_mutex_unlock(&mtx);
00739 
00740     (void) svc_rqst_evchan_reg(rpc_evchan[tchan].chan_id, newxprt,
00741                                SVC_RQST_FLAG_NONE);
00742 
00743     return (0);
00744 }
00745 
00746 static void nfs_rpc_free_xprt(SVCXPRT *xprt)
00747 {
00748     if (xprt->xp_u1)
00749         free_gsh_xprt_private(xprt->xp_u1);
00750 }
00751 
00757 /* PhD: Please note that I renamed this function, added
00758  * it prototype to include/nfs_core.h and removed its "static" tag.
00759  * This is done to share this code with the 9P implementation */
00760 
00761 static inline worker_available_rc
00762 worker_available(unsigned long worker_index, unsigned int avg_number_pending)
00763 {
00764   worker_available_rc rc = WORKER_AVAILABLE;
00765   P(workers_data[worker_index].wcb.tcb_mutex);
00766   switch(workers_data[worker_index].wcb.tcb_state)
00767     {
00768       case STATE_AWAKE:
00769       case STATE_AWAKEN:
00770         /* Choose only fully initialized workers and that does not gc. */
00771         if(workers_data[worker_index].wcb.tcb_ready == FALSE)
00772           {
00773             LogFullDebug(COMPONENT_THREAD,
00774                          "worker thread #%lu is not ready", worker_index);
00775             rc = WORKER_PAUSED;
00776           }
00777         else if(workers_data[worker_index].gc_in_progress == TRUE)
00778           {
00779             LogFullDebug(COMPONENT_THREAD,
00780                          "worker thread #%lu is doing garbage collection", worker_index);
00781             rc = WORKER_GC;
00782           }
00783         else if(workers_data[worker_index].pending_request_len >= avg_number_pending)
00784           {
00785             rc = WORKER_BUSY;
00786           }
00787         break;
00788 
00789       case STATE_STARTUP:
00790       case STATE_PAUSE:
00791       case STATE_PAUSED:
00792         rc = WORKER_ALL_PAUSED;
00793         break;
00794 
00795       case STATE_EXIT:
00796         rc = WORKER_EXIT;
00797         break;
00798     }
00799   V(workers_data[worker_index].wcb.tcb_mutex);
00800 
00801   return rc;
00802 }
00803 
00804 unsigned int
00805 nfs_core_select_worker_queue(unsigned int avoid_index)
00806 {
00807   #define NO_VALUE_CHOOSEN  1000000
00808   unsigned int worker_index = NO_VALUE_CHOOSEN;
00809   unsigned int avg_number_pending = NO_VALUE_CHOOSEN;
00810   unsigned int total_number_pending = 0;
00811   unsigned int i;
00812   unsigned int cpt = 0;
00813 
00814   static unsigned int counter;
00815   static unsigned int last;
00816   worker_available_rc rc_worker;
00817 
00818   P(lock_worker_selection);
00819   counter++;
00820 
00821   /* Calculate the average queue length if counter is bigger than configured value. */
00822   if(counter > nfs_param.core_param.nb_call_before_queue_avg)
00823     {
00824       for(i = 0; i < nfs_param.core_param.nb_worker; i++)
00825         {
00826           total_number_pending += workers_data[i].pending_request_len;
00827         }
00828       avg_number_pending = total_number_pending / nfs_param.core_param.nb_worker;
00829       /* Reset counter. */
00830       counter = 0;
00831     }
00832 
00833   /* Choose the queue whose length is smaller than average. */
00834   for(i = (last + 1) % nfs_param.core_param.nb_worker, cpt = 0;
00835       cpt < nfs_param.core_param.nb_worker;
00836       cpt++, i = (i + 1) % nfs_param.core_param.nb_worker)
00837     {
00838       /* Avoid worker at avoid_index (provided to permit a worker thread to avoid
00839        * dispatching work to itself). */
00840       if (i == avoid_index)
00841           continue;
00842 
00843       /* Choose only fully initialized workers and that does not gc. */
00844       rc_worker = worker_available(i, avg_number_pending);
00845       if(rc_worker == WORKER_AVAILABLE)
00846       {
00847         worker_index = i;
00848         break;
00849       }
00850       else if(rc_worker == WORKER_ALL_PAUSED)
00851       {
00852         /* Wait for the threads to awaken */
00853         wait_for_threads_to_awaken();
00854       }
00855       else if(rc_worker == WORKER_EXIT)
00856       {
00857         /* do nothing */
00858       }
00859     } /* for */
00860 
00861   if(worker_index == NO_VALUE_CHOOSEN)
00862     worker_index = (last + 1) % nfs_param.core_param.nb_worker;
00863 
00864   last = worker_index;
00865 
00866   V(lock_worker_selection);
00867 
00868   return worker_index;
00869 
00870 } /* nfs_core_select_worker_queue */
00871 
00875 request_data_t *
00876 nfs_rpc_get_nfsreq(nfs_worker_data_t *worker, uint32_t flags)
00877 {
00878     request_data_t *nfsreq = NULL;
00879 
00880     nfsreq = pool_alloc(request_pool, NULL);
00881 
00882     return (nfsreq);
00883 }
00884 
00885 process_status_t
00886 dispatch_rpc_subrequest(nfs_worker_data_t *mydata,
00887                         request_data_t *onfsreq)
00888 {
00889   char *cred_area;
00890   struct rpc_msg *msg;
00891   struct svc_req *req;
00892   request_data_t *nfsreq = NULL;
00893   unsigned int worker_index;
00894   process_status_t rc = PROCESS_DONE;
00895 
00896   /* choose a worker who is not us */
00897   worker_index = nfs_core_select_worker_queue(mydata->worker_index);
00898 
00899   LogDebug(COMPONENT_DISPATCH,
00900            "Use request from Worker Thread #%u's pool, xprt->xp_fd=%d, "
00901            "thread has %d pending requests",
00902            worker_index, onfsreq->r_u.nfs->xprt->xp_fd,
00903            workers_data[worker_index].pending_request_len);
00904 
00905   /* Get a nfsreq from the worker's pool */
00906   nfsreq = pool_alloc(request_pool, NULL);
00907 
00908   if(nfsreq == NULL)
00909     {
00910       LogMajor(COMPONENT_DISPATCH,
00911                "Unable to allocate request. Exiting...");
00912       Fatal();
00913     }
00914 
00915   /* Set the request as NFS already-read */
00916   nfsreq->rtype = NFS_REQUEST;
00917 
00918   /* tranfer onfsreq */
00919   nfsreq->r_u.nfs = onfsreq->r_u.nfs;
00920 
00921   /* And fixup onfsreq */
00922   onfsreq->r_u.nfs = pool_alloc(request_data_pool, NULL);
00923 
00924   if(onfsreq->r_u.nfs == NULL)
00925     {
00926       LogMajor(COMPONENT_DISPATCH,
00927                "Empty request data pool! Exiting...");
00928       Fatal();
00929     }
00930 
00931   /* Set up cred area */
00932   cred_area = onfsreq->r_u.nfs->cred_area;
00933   req = &(onfsreq->r_u.nfs->req);
00934   msg = &(onfsreq->r_u.nfs->msg);
00935 
00936   msg->rm_call.cb_cred.oa_base = cred_area;
00937   msg->rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
00938   req->rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
00939 
00940   /* Set up xprt */
00941   onfsreq->r_u.nfs->xprt = nfsreq->r_u.nfs->xprt;
00942   req->rq_xprt = onfsreq->r_u.nfs->xprt;
00943 
00944   /* count as 1 ref */
00945   gsh_xprt_ref(req->rq_xprt, XPRT_PRIVATE_FLAG_LOCKED);
00946 
00947   /* Hand it off */
00948   DispatchWorkNFS(nfsreq, worker_index);
00949 
00950   return (rc);
00951 }
00952 
00957 process_status_t dispatch_rpc_request(SVCXPRT *xprt)
00958 {
00959   char *cred_area;
00960   struct rpc_msg *pmsg;
00961   struct svc_req *preq;
00962   request_data_t *nfsreq = NULL;
00963   unsigned int worker_index;
00964   process_status_t rc = PROCESS_DONE;
00965 
00966   /* A few thread manage only mount protocol, check for this */
00967 
00968   /* Get a worker to do the job */
00969 #ifndef _NO_MOUNT_LIST
00970   if((udp_socket[P_MNT] == xprt->xp_fd) ||
00971      (tcp_socket[P_MNT] == xprt->xp_fd))
00972     {
00973       /* worker #0 is dedicated to mount protocol */
00974       worker_index = 0;
00975     }
00976   else
00977 #endif
00978     {
00979        /* choose a worker depending on its queue length */
00980        worker_index = nfs_core_select_worker_queue( WORKER_INDEX_ANY );
00981     }
00982 
00983   LogFullDebug(COMPONENT_DISPATCH,
00984                "Use request from Worker Thread #%u's pool, xprt->xp_fd=%d, thread "
00985                "has %d pending requests",
00986                worker_index, xprt->xp_fd,
00987                workers_data[worker_index].pending_request_len);
00988 
00989   /* Get a nfsreq from the worker's pool */
00990   nfsreq = pool_alloc(request_pool, NULL);
00991 
00992   if(nfsreq == NULL)
00993     {
00994       LogMajor(COMPONENT_DISPATCH,
00995                "Unable to allocate request.  Exiting...");
00996       Fatal();
00997     }
00998 
00999   /* Set the request as NFS with xprt hand-off */
01000   nfsreq->rtype = NFS_REQUEST_LEADER ;
01001 
01002   nfsreq->r_u.nfs = pool_alloc(request_data_pool, NULL);
01003   if(nfsreq->r_u.nfs == NULL)
01004     {
01005       LogMajor(COMPONENT_DISPATCH,
01006                "Unable to allocate request data.  Exiting...");
01007       Fatal();
01008     }
01009 
01010   /* Set up cred area */
01011   cred_area = nfsreq->r_u.nfs->cred_area;
01012   preq = &(nfsreq->r_u.nfs->req);
01013   pmsg = &(nfsreq->r_u.nfs->msg);
01014 
01015   pmsg->rm_call.cb_cred.oa_base = cred_area;
01016   pmsg->rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
01017   preq->rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
01018 
01019   /* Set up xprt */
01020   nfsreq->r_u.nfs->xprt = xprt;
01021   preq->rq_xprt = xprt;
01022 
01023   /* Count as 1 ref */
01024   gsh_xprt_ref(xprt, XPRT_PRIVATE_FLAG_NONE);
01025 
01026   /* Hand it off */
01027   DispatchWorkNFS(nfsreq, worker_index);
01028 
01029   return (rc);
01030 }
01031 
01032 static bool_t
01033 nfs_rpc_getreq_ng(SVCXPRT *xprt /*, int chan_id */)
01034 {
01035     /* Ok, in the new world, TI-RPC's job is merely to tell us there is activity
01036      * on a specific xprt handle.
01037      *
01038      * Note that we have a builtin mechanism to bind, unbind, and (in response
01039      * to connect events, through a new callout made from within the rendezvous
01040      * in vc xprts) rebind/rebalance xprt handles to independent event channels,
01041      * each with their own platform event demultiplexer.  The current callout
01042      * is one event (request, or, if applicable, new vc connect) on the active
01043      * xprt handle xprt.
01044      *
01045      * We are a blocking call from the svc_run thread specific to our current
01046      * event channel (whatever it is).  Our goal is to hand off processing of
01047      * xprt to a request dispatcher thread as quickly as possible, to minimize
01048      * latency of all xprts on this channel.
01049      *
01050      * Next, the preferred dispatch thread should be, I speculate, one which has
01051      * (most) recently handled a request for this xprt.
01052      */
01053 
01054     /*
01055      * UDP RPCs are quite simple: everything comes to the same socket, so
01056      * several SVCXPRT can be defined, one per tbuf to handle the stuff
01057      * TCP RPCs are more complex:
01058      *   - a unique SVCXPRT exists that deals with initial tcp rendez vous.
01059      *     It does the accept with the client, but recv no message from the
01060      *     client. But SVC_RECV on it creates a new SVCXPRT dedicated to the
01061      *     client. This specific SVXPRT is bound on TCPSocket
01062      *
01063      * while receiving something on the Svc_fdset, I must know if this is a UDP
01064      * request, an initial TCP request or a TCP socket from an already connected
01065      * client.
01066      * This is how to distinguish the cases:
01067      * UDP connections are bound to socket NFS_UDPSocket
01068      * TCP initial connections are bound to socket NFS_TCPSocket
01069      * all the other cases are requests from already connected TCP Clients
01070      */
01071 
01072     /* The following actions are now purely diagnostic, the only side effect is a message to
01073      * the log. */
01074     int code  __attribute__((unused)) = 0;
01075     int rpc_fd = xprt->xp_fd;
01076 
01077     if(udp_socket[P_NFS] == rpc_fd)
01078         LogFullDebug(COMPONENT_DISPATCH, "A NFS UDP request fd %d",
01079                      rpc_fd);
01080     else if(udp_socket[P_MNT] == rpc_fd)
01081         LogFullDebug(COMPONENT_DISPATCH, "A MOUNT UDP request %d",
01082                      rpc_fd);
01083 #ifdef _USE_NLM
01084     else if(udp_socket[P_NLM] == rpc_fd)
01085         LogFullDebug(COMPONENT_DISPATCH, "A NLM UDP request %d",
01086                      rpc_fd);
01087 #endif                          /* _USE_NLM */
01088 #ifdef _USE_QUOTA
01089     else if(udp_socket[P_RQUOTA] == rpc_fd)
01090         LogFullDebug(COMPONENT_DISPATCH, "A RQUOTA UDP request %d",
01091                      rpc_fd);
01092 #endif                        /* _USE_QUOTA */
01093     else if(tcp_socket[P_NFS] == rpc_fd) {
01094         /*
01095          * This is an initial tcp connection
01096          * There is no RPC message, this is only a TCP connect.
01097          * In this case, the SVC_RECV only produces a new connected socket (it does
01098          * just a call to accept)
01099          */
01100         LogFullDebug(COMPONENT_DISPATCH,
01101                      "An initial NFS TCP request from a new client %d",
01102                      rpc_fd);
01103     }
01104     else if(tcp_socket[P_MNT] == rpc_fd)
01105         LogFullDebug(COMPONENT_DISPATCH,
01106                      "An initial MOUNT TCP request from a new client %d",
01107                      rpc_fd);
01108 #ifdef _USE_NLM
01109     else if(tcp_socket[P_NLM] == rpc_fd)
01110         LogFullDebug(COMPONENT_DISPATCH,
01111                      "An initial NLM request from a new client %d",
01112                      rpc_fd);
01113 #endif                          /* _USE_NLM */
01114 #ifdef _USE_QUOTA
01115     else if(tcp_socket[P_RQUOTA] == rpc_fd)
01116         LogFullDebug(COMPONENT_DISPATCH,
01117                      "An initial RQUOTA request from a new client %d",
01118                      rpc_fd);
01119 #endif                          /* _USE_QUOTA */
01120     else
01121         LogDebug(COMPONENT_DISPATCH,
01122                  "A NFS TCP request from an already connected client %d",
01123                  rpc_fd);
01124 
01125     /* Block events in the interval from initial dispatch to the
01126      * completion of SVC_RECV */
01127     (void) svc_rqst_block_events(xprt, SVC_RQST_FLAG_NONE);
01128 
01129     dispatch_rpc_request(xprt);
01130 
01131     return (TRUE);
01132 }
01133 
01146 int print_pending_request(LRU_data_t data, char *str)
01147 {
01148   return snprintf(str, LRU_DISPLAY_STRLEN, "not implemented for now");
01149 }                               /* print_pending_request */
01150 
01161 void *rpc_dispatcher_thread(void *arg)
01162 {
01163     int32_t chan_id = *((int32_t *) arg);
01164     
01165     SetNameFunction("dispatch_thr");
01166 
01167     /* Calling dispatcher main loop */
01168     LogInfo(COMPONENT_DISPATCH,
01169             "Entering nfs/rpc dispatcher");
01170 
01171     LogDebug(COMPONENT_DISPATCH,
01172              "My pthread id is %p", (caddr_t) pthread_self());
01173 
01174     svc_rqst_thrd_run(chan_id, SVC_RQST_FLAG_NONE);
01175 
01176     return (NULL);
01177 }                               /* rpc_dispatcher_thread */
01178 
01191 void constructor_nfs_request_data_t(void *ptr, void *parameters)
01192 {
01193   nfs_request_data_t * pdata = (nfs_request_data_t *) ptr;
01194   memset(pdata, 0, sizeof(nfs_request_data_t));
01195 }
01196 
01208 void constructor_request_data_t(void *ptr, void *parameters)
01209 {
01210   request_data_t * pdata = (request_data_t *) ptr;
01211   memset(pdata, 0, sizeof(request_data_t));
01212 }