ceph: use ceph_pagelist for mds reconnect message; change encoding (protocol change)

Use the ceph_pagelist to encode the MDS reconnect message.  We change the
message encoding (protocol change!) at the same time to make our life
easier (we don't know how many snaprealms we have when we start encoding).

An empty message implies the session is closed/does not exist.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2009-12-23 12:21:51 -08:00
parent 58bb3b374b
commit 93cea5bebf
2 changed files with 57 additions and 101 deletions

View File

@ -39,7 +39,7 @@
#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 5 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 22 /* server/client */
#define CEPH_MDSC_PROTOCOL 30 /* server/client */
#define CEPH_MDSC_PROTOCOL 31 /* server/client */
#define CEPH_MONC_PROTOCOL 15 /* server/client */

View File

@ -9,6 +9,7 @@
#include "messenger.h"
#include "decode.h"
#include "auth.h"
#include "pagelist.h"
/*
* A cluster of MDS (metadata server) daemons is responsible for
@ -1971,20 +1972,12 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
/*
* Encode information about a cap for a reconnect with the MDS.
*/
struct encode_caps_data {
void **pp;
void *end;
int *num_caps;
};
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
struct ceph_mds_cap_reconnect *rec;
struct ceph_mds_cap_reconnect rec;
struct ceph_inode_info *ci;
struct encode_caps_data *data = (struct encode_caps_data *)arg;
void *p = *(data->pp);
void *end = data->end;
struct ceph_pagelist *pagelist = arg;
char *path;
int pathlen, err;
u64 pathbase;
@ -1995,8 +1988,9 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued));
ceph_decode_need(&p, end, sizeof(u64), needmore);
ceph_encode_64(&p, ceph_ino(inode));
err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
if (err)
return err;
dentry = d_find_alias(inode);
if (dentry) {
@ -2009,33 +2003,29 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
path = NULL;
pathlen = 0;
}
ceph_decode_need(&p, end, pathlen+4, needmore);
ceph_encode_string(&p, end, path, pathlen);
err = ceph_pagelist_encode_string(pagelist, path, pathlen);
if (err)
goto out;
ceph_decode_need(&p, end, sizeof(*rec), needmore);
rec = p;
p += sizeof(*rec);
BUG_ON(p > end);
spin_lock(&inode->i_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
rec->cap_id = cpu_to_le64(cap->cap_id);
rec->pathbase = cpu_to_le64(pathbase);
rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec->issued = cpu_to_le32(cap->issued);
rec->size = cpu_to_le64(inode->i_size);
ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
ceph_encode_timespec(&rec->atime, &inode->i_atime);
rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.cap_id = cpu_to_le64(cap->cap_id);
rec.pathbase = cpu_to_le64(pathbase);
rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.issued = cpu_to_le32(cap->issued);
rec.size = cpu_to_le64(inode->i_size);
ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
ceph_encode_timespec(&rec.atime, &inode->i_atime);
rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
spin_unlock(&inode->i_lock);
err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
out:
kfree(path);
dput(dentry);
(*data->num_caps)++;
*(data->pp) = p;
return 0;
needmore:
return -ENOSPC;
return err;
}
@ -2053,19 +2043,26 @@ needmore:
*/
static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
{
struct ceph_mds_session *session;
struct ceph_mds_session *session = NULL;
struct ceph_msg *reply;
int newlen, len = 4 + 1;
void *p, *end;
int err;
int num_caps, num_realms = 0;
int got;
u64 next_snap_ino = 0;
__le32 *pnum_caps, *pnum_realms;
struct encode_caps_data iter_args;
struct ceph_pagelist *pagelist;
pr_info("reconnect to recovering mds%d\n", mds);
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
goto fail_nopagelist;
ceph_pagelist_init(pagelist);
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
if (IS_ERR(reply)) {
err = PTR_ERR(reply);
goto fail_nomsg;
}
/* find session */
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex); /* drop lock for duration */
@ -2081,12 +2078,6 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
/* replay unsafe requests */
replay_unsafe_requests(mdsc, session);
/* estimate needed space */
len += session->s_nr_caps *
(100+sizeof(struct ceph_mds_cap_reconnect));
pr_info("estimating i need %d bytes for %d caps\n",
len, session->s_nr_caps);
} else {
dout("no session for mds%d, will send short reconnect\n",
mds);
@ -2094,41 +2085,18 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
down_read(&mdsc->snap_rwsem);
retry:
/* build reply */
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
if (IS_ERR(reply)) {
err = PTR_ERR(reply);
pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
len, mds);
goto out;
}
p = reply->front.iov_base;
end = p + len;
if (!session) {
ceph_encode_8(&p, 1); /* session was closed */
ceph_encode_32(&p, 0);
if (!session)
goto send;
}
dout("session %p state %s\n", session,
session_state_name(session->s_state));
/* traverse this session's caps */
ceph_encode_8(&p, 0);
pnum_caps = p;
ceph_encode_32(&p, session->s_nr_caps);
num_caps = 0;
iter_args.pp = &p;
iter_args.end = end;
iter_args.num_caps = &num_caps;
err = iterate_session_caps(session, encode_caps_cb, &iter_args);
if (err == -ENOSPC)
goto needmore;
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
if (err)
goto fail;
err = iterate_session_caps(session, encode_caps_cb, pagelist);
if (err < 0)
goto out;
*pnum_caps = cpu_to_le32(num_caps);
/*
* snaprealms. we provide mds with the ino, seq (version), and
@ -2136,14 +2104,9 @@ retry:
* it will tell us.
*/
next_snap_ino = 0;
/* save some space for the snaprealm count */
pnum_realms = p;
ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
p += sizeof(*pnum_realms);
num_realms = 0;
while (1) {
struct ceph_snap_realm *realm;
struct ceph_mds_snaprealm_reconnect *sr_rec;
struct ceph_mds_snaprealm_reconnect sr_rec;
got = radix_tree_gang_lookup(&mdsc->snap_realms,
(void **)&realm, next_snap_ino, 1);
if (!got)
@ -2151,22 +2114,19 @@ retry:
dout(" adding snap realm %llx seq %lld parent %llx\n",
realm->ino, realm->seq, realm->parent_ino);
ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
sr_rec = p;
sr_rec->ino = cpu_to_le64(realm->ino);
sr_rec->seq = cpu_to_le64(realm->seq);
sr_rec->parent = cpu_to_le64(realm->parent_ino);
p += sizeof(*sr_rec);
num_realms++;
sr_rec.ino = cpu_to_le64(realm->ino);
sr_rec.seq = cpu_to_le64(realm->seq);
sr_rec.parent = cpu_to_le64(realm->parent_ino);
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
if (err)
goto fail;
next_snap_ino = realm->ino + 1;
}
*pnum_realms = cpu_to_le32(num_realms);
send:
reply->front.iov_len = p - reply->front.iov_base;
reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
dout("final len was %u (guessed %d)\n",
(unsigned)reply->front.iov_len, len);
reply->pagelist = pagelist;
reply->hdr.data_len = cpu_to_le32(pagelist->length);
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
if (session) {
@ -2183,18 +2143,14 @@ out:
mutex_lock(&mdsc->mutex);
return;
needmore:
/*
* we need a larger buffer. this doesn't very accurately
* factor in snap realms, but it's safe.
*/
num_caps += num_realms;
newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
len, num_caps, session->s_nr_caps, newlen);
len = newlen;
fail:
ceph_msg_put(reply);
goto retry;
fail_nomsg:
ceph_pagelist_release(pagelist);
kfree(pagelist);
fail_nopagelist:
pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
goto out;
}