nfs-ganesha 1.4
|
00001 #include "config.h" 00002 #include "handle_mapping.h" 00003 #include "handle_mapping_db.h" 00004 #include "handle_mapping_internal.h" 00005 #include "../fsal_internal.h" 00006 #include <sqlite3.h> 00007 #include <sys/types.h> 00008 #include <dirent.h> 00009 #include <fnmatch.h> 00010 #include <pthread.h> 00011 00012 /* sqlite check macros */ 00013 00014 #define CheckTable( _p_conn_, _code_, _msg_str_, _result_ ) do { \ 00015 if ( (_code_) != SQLITE_OK ) \ 00016 { \ 00017 LogCrit(COMPONENT_FSAL, "SQLite command failed in %s line %i", __FUNCTION__, __LINE__); \ 00018 LogCrit(COMPONENT_FSAL, "%s (%d)", (_msg_str_?_msg_str_:sqlite3_errmsg(_p_conn_)), _code_ ); \ 00019 if (_msg_str_) { sqlite3_free( _msg_str_); _msg_str_ = NULL; } \ 00020 if (_result_) { sqlite3_free_table( _result_ ); _result_ = NULL; } \ 00021 return HANDLEMAP_DB_ERROR; \ 00022 } \ 00023 } while (0) 00024 00025 #define CheckCommand( _p_conn_, _code_, _msg_str_ ) do { \ 00026 if ( (_code_) != SQLITE_OK ) \ 00027 { \ 00028 LogCrit(COMPONENT_FSAL, "SQLite command failed in %s line %i", __FUNCTION__, __LINE__); \ 00029 LogCrit(COMPONENT_FSAL, "%s (%d)", (_msg_str_?_msg_str_:sqlite3_errmsg(_p_conn_)), _code_ ); \ 00030 if (_msg_str_) {sqlite3_free( _msg_str_); _msg_str_ = NULL ; } \ 00031 return HANDLEMAP_DB_ERROR; \ 00032 } \ 00033 } while (0) 00034 00035 #define CheckPrepare( _p_conn_, _code_ ) do { \ 00036 if ( (_code_) != SQLITE_OK) \ 00037 { \ 00038 LogCrit(COMPONENT_FSAL, "SQLite prepare statement failed in %s line %i", __FUNCTION__, __LINE__); \ 00039 LogCrit(COMPONENT_FSAL, "%s (%d)", sqlite3_errmsg( _p_conn_ ), _code_ ); \ 00040 return HANDLEMAP_DB_ERROR; \ 00041 } \ 00042 } while (0) 00043 00044 #define CheckBind( _p_conn_, _code_, _stmt_ ) do { \ 00045 if ( (_code_) != SQLITE_OK) \ 00046 { \ 00047 LogCrit(COMPONENT_FSAL, "SQLite parameter binding failed in %s line %i", __FUNCTION__, __LINE__); \ 00048 LogCrit(COMPONENT_FSAL, "%s (%d)", sqlite3_errmsg( _p_conn_ ), _code_ ); \ 00049 sqlite3_clear_bindings( _stmt_ ); \ 00050 return HANDLEMAP_DB_ERROR; \ 00051 } \ 00052 } while (0) 00053 00054 #define CheckStep( _p_conn_, _code_, _stmt_ ) do { \ 00055 if ( (_code_) != SQLITE_OK && (_code_) != SQLITE_ROW && (_code_) != SQLITE_DONE ) \ 00056 { \ 00057 LogCrit(COMPONENT_FSAL, "SQLite command failed in %s line %i", __FUNCTION__, __LINE__); \ 00058 LogCrit(COMPONENT_FSAL, "%s (%d)", sqlite3_errmsg( _p_conn_ ), _code_ ); \ 00059 sqlite3_reset( _stmt_ ); \ 00060 return HANDLEMAP_DB_ERROR; \ 00061 } \ 00062 } while (0) 00063 00064 /* Type of DB operations */ 00065 typedef enum 00066 { 00067 LOAD = 1, 00068 INSERT, 00069 DELETE 00070 } db_op_type; 00071 00072 /* DB operation arguments */ 00073 typedef struct db_op_item__ 00074 { 00075 db_op_type op_type; 00076 00077 /* operation info */ 00078 00079 union 00080 { 00081 struct 00082 { 00083 nfs23_map_handle_t nfs23_digest; 00084 fsal_handle_t fsal_handle; 00085 } fh_info; 00086 00087 hash_table_t *hash; 00088 } op_arg; 00089 00090 /* for chained list */ 00091 struct db_op_item__ *p_next; 00092 00093 } db_op_item_t; 00094 00095 /* the queue for each DB flusher thread */ 00096 typedef struct flusher_queue__ 00097 { 00098 /* the queue for high priority operations */ 00099 db_op_item_t *highprio_first; 00100 db_op_item_t *highprio_last; 00101 00102 /* the queue for low priority operations */ 00103 db_op_item_t *lowprio_first; 00104 db_op_item_t *lowprio_last; 00105 00106 /* number of operations pending */ 00107 unsigned int nb_waiting; 00108 00109 pthread_mutex_t queues_mutex; 00110 00111 pthread_cond_t work_avail_condition; 00112 pthread_cond_t work_done_condition; 00113 00114 /* status (used for work_done_condition) */ 00115 enum 00116 { NOT_READY, IDLE, WORKING, FINISHED } status; 00117 00118 } flusher_queue_t; 00119 00120 #define LOAD_ALL_STATEMENT 0 00121 #define INSERT_STATEMENT 1 00122 #define DELETE_STATEMENT 2 00123 00124 #define STATEMENT_COUNT 3 00125 00126 /* thread info */ 00127 typedef struct db_thread_info__ 00128 { 00129 pthread_t thr_id; 00130 unsigned int thr_index; 00131 00132 flusher_queue_t work_queue; 00133 00134 /* SQLite database connection */ 00135 sqlite3 *db_conn; 00136 00137 /* prepared statement table */ 00138 sqlite3_stmt *prep_stmt[STATEMENT_COUNT]; 00139 00140 /* this pool is accessed by submitter 00141 * and by the db thread */ 00142 pthread_mutex_t pool_mutex; 00143 pool_t *dbop_pool; 00144 00145 } db_thread_info_t; 00146 00147 static char dbmap_dir[MAXPATHLEN]; 00148 static char db_tmpdir[MAXPATHLEN]; 00149 static unsigned int nb_db_threads; 00150 static int synchronous; 00151 00152 /* used for clean shutdown */ 00153 static int do_terminate = FALSE; 00154 00155 /* all information and context for threads */ 00156 static db_thread_info_t db_thread[MAX_DB]; 00157 00158 /* Initialize basic structures for a thread */ 00159 static int init_db_thread_info(db_thread_info_t * p_thr_info, 00160 unsigned int nb_dbop_prealloc) 00161 { 00162 unsigned int i; 00163 00164 if(!p_thr_info) 00165 return HANDLEMAP_INTERNAL_ERROR; 00166 00167 memset(p_thr_info, 0, sizeof(db_thread_info_t)); 00168 00169 p_thr_info->work_queue.highprio_first = NULL; 00170 p_thr_info->work_queue.highprio_last = NULL; 00171 p_thr_info->work_queue.lowprio_first = NULL; 00172 p_thr_info->work_queue.lowprio_last = NULL; 00173 00174 p_thr_info->work_queue.nb_waiting = 0; 00175 00176 if(pthread_mutex_init(&p_thr_info->work_queue.queues_mutex, NULL)) 00177 return HANDLEMAP_SYSTEM_ERROR; 00178 00179 if(pthread_cond_init(&p_thr_info->work_queue.work_avail_condition, NULL)) 00180 return HANDLEMAP_SYSTEM_ERROR; 00181 00182 if(pthread_cond_init(&p_thr_info->work_queue.work_done_condition, NULL)) 00183 return HANDLEMAP_SYSTEM_ERROR; 00184 00185 /* init thread status */ 00186 p_thr_info->work_queue.status = NOT_READY; 00187 00188 p_thr_info->db_conn = NULL; 00189 00190 for(i = 0; i < STATEMENT_COUNT; i++) 00191 p_thr_info->prep_stmt[i] = NULL; 00192 00193 /* init memory pool */ 00194 00195 if(pthread_mutex_init(&p_thr_info->pool_mutex, NULL)) 00196 return HANDLEMAP_SYSTEM_ERROR; 00197 00198 p_thr_info->dbop_pool = 00199 pool_init(NULL, sizeof(db_op_item_t), 00200 pool_basic_substrate, 00201 NULL, NULL, NULL); 00202 00203 return HANDLEMAP_SUCCESS; 00204 } 00205 00206 /* Called by a thread to initialize its database access. 00207 * After this call: 00208 * - database connection is established 00209 * - schema is created 00210 * - prepared statements are ready to be used 00211 */ 00212 static int init_database_access(db_thread_info_t * p_thr_info) 00213 { 00214 char db_file[MAXPATHLEN]; 00215 int rc; 00216 char **result = NULL; 00217 int rows, cols; 00218 char *errmsg = NULL; 00219 const char *unparsed; 00220 00221 /* first open the database file */ 00222 00223 snprintf(db_file, MAXPATHLEN, "%s/%s.%u", dbmap_dir, DB_FILE_PREFIX, 00224 p_thr_info->thr_index); 00225 00226 rc = sqlite3_open(db_file, &p_thr_info->db_conn); 00227 00228 if(rc != 0) 00229 { 00230 if(p_thr_info->db_conn) 00231 { 00232 LogCrit(COMPONENT_FSAL, 00233 "ERROR: could not connect to SQLite3 database (file %s): %s", 00234 db_file, sqlite3_errmsg(p_thr_info->db_conn)); 00235 sqlite3_close(p_thr_info->db_conn); 00236 } 00237 else 00238 { 00239 LogCrit(COMPONENT_FSAL, 00240 "ERROR: could not connect to SQLite3 database (file %s): status=%d", 00241 db_file, rc); 00242 } 00243 return HANDLEMAP_DB_ERROR; 00244 } 00245 00246 /* Now check, that the map table exists */ 00247 rc = sqlite3_get_table(p_thr_info->db_conn, 00248 "SELECT name FROM sqlite_master WHERE type = 'table' AND name = '" 00249 MAP_TABLE "'", &result, &rows, &cols, &errmsg); 00250 00251 CheckTable(p_thr_info->db_conn, rc, errmsg, result); 00252 00253 /* no need for the result, just the number of rows returned */ 00254 sqlite3_free_table(result); 00255 00256 if(rows != 1) 00257 { 00258 /* table must be created */ 00259 rc = sqlite3_exec(p_thr_info->db_conn, 00260 "CREATE TABLE " MAP_TABLE " ( " 00261 OBJID_FIELD " BIGINT NOT NULL, " 00262 HASH_FIELD " INT NOT NULL, " 00263 HANDLE_FIELD " TEXT, " 00264 "PRIMARY KEY(" OBJID_FIELD ", " HASH_FIELD ") )", 00265 NULL, NULL, &errmsg); 00266 00267 CheckCommand(p_thr_info->db_conn, rc, errmsg); 00268 00269 } 00270 00271 /* Now, create prepared statements */ 00272 00273 rc = sqlite3_prepare_v2(p_thr_info->db_conn, 00274 "SELECT " OBJID_FIELD "," HASH_FIELD "," HANDLE_FIELD " FROM " 00275 MAP_TABLE, -1, &(p_thr_info->prep_stmt[LOAD_ALL_STATEMENT]), 00276 &unparsed); 00277 00278 CheckPrepare(p_thr_info->db_conn, rc); 00279 00280 rc = sqlite3_prepare_v2(p_thr_info->db_conn, 00281 "INSERT INTO " MAP_TABLE "(" OBJID_FIELD "," HASH_FIELD "," 00282 HANDLE_FIELD ") " "VALUES (?1, ?2, ?3 )", -1, 00283 &(p_thr_info->prep_stmt[INSERT_STATEMENT]), &unparsed); 00284 00285 CheckPrepare(p_thr_info->db_conn, rc); 00286 00287 rc = sqlite3_prepare_v2(p_thr_info->db_conn, 00288 "DELETE FROM " MAP_TABLE " WHERE " OBJID_FIELD "=?1 AND " 00289 HASH_FIELD "=?2", -1, 00290 &(p_thr_info->prep_stmt[DELETE_STATEMENT]), &unparsed); 00291 00292 CheckPrepare(p_thr_info->db_conn, rc); 00293 00294 /* Everything is OK now ! */ 00295 return HANDLEMAP_SUCCESS; 00296 00297 } /* init_database_access */ 00298 00299 static int db_load_operation(db_thread_info_t * p_info, hash_table_t * p_hash) 00300 { 00301 /* the object id to be inserted to hash table */ 00302 uint64_t object_id; 00303 unsigned int handle_hash; 00304 const char *fsal_handle_str; 00305 fsal_handle_t fsal_handle; 00306 unsigned int nb_loaded = 0; 00307 int rc; 00308 struct timeval t1; 00309 struct timeval t2; 00310 struct timeval tdiff; 00311 00312 gettimeofday(&t1, NULL); 00313 00314 rc = sqlite3_step(p_info->prep_stmt[LOAD_ALL_STATEMENT]); 00315 CheckStep(p_info->db_conn, rc, p_info->prep_stmt[LOAD_ALL_STATEMENT]); 00316 00317 /* something to read */ 00318 while(rc == SQLITE_ROW) 00319 { 00320 object_id = sqlite3_column_int64(p_info->prep_stmt[LOAD_ALL_STATEMENT], 0); 00321 handle_hash = sqlite3_column_int(p_info->prep_stmt[LOAD_ALL_STATEMENT], 1); 00322 fsal_handle_str = sqlite3_column_text(p_info->prep_stmt[LOAD_ALL_STATEMENT], 2); 00323 00324 /* convert hexa string representation to binary data */ 00325 sscanHandle(&fsal_handle, fsal_handle_str); 00326 00327 /* now insert it to the hash table */ 00328 00329 rc = handle_mapping_hash_add(p_hash, object_id, handle_hash, &fsal_handle); 00330 00331 if(rc == 0) 00332 nb_loaded++; 00333 else 00334 LogCrit(COMPONENT_FSAL, 00335 "ERROR %d adding entry to hash table <object_id=%llu, FH_hash=%u, FSAL_Handle=%s>", 00336 rc, (unsigned long long)object_id, handle_hash, fsal_handle_str); 00337 00338 rc = sqlite3_step(p_info->prep_stmt[LOAD_ALL_STATEMENT]); 00339 CheckStep(p_info->db_conn, rc, p_info->prep_stmt[LOAD_ALL_STATEMENT]); 00340 00341 } 00342 00343 /* clear results */ 00344 sqlite3_reset(p_info->prep_stmt[LOAD_ALL_STATEMENT]); 00345 00346 /* print time and item count */ 00347 00348 gettimeofday(&t2, NULL); 00349 timersub(&t2, &t1, &tdiff); 00350 00351 LogEvent(COMPONENT_FSAL, "Reloaded %u items in %d.%06ds", 00352 nb_loaded, (int)tdiff.tv_sec, (int)tdiff.tv_usec); 00353 00354 return HANDLEMAP_SUCCESS; 00355 00356 } /* db_load_operation */ 00357 00358 static int db_insert_operation(db_thread_info_t * p_info, 00359 nfs23_map_handle_t * p_nfs23_digest, 00360 fsal_handle_t * p_handle) 00361 { 00362 int rc; 00363 char handle_str[2 * sizeof(fsal_handle_t) + 1]; 00364 00365 rc = sqlite3_bind_int64(p_info->prep_stmt[INSERT_STATEMENT], 1, 00366 p_nfs23_digest->object_id); 00367 CheckBind(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]); 00368 00369 rc = sqlite3_bind_int(p_info->prep_stmt[INSERT_STATEMENT], 2, 00370 p_nfs23_digest->handle_hash); 00371 CheckBind(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]); 00372 00373 snprintHandle(handle_str, 2 * sizeof(fsal_handle_t) + 1, p_handle); 00374 00375 rc = sqlite3_bind_text(p_info->prep_stmt[INSERT_STATEMENT], 3, handle_str, -1, 00376 SQLITE_STATIC); 00377 CheckBind(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]); 00378 00379 rc = sqlite3_step(p_info->prep_stmt[INSERT_STATEMENT]); 00380 CheckStep(p_info->db_conn, rc, p_info->prep_stmt[INSERT_STATEMENT]); 00381 00382 /* clear results */ 00383 sqlite3_reset(p_info->prep_stmt[INSERT_STATEMENT]); 00384 00385 return HANDLEMAP_SUCCESS; 00386 00387 } /* db_insert_operation */ 00388 00389 static int db_delete_operation(db_thread_info_t * p_info, 00390 nfs23_map_handle_t * p_nfs23_digest) 00391 { 00392 int rc; 00393 00394 rc = sqlite3_bind_int64(p_info->prep_stmt[DELETE_STATEMENT], 1, 00395 p_nfs23_digest->object_id); 00396 CheckBind(p_info->db_conn, rc, p_info->prep_stmt[DELETE_STATEMENT]); 00397 00398 rc = sqlite3_bind_int(p_info->prep_stmt[DELETE_STATEMENT], 2, 00399 p_nfs23_digest->handle_hash); 00400 CheckBind(p_info->db_conn, rc, p_info->prep_stmt[DELETE_STATEMENT]); 00401 00402 rc = sqlite3_step(p_info->prep_stmt[DELETE_STATEMENT]); 00403 CheckStep(p_info->db_conn, rc, p_info->prep_stmt[DELETE_STATEMENT]); 00404 00405 /* clear results */ 00406 sqlite3_reset(p_info->prep_stmt[DELETE_STATEMENT]); 00407 00408 return HANDLEMAP_SUCCESS; 00409 00410 } /* db_delete_operation */ 00411 00412 /* push a task to the queue */ 00413 static int dbop_push(flusher_queue_t * p_queue, db_op_item_t * p_op) 00414 { 00415 P(p_queue->queues_mutex); 00416 00417 /* add an item at the end of the queue */ 00418 switch (p_op->op_type) 00419 { 00420 case LOAD: 00421 case INSERT: 00422 00423 /* high priority operations */ 00424 00425 p_op->p_next = NULL; 00426 00427 if(p_queue->highprio_last == NULL) 00428 { 00429 /* first operation */ 00430 p_queue->highprio_first = p_op; 00431 p_queue->highprio_last = p_op; 00432 } 00433 else 00434 { 00435 p_queue->highprio_last->p_next = p_op; 00436 p_queue->highprio_last = p_op; 00437 } 00438 00439 p_queue->nb_waiting++; 00440 00441 break; 00442 00443 case DELETE: 00444 00445 /* low priority operation */ 00446 00447 p_op->p_next = NULL; 00448 00449 if(p_queue->lowprio_last == NULL) 00450 { 00451 /* first operation */ 00452 p_queue->lowprio_first = p_op; 00453 p_queue->lowprio_last = p_op; 00454 } 00455 else 00456 { 00457 p_queue->lowprio_last->p_next = p_op; 00458 p_queue->lowprio_last = p_op; 00459 } 00460 00461 p_queue->nb_waiting++; 00462 00463 break; 00464 00465 default: 00466 LogCrit(COMPONENT_FSAL, 00467 "ERROR in dbop_push: Invalid operation type %d", p_op->op_type); 00468 } 00469 00470 /* there now some work available */ 00471 pthread_cond_signal(&p_queue->work_avail_condition); 00472 00473 V(p_queue->queues_mutex); 00474 00475 return HANDLEMAP_SUCCESS; 00476 00477 } 00478 00479 static void *database_worker_thread(void *arg) 00480 { 00481 db_thread_info_t *p_info = (db_thread_info_t *) arg; 00482 int rc; 00483 db_op_item_t *to_be_done = NULL; 00484 char thread_name[256]; 00485 00486 /* initialize logging */ 00487 snprintf(thread_name, 256, "DB thread #%u", p_info->thr_index); 00488 SetNameFunction(thread_name); 00489 00490 /* initialize memory management */ 00491 00492 rc = init_database_access(p_info); 00493 00494 if(rc != HANDLEMAP_SUCCESS) 00495 { 00496 /* Failed init */ 00497 LogCrit(COMPONENT_FSAL, "ERROR: Database initialization error %d", rc); 00498 exit(rc); 00499 } 00500 00501 /* main loop */ 00502 while(1) 00503 { 00504 00505 /* Is "work done" or "work available" condition verified ? */ 00506 00507 P(p_info->work_queue.queues_mutex); 00508 00509 /* nothing to be done ? */ 00510 while(p_info->work_queue.highprio_first == NULL 00511 && p_info->work_queue.lowprio_first == NULL) 00512 { 00513 to_be_done = NULL; 00514 p_info->work_queue.status = IDLE; 00515 pthread_cond_signal(&p_info->work_queue.work_done_condition); 00516 00517 /* if termination is requested, exit */ 00518 if(do_terminate) 00519 { 00520 p_info->work_queue.status = FINISHED; 00521 V(p_info->work_queue.queues_mutex); 00522 return (void *)p_info; 00523 } 00524 00525 /* else, wait for something to do */ 00526 pthread_cond_wait(&p_info->work_queue.work_avail_condition, 00527 &p_info->work_queue.queues_mutex); 00528 00529 } 00530 00531 /* there is something to do : first check the highest priority list, 00532 * then the lower priority. 00533 */ 00534 00535 if(p_info->work_queue.highprio_first != NULL) 00536 { 00537 /* take the next item in the list */ 00538 to_be_done = p_info->work_queue.highprio_first; 00539 p_info->work_queue.highprio_first = to_be_done->p_next; 00540 00541 /* still any entries in the list ? */ 00542 if(p_info->work_queue.highprio_first == NULL) 00543 p_info->work_queue.highprio_last = NULL; 00544 /* it it the last entry ? */ 00545 else if(p_info->work_queue.highprio_first->p_next == NULL) 00546 p_info->work_queue.highprio_last = p_info->work_queue.highprio_first; 00547 00548 /* something to do */ 00549 p_info->work_queue.status = WORKING; 00550 } 00551 else if(p_info->work_queue.lowprio_first != NULL) 00552 { 00553 /* take the next item in the list */ 00554 to_be_done = p_info->work_queue.lowprio_first; 00555 p_info->work_queue.lowprio_first = to_be_done->p_next; 00556 00557 /* still any entries in the list ? */ 00558 if(p_info->work_queue.lowprio_first == NULL) 00559 p_info->work_queue.lowprio_last = NULL; 00560 /* it it the last entry ? */ 00561 else if(p_info->work_queue.lowprio_first->p_next == NULL) 00562 p_info->work_queue.lowprio_last = p_info->work_queue.lowprio_first; 00563 00564 /* something to do */ 00565 p_info->work_queue.status = WORKING; 00566 } 00567 00568 p_info->work_queue.nb_waiting--; 00569 00570 V(p_info->work_queue.queues_mutex); 00571 00572 /* PROCESS THE REQUEST */ 00573 00574 switch (to_be_done->op_type) 00575 { 00576 case LOAD: 00577 db_load_operation(p_info, to_be_done->op_arg.hash); 00578 break; 00579 00580 case INSERT: 00581 db_insert_operation(p_info, &to_be_done->op_arg.fh_info.nfs23_digest, 00582 &to_be_done->op_arg.fh_info.fsal_handle); 00583 break; 00584 00585 case DELETE: 00586 db_delete_operation(p_info, &to_be_done->op_arg.fh_info.nfs23_digest); 00587 break; 00588 00589 default: 00590 LogCrit(COMPONENT_FSAL, "ERROR: Invalid operation type %d", 00591 to_be_done->op_type); 00592 } 00593 00594 /* free the db operation item */ 00595 P(p_info->pool_mutex); 00596 pool_free(&p_info->dbop_pool, to_be_done); 00597 V(p_info->pool_mutex); 00598 00599 } /* loop forever */ 00600 00601 return (void *)p_info; 00602 } 00603 00609 int handlemap_db_count(const char *dir) 00610 { 00611 DIR *dir_hdl; 00612 struct dirent direntry; 00613 struct dirent *cookie; 00614 int rc; 00615 char db_pattern[MAXPATHLEN]; 00616 00617 unsigned int count = 0; 00618 int end_of_dir = FALSE; 00619 00620 snprintf(db_pattern, MAXPATHLEN, "%s.*[0-9]", DB_FILE_PREFIX); 00621 00622 dir_hdl = opendir(dir); 00623 00624 if(dir_hdl == NULL) 00625 { 00626 LogCrit(COMPONENT_FSAL, "ERROR: could not access directory %s: %s", 00627 dir, strerror(errno)); 00628 return -HANDLEMAP_SYSTEM_ERROR; 00629 } 00630 00631 do 00632 { 00633 rc = readdir_r(dir_hdl, &direntry, &cookie); 00634 00635 if(rc == 0 && cookie != NULL) 00636 { 00637 /* go to the next loop if the entry is . or .. */ 00638 if(!strcmp(".", direntry.d_name) || !strcmp("..", direntry.d_name)) 00639 continue; 00640 00641 /* does it match the expected db pattern ? */ 00642 if(!fnmatch(db_pattern, direntry.d_name, FNM_PATHNAME)) 00643 count++; 00644 00645 } 00646 else if(rc == 0 && cookie == NULL) 00647 { 00648 /* end of dir */ 00649 end_of_dir = TRUE; 00650 } 00651 else if(errno != 0) 00652 { 00653 /* error */ 00654 LogCrit(COMPONENT_FSAL, "ERROR: error reading directory %s: %s", 00655 dir, strerror(errno)); 00656 00657 closedir(dir_hdl); 00658 return -HANDLEMAP_SYSTEM_ERROR; 00659 } 00660 else 00661 { 00662 /* end of dir */ 00663 end_of_dir = TRUE; 00664 } 00665 00666 } 00667 while(!end_of_dir); 00668 00669 closedir(dir_hdl); 00670 00671 return count; 00672 00673 } /* handlemap_db_count */ 00674 00675 unsigned int select_db_queue(const nfs23_map_handle_t * p_nfs23_digest) 00676 { 00677 unsigned int h = 00678 ((p_nfs23_digest->object_id * 1049) ^ p_nfs23_digest->handle_hash) % 2477; 00679 00680 h = h % nb_db_threads; 00681 00682 return h; 00683 } 00684 00692 int handlemap_db_init(const char *db_dir, 00693 const char *tmp_dir, 00694 unsigned int db_count, 00695 unsigned int nb_dbop_prealloc, int synchronous_insert) 00696 { 00697 unsigned int i; 00698 int rc; 00699 00700 /* first, save the parameters */ 00701 00702 strncpy(dbmap_dir, db_dir, MAXPATHLEN); 00703 strncpy(db_tmpdir, tmp_dir, MAXPATHLEN); 00704 00705 if(db_count > MAX_DB) 00706 return HANDLEMAP_INVALID_PARAM; 00707 00708 nb_db_threads = db_count; 00709 synchronous = synchronous_insert; 00710 /* set global database engine info */ 00711 00712 sqlite3_temp_directory = db_tmpdir; 00713 00714 /* initialize structures for each thread and launch it */ 00715 00716 for(i = 0; i < nb_db_threads; i++) 00717 { 00719 rc = init_db_thread_info(&db_thread[i], 100); 00720 if(rc) 00721 return rc; 00722 00723 db_thread[i].thr_index = i; 00724 00725 rc = pthread_create(&db_thread[i].thr_id, NULL, database_worker_thread, 00726 &db_thread[i]); 00727 if(rc) 00728 return HANDLEMAP_SYSTEM_ERROR; 00729 } 00730 00731 /* I'm ready to serve, my Lord ! */ 00732 return HANDLEMAP_SUCCESS; 00733 } 00734 00735 /* wait that a thread has done all its jobs */ 00736 static void wait_thread_jobs_finished(db_thread_info_t * p_thr_info) 00737 { 00738 00739 P(p_thr_info->work_queue.queues_mutex); 00740 00741 /* wait until the thread has no more tasks in its queue 00742 * and it is no more working 00743 */ 00744 while(p_thr_info->work_queue.highprio_first != NULL 00745 || p_thr_info->work_queue.lowprio_first != NULL 00746 || p_thr_info->work_queue.status == WORKING) 00747 pthread_cond_wait(&p_thr_info->work_queue.work_done_condition, 00748 &p_thr_info->work_queue.queues_mutex); 00749 00750 V(p_thr_info->work_queue.queues_mutex); 00751 00752 } 00753 00760 int handlemap_db_reaload_all(hash_table_t * target_hash) 00761 { 00762 unsigned int i; 00763 db_op_item_t *new_task; 00764 int rc; 00765 00766 /* give the job to all threads */ 00767 for(i = 0; i < nb_db_threads; i++) 00768 { 00769 /* get a new db operation */ 00770 P(db_thread[i].pool_mutex); 00771 00772 new_task = pool_alloc(&db_thread[i].dbop_pool, NULL); 00773 00774 V(db_thread[i].pool_mutex); 00775 00776 if(!new_task) 00777 return HANDLEMAP_SYSTEM_ERROR; 00778 00779 /* can you fill it ? */ 00780 new_task->op_type = LOAD; 00781 new_task->op_arg.hash = target_hash; 00782 00783 rc = dbop_push(&db_thread[i].work_queue, new_task); 00784 00785 if(rc) 00786 return rc; 00787 } 00788 00789 /* wait for all threads to finish their job */ 00790 00791 for(i = 0; i < nb_db_threads; i++) 00792 { 00793 wait_thread_jobs_finished(&db_thread[i]); 00794 } 00795 00796 return HANDLEMAP_SUCCESS; 00797 00798 } /* handlemap_db_reaload_all */ 00799 00804 int handlemap_db_insert(nfs23_map_handle_t * p_in_nfs23_digest, 00805 fsal_handle_t * p_in_handle) 00806 { 00807 unsigned int i; 00808 db_op_item_t *new_task; 00809 int rc; 00810 00811 if(!synchronous) 00812 { 00813 /* which thread is going to handle this inode ? */ 00814 00815 i = select_db_queue(p_in_nfs23_digest); 00816 00817 /* get a new db operation */ 00818 P(db_thread[i].pool_mutex); 00819 00820 new_task = pool_alloc(db_thread[i].dbop_pool, NULL); 00821 00822 V(db_thread[i].pool_mutex); 00823 00824 if(!new_task) 00825 return HANDLEMAP_SYSTEM_ERROR; 00826 00827 /* fill the task info */ 00828 new_task->op_type = INSERT; 00829 new_task->op_arg.fh_info.nfs23_digest = *p_in_nfs23_digest; 00830 new_task->op_arg.fh_info.fsal_handle = *p_in_handle; 00831 00832 rc = dbop_push(&db_thread[i].work_queue, new_task); 00833 00834 if(rc) 00835 return rc; 00836 } 00837 /* else: @todo not supported yet */ 00838 00839 return HANDLEMAP_SUCCESS; 00840 00841 } 00842 00848 int handlemap_db_delete(nfs23_map_handle_t * p_in_nfs23_digest) 00849 { 00850 unsigned int i; 00851 db_op_item_t *new_task; 00852 int rc; 00853 00854 /* which thread is going to handle this inode ? */ 00855 00856 i = select_db_queue(p_in_nfs23_digest); 00857 00858 /* get a new db operation */ 00859 P(db_thread[i].pool_mutex); 00860 00861 new_task = pool_alloc(db_thread[i].dbop_pool, NULL); 00862 00863 V(db_thread[i].pool_mutex); 00864 00865 if(!new_task) 00866 return HANDLEMAP_SYSTEM_ERROR; 00867 00868 /* fill the task info */ 00869 new_task->op_type = DELETE; 00870 new_task->op_arg.fh_info.nfs23_digest = *p_in_nfs23_digest; 00871 00872 rc = dbop_push(&db_thread[i].work_queue, new_task); 00873 00874 if(rc) 00875 return rc; 00876 00877 return HANDLEMAP_SUCCESS; 00878 00879 } 00880 00885 int handlemap_db_flush() 00886 { 00887 unsigned int i; 00888 struct timeval t1; 00889 struct timeval t2; 00890 struct timeval tdiff; 00891 unsigned int to_sync = 0; 00892 00893 for(i = 0; i < nb_db_threads; i++) 00894 { 00895 to_sync += db_thread[i].work_queue.nb_waiting; 00896 } 00897 00898 LogEvent(COMPONENT_FSAL, 00899 "Waiting for database synchronization (%u operations pending)", 00900 to_sync); 00901 00902 gettimeofday(&t1, NULL); 00903 00904 /* wait for all threads to finish their job */ 00905 00906 for(i = 0; i < nb_db_threads; i++) 00907 { 00908 wait_thread_jobs_finished(&db_thread[i]); 00909 } 00910 00911 gettimeofday(&t2, NULL); 00912 00913 timersub(&t2, &t1, &tdiff); 00914 00915 LogEvent(COMPONENT_FSAL, "Database synchronized in %d.%06ds", 00916 (int)tdiff.tv_sec, (int)tdiff.tv_usec); 00917 00918 return HANDLEMAP_SUCCESS; 00919 00920 }