nfs-ganesha 1.4

cache_inode_lru.c

Go to the documentation of this file.
00001 /*
00002  * vim:expandtab:shiftwidth=8:tabstop=8:
00003  *
00004  * Copyright (C) 2010, The Linux Box Corporation
00005  * Contributor : Matt Benjamin <matt@linuxbox.com>
00006  *
00007  * Some portions Copyright CEA/DAM/DIF  (2008)
00008  * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
00009  *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
00010  *
00011  *
00012  * This program is free software; you can redistribute it and/or
00013  * modify it under the terms of the GNU Lesser General Public
00014  * License as published by the Free Software Foundation; either
00015  * version 3 of the License, or (at your option) any later version.
00016  *
00017  * This program is distributed in the hope that it will be useful,
00018  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00019  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00020  * Lesser General Public License for more details.
00021  *
00022  * You should have received a copy of the GNU Lesser General Public
00023  * License along with this library; if not, write to the Free Software
00024  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00025  * 02110-1301 USA
00026  *
00027  * -------------
00028  */
00029 
00030 #ifdef HAVE_CONFIG_H
00031 #include "config.h"
00032 #endif
00033 
00034 #ifdef _SOLARIS
00035 #include "solaris_port.h"
00036 #endif                          /* _SOLARIS */
00037 
00038 #include "abstract_atomic.h"
00039 #include <unistd.h>
00040 #include <sys/types.h>
00041 #include <sys/param.h>
00042 #include <time.h>
00043 #include <pthread.h>
00044 #include <assert.h>
00045 #include <sys/time.h>
00046 #include <sys/resource.h>
00047 #include <stdio.h>
00048 #include "nlm_list.h"
00049 #include "fsal.h"
00050 #include "nfs_core.h"
00051 #include "log.h"
00052 #include "cache_inode.h"
00053 #include "cache_inode_lru.h"
00054 
00121 /* Forward Declaration */
00122 
00123 static void *lru_thread(void *arg);
00124 struct lru_state lru_state;
00125 
00130 struct lru_q_base
00131 {
00132      struct glist_head q; /* LRU is at HEAD, MRU at tail */
00133      pthread_mutex_t mtx;
00134      uint64_t size;
00135 };
00136 
00137 /* Cache-line padding macro from MCAS */
00138 
00139 #define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
00140 #define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
00141 #define ALIGNED_ALLOC(_s)                                       \
00142      ((void *)(((unsigned long)malloc((_s)+CACHE_LINE_SIZE*2) + \
00143                 CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1)))
00144 
00149 struct lru_q_
00150 {
00151      struct lru_q_base lru;
00152      struct lru_q_base lru_pinned; /* uncollectable, due to state */
00153      CACHE_PAD(0);
00154 };
00155 
00165 static struct lru_q_ LRU_1[LRU_N_Q_LANES];
00166 static struct lru_q_ LRU_2[LRU_N_Q_LANES];
00167 
00178 size_t open_fd_count = 0;
00179 
00200 static pthread_mutex_t lru_mtx;
00201 static pthread_cond_t lru_cv;
00202 
00203 static const uint32_t FD_FALLBACK_LIMIT = 0x400;
00204 
00209 static struct lru_thread_state
00210 {
00211      pthread_t thread_id;
00212      uint32_t flags;
00213 } lru_thread_state;
00214 
00221 static inline void
00222 lru_init_queue(struct lru_q_base *q)
00223 {
00224      init_glist(&q->q);
00225      pthread_mutex_init(&q->mtx, NULL);
00226      q->size = 0;
00227 }
00228 
00243 static inline struct lru_q_base *
00244 lru_select_queue(uint32_t flags, uint32_t lane)
00245 {
00246      assert(lane < LRU_N_Q_LANES);
00247      if (flags & LRU_ENTRY_PINNED) {
00248           if (flags & LRU_ENTRY_L2) {
00249                return &LRU_2[lane].lru_pinned;
00250           } else {
00251                return &LRU_1[lane].lru_pinned;
00252           }
00253      } else {
00254           if (flags & LRU_ENTRY_L2) {
00255                return &LRU_2[lane].lru;
00256           } else {
00257                return &LRU_1[lane].lru;
00258           }
00259      }
00260 }
00261 
00273 static inline uint32_t
00274 lru_lane_of_entry(cache_entry_t *entry)
00275 {
00276     return (uint32_t) (((uintptr_t) entry) % LRU_N_Q_LANES);
00277 }
00278 
00296 static inline void
00297 lru_insert_entry(cache_inode_lru_t *lru, uint32_t flags, uint32_t lane)
00298 {
00299      /* Destination LRU */
00300      struct lru_q_base *d = NULL;
00301 
00302      d = lru_select_queue(flags, lane);
00303      pthread_mutex_lock(&d->mtx);
00304      glist_add(&d->q, &lru->q);
00305      ++(d->size);
00306      pthread_mutex_unlock(&d->mtx);
00307 
00308      /* Set the flags on the entry to exactly the set of LRU_ENTRY_L2
00309         and LRU_ENTRY_PINNED supplied in the flags argument. */
00310 
00311      lru->flags &= ~(LRU_ENTRY_L2 | LRU_ENTRY_PINNED);
00312      lru->flags |= (flags & (LRU_ENTRY_L2 | LRU_ENTRY_PINNED));
00313      lru->lane = lane;
00314 }
00315 
00329 static inline void
00330 lru_remove_entry(cache_inode_lru_t *lru)
00331 {
00332      if (lru->lane == LRU_NO_LANE) {
00333           return;
00334      }
00335 
00336      /* Source LRU */
00337      struct lru_q_base *s = NULL;
00338      s = lru_select_queue(lru->flags, lru->lane);
00339      pthread_mutex_lock(&s->mtx);
00340      glist_del(&lru->q);
00341      --(s->size);
00342      pthread_mutex_unlock(&s->mtx);
00343      lru->flags &= ~(LRU_ENTRY_L2 | LRU_ENTRY_PINNED);
00344      /* Anyone interested in this entry should back off immediately. */
00345      lru->lane = LRU_NO_LANE;
00346 }
00347 
00360 static inline void
00361 lru_move_entry(cache_inode_lru_t *lru,
00362                uint32_t flags,
00363                uint32_t lane)
00364 {
00365      /* Source LRU */
00366      struct lru_q_base *s = NULL;
00367      /* Destination LRU */
00368      struct lru_q_base *d = NULL;
00369 
00370      if ((lru->lane == LRU_NO_LANE) &&
00371          (lane == LRU_NO_LANE)) {
00372           /* From nothing, to nothing. */
00373           return;
00374      } else if (lru->lane == LRU_NO_LANE) {
00375           lru_insert_entry(lru, flags, lane);
00376           return;
00377      } else if (lane == LRU_NO_LANE) {
00378           lru_remove_entry(lru);
00379           return;
00380      }
00381 
00382      s = lru_select_queue(lru->flags, lru->lane);
00383      d = lru_select_queue(flags, lane);
00384 
00385      if (s == d) {
00386           pthread_mutex_lock(&s->mtx);
00387      } else if (s < d) {
00388           pthread_mutex_lock(&s->mtx);
00389           pthread_mutex_lock(&d->mtx);
00390      } else if (s > d) {
00391           pthread_mutex_lock(&d->mtx);
00392           pthread_mutex_lock(&s->mtx);
00393      }
00394 
00395      glist_del(&lru->q);
00396      --(s->size);
00397 
00398      /* When moving from L2 to L1, add to the LRU, otherwise add to
00399         the MRU.  (In general we don't want to promote things except
00400         on initial reference, but promoting things on move makes more
00401         sense than demoting them.) */
00402      if ((lru->flags & LRU_ENTRY_L2) &&
00403          !(flags & LRU_ENTRY_L2)) {
00404           glist_add_tail(&d->q, &lru->q);
00405      } else {
00406           glist_add(&d->q, &lru->q);
00407      }
00408      ++(d->size);
00409 
00410      pthread_mutex_unlock(&s->mtx);
00411      if (s != d) {
00412           pthread_mutex_unlock(&d->mtx);
00413      }
00414 
00415      lru->flags &= ~(LRU_ENTRY_L2 | LRU_ENTRY_PINNED);
00416      lru->flags |= (flags & (LRU_ENTRY_L2 | LRU_ENTRY_PINNED));
00417 }
00418 
00427 static inline void
00428 cache_inode_lru_clean(cache_entry_t *entry)
00429 {
00430      fsal_status_t fsal_status = {0, 0};
00431      cache_inode_status_t cache_status = CACHE_INODE_SUCCESS;
00432 
00433      /* Clean an LRU entry re-use.  */
00434      assert((entry->lru.refcount == LRU_SENTINEL_REFCOUNT) ||
00435             (entry->lru.refcount == (LRU_SENTINEL_REFCOUNT - 1)));
00436 
00437      if (cache_inode_fd(entry)) {
00438           cache_inode_close(entry, CACHE_INODE_FLAG_REALLYCLOSE,
00439                             &cache_status);
00440           if (cache_status != CACHE_INODE_SUCCESS) {
00441                LogCrit(COMPONENT_CACHE_INODE_LRU,
00442                        "Error closing file in cleanup: %d.",
00443                        cache_status);
00444           }
00445      }
00446 
00447      /* Clean up the associated ressources in the FSAL */
00448      if (FSAL_IS_ERROR(fsal_status
00449                        = FSAL_CleanObjectResources(&entry->handle))) {
00450           LogCrit(COMPONENT_CACHE_INODE,
00451                   "cache_inode_lru_clean: Couldn't free FSAL ressources "
00452                   "fsal_status.major=%u", fsal_status.major);
00453      }
00454 
00455      cache_inode_clean_internal(entry);
00456      entry->lru.refcount = 0;
00457      cache_inode_clean_entry(entry);
00458 }
00459 
00475 static inline cache_inode_lru_t *
00476 lru_try_reap_entry(struct lru_q_base *q)
00477 {
00478      cache_inode_lru_t *lru = NULL;
00479 
00480      pthread_mutex_lock(&q->mtx);
00481      lru = glist_first_entry(&q->q, cache_inode_lru_t, q);
00482      if (!lru) {
00483           pthread_mutex_unlock(&q->mtx);
00484           return NULL;
00485      }
00486 
00487      atomic_inc_int64_t(&lru->refcount);
00488      pthread_mutex_unlock(&q->mtx);
00489      pthread_mutex_lock(&lru->mtx);
00490      if ((lru->flags & LRU_ENTRY_CONDEMNED) ||
00491          (lru->flags & LRU_ENTRY_KILLED)) {
00492           atomic_dec_int64_t(&lru->refcount);
00493           pthread_mutex_unlock(&lru->mtx);
00494           return NULL;
00495      }
00496      if ((lru->refcount > (LRU_SENTINEL_REFCOUNT + 1)) ||
00497          (lru->flags & LRU_ENTRY_PINNED)) {
00498           /* Any more than the sentinel and our reference count
00499              and someone else has a reference.  Plus someone may
00500              have moved it to the pin queue while we were waiting. */
00501           atomic_dec_int64_t(&lru->refcount);
00502           pthread_mutex_unlock(&lru->mtx);
00503           return NULL;
00504      }
00505      /* At this point, we have legitimate access to the entry,
00506         and we go through the disposal/recycling discipline. */
00507 
00508      /* Make sure the entry is still where we think it is. */
00509      q = lru_select_queue(lru->flags, lru->lane);
00510      pthread_mutex_lock(&q->mtx);
00511      if (lru->refcount > LRU_SENTINEL_REFCOUNT + 1) {
00512           /* Someone took a reference while we were waiting for the
00513              queue.  */
00514           atomic_dec_int64_t(&lru->refcount);
00515           pthread_mutex_unlock(&lru->mtx);
00516           pthread_mutex_unlock(&q->mtx);
00517           return NULL;
00518      }
00519      /* Drop the refcount to 0, set the flag to tell other threads to
00520         stop access immediately. */
00521      lru->refcount = 0;
00522      lru->flags = LRU_ENTRY_CONDEMNED;
00523      glist_del(&lru->q);
00524      --(q->size);
00525      lru->lane = LRU_NO_LANE;
00526      /* Drop all locks and give other threads a chance to abandon the
00527         entry. */
00528      pthread_mutex_unlock(&lru->mtx);
00529      pthread_mutex_unlock(&q->mtx);
00530      pthread_yield();
00531 
00532      return lru;
00533 }
00534 
00535 static const uint32_t S_NSECS = 1000000000UL; /* nsecs in 1s */
00536 static const uint32_t MS_NSECS = 1000000UL; /* nsecs in 1ms */
00537 
00550 static bool_t
00551 lru_thread_delay_ms(unsigned long ms)
00552 {
00553      time_t now = time(NULL);
00554      uint64_t nsecs = (S_NSECS * now) + (MS_NSECS * ms);
00555      struct timespec then = {
00556           .tv_sec = nsecs / S_NSECS,
00557           .tv_nsec = nsecs % S_NSECS
00558      };
00559      bool_t woke = FALSE;
00560 
00561      pthread_mutex_lock(&lru_mtx);
00562      lru_thread_state.flags |= LRU_SLEEPING;
00563      woke = (pthread_cond_timedwait(&lru_cv, &lru_mtx, &then) != ETIMEDOUT);
00564      lru_thread_state.flags &= ~LRU_SLEEPING;
00565      pthread_mutex_unlock(&lru_mtx);
00566      return woke;
00567 }
00568 
00618 static void *
00619 lru_thread(void *arg __attribute__((unused)))
00620 {
00621      /* Index */
00622      size_t lane = 0;
00623      /* Temporary holder for flags */
00624      uint32_t tmpflags = lru_state.flags;
00625      /* True if we are taking extreme measures to reclaim FDs. */
00626      bool_t extremis = FALSE;
00627      /* True if we were explicitly woke. */
00628      bool_t woke = FALSE;
00629 
00630      SetNameFunction("lru_thread");
00631 
00632      while (1) {
00633           if (lru_thread_state.flags & LRU_SHUTDOWN)
00634                break;
00635 
00636           extremis = (open_fd_count > lru_state.fds_hiwat);
00637           LogFullDebug(COMPONENT_CACHE_INODE_LRU,
00638                        "Reaper awakes.");
00639 
00640           if (!woke) {
00641                /* If we make it all the way through a timed sleep
00642                   without being woken, we assume we aren't racing
00643                   against the impossible. */
00644                lru_state.futility = 0;
00645           }
00646 
00647           uint64_t t_count = 0;
00648 
00649           /* First, sum the queue counts.  This lets us know where we
00650              are relative to our watermarks. */
00651 
00652           for (lane = 0; lane < LRU_N_Q_LANES; ++lane) {
00653                pthread_mutex_lock(&LRU_1[lane].lru.mtx);
00654                t_count += LRU_1[lane].lru.size;
00655                pthread_mutex_unlock(&LRU_1[lane].lru.mtx);
00656 
00657                pthread_mutex_lock(&LRU_1[lane].lru_pinned.mtx);
00658                t_count += LRU_1[lane].lru_pinned.size;
00659                pthread_mutex_unlock(&LRU_1[lane].lru_pinned.mtx);
00660 
00661                pthread_mutex_lock(&LRU_2[lane].lru.mtx);
00662                t_count += LRU_2[lane].lru.size;
00663                pthread_mutex_unlock(&LRU_2[lane].lru.mtx);
00664 
00665                pthread_mutex_lock(&LRU_2[lane].lru_pinned.mtx);
00666                t_count += LRU_2[lane].lru_pinned.size;
00667                pthread_mutex_unlock(&LRU_2[lane].lru_pinned.mtx);
00668           }
00669 
00670           LogFullDebug(COMPONENT_CACHE_INODE_LRU,
00671                        "%zu entries in cache.",
00672                        t_count);
00673 
00674           if (tmpflags & LRU_STATE_RECLAIMING) {
00675               if (t_count < lru_state.entries_lowat) {
00676                   tmpflags &= ~LRU_STATE_RECLAIMING;
00677                   LogFullDebug(COMPONENT_CACHE_INODE_LRU,
00678                                "Entry count below low water mark.  "
00679                                "Disabling reclaim.");
00680                }
00681           } else {
00682               if (t_count > lru_state.entries_hiwat) {
00683                   tmpflags |= LRU_STATE_RECLAIMING;
00684                   LogFullDebug(COMPONENT_CACHE_INODE_LRU,
00685                                "Entry count above high water mark.  "
00686                                "Enabling reclaim.");
00687                }
00688           }
00689 
00690           /* Update global state */
00691           pthread_mutex_lock(&lru_mtx);
00692 
00693           lru_state.last_count = t_count;
00694           lru_state.flags = tmpflags;
00695 
00696           pthread_mutex_unlock(&lru_mtx);
00697 
00698           /* Reap file descriptors.  This is a preliminary example of
00699              the L2 functionality rather than something we expect to
00700              be permanent.  (It will have to adapt heavily to the new
00701              FSAL API, for example.) */
00702 
00703           if (atomic_fetch_size_t(&open_fd_count)
00704               < lru_state.fds_lowat) {
00705                LogDebug(COMPONENT_CACHE_INODE_LRU,
00706                         "FD count is %zd and low water mark is "
00707                         "%d: not reaping.",
00708                         open_fd_count,
00709                         lru_state.fds_lowat);
00710                if (cache_inode_gc_policy.use_fd_cache &&
00711                    !lru_state.caching_fds) {
00712                     lru_state.caching_fds = TRUE;
00713                     LogInfo(COMPONENT_CACHE_INODE_LRU,
00714                             "Re-enabling FD cache.");
00715                }
00716           } else {
00717                /* The count of open file descriptors before this run
00718                   of the reaper. */
00719                size_t formeropen = open_fd_count;
00720                /* Total work done in all passes so far.  If this
00721                   exceeds the window, stop. */
00722                size_t totalwork = 0;
00723                /* The current count (after reaping) of open FDs */
00724                size_t currentopen = 0;
00725                /* Work done in the most recent pass of all queues.  if
00726                   value is less than the work to do in a single queue,
00727                   don't spin through more passes. */
00728                size_t workpass = 0;
00729 
00730                LogDebug(COMPONENT_CACHE_INODE_LRU,
00731                         "Starting to reap.");
00732 
00733                if (extremis) {
00734                     LogDebug(COMPONENT_CACHE_INODE_LRU,
00735                                  "Open FDs over high water mark, "
00736                                  "reapring aggressively.");
00737                }
00738 
00739                do {
00740                     workpass = 0;
00741                     for (lane = 0; lane < LRU_N_Q_LANES; ++lane) {
00742                          /* The amount of work done on this lane on
00743                             this pass. */
00744                          size_t workdone = 0;
00745                          /* The current entry being examined. */
00746                          cache_inode_lru_t *lru = NULL;
00747                          /* Number of entries closed in this run. */
00748                          size_t closed = 0;
00749 
00750                          LogDebug(COMPONENT_CACHE_INODE_LRU,
00751                                   "Reaping up to %d entries from lane %zd",
00752                                   lru_state.per_lane_work,
00753                                   lane);
00754 
00755                          pthread_mutex_lock(&LRU_1[lane].lru.mtx);
00756                          while ((workdone < lru_state.per_lane_work) &&
00757                                 (lru = glist_first_entry(&LRU_1[lane].lru.q,
00758                                                          cache_inode_lru_t,
00759                                                          q))) {
00760                               cache_inode_status_t cache_status
00761                                    = CACHE_INODE_SUCCESS;
00762                               cache_entry_t *entry
00763                                    = container_of(lru, cache_entry_t, lru);
00764 
00765                               /* We currently hold the lane queue
00766                                  fragment mutex.  Due to lock
00767                                  ordering, we are forbidden from
00768                                  acquiring the LRU mutex directly.
00769                                  therefore, we increase the reference
00770                                  count of the entry and drop the
00771                                  queue fragment mutex. */
00772 
00773                               atomic_inc_int64_t(&lru->refcount);
00774                               pthread_mutex_unlock(&LRU_1[lane].lru.mtx);
00775 
00776                               /* Acquire the entry mutex.  If the entry
00777                                  is condemned, removed, pinned, or in
00778                                  L2, we have no interest in it. Also
00779                                  decrement the refcount (since we just
00780                                  incremented it.) */
00781 
00782                               pthread_mutex_lock(&lru->mtx);
00783                               atomic_dec_int64_t(&lru->refcount);
00784                               if ((lru->flags & LRU_ENTRY_CONDEMNED) ||
00785                                   (lru->flags & LRU_ENTRY_PINNED) ||
00786                                   (lru->flags & LRU_ENTRY_L2) ||
00787                                   (lru->flags & LRU_ENTRY_KILLED) ||
00788                                   (lru->lane == LRU_NO_LANE)) {
00789                                    /* Drop the entry lock, thenr
00790                                       eacquire the queue lock so we
00791                                       can make another trip through
00792                                       the loop. */
00793                                    pthread_mutex_unlock(&lru->mtx);
00794                                    pthread_mutex_lock(&LRU_1[lane].lru.mtx);
00795                                    /* By definition, if any of these
00796                                       flags are set, the entry isn't
00797                                       in this queue fragment any more. */
00798                                    continue;
00799                               }
00800 
00801                               if (cache_inode_fd(entry)) {
00802                                    cache_inode_close(
00803                                         entry,
00804                                         CACHE_INODE_FLAG_REALLYCLOSE,
00805                                         &cache_status);
00806                                    if (cache_status != CACHE_INODE_SUCCESS) {
00807                                         LogCrit(COMPONENT_CACHE_INODE_LRU,
00808                                                 "Error closing file in "
00809                                                 "LRU thread.");
00810                                    } else
00811                                      ++closed;
00812                               }
00813                               /* Move the entry to L2 whatever the
00814                                  result of examining it.*/
00815                               lru_move_entry(lru, LRU_ENTRY_L2,
00816                                              lru->lane);
00817                               pthread_mutex_unlock(&lru->mtx);
00818                               ++workdone;
00819                               /* Reacquire the lock on the queue
00820                                  fragment for the next run through
00821                                  the loop. */
00822                               pthread_mutex_lock(&LRU_1[lane].lru.mtx);
00823                          }
00824                          pthread_mutex_unlock(&LRU_1[lane].lru.mtx);
00825                          LogDebug(COMPONENT_CACHE_INODE_LRU,
00826                                   "Actually processed %zd entries on lane %zd "
00827                                   "closing %zd descriptors",
00828                                   workdone,
00829                                   lane,
00830                                   closed);
00831                          workpass += workdone;
00832                     }
00833                     totalwork += workpass;
00834                } while (extremis &&
00835                         (workpass >= lru_state.per_lane_work) &&
00836                         (totalwork < lru_state.biggest_window));
00837 
00838                currentopen = open_fd_count;
00839                if (extremis &&
00840                    ((currentopen > formeropen) ||
00841                     (formeropen - currentopen <
00842                      (((formeropen - lru_state.fds_hiwat) *
00843                        cache_inode_gc_policy.required_progress) /
00844                       100)))) {
00845                     if (++lru_state.futility >
00846                         cache_inode_gc_policy.futility_count) {
00847                          LogCrit(COMPONENT_CACHE_INODE_LRU,
00848                                  "Futility count exceeded.  The LRU thread is "
00849                                  "unable to make progress in reclaiming FDs."
00850                                  "Disabling FD cache.");
00851                          lru_state.caching_fds = FALSE;
00852                     }
00853                }
00854           }
00855 
00856           LogDebug(COMPONENT_CACHE_INODE_LRU,
00857                   "open_fd_count: %zd  t_count:%"PRIu64"\n",
00858                   open_fd_count, t_count);
00859 
00860           woke = lru_thread_delay_ms(lru_state.threadwait);
00861      }
00862 
00863      LogEvent(COMPONENT_CACHE_INODE_LRU,
00864               "Shutting down LRU thread.");
00865 
00866      return NULL;
00867 }
00868 
00869 /* Public functions */
00870 
00875 void
00876 cache_inode_lru_pkginit(void)
00877 {
00878      /* The attributes governing the LRU reaper thread. */
00879      pthread_attr_t attr_thr;
00880      /* Index for initializing lanes */
00881      size_t ix = 0;
00882      /* Return code from system calls */
00883      int code = 0;
00884      /* Rlimit for open file descriptors */
00885      struct rlimit rlim = {
00886           .rlim_cur = RLIM_INFINITY,
00887           .rlim_max = RLIM_INFINITY
00888      };
00889 
00890      open_fd_count = 0;
00891 
00892      /* Repurpose some GC policy */
00893      lru_state.flags = LRU_STATE_NONE;
00894 
00895      /* Set high and low watermark for cache entries.  This seems a
00896         bit fishy, so come back and revisit this. */
00897      lru_state.entries_hiwat
00898           = cache_inode_gc_policy.entries_hwmark;
00899      lru_state.entries_lowat
00900           = cache_inode_gc_policy.entries_lwmark;
00901 
00902      /* Find out the system-imposed file descriptor limit */
00903      if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
00904           code = errno;
00905           LogCrit(COMPONENT_CACHE_INODE_LRU,
00906                   "Call to getrlimit failed with error %d.  "
00907                   "This should not happen.  Assigning default of %d.",
00908                   code, FD_FALLBACK_LIMIT);
00909           lru_state.fds_system_imposed = FD_FALLBACK_LIMIT;
00910      } else {
00911           if (rlim.rlim_cur < rlim.rlim_max) {
00912                /* Save the old soft value so we can fall back to it
00913                   if setrlimit fails. */
00914                rlim_t old_soft = rlim.rlim_cur;
00915                LogInfo(COMPONENT_CACHE_INODE_LRU,
00916                        "Attempting to increase soft limit from %jd "
00917                        "to hard limit of %jd",
00918                        rlim.rlim_cur, rlim.rlim_max);
00919                rlim.rlim_cur = rlim.rlim_max;
00920                if (setrlimit(RLIMIT_NOFILE, &rlim) < 0) {
00921                     code = errno;
00922                     LogWarn(COMPONENT_CACHE_INODE_LRU,
00923                             "Attempt to raise soft FD limit to hard FD limit "
00924                             "failed with error %d.  Sticking to soft limit.",
00925                             code);
00926                     rlim.rlim_cur = old_soft;
00927                }
00928           }
00929           if (rlim.rlim_cur == RLIM_INFINITY) {
00930                FILE *const nr_open = fopen("/proc/sys/fs/nr_open",
00931                                            "r");
00932                if (!(nr_open &&
00933                      (fscanf(nr_open,
00934                              "%"SCNu32"\n",
00935                              &lru_state.fds_system_imposed) == 1) &&
00936                      (fclose(nr_open) == 0))) {
00937                     code = errno;
00938                     LogMajor(COMPONENT_CACHE_INODE_LRU,
00939                              "The rlimit on open file descriptors is infinite "
00940                              "and the attempt to find the system maximum "
00941                              "failed with error %d.  "
00942                              "Assigning the default fallback of %d which is "
00943                              "almost certainly too small.  If you are on a "
00944                              "Linux system, this should never happen.  If "
00945                              "you are running some other system, please set "
00946                              "an rlimit on file descriptors (for example, "
00947                              "with ulimit) for this process and consider "
00948                              "editing " __FILE__ "to add support for finding "
00949                              "your system's maximum.", code,
00950                              FD_FALLBACK_LIMIT);
00951                     lru_state.fds_system_imposed = FD_FALLBACK_LIMIT;
00952                }
00953           } else {
00954                lru_state.fds_system_imposed = rlim.rlim_cur;
00955           }
00956           LogInfo(COMPONENT_CACHE_INODE_LRU,
00957                   "Setting the system-imposed limit on FDs to %d.",
00958                   lru_state.fds_system_imposed);
00959      }
00960 
00961 
00962      lru_state.fds_hard_limit = (cache_inode_gc_policy.fd_limit_percent *
00963                                  lru_state.fds_system_imposed) / 100;
00964      lru_state.fds_hiwat = (cache_inode_gc_policy.fd_hwmark_percent *
00965                             lru_state.fds_system_imposed) / 100;
00966      lru_state.fds_lowat = (cache_inode_gc_policy.fd_lwmark_percent *
00967                             lru_state.fds_system_imposed) / 100;
00968      lru_state.futility = 0;
00969 
00970      lru_state.per_lane_work
00971           = (cache_inode_gc_policy.reaper_work / LRU_N_Q_LANES);
00972      lru_state.biggest_window = (cache_inode_gc_policy.biggest_window *
00973                                  lru_state.fds_system_imposed) / 100;
00974 
00975      lru_state.last_count = 0;
00976 
00977      lru_state.threadwait
00978           = 1000 * cache_inode_gc_policy.lru_run_interval;
00979 
00980      lru_state.caching_fds = cache_inode_gc_policy.use_fd_cache;
00981 
00982      pthread_mutex_init(&lru_mtx, NULL);
00983      pthread_cond_init(&lru_cv, NULL);
00984 
00985      for (ix = 0; ix < LRU_N_Q_LANES; ++ix) {
00986           /* L1, unpinned */
00987           lru_init_queue(&LRU_1[ix].lru);
00988           /* L1, pinned */
00989           lru_init_queue(&LRU_1[ix].lru_pinned);
00990           /* L2, unpinned */
00991           lru_init_queue(&LRU_2[ix].lru);
00992           /* L2, pinned */
00993           lru_init_queue(&LRU_2[ix].lru_pinned);
00994      }
00995 
00996      if (pthread_attr_init(&attr_thr) != 0) {
00997           LogCrit(COMPONENT_CACHE_INODE_LRU,
00998                   "can't init pthread's attributes");
00999      }
01000 
01001      if (pthread_attr_setscope(&attr_thr, PTHREAD_SCOPE_SYSTEM)
01002          != 0) {
01003           LogCrit(COMPONENT_CACHE_INODE_LRU, "can't set pthread's scope");
01004      }
01005 
01006      if (pthread_attr_setdetachstate(&attr_thr, PTHREAD_CREATE_JOINABLE)
01007          != 0) {
01008           LogCrit(COMPONENT_CACHE_INODE_LRU, "can't set pthread's join state");
01009      }
01010 
01011      if (pthread_attr_setstacksize(&attr_thr, THREAD_STACK_SIZE)
01012          != 0) {
01013           LogCrit(COMPONENT_CACHE_INODE_LRU, "can't set pthread's stack size");
01014      }
01015 
01016      /* spawn LRU background thread */
01017      code = pthread_create(&lru_thread_state.thread_id, &attr_thr, lru_thread,
01018                           NULL);
01019      if (code != 0) {
01020           code = errno;
01021           LogFatal(COMPONENT_CACHE_INODE_LRU,
01022                    "Unable to start lru reaper thread, error code %d.",
01023                    code);
01024      }
01025 }
01026 
01031 void
01032 cache_inode_lru_pkgshutdown(void)
01033 {
01034      /* Post and wait for shutdown of LRU background thread */
01035      pthread_mutex_lock(&lru_mtx);
01036      lru_thread_state.flags |= LRU_SHUTDOWN;
01037      lru_wake_thread(LRU_FLAG_NONE);
01038      pthread_mutex_unlock(&lru_mtx);
01039 }
01040 
01055 cache_entry_t *
01056 cache_inode_lru_get(cache_inode_status_t *status,
01057                     uint32_t flags)
01058 {
01059      /* The lane from which we harvest (or into which we store) the
01060         new entry.  Usually the lane assigned to this thread. */
01061      uint32_t lane = 0;
01062      /* The LRU entry */
01063      cache_inode_lru_t *lru = NULL;
01064      /* The Cache entry being created */
01065      cache_entry_t *entry = NULL;
01066 
01067      /* If we are in reclaim state, try to find an entry to recycle. */
01068      pthread_mutex_lock(&lru_mtx);
01069      if (lru_state.flags & LRU_STATE_RECLAIMING) {
01070           pthread_mutex_unlock(&lru_mtx);
01071 
01072           /* Search through logical L2 entry. */
01073           for (lane = 0; lane < LRU_N_Q_LANES; ++lane) {
01074                lru = lru_try_reap_entry(&LRU_2[lane].lru);
01075                if (lru)
01076                     break;
01077           }
01078 
01079           /* Search through logical L1 if nothing was found in L2
01080              (fall through, otherwise.) */
01081           if (!lru) {
01082                for (lane = 0; lane < LRU_N_Q_LANES; ++lane) {
01083                     lru = lru_try_reap_entry(&LRU_1[lane].lru);
01084                     if (lru)
01085                          break;
01086                }
01087           }
01088 
01089           /* If we found an entry, we hold a lock on it and it is
01090              ready to be recycled. */
01091           if (lru) {
01092                entry = container_of(lru, cache_entry_t, lru);
01093                if (entry) {
01094                     LogFullDebug(COMPONENT_CACHE_INODE_LRU,
01095                                  "Recycling entry at %p.",
01096                                  entry);
01097                }
01098                cache_inode_lru_clean(entry);
01099           }
01100      } else {
01101           pthread_mutex_unlock(&lru_mtx);
01102      }
01103 
01104      if (!lru) {
01105           entry = pool_alloc(cache_inode_entry_pool, NULL);
01106           if(entry == NULL) {
01107                LogCrit(COMPONENT_CACHE_INODE_LRU,
01108                        "can't allocate a new entry from cache pool");
01109                *status = CACHE_INODE_MALLOC_ERROR;
01110                goto out;
01111           }
01112           if (pthread_mutex_init(&entry->lru.mtx, NULL) != 0) {
01113                pool_free(cache_inode_entry_pool, entry);
01114                LogCrit(COMPONENT_CACHE_INODE_LRU,
01115                        "pthread_mutex_init of lru.mtx returned %d (%s)",
01116                        errno,
01117                        strerror(errno));
01118                entry = NULL;
01119                *status = CACHE_INODE_INIT_ENTRY_FAILED;
01120                goto out;
01121           }
01122      }
01123 
01124      assert(entry);
01125      /* Set the sentinel refcount.  Since the entry isn't in a queue,
01126         nobody can bump the refcount yet. */
01127      entry->lru.refcount = 2;
01128      entry->lru.pin_refcnt = 0;
01129      entry->lru.flags = 0;
01130      pthread_mutex_lock(&entry->lru.mtx);
01131      lru_insert_entry(&entry->lru, 0,
01132                       lru_lane_of_entry(entry));
01133      pthread_mutex_unlock(&entry->lru.mtx);
01134 
01135      *status = CACHE_INODE_SUCCESS;
01136 
01137 out:
01138      return (entry);
01139 }
01140 
01154 cache_inode_status_t
01155 cache_inode_inc_pin_ref(cache_entry_t *entry)
01156 {
01157      cache_inode_status_t rc = CACHE_INODE_SUCCESS;
01158 
01159      pthread_mutex_lock(&entry->lru.mtx);
01160 
01161      if (entry->lru.flags & LRU_ENTRY_UNPINNABLE) {
01162           pthread_mutex_unlock(&entry->lru.mtx);
01163           return CACHE_INODE_DEAD_ENTRY;
01164      }
01165 
01166      if (!entry->lru.pin_refcnt && !(entry->lru.flags & LRU_ENTRY_PINNED)) {
01167           lru_move_entry(&entry->lru, LRU_ENTRY_PINNED,
01168                          entry->lru.lane);
01169      }
01170      entry->lru.pin_refcnt++;
01171 
01172      /* Also take an LRU reference */
01173      atomic_inc_int64_t(&entry->lru.refcount);
01174 
01175      pthread_mutex_unlock(&entry->lru.mtx);
01176 
01177      return rc;
01178 }
01179 
01190 void
01191 cache_inode_unpinnable(cache_entry_t *entry)
01192 {
01193      pthread_mutex_lock(&entry->lru.mtx);
01194      entry->lru.flags |= LRU_ENTRY_UNPINNABLE;
01195      pthread_mutex_unlock(&entry->lru.mtx);
01196 }
01197 
01210 cache_inode_status_t
01211 cache_inode_dec_pin_ref(cache_entry_t *entry)
01212 {
01213      pthread_mutex_lock(&entry->lru.mtx);
01214      assert(entry->lru.pin_refcnt);
01215      /* Make sure at least one other LRU reference is held,
01216       * caller should separately hold an LRU reference
01217       */
01218      assert(entry->lru.refcount > 1);
01219      entry->lru.pin_refcnt--;
01220      if (!entry->lru.pin_refcnt && (entry->lru.flags & LRU_ENTRY_PINNED)) {
01221           lru_move_entry(&entry->lru, 0, entry->lru.lane);
01222      }
01223 
01224      /* Also release an LRU reference */
01225      atomic_dec_int64_t(&entry->lru.refcount);
01226 
01227      pthread_mutex_unlock(&entry->lru.mtx);
01228 
01229      return CACHE_INODE_SUCCESS;
01230 }
01231 
01246 cache_inode_status_t
01247 cache_inode_lru_ref(cache_entry_t *entry,
01248                     uint32_t flags)
01249 {
01250      pthread_mutex_lock(&entry->lru.mtx);
01251 
01252      /* Refuse to grant a reference if we're below the sentinel value
01253         or the entry is being removed or recycled. */
01254      if ((entry->lru.refcount == 0) ||
01255          (entry->lru.flags & LRU_ENTRY_CONDEMNED)) {
01256           pthread_mutex_unlock(&entry->lru.mtx);
01257           return CACHE_INODE_DEAD_ENTRY;
01258      }
01259 
01260      /* These shouldn't ever be set */
01261      flags &= ~(LRU_ENTRY_PINNED | LRU_ENTRY_L2);
01262 
01263      /* Initial and Scan are mutually exclusive. */
01264 
01265      assert(!((flags & LRU_REQ_INITIAL) &&
01266               (flags & LRU_REQ_SCAN)));
01267 
01268      atomic_inc_int64_t(&entry->lru.refcount);
01269 
01270      /* Move an entry forward if this is an initial reference. */
01271 
01272      if (flags & LRU_REQ_INITIAL) {
01273           lru_move_entry(&entry->lru,
01274                          /* Pinned stays pinned */
01275                          flags | (entry->lru.flags &
01276                                   LRU_ENTRY_PINNED),
01277                          entry->lru.lane);
01278      } else if ((flags & LRU_REQ_SCAN) &&
01279                 (entry->lru.flags & LRU_ENTRY_L2)) {
01280           lru_move_entry(&entry->lru,
01281                          /* Pinned stays pinned, L2 stays in L2. A
01282                             reference got for SCAN must not be used
01283                             to open an FD. */
01284                          flags | (entry->lru.flags &
01285                                   LRU_ENTRY_PINNED) |
01286                          LRU_ENTRY_L2,
01287                          entry->lru.lane);
01288      }
01289 
01290      pthread_mutex_unlock(&entry->lru.mtx);
01291 
01292      return CACHE_INODE_SUCCESS;
01293 }
01294 
01307 void cache_inode_lru_kill(cache_entry_t *entry)
01308 {
01309      pthread_mutex_lock(&entry->lru.mtx);
01310      if (entry->lru.flags & LRU_ENTRY_KILLED) {
01311           pthread_mutex_unlock(&entry->lru.mtx);
01312      } else {
01313           entry->lru.flags |= LRU_ENTRY_KILLED;
01314           /* cache_inode_lru_unref always either unlocks or destroys
01315              the entry. */
01316           cache_inode_lru_unref(entry, LRU_FLAG_LOCKED);
01317      }
01318 }
01319 
01336 void
01337 cache_inode_lru_unref(cache_entry_t *entry,
01338                       uint32_t flags)
01339 {
01340      if (!(flags & LRU_FLAG_LOCKED)) {
01341           pthread_mutex_lock(&entry->lru.mtx);
01342      }
01343 
01344      assert(entry->lru.refcount >= 1);
01345 
01346      if (entry->lru.refcount == 1) {
01347           struct lru_q_base *q
01348                = lru_select_queue(entry->lru.flags,
01349                                   entry->lru.lane);
01350           pthread_mutex_lock(&q->mtx);
01351           atomic_dec_int64_t(&entry->lru.refcount);
01352           if (entry->lru.refcount == 0) {
01353                /* Refcount has fallen to zero.  Remove the entry from
01354                   the queue and mark it as dead. */
01355                entry->lru.flags = LRU_ENTRY_CONDEMNED;
01356                glist_del(&entry->lru.q);
01357                --(q->size);
01358                entry->lru.lane = LRU_NO_LANE;
01359                /* Give other threads a chance to see that */
01360                pthread_mutex_unlock(&entry->lru.mtx);
01361                pthread_mutex_unlock(&q->mtx);
01362                pthread_yield();
01363                /* We should not need to hold the LRU mutex at this
01364                   point.  The hash table locks will ensure that by
01365                   the time this function completes successfully,
01366                   other threads will either have received
01367                   CACHE_INDOE_DEAD_ENTRY in the attempt to gain a
01368                   reference, or we will have removed the hash table
01369                   entry. */
01370                cache_inode_lru_clean(entry);
01371 
01372                pthread_mutex_destroy(&entry->lru.mtx);
01373                pool_free(cache_inode_entry_pool, entry);
01374                return;
01375           } else {
01376                pthread_mutex_unlock(&q->mtx);
01377           }
01378      } else {
01379           /* We may decrement the reference count without the queue
01380              lock, since it cannot go to 0. */
01381           atomic_dec_int64_t(&entry->lru.refcount);
01382      }
01383 
01384      pthread_mutex_unlock(&entry->lru.mtx);
01385 }
01386 
01397 void lru_wake_thread(uint32_t flags)
01398 {
01399      if (lru_thread_state.flags & LRU_SLEEPING)
01400           pthread_cond_signal(&lru_cv);
01401 }