dlm: change list and timer names

The old terminology of "toss" and "keep" is no longer an
accurate description of the rsb states and lists, so change
the names to "inactive" and "active".  The old names had
also been copied into the scanning code, which is changed
back to use the "scan" name.

- "active" rsb structs have lkb's attached, and are ref counted.
- "inactive" rsb structs have no lkb's attached, are not ref counted.
- "scan" list is for rsb's that can be freed after a timeout period.
- "slow" lists are for infrequent iterations through active or
   inactive rsb structs.
- inactive rsb structs that are directory records will not be put
  on the scan list, since they are not freed based on timeouts.
- inactive rsb structs that are not directory records will be
  put on the scan list to be freed, since they are not longer needed.

Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
David Teigland 2024-06-10 15:02:31 -05:00
parent fa0b54f17a
commit 4f5957a980
9 changed files with 181 additions and 218 deletions

View File

@ -380,7 +380,7 @@ static const struct seq_operations format4_seq_ops;
static int table_seq_show(struct seq_file *seq, void *iter_ptr) static int table_seq_show(struct seq_file *seq, void *iter_ptr)
{ {
struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list); struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_slow_list);
if (seq->op == &format1_seq_ops) if (seq->op == &format1_seq_ops)
print_format1(rsb, seq); print_format1(rsb, seq);
@ -409,9 +409,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
} }
if (seq->op == &format4_seq_ops) if (seq->op == &format4_seq_ops)
list = &ls->ls_toss; list = &ls->ls_slow_inactive;
else else
list = &ls->ls_keep; list = &ls->ls_slow_active;
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
return seq_list_start(list, *pos); return seq_list_start(list, *pos);
@ -423,9 +423,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
struct list_head *list; struct list_head *list;
if (seq->op == &format4_seq_ops) if (seq->op == &format4_seq_ops)
list = &ls->ls_toss; list = &ls->ls_slow_inactive;
else else
list = &ls->ls_keep; list = &ls->ls_slow_active;
return seq_list_next(iter_ptr, list, pos); return seq_list_next(iter_ptr, list, pos);
} }

View File

@ -327,11 +327,11 @@ struct dlm_rsb {
struct list_head res_convertqueue; struct list_head res_convertqueue;
struct list_head res_waitqueue; struct list_head res_waitqueue;
struct list_head res_rsbs_list; struct list_head res_slow_list; /* ls_slow_* */
struct list_head res_scan_list;
struct list_head res_root_list; /* used for recovery */ struct list_head res_root_list; /* used for recovery */
struct list_head res_masters_list; /* used for recovery */ struct list_head res_masters_list; /* used for recovery */
struct list_head res_recover_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */
struct list_head res_toss_q_list;
int res_recover_locks_count; int res_recover_locks_count;
char *res_lvbptr; char *res_lvbptr;
@ -365,7 +365,7 @@ enum rsb_flags {
RSB_RECOVER_CONVERT, RSB_RECOVER_CONVERT,
RSB_RECOVER_GRANT, RSB_RECOVER_GRANT,
RSB_RECOVER_LVB_INVAL, RSB_RECOVER_LVB_INVAL,
RSB_TOSS, RSB_INACTIVE,
}; };
static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@ -572,20 +572,16 @@ struct dlm_ls {
struct xarray ls_lkbxa; struct xarray ls_lkbxa;
rwlock_t ls_lkbxa_lock; rwlock_t ls_lkbxa_lock;
/* an rsb is on rsbtl for primary locking functions,
and on a slow list for recovery/dump iteration */
struct rhashtable ls_rsbtbl; struct rhashtable ls_rsbtbl;
rwlock_t ls_rsbtbl_lock; rwlock_t ls_rsbtbl_lock; /* for ls_rsbtbl and ls_slow */
struct list_head ls_slow_inactive; /* to iterate rsbtbl */
struct list_head ls_slow_active; /* to iterate rsbtbl */
struct list_head ls_toss; struct timer_list ls_scan_timer; /* based on first scan_list rsb toss_time */
struct list_head ls_keep; struct list_head ls_scan_list; /* rsbs ordered by res_toss_time */
spinlock_t ls_scan_lock;
struct timer_list ls_timer;
/* this queue is ordered according the
* absolute res_toss_time jiffies time
* to mod_timer() with the first element
* if necessary.
*/
struct list_head ls_toss_q;
spinlock_t ls_toss_q_lock;
spinlock_t ls_waiters_lock; spinlock_t ls_waiters_lock;
struct list_head ls_waiters; /* lkbs needing a reply */ struct list_head ls_waiters; /* lkbs needing a reply */

View File

@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local); const struct dlm_message *ms, bool local);
static int receive_extralen(const struct dlm_message *ms); static int receive_extralen(const struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void toss_rsb(struct kref *kref); static void deactivate_rsb(struct kref *kref);
/* /*
* Lock compatibilty matrix - thanks Steve * Lock compatibilty matrix - thanks Steve
@ -330,8 +330,8 @@ static inline unsigned long rsb_toss_jiffies(void)
static inline void hold_rsb(struct dlm_rsb *r) static inline void hold_rsb(struct dlm_rsb *r)
{ {
/* rsbs in toss state never get referenced */ /* inactive rsbs are not ref counted */
WARN_ON(rsb_flag(r, RSB_TOSS)); WARN_ON(rsb_flag(r, RSB_INACTIVE));
kref_get(&r->res_ref); kref_get(&r->res_ref);
} }
@ -370,15 +370,12 @@ static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
return 0; return 0;
} }
/* When all references to the rsb are gone it's transferred to
the tossed list for later disposal. */
static void put_rsb(struct dlm_rsb *r) static void put_rsb(struct dlm_rsb *r)
{ {
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
int rv; int rv;
rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
&ls->ls_rsbtbl_lock); &ls->ls_rsbtbl_lock);
if (rv) if (rv)
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
@ -391,48 +388,49 @@ void dlm_put_rsb(struct dlm_rsb *r)
/* connected with timer_delete_sync() in dlm_ls_stop() to stop /* connected with timer_delete_sync() in dlm_ls_stop() to stop
* new timers when recovery is triggered and don't run them * new timers when recovery is triggered and don't run them
* again until a dlm_timer_resume() tries it again. * again until a resume_scan_timer() tries it again.
*/ */
static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
{ {
if (!dlm_locking_stopped(ls)) if (!dlm_locking_stopped(ls))
mod_timer(&ls->ls_timer, jiffies); mod_timer(&ls->ls_scan_timer, jiffies);
} }
/* This function tries to resume the timer callback if a rsb /* This function tries to resume the timer callback if a rsb
* is on the toss list and no timer is pending. It might that * is on the scan list and no timer is pending. It might that
* the first entry is on currently executed as timer callback * the first entry is on currently executed as timer callback
* but we don't care if a timer queued up again and does * but we don't care if a timer queued up again and does
* nothing. Should be a rare case. * nothing. Should be a rare case.
*/ */
void dlm_timer_resume(struct dlm_ls *ls) void resume_scan_timer(struct dlm_ls *ls)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
spin_lock_bh(&ls->ls_toss_q_lock); spin_lock_bh(&ls->ls_scan_lock);
r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_toss_q_list); res_scan_list);
if (r && !timer_pending(&ls->ls_timer)) if (r && !timer_pending(&ls->ls_scan_timer))
__rsb_mod_timer(ls, r->res_toss_time); enable_scan_timer(ls, r->res_toss_time);
spin_unlock_bh(&ls->ls_toss_q_lock); spin_unlock_bh(&ls->ls_scan_lock);
} }
/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ /* ls_rsbtbl_lock must be held */
static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
{ {
struct dlm_rsb *first; struct dlm_rsb *first;
spin_lock_bh(&ls->ls_toss_q_lock); spin_lock_bh(&ls->ls_scan_lock);
r->res_toss_time = 0; r->res_toss_time = 0;
/* if the rsb is not queued do nothing */ /* if the rsb is not queued do nothing */
if (list_empty(&r->res_toss_q_list)) if (list_empty(&r->res_scan_list))
goto out; goto out;
/* get the first element before delete */ /* get the first element before delete */
first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
res_toss_q_list); res_scan_list);
list_del_init(&r->res_toss_q_list); list_del_init(&r->res_scan_list);
/* check if the first element was the rsb we deleted */ /* check if the first element was the rsb we deleted */
if (first == r) { if (first == r) {
/* try to get the new first element, if the list /* try to get the new first element, if the list
@ -442,23 +440,19 @@ static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
* if the list isn't empty and a new first element got * if the list isn't empty and a new first element got
* in place, set the new timer expire time. * in place, set the new timer expire time.
*/ */
first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_toss_q_list); res_scan_list);
if (!first) if (!first)
timer_delete(&ls->ls_timer); timer_delete(&ls->ls_scan_timer);
else else
__rsb_mod_timer(ls, first->res_toss_time); enable_scan_timer(ls, first->res_toss_time);
} }
out: out:
spin_unlock_bh(&ls->ls_toss_q_lock); spin_unlock_bh(&ls->ls_scan_lock);
} }
/* Caller must held ls_rsbtbl_lock and need to be called every time static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
* when either the rsb enters toss state or the toss state changes
* the dir/master nodeid.
*/
static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
{ {
int our_nodeid = dlm_our_nodeid(); int our_nodeid = dlm_our_nodeid();
struct dlm_rsb *first; struct dlm_rsb *first;
@ -471,25 +465,25 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
if (!dlm_no_directory(ls) && if (!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) && (r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid)) { (dlm_dir_nodeid(r) == our_nodeid)) {
rsb_delete_toss_timer(ls, r); del_scan(ls, r);
return; return;
} }
spin_lock_bh(&ls->ls_toss_q_lock); spin_lock_bh(&ls->ls_scan_lock);
/* set the new rsb absolute expire time in the rsb */ /* set the new rsb absolute expire time in the rsb */
r->res_toss_time = rsb_toss_jiffies(); r->res_toss_time = rsb_toss_jiffies();
if (list_empty(&ls->ls_toss_q)) { if (list_empty(&ls->ls_scan_list)) {
/* if the queue is empty add the element and it's /* if the queue is empty add the element and it's
* our new expire time * our new expire time
*/ */
list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
__rsb_mod_timer(ls, r->res_toss_time); enable_scan_timer(ls, r->res_toss_time);
} else { } else {
/* check if the rsb was already queued, if so delete /* check if the rsb was already queued, if so delete
* it from the toss queue * it from the toss queue
*/ */
if (!list_empty(&r->res_toss_q_list)) if (!list_empty(&r->res_scan_list))
list_del(&r->res_toss_q_list); list_del(&r->res_scan_list);
/* try to get the maybe new first element and then add /* try to get the maybe new first element and then add
* to this rsb with the oldest expire time to the end * to this rsb with the oldest expire time to the end
@ -497,15 +491,15 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
* rsb expire time is our next expiration if it wasn't * rsb expire time is our next expiration if it wasn't
* the now new first elemet is our new expiration time * the now new first elemet is our new expiration time
*/ */
first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_toss_q_list); res_scan_list);
list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
if (!first) if (!first)
__rsb_mod_timer(ls, r->res_toss_time); enable_scan_timer(ls, r->res_toss_time);
else else
__rsb_mod_timer(ls, first->res_toss_time); enable_scan_timer(ls, first->res_toss_time);
} }
spin_unlock_bh(&ls->ls_toss_q_lock); spin_unlock_bh(&ls->ls_scan_lock);
} }
/* if we hit contention we do in 250 ms a retry to trylock. /* if we hit contention we do in 250 ms a retry to trylock.
@ -515,9 +509,11 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
*/ */
#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
void dlm_rsb_toss_timer(struct timer_list *timer) /* Called by lockspace scan_timer to free unused rsb's. */
void dlm_rsb_scan(struct timer_list *timer)
{ {
struct dlm_ls *ls = from_timer(ls, timer, ls_timer); struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
int our_nodeid = dlm_our_nodeid(); int our_nodeid = dlm_our_nodeid();
struct dlm_rsb *r; struct dlm_rsb *r;
int rv; int rv;
@ -525,76 +521,62 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
while (1) { while (1) {
/* interrupting point to leave iteration when /* interrupting point to leave iteration when
* recovery waits for timer_delete_sync(), recovery * recovery waits for timer_delete_sync(), recovery
* will take care to delete everything in toss queue. * will take care to delete everything in scan list.
*/ */
if (dlm_locking_stopped(ls)) if (dlm_locking_stopped(ls))
break; break;
rv = spin_trylock(&ls->ls_toss_q_lock); rv = spin_trylock(&ls->ls_scan_lock);
if (!rv) { if (!rv) {
/* rearm again try timer */ /* rearm again try timer */
__rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
break; break;
} }
r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_toss_q_list); res_scan_list);
if (!r) { if (!r) {
/* nothing to do anymore next rsb queue will /* the next add_scan will enable the timer again */
* set next mod_timer() expire. spin_unlock(&ls->ls_scan_lock);
*/
spin_unlock(&ls->ls_toss_q_lock);
break; break;
} }
/* test if the first rsb isn't expired yet, if /*
* so we stop freeing rsb from toss queue as * If the first rsb is not yet expired, then stop because the
* the order in queue is ascending to the * list is sorted with nearest expiration first.
* absolute res_toss_time jiffies
*/ */
if (time_before(jiffies, r->res_toss_time)) { if (time_before(jiffies, r->res_toss_time)) {
/* rearm with the next rsb to expire in the future */ /* rearm with the next rsb to expire in the future */
__rsb_mod_timer(ls, r->res_toss_time); enable_scan_timer(ls, r->res_toss_time);
spin_unlock(&ls->ls_toss_q_lock); spin_unlock(&ls->ls_scan_lock);
break; break;
} }
/* in find_rsb_dir/nodir there is a reverse order of this /* in find_rsb_dir/nodir there is a reverse order of this
* lock, however this is only a trylock if we hit some * lock, however this is only a trylock if we hit some
* possible contention we try it again. * possible contention we try it again.
*
* This lock synchronized while holding ls_toss_q_lock
* synchronize everything that rsb_delete_toss_timer()
* or rsb_mod_timer() can't run after this timer callback
* deletes the rsb from the ls_toss_q. Whereas the other
* holders have always a priority to run as this is only
* a caching handling and the other holders might to put
* this rsb out of the toss state.
*/ */
rv = write_trylock(&ls->ls_rsbtbl_lock); rv = write_trylock(&ls->ls_rsbtbl_lock);
if (!rv) { if (!rv) {
spin_unlock(&ls->ls_toss_q_lock); spin_unlock(&ls->ls_scan_lock);
/* rearm again try timer */ /* rearm again try timer */
__rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
break; break;
} }
list_del(&r->res_rsbs_list); list_del(&r->res_slow_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params); dlm_rhash_rsb_params);
/* not necessary to held the ls_rsbtbl_lock when /* ls_rsbtbl_lock is not needed when calling send_remove() */
* calling send_remove()
*/
write_unlock(&ls->ls_rsbtbl_lock); write_unlock(&ls->ls_rsbtbl_lock);
/* remove the rsb out of the toss queue its gone list_del_init(&r->res_scan_list);
* drom DLM now spin_unlock(&ls->ls_scan_lock);
*/
list_del_init(&r->res_toss_q_list);
spin_unlock(&ls->ls_toss_q_lock);
/* no rsb in this state should ever run a timer */ /* An rsb that is a dir record for a remote master rsb
* cannot be removed, and should not have a timer enabled.
*/
WARN_ON(!dlm_no_directory(ls) && WARN_ON(!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) && (r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid)); (dlm_dir_nodeid(r) == our_nodeid));
@ -608,7 +590,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
(dlm_dir_nodeid(r) != our_nodeid)) (dlm_dir_nodeid(r) != our_nodeid))
send_remove(r); send_remove(r);
free_toss_rsb(r); free_inactive_rsb(r);
} }
} }
@ -635,7 +617,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_convertqueue);
INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_waitqueue);
INIT_LIST_HEAD(&r->res_root_list); INIT_LIST_HEAD(&r->res_root_list);
INIT_LIST_HEAD(&r->res_toss_q_list); INIT_LIST_HEAD(&r->res_scan_list);
INIT_LIST_HEAD(&r->res_recover_list); INIT_LIST_HEAD(&r->res_recover_list);
INIT_LIST_HEAD(&r->res_masters_list); INIT_LIST_HEAD(&r->res_masters_list);
@ -689,7 +671,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
* So, if the given rsb is on the toss list, it is moved to the keep list * So, if the given rsb is on the toss list, it is moved to the keep list
* before being returned. * before being returned.
* *
* toss_rsb() happens when all local usage of the rsb is done, i.e. no * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
* more refcounts exist, so the rsb is moved from the keep list to the * more refcounts exist, so the rsb is moved from the keep list to the
* toss list. * toss list.
* *
@ -737,9 +719,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
* *
* If someone sends us a request, we are the dir node, and we do * If someone sends us a request, we are the dir node, and we do
* not find the rsb anywhere, then recreate it. This happens if * not find the rsb anywhere, then recreate it. This happens if
* someone sends us a request after we have removed/freed an rsb * someone sends us a request after we have removed/freed an rsb.
* from our toss list. (They sent a request instead of lookup * (They sent a request instead of lookup because they are using
* because they are using an rsb from their toss list.) * an rsb taken from their scan list.)
*/ */
if (from_local || from_dir || if (from_local || from_dir ||
@ -749,7 +731,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
retry: retry:
/* check if the rsb is in keep state under read lock - likely path */ /* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) { if (error) {
@ -761,9 +743,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
* rsb is active, so we can't check master_nodeid without lock_rsb. * rsb is active, so we can't check master_nodeid without lock_rsb.
*/ */
if (rsb_flag(r, RSB_TOSS)) { if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_toss; goto do_inactive;
} }
kref_get(&r->res_ref); kref_get(&r->res_ref);
@ -771,15 +753,15 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out; goto out;
do_toss: do_inactive:
write_lock_bh(&ls->ls_rsbtbl_lock); write_lock_bh(&ls->ls_rsbtbl_lock);
/* retry lookup under write lock to see if its still in toss state /* retry lookup under write lock to see if its still in inactive state
* if not it's in keep state and we relookup - unlikely path. * if not it's in active state and we relookup - unlikely path.
*/ */
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) { if (!error) {
if (!rsb_flag(r, RSB_TOSS)) { if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry; goto retry;
} }
@ -791,14 +773,14 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
/* /*
* rsb found inactive (master_nodeid may be out of date unless * rsb found inactive (master_nodeid may be out of date unless
* we are the dir_nodeid or were the master) No other thread * we are the dir_nodeid or were the master) No other thread
* is using this rsb because it's on the toss list, so we can * is using this rsb because it's inactive, so we can
* look at or update res_master_nodeid without lock_rsb. * look at or update res_master_nodeid without lock_rsb.
*/ */
if ((r->res_master_nodeid != our_nodeid) && from_other) { if ((r->res_master_nodeid != our_nodeid) && from_other) {
/* our rsb was not master, and another node (not the dir node) /* our rsb was not master, and another node (not the dir node)
has sent us a request */ has sent us a request */
log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
from_nodeid, r->res_master_nodeid, dir_nodeid, from_nodeid, r->res_master_nodeid, dir_nodeid,
r->res_name); r->res_name);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
@ -808,7 +790,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
if ((r->res_master_nodeid != our_nodeid) && from_dir) { if ((r->res_master_nodeid != our_nodeid) && from_dir) {
/* don't think this should ever happen */ /* don't think this should ever happen */
log_error(ls, "find_rsb toss from_dir %d master %d", log_error(ls, "find_rsb inactive from_dir %d master %d",
from_nodeid, r->res_master_nodeid); from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
/* fix it and go on */ /* fix it and go on */
@ -825,14 +807,10 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
r->res_first_lkid = 0; r->res_first_lkid = 0;
} }
list_move(&r->res_rsbs_list, &ls->ls_keep); list_move(&r->res_slow_list, &ls->ls_slow_active);
rsb_clear_flag(r, RSB_TOSS); rsb_clear_flag(r, RSB_INACTIVE);
/* rsb got out of toss state, it becomes alive again
* and we reinit the reference counter that is only
* valid for keep state rsbs
*/
kref_init(&r->res_ref); kref_init(&r->res_ref);
rsb_delete_toss_timer(ls, r); del_scan(ls, r);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
goto out; goto out;
@ -901,7 +879,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
dlm_free_rsb(r); dlm_free_rsb(r);
goto retry; goto retry;
} else if (!error) { } else if (!error) {
list_add(&r->res_rsbs_list, &ls->ls_keep); list_add(&r->res_slow_list, &ls->ls_slow_active);
} }
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
out: out:
@ -924,7 +902,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
retry: retry:
/* check if the rsb is in keep state under read lock - likely path */ /* check if the rsb is in active state under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) { if (error) {
@ -932,9 +910,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
goto do_new; goto do_new;
} }
if (rsb_flag(r, RSB_TOSS)) { if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_toss; goto do_inactive;
} }
/* /*
@ -947,15 +925,15 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
goto out; goto out;
do_toss: do_inactive:
write_lock_bh(&ls->ls_rsbtbl_lock); write_lock_bh(&ls->ls_rsbtbl_lock);
/* retry lookup under write lock to see if its still in toss state /* retry lookup under write lock to see if its still inactive.
* if not it's in keep state and we relookup - unlikely path. * if it's active, repeat lookup - unlikely path.
*/ */
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) { if (!error) {
if (!rsb_flag(r, RSB_TOSS)) { if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry; goto retry;
} }
@ -967,14 +945,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
/* /*
* rsb found inactive. No other thread is using this rsb because * rsb found inactive. No other thread is using this rsb because
* it's on the toss list, so we can look at or update * it's inactive, so we can look at or update res_master_nodeid
* res_master_nodeid without lock_rsb. * without lock_rsb.
*/ */
if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
/* our rsb is not master, and another node has sent us a /* our rsb is not master, and another node has sent us a
request; this should never happen */ request; this should never happen */
log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid, dir_nodeid); from_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
@ -986,21 +964,17 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
(dir_nodeid == our_nodeid)) { (dir_nodeid == our_nodeid)) {
/* our rsb is not master, and we are dir; may as well fix it; /* our rsb is not master, and we are dir; may as well fix it;
this should never happen */ this should never happen */
log_error(ls, "find_rsb toss our %d master %d dir %d", log_error(ls, "find_rsb inactive our %d master %d dir %d",
our_nodeid, r->res_master_nodeid, dir_nodeid); our_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
r->res_master_nodeid = our_nodeid; r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0; r->res_nodeid = 0;
} }
list_move(&r->res_rsbs_list, &ls->ls_keep); list_move(&r->res_slow_list, &ls->ls_slow_active);
rsb_clear_flag(r, RSB_TOSS); rsb_clear_flag(r, RSB_INACTIVE);
/* rsb got out of toss state, it becomes alive again
* and we reinit the reference counter that is only
* valid for keep state rsbs
*/
kref_init(&r->res_ref); kref_init(&r->res_ref);
rsb_delete_toss_timer(ls, r); del_scan(ls, r);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
goto out; goto out;
@ -1031,7 +1005,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
dlm_free_rsb(r); dlm_free_rsb(r);
goto retry; goto retry;
} else if (!error) { } else if (!error) {
list_add(&r->res_rsbs_list, &ls->ls_keep); list_add(&r->res_slow_list, &ls->ls_slow_active);
} }
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
@ -1105,7 +1079,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
} }
static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
int from_nodeid, bool toss_list, unsigned int flags, int from_nodeid, bool is_inactive, unsigned int flags,
int *r_nodeid, int *result) int *r_nodeid, int *result)
{ {
int fix_master = (flags & DLM_LU_RECOVER_MASTER); int fix_master = (flags & DLM_LU_RECOVER_MASTER);
@ -1129,9 +1103,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
r->res_nodeid = from_nodeid; r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER); rsb_set_flag(r, RSB_NEW_MASTER);
if (toss_list) { if (is_inactive) {
/* I don't think we should ever find it on toss list. */ /* I don't think we should ever find it inactive. */
log_error(ls, "%s fix_master on toss", __func__); log_error(ls, "%s fix_master inactive", __func__);
dlm_dump_rsb(r); dlm_dump_rsb(r);
} }
} }
@ -1171,7 +1145,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
if (!from_master && !fix_master && if (!from_master && !fix_master &&
(r->res_master_nodeid == from_nodeid)) { (r->res_master_nodeid == from_nodeid)) {
/* this can happen when the master sends remove, the dir node /* this can happen when the master sends remove, the dir node
* finds the rsb on the keep list and ignores the remove, * finds the rsb on the active list and ignores the remove,
* and the former master sends a lookup * and the former master sends a lookup
*/ */
@ -1244,13 +1218,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
retry: retry:
/* check if the rsb is in keep state under read lock - likely path */ /* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) { if (!error) {
if (rsb_flag(r, RSB_TOSS)) { if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_toss; goto do_inactive;
} }
/* because the rsb is active, we need to lock_rsb before /* because the rsb is active, we need to lock_rsb before
@ -1274,16 +1248,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
goto not_found; goto not_found;
} }
do_toss: do_inactive:
/* unlikely path - relookup under write */ /* unlikely path - relookup under write */
write_lock_bh(&ls->ls_rsbtbl_lock); write_lock_bh(&ls->ls_rsbtbl_lock);
/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
* check if the rsb is still in toss state, if not relookup
*/
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) { if (!error) {
if (!rsb_flag(r, RSB_TOSS)) { if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
/* something as changed, very unlikely but /* something as changed, very unlikely but
* try again * try again
@ -1295,15 +1266,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
goto not_found; goto not_found;
} }
/* because the rsb is inactive (on toss list), it's not refcounted /* because the rsb is inactive, it's not refcounted and lock_rsb
* and lock_rsb is not used, but is protected by the rsbtbl lock is not used, but is protected by the rsbtbl lock */
*/
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
r_nodeid, result); r_nodeid, result);
rsb_mod_timer(ls, r); add_scan(ls, r);
/* the rsb was inactive (on toss list) */
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
return 0; return 0;
@ -1317,7 +1286,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_dir_nodeid = our_nodeid; r->res_dir_nodeid = our_nodeid;
r->res_master_nodeid = from_nodeid; r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid; r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_TOSS); rsb_set_flag(r, RSB_INACTIVE);
write_lock_bh(&ls->ls_rsbtbl_lock); write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl); error = rsb_insert(r, &ls->ls_rsbtbl);
@ -1335,8 +1304,8 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
goto retry; goto retry;
} }
list_add(&r->res_rsbs_list, &ls->ls_toss); list_add(&r->res_slow_list, &ls->ls_slow_inactive);
rsb_mod_timer(ls, r); add_scan(ls, r);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
if (result) if (result)
@ -1351,7 +1320,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
struct dlm_rsb *r; struct dlm_rsb *r;
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
if (r->res_hash == hash) if (r->res_hash == hash)
dlm_dump_rsb(r); dlm_dump_rsb(r);
} }
@ -1373,15 +1342,15 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
read_unlock_bh(&ls->ls_rsbtbl_lock); read_unlock_bh(&ls->ls_rsbtbl_lock);
} }
static void toss_rsb(struct kref *kref) static void deactivate_rsb(struct kref *kref)
{ {
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
rsb_set_flag(r, RSB_TOSS); rsb_set_flag(r, RSB_INACTIVE);
list_move(&r->res_rsbs_list, &ls->ls_toss); list_move(&r->res_slow_list, &ls->ls_slow_inactive);
rsb_mod_timer(ls, r); add_scan(ls, r);
if (r->res_lvbptr) { if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr); dlm_free_lvb(r->res_lvbptr);
@ -1395,22 +1364,22 @@ static void unhold_rsb(struct dlm_rsb *r)
{ {
int rv; int rv;
/* rsbs in toss state never get referenced */ /* inactive rsbs are not ref counted */
WARN_ON(rsb_flag(r, RSB_TOSS)); WARN_ON(rsb_flag(r, RSB_INACTIVE));
rv = kref_put(&r->res_ref, toss_rsb); rv = kref_put(&r->res_ref, deactivate_rsb);
DLM_ASSERT(!rv, dlm_dump_rsb(r);); DLM_ASSERT(!rv, dlm_dump_rsb(r););
} }
void free_toss_rsb(struct dlm_rsb *r) void free_inactive_rsb(struct dlm_rsb *r)
{ {
WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
@ -4256,8 +4225,9 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return; return;
} }
/* Look for name in rsb toss state, if it's there, kill it. /*
* If it's in non toss state, it's being used, and we should ignore this * Look for inactive rsb, if it's there, free it.
* If the rsb is active, it's being used, and we should ignore this
* message. This is an expected race between the dir node sending a * message. This is an expected race between the dir node sending a
* request to the master node at the same time as the master node sends * request to the master node at the same time as the master node sends
* a remove to the dir node. The resolution to that race is for the * a remove to the dir node. The resolution to that race is for the
@ -4280,16 +4250,18 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return; return;
} }
if (!rsb_flag(r, RSB_TOSS)) { if (!rsb_flag(r, RSB_INACTIVE)) {
if (r->res_master_nodeid != from_nodeid) { if (r->res_master_nodeid != from_nodeid) {
/* should not happen */ /* should not happen */
log_error(ls, "receive_remove keep from %d master %d", log_error(ls, "receive_remove on active rsb from %d master %d",
from_nodeid, r->res_master_nodeid); from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
/* Ignore the remove message, see race comment above. */
log_debug(ls, "receive_remove from %d master %d first %x %s", log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid, from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name); name);
@ -4298,19 +4270,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
} }
if (r->res_master_nodeid != from_nodeid) { if (r->res_master_nodeid != from_nodeid) {
log_error(ls, "receive_remove toss from %d master %d", log_error(ls, "receive_remove inactive from %d master %d",
from_nodeid, r->res_master_nodeid); from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r); dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
return; return;
} }
list_del(&r->res_rsbs_list); list_del(&r->res_slow_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params); dlm_rhash_rsb_params);
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
free_toss_rsb(r); free_inactive_rsb(r);
} }
static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
@ -5377,7 +5349,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
struct dlm_rsb *r; struct dlm_rsb *r;
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
if (!rsb_flag(r, RSB_RECOVER_GRANT)) if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue; continue;
if (!is_master(r)) { if (!is_master(r)) {

View File

@ -11,7 +11,6 @@
#ifndef __LOCK_DOT_H__ #ifndef __LOCK_DOT_H__
#define __LOCK_DOT_H__ #define __LOCK_DOT_H__
void dlm_rsb_toss_timer(struct timer_list *timer);
void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len);
void dlm_print_lkb(struct dlm_lkb *lkb); void dlm_print_lkb(struct dlm_lkb *lkb);
@ -19,15 +18,15 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq); uint32_t saved_seq);
void dlm_receive_buffer(const union dlm_packet *p, int nodeid); void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2); int dlm_modes_compat(int mode1, int mode2);
void free_toss_rsb(struct dlm_rsb *r); void free_inactive_rsb(struct dlm_rsb *r);
void dlm_put_rsb(struct dlm_rsb *r); void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r);
int dlm_put_lkb(struct dlm_lkb *lkb); int dlm_put_lkb(struct dlm_lkb *lkb);
void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_lock_recovery(struct dlm_ls *ls); void dlm_lock_recovery(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls);
void dlm_timer_resume(struct dlm_ls *ls); void dlm_rsb_scan(struct timer_list *timer);
void resume_scan_timer(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
int len, unsigned int flags, int *r_nodeid, int *result); int len, unsigned int flags, int *r_nodeid, int *result);

View File

@ -412,8 +412,8 @@ static int new_lockspace(const char *name, const char *cluster,
*/ */
ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
INIT_LIST_HEAD(&ls->ls_toss); INIT_LIST_HEAD(&ls->ls_slow_inactive);
INIT_LIST_HEAD(&ls->ls_keep); INIT_LIST_HEAD(&ls->ls_slow_active);
rwlock_init(&ls->ls_rsbtbl_lock); rwlock_init(&ls->ls_rsbtbl_lock);
error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
@ -490,10 +490,9 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_dir_dump_list); INIT_LIST_HEAD(&ls->ls_dir_dump_list);
rwlock_init(&ls->ls_dir_dump_lock); rwlock_init(&ls->ls_dir_dump_lock);
INIT_LIST_HEAD(&ls->ls_toss_q); INIT_LIST_HEAD(&ls->ls_scan_list);
spin_lock_init(&ls->ls_toss_q_lock); spin_lock_init(&ls->ls_scan_lock);
timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
TIMER_DEFERRABLE);
spin_lock_bh(&lslist_lock); spin_lock_bh(&lslist_lock);
ls->ls_create_count = 1; ls->ls_create_count = 1;
@ -723,7 +722,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
* time_shutdown_sync(), we don't care anymore * time_shutdown_sync(), we don't care anymore
*/ */
clear_bit(LSFL_RUNNING, &ls->ls_flags); clear_bit(LSFL_RUNNING, &ls->ls_flags);
timer_shutdown_sync(&ls->ls_timer); timer_shutdown_sync(&ls->ls_scan_timer);
if (ls_count == 1) { if (ls_count == 1) {
dlm_clear_members(ls); dlm_clear_members(ls);

View File

@ -642,7 +642,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
if (new) if (new)
timer_delete_sync(&ls->ls_timer); timer_delete_sync(&ls->ls_scan_timer);
ls->ls_recover_seq++; ls->ls_recover_seq++;
/* activate requestqueue and stop processing */ /* activate requestqueue and stop processing */

View File

@ -882,29 +882,26 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
log_rinfo(ls, "dlm_recover_rsbs %d done", count); log_rinfo(ls, "dlm_recover_rsbs %d done", count);
} }
/* Create a single list of all root rsb's to be used during recovery */ void dlm_clear_inactive(struct dlm_ls *ls)
void dlm_clear_toss(struct dlm_ls *ls)
{ {
struct dlm_rsb *r, *safe; struct dlm_rsb *r, *safe;
unsigned int count = 0; unsigned int count = 0;
write_lock_bh(&ls->ls_rsbtbl_lock); write_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) { list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) {
list_del(&r->res_rsbs_list); list_del(&r->res_slow_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params); dlm_rhash_rsb_params);
/* remove it from the toss queue if its part of it */ if (!list_empty(&r->res_scan_list))
if (!list_empty(&r->res_toss_q_list)) list_del_init(&r->res_scan_list);
list_del_init(&r->res_toss_q_list);
free_toss_rsb(r); free_inactive_rsb(r);
count++; count++;
} }
write_unlock_bh(&ls->ls_rsbtbl_lock); write_unlock_bh(&ls->ls_rsbtbl_lock);
if (count) if (count)
log_rinfo(ls, "dlm_clear_toss %u done", count); log_rinfo(ls, "dlm_clear_inactive %u done", count);
} }

View File

@ -25,7 +25,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc);
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
const struct list_head *root_list); const struct list_head *root_list);
void dlm_recovered_lock(struct dlm_rsb *r); void dlm_recovered_lock(struct dlm_rsb *r);
void dlm_clear_toss(struct dlm_ls *ls); void dlm_clear_inactive(struct dlm_ls *ls);
void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list); void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list);
#endif /* __RECOVER_DOT_H__ */ #endif /* __RECOVER_DOT_H__ */

View File

@ -33,7 +33,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
} }
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
if (r->res_nodeid) if (r->res_nodeid)
continue; continue;
@ -63,12 +63,12 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
struct dlm_rsb *r; struct dlm_rsb *r;
read_lock_bh(&ls->ls_rsbtbl_lock); read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
list_add(&r->res_root_list, root_list); list_add(&r->res_root_list, root_list);
dlm_hold_rsb(r); dlm_hold_rsb(r);
} }
WARN_ON_ONCE(!list_empty(&ls->ls_toss)); WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive));
read_unlock_bh(&ls->ls_rsbtbl_lock); read_unlock_bh(&ls->ls_rsbtbl_lock);
} }
@ -98,16 +98,16 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
spin_lock_bh(&ls->ls_recover_lock); spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) { if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags); set_bit(LSFL_RUNNING, &ls->ls_flags);
/* Schedule next timer if recovery put something on toss. /* Schedule next timer if recovery put something on inactive.
* *
* The rsbs that was queued while recovery on toss hasn't * The rsbs that was queued while recovery on toss hasn't
* started yet because LSFL_RUNNING was set everything * started yet because LSFL_RUNNING was set everything
* else recovery hasn't started as well because ls_in_recovery * else recovery hasn't started as well because ls_in_recovery
* is still hold. So we should not run into the case that * is still hold. So we should not run into the case that
* dlm_timer_resume() queues a timer that can occur in * resume_scan_timer() queues a timer that can occur in
* a no op. * a no op.
*/ */
dlm_timer_resume(ls); resume_scan_timer(ls);
/* unblocks processes waiting to enter the dlm */ /* unblocks processes waiting to enter the dlm */
up_write(&ls->ls_in_recovery); up_write(&ls->ls_in_recovery);
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
@ -131,7 +131,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_callback_suspend(ls); dlm_callback_suspend(ls);
dlm_clear_toss(ls); dlm_clear_inactive(ls);
/* /*
* This list of root rsb's will be the basis of most of the recovery * This list of root rsb's will be the basis of most of the recovery