diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index bba28a5034ba..0eaf1b48c431 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2393,6 +2393,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
 		if ((cap->issued & ci->i_flushing_caps) !=
 		    ci->i_flushing_caps) {
 			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+			/* encode_caps_cb() also will reset these sequence
+			 * numbers. make sure sequence numbers in cap flush
+			 * message match later reconnect message */
+			cap->seq = 0;
+			cap->issue_seq = 0;
+			cap->mseq = 0;
 			__kick_flushing_caps(mdsc, session, ci,
 					     oldest_flush_tid);
 		} else {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 04f18095e306..cce4e4b9ea57 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -20,6 +20,8 @@
 #include <linux/ceph/auth.h>
 #include <linux/ceph/debugfs.h>
 
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
+
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
  * managing the file system namespace (the directory hierarchy and
@@ -46,9 +48,11 @@
  */
 
 struct ceph_reconnect_state {
-	int nr_caps;
+	struct ceph_mds_session *session;
+	int nr_caps, nr_realms;
 	struct ceph_pagelist *pagelist;
 	unsigned msg_version;
+	bool allow_multi;
 };
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
@@ -2985,6 +2989,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 	mutex_unlock(&mdsc->mutex);
 }
 
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
+{
+	struct ceph_msg *reply;
+	struct ceph_pagelist *_pagelist;
+	struct page *page;
+	__le32 *addr;
+	int err = -ENOMEM;
+
+	if (!recon_state->allow_multi)
+		return -ENOSPC;
+
+	/* can't handle message that contains both caps and realm */
+	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
+
+	/* pre-allocate new pagelist */
+	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
+	if (!_pagelist)
+		return -ENOMEM;
+
+	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
+	if (!reply)
+		goto fail_msg;
+
+	/* placeholder for nr_caps */
+	err = ceph_pagelist_encode_32(_pagelist, 0);
+	if (err < 0)
+		goto fail;
+
+	if (recon_state->nr_caps) {
+		/* currently encoding caps */
+		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+		if (err)
+			goto fail;
+	} else {
+		/* placeholder for nr_realms (currently encoding relams) */
+		err = ceph_pagelist_encode_32(_pagelist, 0);
+		if (err < 0)
+			goto fail;
+	}
+
+	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+	if (err)
+		goto fail;
+
+	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
+	addr = kmap_atomic(page);
+	if (recon_state->nr_caps) {
+		/* currently encoding caps */
+		*addr = cpu_to_le32(recon_state->nr_caps);
+	} else {
+		/* currently encoding relams */
+		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
+	}
+	kunmap_atomic(addr);
+
+	reply->hdr.version = cpu_to_le16(5);
+	reply->hdr.compat_version = cpu_to_le16(4);
+
+	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
+	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+
+	ceph_con_send(&recon_state->session->s_con, reply);
+	ceph_pagelist_release(recon_state->pagelist);
+
+	recon_state->pagelist = _pagelist;
+	recon_state->nr_caps = 0;
+	recon_state->nr_realms = 0;
+	recon_state->msg_version = 5;
+	return 0;
+fail:
+	ceph_msg_put(reply);
+fail_msg:
+	ceph_pagelist_release(_pagelist);
+	return err;
+}
+
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
@@ -3004,9 +3084,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
 	     inode, ceph_vinop(inode), cap, cap->cap_id,
 	     ceph_cap_string(cap->issued));
-	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
-	if (err)
-		return err;
 
 	spin_lock(&ci->i_ceph_lock);
 	cap->seq = 0;        /* reset cap seq */
@@ -3046,7 +3123,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	if (recon_state->msg_version >= 2) {
 		int num_fcntl_locks, num_flock_locks;
 		struct ceph_filelock *flocks = NULL;
-		size_t struct_len, total_len = 0;
+		size_t struct_len, total_len = sizeof(u64);
 		u8 struct_v = 0;
 
 encode_again:
@@ -3081,7 +3158,7 @@ encode_again:
 
 		if (recon_state->msg_version >= 3) {
 			/* version, compat_version and struct_len */
-			total_len = 2 * sizeof(u8) + sizeof(u32);
+			total_len += 2 * sizeof(u8) + sizeof(u32);
 			struct_v = 2;
 		}
 		/*
@@ -3098,12 +3175,19 @@ encode_again:
 			struct_len += sizeof(u64); /* snap_follows */
 
 		total_len += struct_len;
-		err = ceph_pagelist_reserve(pagelist, total_len);
-		if (err) {
-			kfree(flocks);
-			goto out_err;
+
+		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+			err = send_reconnect_partial(recon_state);
+			if (err)
+				goto out_freeflocks;
+			pagelist = recon_state->pagelist;
 		}
 
+		err = ceph_pagelist_reserve(pagelist, total_len);
+		if (err)
+			goto out_freeflocks;
+
+		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
 		if (recon_state->msg_version >= 3) {
 			ceph_pagelist_encode_8(pagelist, struct_v);
 			ceph_pagelist_encode_8(pagelist, 1);
@@ -3115,7 +3199,7 @@ encode_again:
 				       num_fcntl_locks, num_flock_locks);
 		if (struct_v >= 2)
 			ceph_pagelist_encode_64(pagelist, snap_follows);
-
+out_freeflocks:
 		kfree(flocks);
 	} else {
 		u64 pathbase = 0;
@@ -3136,20 +3220,81 @@ encode_again:
 		}
 
 		err = ceph_pagelist_reserve(pagelist,
-				pathlen + sizeof(u32) + sizeof(rec.v1));
+					    sizeof(u64) + sizeof(u32) +
+					    pathlen + sizeof(rec.v1));
 		if (err) {
-			kfree(path);
-			goto out_err;
+			goto out_freepath;
 		}
 
+		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
 		ceph_pagelist_encode_string(pagelist, path, pathlen);
 		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
-
+out_freepath:
 		kfree(path);
 	}
 
-	recon_state->nr_caps++;
 out_err:
+	if (err >= 0)
+		recon_state->nr_caps++;
+	return err;
+}
+
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
+			      struct ceph_reconnect_state *recon_state)
+{
+	struct rb_node *p;
+	struct ceph_pagelist *pagelist = recon_state->pagelist;
+	int err = 0;
+
+	if (recon_state->msg_version >= 4) {
+		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+		if (err < 0)
+			goto fail;
+	}
+
+	/*
+	 * snaprealms.  we provide mds with the ino, seq (version), and
+	 * parent for all of our realms.  If the mds has any newer info,
+	 * it will tell us.
+	 */
+	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+		struct ceph_snap_realm *realm =
+		       rb_entry(p, struct ceph_snap_realm, node);
+		struct ceph_mds_snaprealm_reconnect sr_rec;
+
+		if (recon_state->msg_version >= 4) {
+			size_t need = sizeof(u8) * 2 + sizeof(u32) +
+				      sizeof(sr_rec);
+
+			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+				err = send_reconnect_partial(recon_state);
+				if (err)
+					goto fail;
+				pagelist = recon_state->pagelist;
+			}
+
+			err = ceph_pagelist_reserve(pagelist, need);
+			if (err)
+				goto fail;
+
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+		}
+
+		dout(" adding snap realm %llx seq %lld parent %llx\n",
+		     realm->ino, realm->seq, realm->parent_ino);
+		sr_rec.ino = cpu_to_le64(realm->ino);
+		sr_rec.seq = cpu_to_le64(realm->seq);
+		sr_rec.parent = cpu_to_le64(realm->parent_ino);
+
+		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+		if (err)
+			goto fail;
+
+		recon_state->nr_realms++;
+	}
+fail:
 	return err;
 }
 
@@ -3170,18 +3315,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 			       struct ceph_mds_session *session)
 {
 	struct ceph_msg *reply;
-	struct rb_node *p;
 	int mds = session->s_mds;
 	int err = -ENOMEM;
-	int s_nr_caps;
-	struct ceph_pagelist *pagelist;
-	struct ceph_reconnect_state recon_state;
+	struct ceph_reconnect_state recon_state = {
+		.session = session,
+	};
 	LIST_HEAD(dispose);
 
 	pr_info("mds%d reconnect start\n", mds);
 
-	pagelist = ceph_pagelist_alloc(GFP_NOFS);
-	if (!pagelist)
+	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
+	if (!recon_state.pagelist)
 		goto fail_nopagelist;
 
 	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
@@ -3225,63 +3369,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	/* replay unsafe requests */
 	replay_unsafe_requests(mdsc, session);
 
+	ceph_early_kick_flushing_caps(mdsc, session);
+
 	down_read(&mdsc->snap_rwsem);
 
-	/* traverse this session's caps */
-	s_nr_caps = session->s_nr_caps;
-	err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
+	/* placeholder for nr_caps */
+	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
 	if (err)
 		goto fail;
 
-	recon_state.nr_caps = 0;
-	recon_state.pagelist = pagelist;
-	if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
 		recon_state.msg_version = 3;
-	else
+		recon_state.allow_multi = true;
+	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
+		recon_state.msg_version = 3;
+	} else {
 		recon_state.msg_version = 2;
+	}
+	/* trsaverse this session's caps */
 	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
-	if (err < 0)
-		goto fail;
 
 	spin_lock(&session->s_cap_lock);
 	session->s_cap_reconnect = 0;
 	spin_unlock(&session->s_cap_lock);
 
-	/*
-	 * snaprealms.  we provide mds with the ino, seq (version), and
-	 * parent for all of our realms.  If the mds has any newer info,
-	 * it will tell us.
-	 */
-	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
-		struct ceph_snap_realm *realm =
-			rb_entry(p, struct ceph_snap_realm, node);
-		struct ceph_mds_snaprealm_reconnect sr_rec;
+	if (err < 0)
+		goto fail;
 
-		dout(" adding snap realm %llx seq %lld parent %llx\n",
-		     realm->ino, realm->seq, realm->parent_ino);
-		sr_rec.ino = cpu_to_le64(realm->ino);
-		sr_rec.seq = cpu_to_le64(realm->seq);
-		sr_rec.parent = cpu_to_le64(realm->parent_ino);
-		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
-		if (err)
+	/* check if all realms can be encoded into current message */
+	if (mdsc->num_snap_realms) {
+		size_t total_len =
+			recon_state.pagelist->length +
+			mdsc->num_snap_realms *
+			sizeof(struct ceph_mds_snaprealm_reconnect);
+		if (recon_state.msg_version >= 4) {
+			/* number of realms */
+			total_len += sizeof(u32);
+			/* version, compat_version and struct_len */
+			total_len += mdsc->num_snap_realms *
+				     (2 * sizeof(u8) + sizeof(u32));
+		}
+		if (total_len > RECONNECT_MAX_SIZE) {
+			if (!recon_state.allow_multi) {
+				err = -ENOSPC;
+				goto fail;
+			}
+			if (recon_state.nr_caps) {
+				err = send_reconnect_partial(&recon_state);
+				if (err)
+					goto fail;
+			}
+			recon_state.msg_version = 5;
+		}
+	}
+
+	err = encode_snap_realms(mdsc, &recon_state);
+	if (err < 0)
+		goto fail;
+
+	if (recon_state.msg_version >= 5) {
+		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+		if (err < 0)
 			goto fail;
 	}
 
-	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
-
-	/* raced with cap release? */
-	if (s_nr_caps != recon_state.nr_caps) {
-		struct page *page = list_first_entry(&pagelist->head,
-						     struct page, lru);
+	if (recon_state.nr_caps || recon_state.nr_realms) {
+		struct page *page =
+			list_first_entry(&recon_state.pagelist->head,
+					struct page, lru);
 		__le32 *addr = kmap_atomic(page);
-		*addr = cpu_to_le32(recon_state.nr_caps);
+		if (recon_state.nr_caps) {
+			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
+			*addr = cpu_to_le32(recon_state.nr_caps);
+		} else if (recon_state.msg_version >= 4) {
+			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
+		}
 		kunmap_atomic(addr);
 	}
 
-	reply->hdr.data_len = cpu_to_le32(pagelist->length);
-	ceph_msg_data_add_pagelist(reply, pagelist);
+	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+	if (recon_state.msg_version >= 4)
+		reply->hdr.compat_version = cpu_to_le16(4);
 
-	ceph_early_kick_flushing_caps(mdsc, session);
+	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
+	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
 
 	ceph_con_send(&session->s_con, reply);
 
@@ -3292,7 +3463,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	mutex_unlock(&mdsc->mutex);
 
 	up_read(&mdsc->snap_rwsem);
-	ceph_pagelist_release(pagelist);
+	ceph_pagelist_release(recon_state.pagelist);
 	return;
 
 fail:
@@ -3300,7 +3471,7 @@ fail:
 	up_read(&mdsc->snap_rwsem);
 	mutex_unlock(&session->s_mutex);
 fail_nomsg:
-	ceph_pagelist_release(pagelist);
+	ceph_pagelist_release(recon_state.pagelist);
 fail_nopagelist:
 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
 	return;
@@ -3698,6 +3869,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	init_rwsem(&mdsc->snap_rwsem);
 	mdsc->snap_realms = RB_ROOT;
 	INIT_LIST_HEAD(&mdsc->snap_empty);
+	mdsc->num_snap_realms = 0;
 	spin_lock_init(&mdsc->snap_empty_lock);
 	mdsc->last_tid = 0;
 	mdsc->oldest_tid = 0;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 0d3264cf3334..4f962642fee4 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -21,11 +21,13 @@
 #define CEPHFS_FEATURE_REPLY_ENCODING	9
 #define CEPHFS_FEATURE_RECLAIM_CLIENT	10
 #define CEPHFS_FEATURE_LAZY_CAP_WANTED	11
+#define CEPHFS_FEATURE_MULTI_RECONNECT  12
 
 #define CEPHFS_FEATURES_CLIENT_SUPPORTED { 	\
 	0, 1, 2, 3, 4, 5, 6, 7,			\
 	CEPHFS_FEATURE_MIMIC,			\
 	CEPHFS_FEATURE_LAZY_CAP_WANTED,		\
+	CEPHFS_FEATURE_MULTI_RECONNECT,		\
 }
 #define CEPHFS_FEATURES_CLIENT_REQUIRED {}
 
@@ -342,6 +344,7 @@ struct ceph_mds_client {
 	struct rw_semaphore     snap_rwsem;
 	struct rb_root          snap_realms;
 	struct list_head        snap_empty;
+	int			num_snap_realms;
 	spinlock_t              snap_empty_lock;  /* protect snap_empty */
 
 	u64                    last_tid;      /* most recent mds request */
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f74193da0e09..dfc25ceeffed 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -124,6 +124,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
 	INIT_LIST_HEAD(&realm->inodes_with_caps);
 	spin_lock_init(&realm->inodes_with_caps_lock);
 	__insert_snap_realm(&mdsc->snap_realms, realm);
+	mdsc->num_snap_realms++;
+
 	dout("create_snap_realm %llx %p\n", realm->ino, realm);
 	return realm;
 }
@@ -175,6 +177,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 	dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 
 	rb_erase(&realm->node, &mdsc->snap_realms);
+	mdsc->num_snap_realms--;
 
 	if (realm->parent) {
 		list_del_init(&realm->child_item);