nfs-ganesha 1.4

mfsl_async_synclet.c

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Copyright CEA/DAM/DIF  (2008)
00005  * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
00006  *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
00007  *
00008  *
00009  * This program is free software; you can redistribute it and/or
00010  * modify it under the terms of the GNU Lesser General Public
00011  * License as published by the Free Software Foundation; either
00012  * version 3 of the License, or (at your option) any later version.
00013  * 
00014  * This program is distributed in the hope that it will be useful,
00015  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017  * Lesser General Public License for more details.
00018  * 
00019  * You should have received a copy of the GNU Lesser General Public
00020  * License along with this library; if not, write to the Free Software
00021  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
00022  * 
00023  * ---------------------------------------
00024  */
00025 
00031 #ifdef HAVE_CONFIG_H
00032 #include "config.h"
00033 #endif
00034 
00035 /* fsal_types contains constants and type definitions for FSAL */
00036 #include "fsal_types.h"
00037 #include "fsal.h"
00038 #include "mfsl_types.h"
00039 #include "mfsl.h"
00040 #include "common_utils.h"
00041 #include "LRU_List.h"
00042 
00043 pthread_t mfsl_async_atd_thrid;
00044 pthread_t *mfsl_async_synclet_thrid;
00045 
00046 int once = 0;
00047 
00048 extern mfsl_synclet_data_t *synclet_data;
00049 extern mfsl_parameter_t mfsl_param;
00050 extern unsigned int end_of_mfsl;
00051 
00052 LRU_list_t *async_op_lru;
00053 pthread_mutex_t mutex_async_list;
00054 
00064 fsal_status_t MFSL_async_post(mfsl_async_op_desc_t * popdesc)
00065 {
00066   LRU_entry_t *plru_entry = NULL;
00067   LRU_status_t lru_status;
00068 /* Do not use RPCBIND by default */
00069 #define  _RPCB_PROT_H_RPCGEN
00070 
00071   P(mutex_async_list);
00072 
00073   if((plru_entry = LRU_new_entry(async_op_lru, &lru_status)) == NULL)
00074     {
00075       LogMajor(COMPONENT_MFSL,"Impossible to post async operation in LRU dispatch list");
00076       MFSL_return(ERR_FSAL_SERVERFAULT, (int)lru_status);
00077     }
00078 
00079   plru_entry->buffdata.pdata = (caddr_t) popdesc;
00080   plru_entry->buffdata.len = sizeof(mfsl_async_op_desc_t);
00081 
00082   V(mutex_async_list);
00083 
00084   MFSL_return(ERR_FSAL_NO_ERROR, 0);
00085 }                               /* MFSL_async_post */
00086 
00098 fsal_status_t mfsl_async_process_async_op(mfsl_async_op_desc_t * pasyncopdesc)
00099 {
00100   fsal_status_t fsal_status;
00101   mfsl_context_t *pmfsl_context;
00102 
00103   if(pasyncopdesc == NULL)
00104     {
00105       MFSL_return(ERR_FSAL_INVAL, 0);
00106     }
00107 
00108   /* Calling the function from async op */
00109   LogDebug(COMPONENT_MFSL, "op_type=%u %s", pasyncopdesc->op_type,
00110                   mfsl_async_op_name[pasyncopdesc->op_type]);
00111 
00112   fsal_status = (pasyncopdesc->op_func) (pasyncopdesc);
00113 
00114   if(FSAL_IS_ERROR(fsal_status))
00115     LogMajor(COMPONENT_MFSL, "op_type=%u %s : error (%u,%u)",
00116                     pasyncopdesc->op_type, mfsl_async_op_name[pasyncopdesc->op_type],
00117                     fsal_status.major, fsal_status.minor);
00118 
00119   /* Free the previously allocated structures */
00120   pmfsl_context = (mfsl_context_t *) pasyncopdesc->ptr_mfsl_context;
00121 
00122   P(pmfsl_context->lock);
00123   pool_free(pmfsl_context->pool_async_op, pasyncopdesc);
00124   V(pmfsl_context->lock);
00125 
00126   /* Regular exit */
00127   MFSL_return(ERR_FSAL_NO_ERROR, 0);
00128 }                               /* cache_inode_process_async_op */
00129 
00139 static unsigned int mfsl_async_choose_synclet(void)
00140 {
00141 #define NO_VALUE_CHOOSEN  1000000
00142   unsigned int synclet_chosen = NO_VALUE_CHOOSEN;
00143   unsigned int min_number_pending = NO_VALUE_CHOOSEN;
00144 
00145   unsigned int i;
00146   static unsigned int last;
00147   unsigned int cpt = 0;
00148 
00149   do
00150     {
00151       /* chose the smallest queue */
00152 
00153       for(i = (last + 1) % mfsl_param.nb_synclet, cpt = 0; cpt < mfsl_param.nb_synclet;
00154           cpt++, i = (i + 1) % mfsl_param.nb_synclet)
00155         {
00156           /* Choose only fully initialized workers and that does not gc */
00157 
00158           if(synclet_data[i].op_lru->nb_entry < min_number_pending)
00159             {
00160               synclet_chosen = i;
00161               min_number_pending = synclet_data[i].op_lru->nb_entry;
00162             }
00163         }
00164 
00165     }
00166   while(synclet_chosen == NO_VALUE_CHOOSEN);
00167 
00168   last = synclet_chosen;
00169 
00170   return synclet_chosen;
00171 }                               /* mfsl_async_choose_synclet */
00172 
00183 void *mfsl_async_synclet_refresher_thread(void *Arg)
00184 {
00185   int rc = 0;
00186   unsigned int i = 0;
00187   fsal_status_t fsal_status;
00188   fsal_export_context_t fsal_export_context;
00189   fsal_op_context_t fsal_context;
00190   mfsl_precreated_object_t *pooldirs = NULL;
00191   mfsl_precreated_object_t *poolfiles = NULL;
00192 
00193   SetNameFunction("MFSL_ASYNC Context refresher");
00194 
00195   /* Init FSAL root fsal_op_context */
00196   if(FSAL_IS_ERROR(FSAL_BuildExportContext(&fsal_export_context, NULL, NULL)))
00197     {
00198       /* Failed init */
00199       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not build export context, exiting...");
00200       exit(1);
00201     }
00202 
00203   if(FSAL_IS_ERROR(FSAL_InitClientContext(&fsal_context)))
00204     {
00205       /* Failed init */
00206       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not build thread context, exiting...");
00207       exit(1);
00208     }
00209 
00210   if(FSAL_IS_ERROR(FSAL_GetClientContext(&fsal_context,
00211                                          &fsal_export_context, 0, 0, NULL, 0)))
00212     {
00213       /* Failed init */
00214       LogMajor(COMPONENT_MFSL,"could not build client context, exiting...");
00215       exit(1);
00216     }
00217 
00218   /* Showtime... */
00219   LogEvent(COMPONENT_MFSL, "Started...");
00220 }                               /* mfsl_async_synclet_refresher_thread */
00221 
00232 void *mfsl_async_synclet_thread(void *Arg)
00233 {
00234   long index = 0;
00235   char namestr[64];
00236   int rc = 0;
00237   fsal_status_t fsal_status;
00238   fsal_export_context_t fsal_export_context;
00239   int found = FALSE;
00240   LRU_entry_t *pentry;
00241   mfsl_async_op_desc_t *pasyncopdesc = NULL;
00242 
00243   index = (long)Arg;
00244   sprintf(namestr, "MFSL_ASYNC Synclet #%ld", index);
00245   SetNameFunction(namestr);
00246 
00247   /* Init FSAL root fsal_op_context */
00248   if(FSAL_IS_ERROR(FSAL_BuildExportContext(&fsal_export_context, NULL, NULL)))
00249     {
00250       /* Failed init */
00251       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not build export context, exiting...");
00252       exit(1);
00253     }
00254 
00255   if(FSAL_IS_ERROR(FSAL_InitClientContext(&synclet_data[index].root_fsal_context)))
00256     {
00257       /* Failed init */
00258       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not build thread context, exiting...");
00259       exit(1);
00260     }
00261 
00262   if(FSAL_IS_ERROR(FSAL_GetClientContext(&synclet_data[index].root_fsal_context,
00263                                          &fsal_export_context, 0, 0, NULL, 0)))
00264     {
00265       /* Failed init */
00266       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not build client context, exiting...");
00267       exit(1);
00268     }
00269 
00270   /* Init synclet context */
00271   if(FSAL_IS_ERROR(MFSL_ASYNC_GetSyncletContext(&synclet_data[index].synclet_context,
00272                                                 &synclet_data[index].root_fsal_context)))
00273     {
00274       /* Failed init */
00275       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not be initialized, exiting...");
00276       exit(1);
00277     }
00278 
00279   if(FSAL_IS_ERROR(MFSL_PrepareContext(&synclet_data[index].root_fsal_context)))
00280     {
00281       /* Failed init */
00282       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not be cleaned up before using, exiting...");
00283       exit(1);
00284     }
00285 
00286   if(FSAL_IS_ERROR(mfsl_async_init_symlinkdir(&synclet_data[index].root_fsal_context)))
00287     {
00288       /* Failed init */
00289       LogMajor(COMPONENT_MFSL,"MFSL Synclet context could init symlink's nursery, exiting...");
00290       exit(1);
00291     }
00292 
00293   /* Showtime... */
00294   LogEvent(COMPONENT_MFSL, "Started...");
00295 
00296   while(!end_of_mfsl)
00297     {
00298       P(synclet_data[index].mutex_op_condvar);
00299       while(synclet_data[index].op_lru->nb_entry ==
00300             synclet_data[index].op_lru->nb_invalid)
00301         pthread_cond_wait(&(synclet_data[index].op_condvar),
00302                           &(synclet_data[index].mutex_op_condvar));
00303       V(synclet_data[index].mutex_op_condvar);
00304 
00305       found = FALSE;
00306       P(synclet_data[index].mutex_op_lru);
00307       for(pentry = synclet_data[index].op_lru->LRU; pentry != NULL; pentry = pentry->next)
00308         {
00309           if(pentry->valid_state == LRU_ENTRY_VALID)
00310             {
00311               found = TRUE;
00312               break;
00313             }
00314         }
00315       V(synclet_data[index].mutex_op_lru);
00316 
00317       if(!found)
00318         {
00319           LogMajor(COMPONENT_MFSL,
00320                           "Error : I have been awaken when no pending async operation is available");
00321           LogFullDebug(COMPONENT_MFSL,
00322               "synclet_data[index].op_lru->nb_entry=%u  synclet_data[index].op_lru->nb_invalid=%u\n",
00323                synclet_data[index].op_lru->nb_entry,
00324                synclet_data[index].op_lru->nb_invalid);
00325 
00326           continue;             /* return to main loop */
00327         }
00328 
00329       /* Get the async op to be proceeded */
00330       pasyncopdesc = (mfsl_async_op_desc_t *) (pentry->buffdata.pdata);
00331 
00332       LogDebug(COMPONENT_MFSL, "I will proceed with asyncop %p", pasyncopdesc);
00333 
00334       /* Execute the async op */
00335       fsal_status = mfsl_async_process_async_op(pasyncopdesc);
00336 
00337       /* Now proceed with LRU gc management. First step is to increment the passcounter */
00338       synclet_data[index].passcounter += 1;
00339 
00340       /* Finalize my making the LRU entry invalid so that it is garbagged later */
00341       P(synclet_data[index].mutex_op_lru);
00342       if(LRU_invalidate(synclet_data[index].op_lru, pentry) != LRU_LIST_SUCCESS)
00343         {
00344           LogCrit(COMPONENT_MFSL, 
00345               "Incoherency: released entry for asyncopdesc could not be tagged invalid");
00346         }
00347       V(synclet_data[index].mutex_op_lru);
00348 
00349       /* Init synclet context */
00350       if(FSAL_IS_ERROR
00351          (MFSL_ASYNC_RefreshSyncletContext
00352           (&synclet_data[index].synclet_context, &synclet_data[index].root_fsal_context)))
00353         {
00354           /* Failed init */
00355           LogMajor(COMPONENT_MFSL,"MFSL Synclet context could not be initialized, exiting...");
00356           exit(1);
00357         }
00358 
00359       /* Put the invalid entries back to pool (they have been managed */
00360       if(synclet_data[index].passcounter > mfsl_param.nb_before_gc)
00361         {
00362 
00363           if(LRU_gc_invalid(synclet_data[index].op_lru, NULL) != LRU_LIST_SUCCESS)
00364           LogCrit(COMPONENT_MFSL, 
00365                 "/!\\ : Could not gc on LRU list for pending asynchronous operations");
00366 
00367           synclet_data[index].passcounter = 0;
00368         }
00369 
00370     }                           /* while( 1 ) */
00371 
00372   LogMajor(COMPONENT_MFSL, "Terminated...");
00373 
00374   return NULL;
00375 }                               /* mfsl_async_synclet_thread */
00376 
00386 void *mfsl_async_asynchronous_dispatcher_thread(void *Arg)
00387 {
00388   int rc = 0;
00389   LRU_entry_t *pentry_dispatch = NULL;
00390   LRU_entry_t *pentry_synclet = NULL;
00391   LRU_status_t lru_status;
00392   unsigned int chosen_synclet = 0;
00393   unsigned int passcounter = 0;
00394   struct timeval current;
00395   struct timeval delta;
00396   mfsl_async_op_desc_t *pasyncopdesc = NULL;
00397   SetNameFunction("MFSL_ASYNC ADT");
00398 
00399   /* Structure initialisation */
00400   if((async_op_lru = LRU_Init(mfsl_param.lru_param, &lru_status)) == NULL)
00401     {
00402       LogMajor(COMPONENT_MFSL,"Could not init LRU List");
00403       exit(1);
00404     }
00405 
00406   if((rc = pthread_mutex_init(&mutex_async_list, NULL)) != 0)
00407     return NULL;
00408 
00409   LogEvent(COMPONENT_MFSL, "Started...");
00410   while(!end_of_mfsl)
00411     {
00412       /* Sleep for a while */
00413       usleep(60000);
00414 
00415       // sleep( mfsl_param.adt_sleeptime ) ;
00416       if(gettimeofday(&current, NULL) != 0)
00417         {
00418           /* Could'not get time of day... Stopping, this may need a major failure */
00419           LogCrit(COMPONENT_MFSL, " cannot get time of day...");
00420           continue;
00421         }
00422 
00423       P(mutex_async_list);
00424       for(pentry_dispatch = async_op_lru->LRU; pentry_dispatch != NULL;
00425           pentry_dispatch = pentry_dispatch->next)
00426         {
00427           /* Entries are in chronological order, they should be managed in the asynchronous window only */
00428           if(pentry_dispatch->valid_state == LRU_ENTRY_VALID)
00429             {
00430               /* Get the async op to be proceeded */
00431               pasyncopdesc = (mfsl_async_op_desc_t *) (pentry_dispatch->buffdata.pdata);
00432 
00433               /* Manage ops that are older than the duration of the asynchronous window */
00434               timersub(&current, &(pasyncopdesc->op_time), &delta);
00435 
00436               if(delta.tv_sec < mfsl_param.async_window_sec)
00437                 break;
00438 
00439               if(delta.tv_usec < mfsl_param.async_window_usec)
00440                 break;
00441 
00442               /* Choose a synclet to operate on */
00443               chosen_synclet = mfsl_async_choose_synclet();
00444               pasyncopdesc->related_synclet_index = chosen_synclet;
00445 
00446               /* Insert async op to this synclet's LRU */
00447               P(synclet_data[chosen_synclet].mutex_op_lru);
00448               if((pentry_synclet =
00449                   LRU_new_entry(synclet_data[chosen_synclet].op_lru,
00450                                 &lru_status)) == NULL)
00451                 {
00452                   LogCrit(COMPONENT_MFSL,
00453                                   "Impossible to post async operation in LRU synclet list");
00454                   V(synclet_data[chosen_synclet].mutex_op_lru);
00455                   continue;
00456                 }
00457 
00458               LogDebug(COMPONENT_MFSL, "Asyncop %p is to be managed by synclet %u",
00459                               pentry_dispatch->buffdata.pdata, chosen_synclet);
00460 
00461               pentry_synclet->buffdata.pdata = pentry_dispatch->buffdata.pdata;
00462               pentry_synclet->buffdata.len = pentry_dispatch->buffdata.len;
00463 
00464               V(synclet_data[chosen_synclet].mutex_op_lru);
00465 
00466               P(synclet_data[chosen_synclet].mutex_op_condvar);
00467               if(pthread_cond_signal(&(synclet_data[chosen_synclet].op_condvar)) == -1)
00468                 LogEvent(COMPONENT_MFSL,
00469                                 "Error : pthread_cond_signal failed on condvar for synclect %u",
00470                                 chosen_synclet);
00471               V(synclet_data[chosen_synclet].mutex_op_condvar);
00472 
00473               /* Invalidate the entry in dispatch list */
00474               LRU_invalidate(async_op_lru, pentry_dispatch);
00475             }
00476         }
00477 
00478       /* Increment the passcounter */
00479       passcounter += 1;
00480 
00481       /* Put the invalid entries back to pool (they have been managed */
00482       if(passcounter > mfsl_param.nb_before_gc)
00483         {
00484           if(LRU_gc_invalid(async_op_lru, NULL) != LRU_LIST_SUCCESS)
00485             LogMajor(COMPONENT_MFSL,
00486                 "/!\\ : Could not gc on LRU list for not dispatched asynchronous operations");
00487 
00488           passcounter = 0;
00489         }
00490 
00491       V(mutex_async_list);
00492     }
00493 
00494   LogMajor(COMPONENT_MFSL, "Terminated...");
00495 
00496   /* Should never occur (neverending loop) */
00497   return NULL;
00498 }                               /* mfsl_async_asynchronous_dispatcher_thread */