nfs-ganesha 1.4
|
00001 /* 00002 * vim:expandtab:shiftwidth=8:tabstop=8: 00003 * 00004 * Copyright (C) 2010, The Linux Box Corporation 00005 * Contributor : Matt Benjamin <matt@linuxbox.com> 00006 * 00007 * Some portions Copyright CEA/DAM/DIF (2008) 00008 * contributeur : Philippe DENIEL philippe.deniel@cea.fr 00009 * Thomas LEIBOVICI thomas.leibovici@cea.fr 00010 * 00011 * 00012 * This program is free software; you can redistribute it and/or 00013 * modify it under the terms of the GNU Lesser General Public 00014 * License as published by the Free Software Foundation; either 00015 * version 3 of the License, or (at your option) any later version. 00016 * 00017 * This program is distributed in the hope that it will be useful, 00018 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00019 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00020 * Lesser General Public License for more details. 00021 * 00022 * You should have received a copy of the GNU Lesser General Public 00023 * License along with this library; if not, write to the Free Software 00024 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00025 * 02110-1301 USA 00026 * 00027 * ------------- 00028 */ 00029 00030 #ifdef HAVE_CONFIG_H 00031 #include "config.h" 00032 #endif 00033 00034 #ifdef _SOLARIS 00035 #include "solaris_port.h" 00036 #endif /* _SOLARIS */ 00037 00038 #include "abstract_atomic.h" 00039 #include <unistd.h> 00040 #include <sys/types.h> 00041 #include <sys/param.h> 00042 #include <time.h> 00043 #include <pthread.h> 00044 #include <assert.h> 00045 #include <sys/time.h> 00046 #include <sys/resource.h> 00047 #include <stdio.h> 00048 #include "nlm_list.h" 00049 #include "fsal.h" 00050 #include "nfs_core.h" 00051 #include "log.h" 00052 #include "cache_inode.h" 00053 #include "cache_inode_lru.h" 00054 00121 /* Forward Declaration */ 00122 00123 static void *lru_thread(void *arg); 00124 struct lru_state lru_state; 00125 00130 struct lru_q_base 00131 { 00132 struct glist_head q; /* LRU is at HEAD, MRU at tail */ 00133 pthread_mutex_t mtx; 00134 uint64_t size; 00135 }; 00136 00137 /* Cache-line padding macro from MCAS */ 00138 00139 #define CACHE_LINE_SIZE 64 /* XXX arch-specific define */ 00140 #define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE] 00141 #define ALIGNED_ALLOC(_s) \ 00142 ((void *)(((unsigned long)malloc((_s)+CACHE_LINE_SIZE*2) + \ 00143 CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) 00144 00149 struct lru_q_ 00150 { 00151 struct lru_q_base lru; 00152 struct lru_q_base lru_pinned; /* uncollectable, due to state */ 00153 CACHE_PAD(0); 00154 }; 00155 00165 static struct lru_q_ LRU_1[LRU_N_Q_LANES]; 00166 static struct lru_q_ LRU_2[LRU_N_Q_LANES]; 00167 00178 size_t open_fd_count = 0; 00179 00200 static pthread_mutex_t lru_mtx; 00201 static pthread_cond_t lru_cv; 00202 00203 static const uint32_t FD_FALLBACK_LIMIT = 0x400; 00204 00209 static struct lru_thread_state 00210 { 00211 pthread_t thread_id; 00212 uint32_t flags; 00213 } lru_thread_state; 00214 00221 static inline void 00222 lru_init_queue(struct lru_q_base *q) 00223 { 00224 init_glist(&q->q); 00225 pthread_mutex_init(&q->mtx, NULL); 00226 q->size = 0; 00227 } 00228 00243 static inline struct lru_q_base * 00244 lru_select_queue(uint32_t flags, uint32_t lane) 00245 { 00246 assert(lane < LRU_N_Q_LANES); 00247 if (flags & LRU_ENTRY_PINNED) { 00248 if (flags & LRU_ENTRY_L2) { 00249 return &LRU_2[lane].lru_pinned; 00250 } else { 00251 return &LRU_1[lane].lru_pinned; 00252 } 00253 } else { 00254 if (flags & LRU_ENTRY_L2) { 00255 return &LRU_2[lane].lru; 00256 } else { 00257 return &LRU_1[lane].lru; 00258 } 00259 } 00260 } 00261 00273 static inline uint32_t 00274 lru_lane_of_entry(cache_entry_t *entry) 00275 { 00276 return (uint32_t) (((uintptr_t) entry) % LRU_N_Q_LANES); 00277 } 00278 00296 static inline void 00297 lru_insert_entry(cache_inode_lru_t *lru, uint32_t flags, uint32_t lane) 00298 { 00299 /* Destination LRU */ 00300 struct lru_q_base *d = NULL; 00301 00302 d = lru_select_queue(flags, lane); 00303 pthread_mutex_lock(&d->mtx); 00304 glist_add(&d->q, &lru->q); 00305 ++(d->size); 00306 pthread_mutex_unlock(&d->mtx); 00307 00308 /* Set the flags on the entry to exactly the set of LRU_ENTRY_L2 00309 and LRU_ENTRY_PINNED supplied in the flags argument. */ 00310 00311 lru->flags &= ~(LRU_ENTRY_L2 | LRU_ENTRY_PINNED); 00312 lru->flags |= (flags & (LRU_ENTRY_L2 | LRU_ENTRY_PINNED)); 00313 lru->lane = lane; 00314 } 00315 00329 static inline void 00330 lru_remove_entry(cache_inode_lru_t *lru) 00331 { 00332 if (lru->lane == LRU_NO_LANE) { 00333 return; 00334 } 00335 00336 /* Source LRU */ 00337 struct lru_q_base *s = NULL; 00338 s = lru_select_queue(lru->flags, lru->lane); 00339 pthread_mutex_lock(&s->mtx); 00340 glist_del(&lru->q); 00341 --(s->size); 00342 pthread_mutex_unlock(&s->mtx); 00343 lru->flags &= ~(LRU_ENTRY_L2 | LRU_ENTRY_PINNED); 00344 /* Anyone interested in this entry should back off immediately. */ 00345 lru->lane = LRU_NO_LANE; 00346 } 00347 00360 static inline void 00361 lru_move_entry(cache_inode_lru_t *lru, 00362 uint32_t flags, 00363 uint32_t lane) 00364 { 00365 /* Source LRU */ 00366 struct lru_q_base *s = NULL; 00367 /* Destination LRU */ 00368 struct lru_q_base *d = NULL; 00369 00370 if ((lru->lane == LRU_NO_LANE) && 00371 (lane == LRU_NO_LANE)) { 00372 /* From nothing, to nothing. */ 00373 return; 00374 } else if (lru->lane == LRU_NO_LANE) { 00375 lru_insert_entry(lru, flags, lane); 00376 return; 00377 } else if (lane == LRU_NO_LANE) { 00378 lru_remove_entry(lru); 00379 return; 00380 } 00381 00382 s = lru_select_queue(lru->flags, lru->lane); 00383 d = lru_select_queue(flags, lane); 00384 00385 if (s == d) { 00386 pthread_mutex_lock(&s->mtx); 00387 } else if (s < d) { 00388 pthread_mutex_lock(&s->mtx); 00389 pthread_mutex_lock(&d->mtx); 00390 } else if (s > d) { 00391 pthread_mutex_lock(&d->mtx); 00392 pthread_mutex_lock(&s->mtx); 00393 } 00394 00395 glist_del(&lru->q); 00396 --(s->size); 00397 00398 /* When moving from L2 to L1, add to the LRU, otherwise add to 00399 the MRU. (In general we don't want to promote things except 00400 on initial reference, but promoting things on move makes more 00401 sense than demoting them.) */ 00402 if ((lru->flags & LRU_ENTRY_L2) && 00403 !(flags & LRU_ENTRY_L2)) { 00404 glist_add_tail(&d->q, &lru->q); 00405 } else { 00406 glist_add(&d->q, &lru->q); 00407 } 00408 ++(d->size); 00409 00410 pthread_mutex_unlock(&s->mtx); 00411 if (s != d) { 00412 pthread_mutex_unlock(&d->mtx); 00413 } 00414 00415 lru->flags &= ~(LRU_ENTRY_L2 | LRU_ENTRY_PINNED); 00416 lru->flags |= (flags & (LRU_ENTRY_L2 | LRU_ENTRY_PINNED)); 00417 } 00418 00427 static inline void 00428 cache_inode_lru_clean(cache_entry_t *entry) 00429 { 00430 fsal_status_t fsal_status = {0, 0}; 00431 cache_inode_status_t cache_status = CACHE_INODE_SUCCESS; 00432 00433 /* Clean an LRU entry re-use. */ 00434 assert((entry->lru.refcount == LRU_SENTINEL_REFCOUNT) || 00435 (entry->lru.refcount == (LRU_SENTINEL_REFCOUNT - 1))); 00436 00437 if (cache_inode_fd(entry)) { 00438 cache_inode_close(entry, CACHE_INODE_FLAG_REALLYCLOSE, 00439 &cache_status); 00440 if (cache_status != CACHE_INODE_SUCCESS) { 00441 LogCrit(COMPONENT_CACHE_INODE_LRU, 00442 "Error closing file in cleanup: %d.", 00443 cache_status); 00444 } 00445 } 00446 00447 /* Clean up the associated ressources in the FSAL */ 00448 if (FSAL_IS_ERROR(fsal_status 00449 = FSAL_CleanObjectResources(&entry->handle))) { 00450 LogCrit(COMPONENT_CACHE_INODE, 00451 "cache_inode_lru_clean: Couldn't free FSAL ressources " 00452 "fsal_status.major=%u", fsal_status.major); 00453 } 00454 00455 cache_inode_clean_internal(entry); 00456 entry->lru.refcount = 0; 00457 cache_inode_clean_entry(entry); 00458 } 00459 00475 static inline cache_inode_lru_t * 00476 lru_try_reap_entry(struct lru_q_base *q) 00477 { 00478 cache_inode_lru_t *lru = NULL; 00479 00480 pthread_mutex_lock(&q->mtx); 00481 lru = glist_first_entry(&q->q, cache_inode_lru_t, q); 00482 if (!lru) { 00483 pthread_mutex_unlock(&q->mtx); 00484 return NULL; 00485 } 00486 00487 atomic_inc_int64_t(&lru->refcount); 00488 pthread_mutex_unlock(&q->mtx); 00489 pthread_mutex_lock(&lru->mtx); 00490 if ((lru->flags & LRU_ENTRY_CONDEMNED) || 00491 (lru->flags & LRU_ENTRY_KILLED)) { 00492 atomic_dec_int64_t(&lru->refcount); 00493 pthread_mutex_unlock(&lru->mtx); 00494 return NULL; 00495 } 00496 if ((lru->refcount > (LRU_SENTINEL_REFCOUNT + 1)) || 00497 (lru->flags & LRU_ENTRY_PINNED)) { 00498 /* Any more than the sentinel and our reference count 00499 and someone else has a reference. Plus someone may 00500 have moved it to the pin queue while we were waiting. */ 00501 atomic_dec_int64_t(&lru->refcount); 00502 pthread_mutex_unlock(&lru->mtx); 00503 return NULL; 00504 } 00505 /* At this point, we have legitimate access to the entry, 00506 and we go through the disposal/recycling discipline. */ 00507 00508 /* Make sure the entry is still where we think it is. */ 00509 q = lru_select_queue(lru->flags, lru->lane); 00510 pthread_mutex_lock(&q->mtx); 00511 if (lru->refcount > LRU_SENTINEL_REFCOUNT + 1) { 00512 /* Someone took a reference while we were waiting for the 00513 queue. */ 00514 atomic_dec_int64_t(&lru->refcount); 00515 pthread_mutex_unlock(&lru->mtx); 00516 pthread_mutex_unlock(&q->mtx); 00517 return NULL; 00518 } 00519 /* Drop the refcount to 0, set the flag to tell other threads to 00520 stop access immediately. */ 00521 lru->refcount = 0; 00522 lru->flags = LRU_ENTRY_CONDEMNED; 00523 glist_del(&lru->q); 00524 --(q->size); 00525 lru->lane = LRU_NO_LANE; 00526 /* Drop all locks and give other threads a chance to abandon the 00527 entry. */ 00528 pthread_mutex_unlock(&lru->mtx); 00529 pthread_mutex_unlock(&q->mtx); 00530 pthread_yield(); 00531 00532 return lru; 00533 } 00534 00535 static const uint32_t S_NSECS = 1000000000UL; /* nsecs in 1s */ 00536 static const uint32_t MS_NSECS = 1000000UL; /* nsecs in 1ms */ 00537 00550 static bool_t 00551 lru_thread_delay_ms(unsigned long ms) 00552 { 00553 time_t now = time(NULL); 00554 uint64_t nsecs = (S_NSECS * now) + (MS_NSECS * ms); 00555 struct timespec then = { 00556 .tv_sec = nsecs / S_NSECS, 00557 .tv_nsec = nsecs % S_NSECS 00558 }; 00559 bool_t woke = FALSE; 00560 00561 pthread_mutex_lock(&lru_mtx); 00562 lru_thread_state.flags |= LRU_SLEEPING; 00563 woke = (pthread_cond_timedwait(&lru_cv, &lru_mtx, &then) != ETIMEDOUT); 00564 lru_thread_state.flags &= ~LRU_SLEEPING; 00565 pthread_mutex_unlock(&lru_mtx); 00566 return woke; 00567 } 00568 00618 static void * 00619 lru_thread(void *arg __attribute__((unused))) 00620 { 00621 /* Index */ 00622 size_t lane = 0; 00623 /* Temporary holder for flags */ 00624 uint32_t tmpflags = lru_state.flags; 00625 /* True if we are taking extreme measures to reclaim FDs. */ 00626 bool_t extremis = FALSE; 00627 /* True if we were explicitly woke. */ 00628 bool_t woke = FALSE; 00629 00630 SetNameFunction("lru_thread"); 00631 00632 while (1) { 00633 if (lru_thread_state.flags & LRU_SHUTDOWN) 00634 break; 00635 00636 extremis = (open_fd_count > lru_state.fds_hiwat); 00637 LogFullDebug(COMPONENT_CACHE_INODE_LRU, 00638 "Reaper awakes."); 00639 00640 if (!woke) { 00641 /* If we make it all the way through a timed sleep 00642 without being woken, we assume we aren't racing 00643 against the impossible. */ 00644 lru_state.futility = 0; 00645 } 00646 00647 uint64_t t_count = 0; 00648 00649 /* First, sum the queue counts. This lets us know where we 00650 are relative to our watermarks. */ 00651 00652 for (lane = 0; lane < LRU_N_Q_LANES; ++lane) { 00653 pthread_mutex_lock(&LRU_1[lane].lru.mtx); 00654 t_count += LRU_1[lane].lru.size; 00655 pthread_mutex_unlock(&LRU_1[lane].lru.mtx); 00656 00657 pthread_mutex_lock(&LRU_1[lane].lru_pinned.mtx); 00658 t_count += LRU_1[lane].lru_pinned.size; 00659 pthread_mutex_unlock(&LRU_1[lane].lru_pinned.mtx); 00660 00661 pthread_mutex_lock(&LRU_2[lane].lru.mtx); 00662 t_count += LRU_2[lane].lru.size; 00663 pthread_mutex_unlock(&LRU_2[lane].lru.mtx); 00664 00665 pthread_mutex_lock(&LRU_2[lane].lru_pinned.mtx); 00666 t_count += LRU_2[lane].lru_pinned.size; 00667 pthread_mutex_unlock(&LRU_2[lane].lru_pinned.mtx); 00668 } 00669 00670 LogFullDebug(COMPONENT_CACHE_INODE_LRU, 00671 "%zu entries in cache.", 00672 t_count); 00673 00674 if (tmpflags & LRU_STATE_RECLAIMING) { 00675 if (t_count < lru_state.entries_lowat) { 00676 tmpflags &= ~LRU_STATE_RECLAIMING; 00677 LogFullDebug(COMPONENT_CACHE_INODE_LRU, 00678 "Entry count below low water mark. " 00679 "Disabling reclaim."); 00680 } 00681 } else { 00682 if (t_count > lru_state.entries_hiwat) { 00683 tmpflags |= LRU_STATE_RECLAIMING; 00684 LogFullDebug(COMPONENT_CACHE_INODE_LRU, 00685 "Entry count above high water mark. " 00686 "Enabling reclaim."); 00687 } 00688 } 00689 00690 /* Update global state */ 00691 pthread_mutex_lock(&lru_mtx); 00692 00693 lru_state.last_count = t_count; 00694 lru_state.flags = tmpflags; 00695 00696 pthread_mutex_unlock(&lru_mtx); 00697 00698 /* Reap file descriptors. This is a preliminary example of 00699 the L2 functionality rather than something we expect to 00700 be permanent. (It will have to adapt heavily to the new 00701 FSAL API, for example.) */ 00702 00703 if (atomic_fetch_size_t(&open_fd_count) 00704 < lru_state.fds_lowat) { 00705 LogDebug(COMPONENT_CACHE_INODE_LRU, 00706 "FD count is %zd and low water mark is " 00707 "%d: not reaping.", 00708 open_fd_count, 00709 lru_state.fds_lowat); 00710 if (cache_inode_gc_policy.use_fd_cache && 00711 !lru_state.caching_fds) { 00712 lru_state.caching_fds = TRUE; 00713 LogInfo(COMPONENT_CACHE_INODE_LRU, 00714 "Re-enabling FD cache."); 00715 } 00716 } else { 00717 /* The count of open file descriptors before this run 00718 of the reaper. */ 00719 size_t formeropen = open_fd_count; 00720 /* Total work done in all passes so far. If this 00721 exceeds the window, stop. */ 00722 size_t totalwork = 0; 00723 /* The current count (after reaping) of open FDs */ 00724 size_t currentopen = 0; 00725 /* Work done in the most recent pass of all queues. if 00726 value is less than the work to do in a single queue, 00727 don't spin through more passes. */ 00728 size_t workpass = 0; 00729 00730 LogDebug(COMPONENT_CACHE_INODE_LRU, 00731 "Starting to reap."); 00732 00733 if (extremis) { 00734 LogDebug(COMPONENT_CACHE_INODE_LRU, 00735 "Open FDs over high water mark, " 00736 "reapring aggressively."); 00737 } 00738 00739 do { 00740 workpass = 0; 00741 for (lane = 0; lane < LRU_N_Q_LANES; ++lane) { 00742 /* The amount of work done on this lane on 00743 this pass. */ 00744 size_t workdone = 0; 00745 /* The current entry being examined. */ 00746 cache_inode_lru_t *lru = NULL; 00747 /* Number of entries closed in this run. */ 00748 size_t closed = 0; 00749 00750 LogDebug(COMPONENT_CACHE_INODE_LRU, 00751 "Reaping up to %d entries from lane %zd", 00752 lru_state.per_lane_work, 00753 lane); 00754 00755 pthread_mutex_lock(&LRU_1[lane].lru.mtx); 00756 while ((workdone < lru_state.per_lane_work) && 00757 (lru = glist_first_entry(&LRU_1[lane].lru.q, 00758 cache_inode_lru_t, 00759 q))) { 00760 cache_inode_status_t cache_status 00761 = CACHE_INODE_SUCCESS; 00762 cache_entry_t *entry 00763 = container_of(lru, cache_entry_t, lru); 00764 00765 /* We currently hold the lane queue 00766 fragment mutex. Due to lock 00767 ordering, we are forbidden from 00768 acquiring the LRU mutex directly. 00769 therefore, we increase the reference 00770 count of the entry and drop the 00771 queue fragment mutex. */ 00772 00773 atomic_inc_int64_t(&lru->refcount); 00774 pthread_mutex_unlock(&LRU_1[lane].lru.mtx); 00775 00776 /* Acquire the entry mutex. If the entry 00777 is condemned, removed, pinned, or in 00778 L2, we have no interest in it. Also 00779 decrement the refcount (since we just 00780 incremented it.) */ 00781 00782 pthread_mutex_lock(&lru->mtx); 00783 atomic_dec_int64_t(&lru->refcount); 00784 if ((lru->flags & LRU_ENTRY_CONDEMNED) || 00785 (lru->flags & LRU_ENTRY_PINNED) || 00786 (lru->flags & LRU_ENTRY_L2) || 00787 (lru->flags & LRU_ENTRY_KILLED) || 00788 (lru->lane == LRU_NO_LANE)) { 00789 /* Drop the entry lock, thenr 00790 eacquire the queue lock so we 00791 can make another trip through 00792 the loop. */ 00793 pthread_mutex_unlock(&lru->mtx); 00794 pthread_mutex_lock(&LRU_1[lane].lru.mtx); 00795 /* By definition, if any of these 00796 flags are set, the entry isn't 00797 in this queue fragment any more. */ 00798 continue; 00799 } 00800 00801 if (cache_inode_fd(entry)) { 00802 cache_inode_close( 00803 entry, 00804 CACHE_INODE_FLAG_REALLYCLOSE, 00805 &cache_status); 00806 if (cache_status != CACHE_INODE_SUCCESS) { 00807 LogCrit(COMPONENT_CACHE_INODE_LRU, 00808 "Error closing file in " 00809 "LRU thread."); 00810 } else 00811 ++closed; 00812 } 00813 /* Move the entry to L2 whatever the 00814 result of examining it.*/ 00815 lru_move_entry(lru, LRU_ENTRY_L2, 00816 lru->lane); 00817 pthread_mutex_unlock(&lru->mtx); 00818 ++workdone; 00819 /* Reacquire the lock on the queue 00820 fragment for the next run through 00821 the loop. */ 00822 pthread_mutex_lock(&LRU_1[lane].lru.mtx); 00823 } 00824 pthread_mutex_unlock(&LRU_1[lane].lru.mtx); 00825 LogDebug(COMPONENT_CACHE_INODE_LRU, 00826 "Actually processed %zd entries on lane %zd " 00827 "closing %zd descriptors", 00828 workdone, 00829 lane, 00830 closed); 00831 workpass += workdone; 00832 } 00833 totalwork += workpass; 00834 } while (extremis && 00835 (workpass >= lru_state.per_lane_work) && 00836 (totalwork < lru_state.biggest_window)); 00837 00838 currentopen = open_fd_count; 00839 if (extremis && 00840 ((currentopen > formeropen) || 00841 (formeropen - currentopen < 00842 (((formeropen - lru_state.fds_hiwat) * 00843 cache_inode_gc_policy.required_progress) / 00844 100)))) { 00845 if (++lru_state.futility > 00846 cache_inode_gc_policy.futility_count) { 00847 LogCrit(COMPONENT_CACHE_INODE_LRU, 00848 "Futility count exceeded. The LRU thread is " 00849 "unable to make progress in reclaiming FDs." 00850 "Disabling FD cache."); 00851 lru_state.caching_fds = FALSE; 00852 } 00853 } 00854 } 00855 00856 LogDebug(COMPONENT_CACHE_INODE_LRU, 00857 "open_fd_count: %zd t_count:%"PRIu64"\n", 00858 open_fd_count, t_count); 00859 00860 woke = lru_thread_delay_ms(lru_state.threadwait); 00861 } 00862 00863 LogEvent(COMPONENT_CACHE_INODE_LRU, 00864 "Shutting down LRU thread."); 00865 00866 return NULL; 00867 } 00868 00869 /* Public functions */ 00870 00875 void 00876 cache_inode_lru_pkginit(void) 00877 { 00878 /* The attributes governing the LRU reaper thread. */ 00879 pthread_attr_t attr_thr; 00880 /* Index for initializing lanes */ 00881 size_t ix = 0; 00882 /* Return code from system calls */ 00883 int code = 0; 00884 /* Rlimit for open file descriptors */ 00885 struct rlimit rlim = { 00886 .rlim_cur = RLIM_INFINITY, 00887 .rlim_max = RLIM_INFINITY 00888 }; 00889 00890 open_fd_count = 0; 00891 00892 /* Repurpose some GC policy */ 00893 lru_state.flags = LRU_STATE_NONE; 00894 00895 /* Set high and low watermark for cache entries. This seems a 00896 bit fishy, so come back and revisit this. */ 00897 lru_state.entries_hiwat 00898 = cache_inode_gc_policy.entries_hwmark; 00899 lru_state.entries_lowat 00900 = cache_inode_gc_policy.entries_lwmark; 00901 00902 /* Find out the system-imposed file descriptor limit */ 00903 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { 00904 code = errno; 00905 LogCrit(COMPONENT_CACHE_INODE_LRU, 00906 "Call to getrlimit failed with error %d. " 00907 "This should not happen. Assigning default of %d.", 00908 code, FD_FALLBACK_LIMIT); 00909 lru_state.fds_system_imposed = FD_FALLBACK_LIMIT; 00910 } else { 00911 if (rlim.rlim_cur < rlim.rlim_max) { 00912 /* Save the old soft value so we can fall back to it 00913 if setrlimit fails. */ 00914 rlim_t old_soft = rlim.rlim_cur; 00915 LogInfo(COMPONENT_CACHE_INODE_LRU, 00916 "Attempting to increase soft limit from %jd " 00917 "to hard limit of %jd", 00918 rlim.rlim_cur, rlim.rlim_max); 00919 rlim.rlim_cur = rlim.rlim_max; 00920 if (setrlimit(RLIMIT_NOFILE, &rlim) < 0) { 00921 code = errno; 00922 LogWarn(COMPONENT_CACHE_INODE_LRU, 00923 "Attempt to raise soft FD limit to hard FD limit " 00924 "failed with error %d. Sticking to soft limit.", 00925 code); 00926 rlim.rlim_cur = old_soft; 00927 } 00928 } 00929 if (rlim.rlim_cur == RLIM_INFINITY) { 00930 FILE *const nr_open = fopen("/proc/sys/fs/nr_open", 00931 "r"); 00932 if (!(nr_open && 00933 (fscanf(nr_open, 00934 "%"SCNu32"\n", 00935 &lru_state.fds_system_imposed) == 1) && 00936 (fclose(nr_open) == 0))) { 00937 code = errno; 00938 LogMajor(COMPONENT_CACHE_INODE_LRU, 00939 "The rlimit on open file descriptors is infinite " 00940 "and the attempt to find the system maximum " 00941 "failed with error %d. " 00942 "Assigning the default fallback of %d which is " 00943 "almost certainly too small. If you are on a " 00944 "Linux system, this should never happen. If " 00945 "you are running some other system, please set " 00946 "an rlimit on file descriptors (for example, " 00947 "with ulimit) for this process and consider " 00948 "editing " __FILE__ "to add support for finding " 00949 "your system's maximum.", code, 00950 FD_FALLBACK_LIMIT); 00951 lru_state.fds_system_imposed = FD_FALLBACK_LIMIT; 00952 } 00953 } else { 00954 lru_state.fds_system_imposed = rlim.rlim_cur; 00955 } 00956 LogInfo(COMPONENT_CACHE_INODE_LRU, 00957 "Setting the system-imposed limit on FDs to %d.", 00958 lru_state.fds_system_imposed); 00959 } 00960 00961 00962 lru_state.fds_hard_limit = (cache_inode_gc_policy.fd_limit_percent * 00963 lru_state.fds_system_imposed) / 100; 00964 lru_state.fds_hiwat = (cache_inode_gc_policy.fd_hwmark_percent * 00965 lru_state.fds_system_imposed) / 100; 00966 lru_state.fds_lowat = (cache_inode_gc_policy.fd_lwmark_percent * 00967 lru_state.fds_system_imposed) / 100; 00968 lru_state.futility = 0; 00969 00970 lru_state.per_lane_work 00971 = (cache_inode_gc_policy.reaper_work / LRU_N_Q_LANES); 00972 lru_state.biggest_window = (cache_inode_gc_policy.biggest_window * 00973 lru_state.fds_system_imposed) / 100; 00974 00975 lru_state.last_count = 0; 00976 00977 lru_state.threadwait 00978 = 1000 * cache_inode_gc_policy.lru_run_interval; 00979 00980 lru_state.caching_fds = cache_inode_gc_policy.use_fd_cache; 00981 00982 pthread_mutex_init(&lru_mtx, NULL); 00983 pthread_cond_init(&lru_cv, NULL); 00984 00985 for (ix = 0; ix < LRU_N_Q_LANES; ++ix) { 00986 /* L1, unpinned */ 00987 lru_init_queue(&LRU_1[ix].lru); 00988 /* L1, pinned */ 00989 lru_init_queue(&LRU_1[ix].lru_pinned); 00990 /* L2, unpinned */ 00991 lru_init_queue(&LRU_2[ix].lru); 00992 /* L2, pinned */ 00993 lru_init_queue(&LRU_2[ix].lru_pinned); 00994 } 00995 00996 if (pthread_attr_init(&attr_thr) != 0) { 00997 LogCrit(COMPONENT_CACHE_INODE_LRU, 00998 "can't init pthread's attributes"); 00999 } 01000 01001 if (pthread_attr_setscope(&attr_thr, PTHREAD_SCOPE_SYSTEM) 01002 != 0) { 01003 LogCrit(COMPONENT_CACHE_INODE_LRU, "can't set pthread's scope"); 01004 } 01005 01006 if (pthread_attr_setdetachstate(&attr_thr, PTHREAD_CREATE_JOINABLE) 01007 != 0) { 01008 LogCrit(COMPONENT_CACHE_INODE_LRU, "can't set pthread's join state"); 01009 } 01010 01011 if (pthread_attr_setstacksize(&attr_thr, THREAD_STACK_SIZE) 01012 != 0) { 01013 LogCrit(COMPONENT_CACHE_INODE_LRU, "can't set pthread's stack size"); 01014 } 01015 01016 /* spawn LRU background thread */ 01017 code = pthread_create(&lru_thread_state.thread_id, &attr_thr, lru_thread, 01018 NULL); 01019 if (code != 0) { 01020 code = errno; 01021 LogFatal(COMPONENT_CACHE_INODE_LRU, 01022 "Unable to start lru reaper thread, error code %d.", 01023 code); 01024 } 01025 } 01026 01031 void 01032 cache_inode_lru_pkgshutdown(void) 01033 { 01034 /* Post and wait for shutdown of LRU background thread */ 01035 pthread_mutex_lock(&lru_mtx); 01036 lru_thread_state.flags |= LRU_SHUTDOWN; 01037 lru_wake_thread(LRU_FLAG_NONE); 01038 pthread_mutex_unlock(&lru_mtx); 01039 } 01040 01055 cache_entry_t * 01056 cache_inode_lru_get(cache_inode_status_t *status, 01057 uint32_t flags) 01058 { 01059 /* The lane from which we harvest (or into which we store) the 01060 new entry. Usually the lane assigned to this thread. */ 01061 uint32_t lane = 0; 01062 /* The LRU entry */ 01063 cache_inode_lru_t *lru = NULL; 01064 /* The Cache entry being created */ 01065 cache_entry_t *entry = NULL; 01066 01067 /* If we are in reclaim state, try to find an entry to recycle. */ 01068 pthread_mutex_lock(&lru_mtx); 01069 if (lru_state.flags & LRU_STATE_RECLAIMING) { 01070 pthread_mutex_unlock(&lru_mtx); 01071 01072 /* Search through logical L2 entry. */ 01073 for (lane = 0; lane < LRU_N_Q_LANES; ++lane) { 01074 lru = lru_try_reap_entry(&LRU_2[lane].lru); 01075 if (lru) 01076 break; 01077 } 01078 01079 /* Search through logical L1 if nothing was found in L2 01080 (fall through, otherwise.) */ 01081 if (!lru) { 01082 for (lane = 0; lane < LRU_N_Q_LANES; ++lane) { 01083 lru = lru_try_reap_entry(&LRU_1[lane].lru); 01084 if (lru) 01085 break; 01086 } 01087 } 01088 01089 /* If we found an entry, we hold a lock on it and it is 01090 ready to be recycled. */ 01091 if (lru) { 01092 entry = container_of(lru, cache_entry_t, lru); 01093 if (entry) { 01094 LogFullDebug(COMPONENT_CACHE_INODE_LRU, 01095 "Recycling entry at %p.", 01096 entry); 01097 } 01098 cache_inode_lru_clean(entry); 01099 } 01100 } else { 01101 pthread_mutex_unlock(&lru_mtx); 01102 } 01103 01104 if (!lru) { 01105 entry = pool_alloc(cache_inode_entry_pool, NULL); 01106 if(entry == NULL) { 01107 LogCrit(COMPONENT_CACHE_INODE_LRU, 01108 "can't allocate a new entry from cache pool"); 01109 *status = CACHE_INODE_MALLOC_ERROR; 01110 goto out; 01111 } 01112 if (pthread_mutex_init(&entry->lru.mtx, NULL) != 0) { 01113 pool_free(cache_inode_entry_pool, entry); 01114 LogCrit(COMPONENT_CACHE_INODE_LRU, 01115 "pthread_mutex_init of lru.mtx returned %d (%s)", 01116 errno, 01117 strerror(errno)); 01118 entry = NULL; 01119 *status = CACHE_INODE_INIT_ENTRY_FAILED; 01120 goto out; 01121 } 01122 } 01123 01124 assert(entry); 01125 /* Set the sentinel refcount. Since the entry isn't in a queue, 01126 nobody can bump the refcount yet. */ 01127 entry->lru.refcount = 2; 01128 entry->lru.pin_refcnt = 0; 01129 entry->lru.flags = 0; 01130 pthread_mutex_lock(&entry->lru.mtx); 01131 lru_insert_entry(&entry->lru, 0, 01132 lru_lane_of_entry(entry)); 01133 pthread_mutex_unlock(&entry->lru.mtx); 01134 01135 *status = CACHE_INODE_SUCCESS; 01136 01137 out: 01138 return (entry); 01139 } 01140 01154 cache_inode_status_t 01155 cache_inode_inc_pin_ref(cache_entry_t *entry) 01156 { 01157 cache_inode_status_t rc = CACHE_INODE_SUCCESS; 01158 01159 pthread_mutex_lock(&entry->lru.mtx); 01160 01161 if (entry->lru.flags & LRU_ENTRY_UNPINNABLE) { 01162 pthread_mutex_unlock(&entry->lru.mtx); 01163 return CACHE_INODE_DEAD_ENTRY; 01164 } 01165 01166 if (!entry->lru.pin_refcnt && !(entry->lru.flags & LRU_ENTRY_PINNED)) { 01167 lru_move_entry(&entry->lru, LRU_ENTRY_PINNED, 01168 entry->lru.lane); 01169 } 01170 entry->lru.pin_refcnt++; 01171 01172 /* Also take an LRU reference */ 01173 atomic_inc_int64_t(&entry->lru.refcount); 01174 01175 pthread_mutex_unlock(&entry->lru.mtx); 01176 01177 return rc; 01178 } 01179 01190 void 01191 cache_inode_unpinnable(cache_entry_t *entry) 01192 { 01193 pthread_mutex_lock(&entry->lru.mtx); 01194 entry->lru.flags |= LRU_ENTRY_UNPINNABLE; 01195 pthread_mutex_unlock(&entry->lru.mtx); 01196 } 01197 01210 cache_inode_status_t 01211 cache_inode_dec_pin_ref(cache_entry_t *entry) 01212 { 01213 pthread_mutex_lock(&entry->lru.mtx); 01214 assert(entry->lru.pin_refcnt); 01215 /* Make sure at least one other LRU reference is held, 01216 * caller should separately hold an LRU reference 01217 */ 01218 assert(entry->lru.refcount > 1); 01219 entry->lru.pin_refcnt--; 01220 if (!entry->lru.pin_refcnt && (entry->lru.flags & LRU_ENTRY_PINNED)) { 01221 lru_move_entry(&entry->lru, 0, entry->lru.lane); 01222 } 01223 01224 /* Also release an LRU reference */ 01225 atomic_dec_int64_t(&entry->lru.refcount); 01226 01227 pthread_mutex_unlock(&entry->lru.mtx); 01228 01229 return CACHE_INODE_SUCCESS; 01230 } 01231 01246 cache_inode_status_t 01247 cache_inode_lru_ref(cache_entry_t *entry, 01248 uint32_t flags) 01249 { 01250 pthread_mutex_lock(&entry->lru.mtx); 01251 01252 /* Refuse to grant a reference if we're below the sentinel value 01253 or the entry is being removed or recycled. */ 01254 if ((entry->lru.refcount == 0) || 01255 (entry->lru.flags & LRU_ENTRY_CONDEMNED)) { 01256 pthread_mutex_unlock(&entry->lru.mtx); 01257 return CACHE_INODE_DEAD_ENTRY; 01258 } 01259 01260 /* These shouldn't ever be set */ 01261 flags &= ~(LRU_ENTRY_PINNED | LRU_ENTRY_L2); 01262 01263 /* Initial and Scan are mutually exclusive. */ 01264 01265 assert(!((flags & LRU_REQ_INITIAL) && 01266 (flags & LRU_REQ_SCAN))); 01267 01268 atomic_inc_int64_t(&entry->lru.refcount); 01269 01270 /* Move an entry forward if this is an initial reference. */ 01271 01272 if (flags & LRU_REQ_INITIAL) { 01273 lru_move_entry(&entry->lru, 01274 /* Pinned stays pinned */ 01275 flags | (entry->lru.flags & 01276 LRU_ENTRY_PINNED), 01277 entry->lru.lane); 01278 } else if ((flags & LRU_REQ_SCAN) && 01279 (entry->lru.flags & LRU_ENTRY_L2)) { 01280 lru_move_entry(&entry->lru, 01281 /* Pinned stays pinned, L2 stays in L2. A 01282 reference got for SCAN must not be used 01283 to open an FD. */ 01284 flags | (entry->lru.flags & 01285 LRU_ENTRY_PINNED) | 01286 LRU_ENTRY_L2, 01287 entry->lru.lane); 01288 } 01289 01290 pthread_mutex_unlock(&entry->lru.mtx); 01291 01292 return CACHE_INODE_SUCCESS; 01293 } 01294 01307 void cache_inode_lru_kill(cache_entry_t *entry) 01308 { 01309 pthread_mutex_lock(&entry->lru.mtx); 01310 if (entry->lru.flags & LRU_ENTRY_KILLED) { 01311 pthread_mutex_unlock(&entry->lru.mtx); 01312 } else { 01313 entry->lru.flags |= LRU_ENTRY_KILLED; 01314 /* cache_inode_lru_unref always either unlocks or destroys 01315 the entry. */ 01316 cache_inode_lru_unref(entry, LRU_FLAG_LOCKED); 01317 } 01318 } 01319 01336 void 01337 cache_inode_lru_unref(cache_entry_t *entry, 01338 uint32_t flags) 01339 { 01340 if (!(flags & LRU_FLAG_LOCKED)) { 01341 pthread_mutex_lock(&entry->lru.mtx); 01342 } 01343 01344 assert(entry->lru.refcount >= 1); 01345 01346 if (entry->lru.refcount == 1) { 01347 struct lru_q_base *q 01348 = lru_select_queue(entry->lru.flags, 01349 entry->lru.lane); 01350 pthread_mutex_lock(&q->mtx); 01351 atomic_dec_int64_t(&entry->lru.refcount); 01352 if (entry->lru.refcount == 0) { 01353 /* Refcount has fallen to zero. Remove the entry from 01354 the queue and mark it as dead. */ 01355 entry->lru.flags = LRU_ENTRY_CONDEMNED; 01356 glist_del(&entry->lru.q); 01357 --(q->size); 01358 entry->lru.lane = LRU_NO_LANE; 01359 /* Give other threads a chance to see that */ 01360 pthread_mutex_unlock(&entry->lru.mtx); 01361 pthread_mutex_unlock(&q->mtx); 01362 pthread_yield(); 01363 /* We should not need to hold the LRU mutex at this 01364 point. The hash table locks will ensure that by 01365 the time this function completes successfully, 01366 other threads will either have received 01367 CACHE_INDOE_DEAD_ENTRY in the attempt to gain a 01368 reference, or we will have removed the hash table 01369 entry. */ 01370 cache_inode_lru_clean(entry); 01371 01372 pthread_mutex_destroy(&entry->lru.mtx); 01373 pool_free(cache_inode_entry_pool, entry); 01374 return; 01375 } else { 01376 pthread_mutex_unlock(&q->mtx); 01377 } 01378 } else { 01379 /* We may decrement the reference count without the queue 01380 lock, since it cannot go to 0. */ 01381 atomic_dec_int64_t(&entry->lru.refcount); 01382 } 01383 01384 pthread_mutex_unlock(&entry->lru.mtx); 01385 } 01386 01397 void lru_wake_thread(uint32_t flags) 01398 { 01399 if (lru_thread_state.flags & LRU_SLEEPING) 01400 pthread_cond_signal(&lru_cv); 01401 }