nfs-ganesha 1.4

handle_mapping_db.c

Go to the documentation of this file.
00001 #include "config.h"
00002 #include "handle_mapping.h"
00003 #include "handle_mapping_db.h"
00004 #include "handle_mapping_internal.h"
00005 #include "../fsal_internal.h"
00006 #include <sqlite3.h>
00007 #include <sys/types.h>
00008 #include <dirent.h>
00009 #include <fnmatch.h>
00010 #include <pthread.h>
00011 
00012 /* sqlite check macros */
00013 
00014 #define CheckTable( _p_conn_, _code_, _msg_str_, _result_ ) do { \
00015  if ( (_code_) != SQLITE_OK ) \
00016     {                                                                             \
00017         LogCrit(COMPONENT_FSAL, "SQLite command failed in %s line %i", __FUNCTION__, __LINE__); \
00018         LogCrit(COMPONENT_FSAL, "%s (%d)", (_msg_str_?_msg_str_:sqlite3_errmsg(_p_conn_)), _code_ ); \
00019         if (_msg_str_) { sqlite3_free( _msg_str_); _msg_str_ = NULL; }     \
00020         if (_result_) { sqlite3_free_table( _result_ ); _result_ = NULL; } \
00021         return HANDLEMAP_DB_ERROR;  \
00022     } \
00023   } while (0)
00024 
00025 #define CheckCommand( _p_conn_, _code_, _msg_str_ ) do { \
00026   if ( (_code_) != SQLITE_OK ) \
00027     {                                                                             \
00028         LogCrit(COMPONENT_FSAL, "SQLite command failed in %s line %i", __FUNCTION__, __LINE__); \
00029         LogCrit(COMPONENT_FSAL, "%s (%d)", (_msg_str_?_msg_str_:sqlite3_errmsg(_p_conn_)), _code_ ); \
00030         if (_msg_str_) {sqlite3_free( _msg_str_); _msg_str_ = NULL ; } \
00031         return HANDLEMAP_DB_ERROR;  \
00032     } \
00033   } while (0)
00034 
00035 #define CheckPrepare( _p_conn_, _code_ ) do { \
00036   if ( (_code_) != SQLITE_OK)                        \
00037     {                                                                             \
00038         LogCrit(COMPONENT_FSAL, "SQLite prepare statement failed in %s line %i", __FUNCTION__, __LINE__); \
00039         LogCrit(COMPONENT_FSAL, "%s (%d)", sqlite3_errmsg( _p_conn_ ), _code_ ); \
00040         return HANDLEMAP_DB_ERROR;  \
00041     } \
00042   } while (0)
00043 
00044 #define CheckBind( _p_conn_, _code_, _stmt_ ) do { \
00045   if ( (_code_) != SQLITE_OK)                                                        \
00046     {                                                                                \
00047         LogCrit(COMPONENT_FSAL, "SQLite parameter binding failed in %s line %i", __FUNCTION__, __LINE__); \
00048         LogCrit(COMPONENT_FSAL, "%s (%d)", sqlite3_errmsg( _p_conn_ ), _code_ ); \
00049         sqlite3_clear_bindings( _stmt_ ); \
00050         return HANDLEMAP_DB_ERROR;  \
00051     }  \
00052   } while (0)
00053 
00054 #define CheckStep( _p_conn_, _code_, _stmt_ ) do { \
00055   if ( (_code_) != SQLITE_OK  && (_code_) != SQLITE_ROW && (_code_) != SQLITE_DONE ) \
00056     {                                                                                \
00057         LogCrit(COMPONENT_FSAL, "SQLite command failed in %s line %i", __FUNCTION__, __LINE__); \
00058         LogCrit(COMPONENT_FSAL, "%s (%d)", sqlite3_errmsg( _p_conn_ ), _code_ ); \
00059         sqlite3_reset( _stmt_ );    \
00060         return HANDLEMAP_DB_ERROR;  \
00061     }  \
00062   } while (0)
00063 
00064 /* Type of DB operations */
00065 typedef enum
00066 {
00067   LOAD = 1,
00068   INSERT,
00069   DELETE
00070 } db_op_type;
00071 
00072 /* DB operation arguments */
00073 typedef struct db_op_item__
00074 {
00075   db_op_type op_type;
00076 
00077   /* operation info */
00078 
00079   union
00080   {
00081     struct
00082     {
00083       nfs23_map_handle_t nfs23_digest;
00084       fsal_handle_t fsal_handle;
00085     } fh_info;
00086 
00087     hash_table_t *hash;
00088   } op_arg;
00089 
00090   /* for chained list */
00091   struct db_op_item__ *p_next;
00092 
00093 } db_op_item_t;
00094 
00095 /* the queue for each DB flusher thread */
00096 typedef struct flusher_queue__
00097 {
00098   /* the queue for high priority operations */
00099   db_op_item_t *highprio_first;
00100   db_op_item_t *highprio_last;
00101 
00102   /* the queue for low priority operations */
00103   db_op_item_t *lowprio_first;
00104   db_op_item_t *lowprio_last;
00105 
00106   /* number of operations pending */
00107   unsigned int nb_waiting;
00108 
00109   pthread_mutex_t queues_mutex;
00110 
00111   pthread_cond_t work_avail_condition;
00112   pthread_cond_t work_done_condition;
00113 
00114   /* status (used for work_done_condition) */
00115   enum
00116   { NOT_READY, IDLE, WORKING, FINISHED } status;
00117 
00118 } flusher_queue_t;
00119 
00120 #define LOAD_ALL_STATEMENT  0
00121 #define INSERT_STATEMENT    1
00122 #define DELETE_STATEMENT    2
00123 
00124 #define STATEMENT_COUNT     3
00125 
00126 /* thread info */
00127 typedef struct db_thread_info__
00128 {
00129   pthread_t thr_id;
00130   unsigned int thr_index;
00131 
00132   flusher_queue_t work_queue;
00133 
00134   /* SQLite database connection */
00135   sqlite3 *db_conn;
00136 
00137   /* prepared statement table */
00138   sqlite3_stmt *prep_stmt[STATEMENT_COUNT];
00139 
00140   /* this pool is accessed by submitter
00141    * and by the db thread */
00142   pthread_mutex_t pool_mutex;
00143   pool_t *dbop_pool;
00144 
00145 } db_thread_info_t;
00146 
00147 static char dbmap_dir[MAXPATHLEN];
00148 static char db_tmpdir[MAXPATHLEN];
00149 static unsigned int nb_db_threads;
00150 static int synchronous;
00151 
00152 /* used for clean shutdown */
00153 static int do_terminate = FALSE;
00154 
00155 /* all information and context for threads */
00156 static db_thread_info_t db_thread[MAX_DB];
00157 
00158 /* Initialize basic structures for a thread */
00159 static int init_db_thread_info(db_thread_info_t * p_thr_info,
00160                                unsigned int nb_dbop_prealloc)
00161 {
00162   unsigned int i;
00163 
00164   if(!p_thr_info)
00165     return HANDLEMAP_INTERNAL_ERROR;
00166 
00167   memset(p_thr_info, 0, sizeof(db_thread_info_t));
00168 
00169   p_thr_info->work_queue.highprio_first = NULL;
00170   p_thr_info->work_queue.highprio_last = NULL;
00171   p_thr_info->work_queue.lowprio_first = NULL;
00172   p_thr_info->work_queue.lowprio_last = NULL;
00173 
00174   p_thr_info->work_queue.nb_waiting = 0;
00175 
00176   if(pthread_mutex_init(&p_thr_info->work_queue.queues_mutex, NULL))
00177     return HANDLEMAP_SYSTEM_ERROR;
00178 
00179   if(pthread_cond_init(&p_thr_info->work_queue.work_avail_condition, NULL))
00180     return HANDLEMAP_SYSTEM_ERROR;
00181 
00182   if(pthread_cond_init(&p_thr_info->work_queue.work_done_condition, NULL))
00183     return HANDLEMAP_SYSTEM_ERROR;
00184 
00185   /* init thread status */
00186   p_thr_info->work_queue.status = NOT_READY;
00187 
00188   p_thr_info->db_conn = NULL;
00189 
00190   for(i = 0; i < STATEMENT_COUNT; i++)
00191     p_thr_info->prep_stmt[i] = NULL;
00192 
00193   /* init memory pool */
00194 
00195   if(pthread_mutex_init(&p_thr_info->pool_mutex, NULL))
00196     return HANDLEMAP_SYSTEM_ERROR;
00197 
00198   p_thr_info->dbop_pool =
00199        pool_init(NULL, sizeof(db_op_item_t),
00200                  pool_basic_substrate,
00201                  NULL, NULL, NULL);
00202 
00203   return HANDLEMAP_SUCCESS;
00204 }
00205 
00206 /* Called by a thread to initialize its database access.
00207  * After this call:
00208  *  - database connection is established
00209  *  - schema is created
00210  *  - prepared statements are ready to be used
00211  */
00212 static int init_database_access(db_thread_info_t * p_thr_info)
00213 {
00214   char db_file[MAXPATHLEN];
00215   int rc;
00216   char **result = NULL;
00217   int rows, cols;
00218   char *errmsg = NULL;
00219   const char *unparsed;
00220 
00221   /* first open the database file */
00222 
00223   snprintf(db_file, MAXPATHLEN, "%s/%s.%u", dbmap_dir, DB_FILE_PREFIX,
00224            p_thr_info->thr_index);
00225 
00226   rc = sqlite3_open(db_file, &p_thr_info->db_conn);
00227 
00228   if(rc != 0)
00229     {
00230       if(p_thr_info->db_conn)
00231         {
00232           LogCrit(COMPONENT_FSAL,
00233                   "ERROR: could not connect to SQLite3 database (file %s): %s",
00234                   db_file, sqlite3_errmsg(p_thr_info->db_conn));
00235           sqlite3_close(p_thr_info->db_conn);
00236         }
00237       else
00238         {
00239           LogCrit(COMPONENT_FSAL,
00240                   "ERROR: could not connect to SQLite3 database (file %s): status=%d",
00241                   db_file, rc);
00242         }
00243       return HANDLEMAP_DB_ERROR;
00244     }
00245 
00246   /* Now check, that the map table exists */
00247   rc = sqlite3_get_table(p_thr_info->db_conn,
00248                          "SELECT name FROM sqlite_master WHERE type = 'table' AND name = '"
00249                          MAP_TABLE "'", &result, &rows, &cols, &errmsg);
00250 
00251   CheckTable(p_thr_info->db_conn, rc, errmsg, result);
00252 
00253   /* no need for the result, just the number of rows returned */
00254   sqlite3_free_table(result);
00255 
00256   if(rows != 1)
00257     {
00258       /* table must be created */
00259       rc = sqlite3_exec(p_thr_info->db_conn,
00260                         "CREATE TABLE " MAP_TABLE " ( "
00261                         OBJID_FIELD "   BIGINT NOT NULL, "
00262                         HASH_FIELD "    INT NOT NULL, "
00263                         HANDLE_FIELD "  TEXT, "
00264                         "PRIMARY KEY(" OBJID_FIELD ", " HASH_FIELD ") )",
00265                         NULL, NULL, &errmsg);
00266 
00267       CheckCommand(p_thr_info->db_conn, rc, errmsg);
00268 
00269     }
00270 
00271   /* Now, create prepared statements */
00272 
00273   rc = sqlite3_prepare_v2(p_thr_info->db_conn,
00274                           "SELECT " OBJID_FIELD "," HASH_FIELD "," HANDLE_FIELD " FROM "
00275                           MAP_TABLE, -1, &(p_thr_info->prep_stmt[LOAD_ALL_STATEMENT]),
00276                           &unparsed);
00277 
00278   CheckPrepare(p_thr_info->db_conn, rc);
00279 
00280   rc = sqlite3_prepare_v2(p_thr_info->db_conn,
00281                           "INSERT INTO " MAP_TABLE "(" OBJID_FIELD "," HASH_FIELD ","
00282                           HANDLE_FIELD ") " "VALUES (?1, ?2, ?3 )", -1,
00283                           &(p_thr_info->prep_stmt[INSERT_STATEMENT]), &unparsed);
00284 
00285   CheckPrepare(p_thr_info->db_conn, rc);
00286 
00287   rc = sqlite3_prepare_v2(p_thr_info->db_conn,
00288                           "DELETE FROM " MAP_TABLE " WHERE " OBJID_FIELD "=?1 AND "
00289                           HASH_FIELD "=?2", -1,
00290                           &(p_thr_info->prep_stmt[DELETE_STATEMENT]), &unparsed);
00291 
00292   CheckPrepare(p_thr_info->db_conn, rc);
00293 
00294   /* Everything is OK now ! */
00295   return HANDLEMAP_SUCCESS;
00296 
00297 }                               /* init_database_access */
00298 
00299 static int db_load_operation(db_thread_info_t * p_info, hash_table_t * p_hash)
00300 {
00301   /* the object id to be inserted to hash table */
00302   uint64_t object_id;
00303   unsigned int handle_hash;
00304   const char *fsal_handle_str;
00305   fsal_handle_t fsal_handle;
00306   unsigned int nb_loaded = 0;
00307   int rc;
00308   struct timeval t1;
00309   struct timeval t2;
00310   struct timeval tdiff;
00311 
00312   gettimeofday(&t1, NULL);
00313 
00314   rc = sqlite3_step(p_info->prep_stmt[LOAD_ALL_STATEMENT]);
00315   CheckStep(p_info->db_conn, rc, p_info->prep_stmt[LOAD_ALL_STATEMENT]);
00316 
00317   /* something to read */
00318   while(rc == SQLITE_ROW)
00319     {
00320       object_id = sqlite3_column_int64(p_info->prep_stmt[LOAD_ALL_STATEMENT], 0);
00321       handle_hash = sqlite3_column_int(p_info->prep_stmt[LOAD_ALL_STATEMENT], 1);
00322       fsal_handle_str = sqlite3_column_text(p_info->prep_stmt[LOAD_ALL_STATEMENT], 2);
00323 
00324       /* convert hexa string representation to binary data */
00325       sscanHandle(&fsal_handle, fsal_handle_str);
00326 
00327       /* now insert it to the hash table */
00328 
00329       rc = handle_mapping_hash_add(p_hash, object_id, handle_hash, &fsal_handle);
00330 
00331       if(rc == 0)
00332         nb_loaded++;
00333       else
00334         LogCrit(COMPONENT_FSAL,
00335                 "ERROR %d adding entry to hash table <object_id=%llu, FH_hash=%u, FSAL_Handle=%s>",
00336                 rc, (unsigned long long)object_id, handle_hash, fsal_handle_str);
00337 
00338       rc = sqlite3_step(p_info->prep_stmt[LOAD_ALL_STATEMENT]);
00339       CheckStep(p_info->db_conn, rc, p_info->prep_stmt[LOAD_ALL_STATEMENT]);
00340 
00341     }
00342 
00343   /* clear results */
00344   sqlite3_reset(p_info->prep_stmt[LOAD_ALL_STATEMENT]);
00345 
00346   /* print time and item count */
00347 
00348   gettimeofday(&t2, NULL);
00349   timersub(&t2, &t1, &tdiff);
00350 
00351   LogEvent(COMPONENT_FSAL, "Reloaded %u items in %d.%06ds",
00352             nb_loaded, (int)tdiff.tv_sec, (int)tdiff.tv_usec);
00353 
00354   return HANDLEMAP_SUCCESS;
00355 
00356 }                               /* db_load_operation */
00357 
00358 static int db_insert_operation(db_thread_info_t * p_info,
00359                                nfs23_map_handle_t * p_nfs23_digest,
00360                                fsal_handle_t * p_handle)
00361 {
00362   int rc;
00363   char handle_str[2 * sizeof(fsal_handle_t) + 1];
00364 
00365   rc = sqlite3_bind_int64(p_info->prep_stmt[INSERT_STATEMENT], 1,
00366                           p_nfs23_digest->object_id);
00367   CheckBind(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]);
00368 
00369   rc = sqlite3_bind_int(p_info->prep_stmt[INSERT_STATEMENT], 2,
00370                         p_nfs23_digest->handle_hash);
00371   CheckBind(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]);
00372 
00373   snprintHandle(handle_str, 2 * sizeof(fsal_handle_t) + 1, p_handle);
00374 
00375   rc = sqlite3_bind_text(p_info->prep_stmt[INSERT_STATEMENT], 3, handle_str, -1,
00376                          SQLITE_STATIC);
00377   CheckBind(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]);
00378 
00379   rc = sqlite3_step(p_info->prep_stmt[INSERT_STATEMENT]);
00380   CheckStep(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]);
00381 
00382   /* clear results */
00383   sqlite3_reset(p_info->prep_stmt[INSERT_STATEMENT]);
00384 
00385   return HANDLEMAP_SUCCESS;
00386 
00387 }                               /* db_insert_operation */
00388 
00389 static int db_delete_operation(db_thread_info_t * p_info,
00390                                nfs23_map_handle_t * p_nfs23_digest)
00391 {
00392   int rc;
00393 
00394   rc = sqlite3_bind_int64(p_info->prep_stmt[DELETE_STATEMENT], 1,
00395                           p_nfs23_digest->object_id);
00396   CheckBind(p_info->db_conn, rc, p_info->prep_stmt[DELETE_STATEMENT]);
00397 
00398   rc = sqlite3_bind_int(p_info->prep_stmt[DELETE_STATEMENT], 2,
00399                         p_nfs23_digest->handle_hash);
00400   CheckBind(p_info->db_conn, rc, p_info->prep_stmt[DELETE_STATEMENT]);
00401 
00402   rc = sqlite3_step(p_info->prep_stmt[DELETE_STATEMENT]);
00403   CheckStep(p_info->db_conn, rc, p_info->prep_stmt[DELETE_STATEMENT]);
00404 
00405   /* clear results */
00406   sqlite3_reset(p_info->prep_stmt[DELETE_STATEMENT]);
00407 
00408   return HANDLEMAP_SUCCESS;
00409 
00410 }                               /* db_delete_operation */
00411 
00412 /* push a task to the queue */
00413 static int dbop_push(flusher_queue_t * p_queue, db_op_item_t * p_op)
00414 {
00415   P(p_queue->queues_mutex);
00416 
00417   /* add an item at the end of the queue */
00418   switch (p_op->op_type)
00419     {
00420     case LOAD:
00421     case INSERT:
00422 
00423       /* high priority operations */
00424 
00425       p_op->p_next = NULL;
00426 
00427       if(p_queue->highprio_last == NULL)
00428         {
00429           /* first operation */
00430           p_queue->highprio_first = p_op;
00431           p_queue->highprio_last = p_op;
00432         }
00433       else
00434         {
00435           p_queue->highprio_last->p_next = p_op;
00436           p_queue->highprio_last = p_op;
00437         }
00438 
00439       p_queue->nb_waiting++;
00440 
00441       break;
00442 
00443     case DELETE:
00444 
00445       /* low priority operation */
00446 
00447       p_op->p_next = NULL;
00448 
00449       if(p_queue->lowprio_last == NULL)
00450         {
00451           /* first operation */
00452           p_queue->lowprio_first = p_op;
00453           p_queue->lowprio_last = p_op;
00454         }
00455       else
00456         {
00457           p_queue->lowprio_last->p_next = p_op;
00458           p_queue->lowprio_last = p_op;
00459         }
00460 
00461       p_queue->nb_waiting++;
00462 
00463       break;
00464 
00465     default:
00466       LogCrit(COMPONENT_FSAL,
00467               "ERROR in dbop_push: Invalid operation type %d", p_op->op_type);
00468     }
00469 
00470   /* there now some work available */
00471   pthread_cond_signal(&p_queue->work_avail_condition);
00472 
00473   V(p_queue->queues_mutex);
00474 
00475   return HANDLEMAP_SUCCESS;
00476 
00477 }
00478 
00479 static void *database_worker_thread(void *arg)
00480 {
00481   db_thread_info_t *p_info = (db_thread_info_t *) arg;
00482   int rc;
00483   db_op_item_t *to_be_done = NULL;
00484   char thread_name[256];
00485 
00486   /* initialize logging */
00487   snprintf(thread_name, 256, "DB thread #%u", p_info->thr_index);
00488   SetNameFunction(thread_name);
00489 
00490   /* initialize memory management */
00491 
00492   rc = init_database_access(p_info);
00493 
00494   if(rc != HANDLEMAP_SUCCESS)
00495     {
00496       /* Failed init */
00497       LogCrit(COMPONENT_FSAL, "ERROR: Database initialization error %d", rc);
00498       exit(rc);
00499     }
00500 
00501   /* main loop */
00502   while(1)
00503     {
00504 
00505       /* Is "work done" or "work available" condition verified ? */
00506 
00507       P(p_info->work_queue.queues_mutex);
00508 
00509       /* nothing to be done ? */
00510       while(p_info->work_queue.highprio_first == NULL
00511             && p_info->work_queue.lowprio_first == NULL)
00512         {
00513           to_be_done = NULL;
00514           p_info->work_queue.status = IDLE;
00515           pthread_cond_signal(&p_info->work_queue.work_done_condition);
00516 
00517           /* if termination is requested, exit */
00518           if(do_terminate)
00519             {
00520               p_info->work_queue.status = FINISHED;
00521               V(p_info->work_queue.queues_mutex);
00522               return (void *)p_info;
00523             }
00524 
00525           /* else, wait for something to do */
00526           pthread_cond_wait(&p_info->work_queue.work_avail_condition,
00527                             &p_info->work_queue.queues_mutex);
00528 
00529         }
00530 
00531       /* there is something to do : first check the highest priority list,
00532        * then the lower priority.
00533        */
00534 
00535       if(p_info->work_queue.highprio_first != NULL)
00536         {
00537           /* take the next item in the list */
00538           to_be_done = p_info->work_queue.highprio_first;
00539           p_info->work_queue.highprio_first = to_be_done->p_next;
00540 
00541           /* still any entries in the list ? */
00542           if(p_info->work_queue.highprio_first == NULL)
00543             p_info->work_queue.highprio_last = NULL;
00544           /* it it the last entry ? */
00545           else if(p_info->work_queue.highprio_first->p_next == NULL)
00546             p_info->work_queue.highprio_last = p_info->work_queue.highprio_first;
00547 
00548           /* something to do */
00549           p_info->work_queue.status = WORKING;
00550         }
00551       else if(p_info->work_queue.lowprio_first != NULL)
00552         {
00553           /* take the next item in the list */
00554           to_be_done = p_info->work_queue.lowprio_first;
00555           p_info->work_queue.lowprio_first = to_be_done->p_next;
00556 
00557           /* still any entries in the list ? */
00558           if(p_info->work_queue.lowprio_first == NULL)
00559             p_info->work_queue.lowprio_last = NULL;
00560           /* it it the last entry ? */
00561           else if(p_info->work_queue.lowprio_first->p_next == NULL)
00562             p_info->work_queue.lowprio_last = p_info->work_queue.lowprio_first;
00563 
00564           /* something to do */
00565           p_info->work_queue.status = WORKING;
00566         }
00567 
00568       p_info->work_queue.nb_waiting--;
00569 
00570       V(p_info->work_queue.queues_mutex);
00571 
00572       /* PROCESS THE REQUEST */
00573 
00574       switch (to_be_done->op_type)
00575         {
00576         case LOAD:
00577           db_load_operation(p_info, to_be_done->op_arg.hash);
00578           break;
00579 
00580         case INSERT:
00581           db_insert_operation(p_info, &to_be_done->op_arg.fh_info.nfs23_digest,
00582                               &to_be_done->op_arg.fh_info.fsal_handle);
00583           break;
00584 
00585         case DELETE:
00586           db_delete_operation(p_info, &to_be_done->op_arg.fh_info.nfs23_digest);
00587           break;
00588 
00589         default:
00590           LogCrit(COMPONENT_FSAL, "ERROR: Invalid operation type %d",
00591                   to_be_done->op_type);
00592         }
00593 
00594       /* free the db operation item */
00595       P(p_info->pool_mutex);
00596       pool_free(&p_info->dbop_pool, to_be_done);
00597       V(p_info->pool_mutex);
00598 
00599     }                           /* loop forever */
00600 
00601   return (void *)p_info;
00602 }
00603 
00609 int handlemap_db_count(const char *dir)
00610 {
00611   DIR *dir_hdl;
00612   struct dirent direntry;
00613   struct dirent *cookie;
00614   int rc;
00615   char db_pattern[MAXPATHLEN];
00616 
00617   unsigned int count = 0;
00618   int end_of_dir = FALSE;
00619 
00620   snprintf(db_pattern, MAXPATHLEN, "%s.*[0-9]", DB_FILE_PREFIX);
00621 
00622   dir_hdl = opendir(dir);
00623 
00624   if(dir_hdl == NULL)
00625     {
00626       LogCrit(COMPONENT_FSAL, "ERROR: could not access directory %s: %s",
00627               dir, strerror(errno));
00628       return -HANDLEMAP_SYSTEM_ERROR;
00629     }
00630 
00631   do
00632     {
00633       rc = readdir_r(dir_hdl, &direntry, &cookie);
00634 
00635       if(rc == 0 && cookie != NULL)
00636         {
00637           /* go to the next loop if the entry is . or .. */
00638           if(!strcmp(".", direntry.d_name) || !strcmp("..", direntry.d_name))
00639             continue;
00640 
00641           /* does it match the expected db pattern ? */
00642           if(!fnmatch(db_pattern, direntry.d_name, FNM_PATHNAME))
00643             count++;
00644 
00645         }
00646       else if(rc == 0 && cookie == NULL)
00647         {
00648           /* end of dir */
00649           end_of_dir = TRUE;
00650         }
00651       else if(errno != 0)
00652         {
00653           /* error */
00654           LogCrit(COMPONENT_FSAL, "ERROR: error reading directory %s: %s",
00655                   dir, strerror(errno));
00656 
00657           closedir(dir_hdl);
00658           return -HANDLEMAP_SYSTEM_ERROR;
00659         }
00660       else
00661         {
00662           /* end of dir */
00663           end_of_dir = TRUE;
00664         }
00665 
00666     }
00667   while(!end_of_dir);
00668 
00669   closedir(dir_hdl);
00670 
00671   return count;
00672 
00673 }                               /* handlemap_db_count */
00674 
00675 unsigned int select_db_queue(const nfs23_map_handle_t * p_nfs23_digest)
00676 {
00677   unsigned int h =
00678       ((p_nfs23_digest->object_id * 1049) ^ p_nfs23_digest->handle_hash) % 2477;
00679 
00680   h = h % nb_db_threads;
00681 
00682   return h;
00683 }
00684 
00692 int handlemap_db_init(const char *db_dir,
00693                       const char *tmp_dir,
00694                       unsigned int db_count,
00695                       unsigned int nb_dbop_prealloc, int synchronous_insert)
00696 {
00697   unsigned int i;
00698   int rc;
00699 
00700   /* first, save the parameters */
00701 
00702   strncpy(dbmap_dir, db_dir, MAXPATHLEN);
00703   strncpy(db_tmpdir, tmp_dir, MAXPATHLEN);
00704 
00705   if(db_count > MAX_DB)
00706     return HANDLEMAP_INVALID_PARAM;
00707 
00708   nb_db_threads = db_count;
00709   synchronous = synchronous_insert;
00710   /* set global database engine info */
00711 
00712   sqlite3_temp_directory = db_tmpdir;
00713 
00714   /* initialize structures for each thread and launch it */
00715 
00716   for(i = 0; i < nb_db_threads; i++)
00717     {
00719       rc = init_db_thread_info(&db_thread[i], 100);
00720       if(rc)
00721         return rc;
00722 
00723       db_thread[i].thr_index = i;
00724 
00725       rc = pthread_create(&db_thread[i].thr_id, NULL, database_worker_thread,
00726                           &db_thread[i]);
00727       if(rc)
00728         return HANDLEMAP_SYSTEM_ERROR;
00729     }
00730 
00731   /* I'm ready to serve, my Lord ! */
00732   return HANDLEMAP_SUCCESS;
00733 }
00734 
00735 /* wait that a thread has done all its jobs */
00736 static void wait_thread_jobs_finished(db_thread_info_t * p_thr_info)
00737 {
00738 
00739   P(p_thr_info->work_queue.queues_mutex);
00740 
00741   /* wait until the thread has no more tasks in its queue
00742    * and it is no more working
00743    */
00744   while(p_thr_info->work_queue.highprio_first != NULL
00745         || p_thr_info->work_queue.lowprio_first != NULL
00746         || p_thr_info->work_queue.status == WORKING)
00747     pthread_cond_wait(&p_thr_info->work_queue.work_done_condition,
00748                       &p_thr_info->work_queue.queues_mutex);
00749 
00750   V(p_thr_info->work_queue.queues_mutex);
00751 
00752 }
00753 
00760 int handlemap_db_reaload_all(hash_table_t * target_hash)
00761 {
00762   unsigned int i;
00763   db_op_item_t *new_task;
00764   int rc;
00765 
00766   /* give the job to all threads */
00767   for(i = 0; i < nb_db_threads; i++)
00768     {
00769       /* get a new db operation  */
00770       P(db_thread[i].pool_mutex);
00771 
00772       new_task = pool_alloc(&db_thread[i].dbop_pool, NULL);
00773 
00774       V(db_thread[i].pool_mutex);
00775 
00776       if(!new_task)
00777         return HANDLEMAP_SYSTEM_ERROR;
00778 
00779       /* can you fill it ? */
00780       new_task->op_type = LOAD;
00781       new_task->op_arg.hash = target_hash;
00782 
00783       rc = dbop_push(&db_thread[i].work_queue, new_task);
00784 
00785       if(rc)
00786         return rc;
00787     }
00788 
00789   /* wait for all threads to finish their job */
00790 
00791   for(i = 0; i < nb_db_threads; i++)
00792     {
00793       wait_thread_jobs_finished(&db_thread[i]);
00794     }
00795 
00796   return HANDLEMAP_SUCCESS;
00797 
00798 }                               /* handlemap_db_reaload_all */
00799 
00804 int handlemap_db_insert(nfs23_map_handle_t * p_in_nfs23_digest,
00805                         fsal_handle_t * p_in_handle)
00806 {
00807   unsigned int i;
00808   db_op_item_t *new_task;
00809   int rc;
00810 
00811   if(!synchronous)
00812     {
00813       /* which thread is going to handle this inode ? */
00814 
00815       i = select_db_queue(p_in_nfs23_digest);
00816 
00817       /* get a new db operation  */
00818       P(db_thread[i].pool_mutex);
00819 
00820       new_task = pool_alloc(db_thread[i].dbop_pool, NULL);
00821 
00822       V(db_thread[i].pool_mutex);
00823 
00824       if(!new_task)
00825         return HANDLEMAP_SYSTEM_ERROR;
00826 
00827       /* fill the task info */
00828       new_task->op_type = INSERT;
00829       new_task->op_arg.fh_info.nfs23_digest = *p_in_nfs23_digest;
00830       new_task->op_arg.fh_info.fsal_handle = *p_in_handle;
00831 
00832       rc = dbop_push(&db_thread[i].work_queue, new_task);
00833 
00834       if(rc)
00835         return rc;
00836     }
00837   /* else: @todo not supported yet */
00838 
00839   return HANDLEMAP_SUCCESS;
00840 
00841 }
00842 
00848 int handlemap_db_delete(nfs23_map_handle_t * p_in_nfs23_digest)
00849 {
00850   unsigned int i;
00851   db_op_item_t *new_task;
00852   int rc;
00853 
00854   /* which thread is going to handle this inode ? */
00855 
00856   i = select_db_queue(p_in_nfs23_digest);
00857 
00858   /* get a new db operation  */
00859   P(db_thread[i].pool_mutex);
00860 
00861   new_task = pool_alloc(db_thread[i].dbop_pool, NULL);
00862 
00863   V(db_thread[i].pool_mutex);
00864 
00865   if(!new_task)
00866     return HANDLEMAP_SYSTEM_ERROR;
00867 
00868   /* fill the task info */
00869   new_task->op_type = DELETE;
00870   new_task->op_arg.fh_info.nfs23_digest = *p_in_nfs23_digest;
00871 
00872   rc = dbop_push(&db_thread[i].work_queue, new_task);
00873 
00874   if(rc)
00875     return rc;
00876 
00877   return HANDLEMAP_SUCCESS;
00878 
00879 }
00880 
00885 int handlemap_db_flush()
00886 {
00887   unsigned int i;
00888   struct timeval t1;
00889   struct timeval t2;
00890   struct timeval tdiff;
00891   unsigned int to_sync = 0;
00892 
00893   for(i = 0; i < nb_db_threads; i++)
00894     {
00895       to_sync += db_thread[i].work_queue.nb_waiting;
00896     }
00897 
00898   LogEvent(COMPONENT_FSAL,
00899            "Waiting for database synchronization (%u operations pending)",
00900            to_sync);
00901 
00902   gettimeofday(&t1, NULL);
00903 
00904   /* wait for all threads to finish their job */
00905 
00906   for(i = 0; i < nb_db_threads; i++)
00907     {
00908       wait_thread_jobs_finished(&db_thread[i]);
00909     }
00910 
00911   gettimeofday(&t2, NULL);
00912 
00913   timersub(&t2, &t1, &tdiff);
00914 
00915   LogEvent(COMPONENT_FSAL, "Database synchronized in %d.%06ds",
00916            (int)tdiff.tv_sec, (int)tdiff.tv_usec);
00917 
00918   return HANDLEMAP_SUCCESS;
00919 
00920 }