diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 67d65ac785e9..7420648a1de6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -836,6 +836,7 @@ enum {
 	Opt_lock_timeout,
 	/* int args above */
 	Opt_pool_ns,
+	Opt_compression_hint,
 	/* string args above */
 	Opt_read_only,
 	Opt_read_write,
@@ -844,8 +845,23 @@ enum {
 	Opt_notrim,
 };
 
+enum {
+	Opt_compression_hint_none,
+	Opt_compression_hint_compressible,
+	Opt_compression_hint_incompressible,
+};
+
+static const struct constant_table rbd_param_compression_hint[] = {
+	{"none",		Opt_compression_hint_none},
+	{"compressible",	Opt_compression_hint_compressible},
+	{"incompressible",	Opt_compression_hint_incompressible},
+	{}
+};
+
 static const struct fs_parameter_spec rbd_parameters[] = {
 	fsparam_u32	("alloc_size",			Opt_alloc_size),
+	fsparam_enum	("compression_hint",		Opt_compression_hint,
+			 rbd_param_compression_hint),
 	fsparam_flag	("exclusive",			Opt_exclusive),
 	fsparam_flag	("lock_on_read",		Opt_lock_on_read),
 	fsparam_u32	("lock_timeout",		Opt_lock_timeout),
@@ -867,6 +883,8 @@ struct rbd_options {
 	bool	lock_on_read;
 	bool	exclusive;
 	bool	trim;
+
+	u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 };
 
 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
@@ -2253,7 +2271,8 @@ static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
 	    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
 		osd_req_op_alloc_hint_init(osd_req, which++,
 					   rbd_dev->layout.object_size,
-					   rbd_dev->layout.object_size);
+					   rbd_dev->layout.object_size,
+					   rbd_dev->opts->alloc_hint_flags);
 	}
 
 	if (rbd_obj_is_entire(obj_req))
@@ -6331,6 +6350,29 @@ static int rbd_parse_param(struct fs_parameter *param,
 		pctx->spec->pool_ns = param->string;
 		param->string = NULL;
 		break;
+	case Opt_compression_hint:
+		switch (result.uint_32) {
+		case Opt_compression_hint_none:
+			opt->alloc_hint_flags &=
+			    ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
+			      CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
+			break;
+		case Opt_compression_hint_compressible:
+			opt->alloc_hint_flags |=
+			    CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
+			opt->alloc_hint_flags &=
+			    ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+			break;
+		case Opt_compression_hint_incompressible:
+			opt->alloc_hint_flags |=
+			    CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+			opt->alloc_hint_flags &=
+			    ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
+			break;
+		default:
+			BUG();
+		}
+		break;
 	case Opt_read_only:
 		opt->read_only = true;
 		break;
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index ac98ab6ccd3b..a600e0eb6b6f 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -93,7 +93,7 @@ struct rbd_image_header_ondisk {
 	__le32 snap_count;
 	__le32 reserved;
 	__le64 snap_names_len;
-	struct rbd_image_snap_ondisk snaps[0];
+	struct rbd_image_snap_ondisk snaps[];
 } __attribute__((packed));
 
 
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 0a0823d378db..50c635dc7f71 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
 ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
 	export.o caps.o snap.o xattr.o quota.o io.o \
 	mds_client.o mdsmap.o strings.o ceph_frag.o \
-	debugfs.o util.o
+	debugfs.o util.o metric.o
 
 ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
 ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 26be6520d3fb..e0465741c591 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -22,7 +22,7 @@ static inline void ceph_set_cached_acl(struct inode *inode,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 
 	spin_lock(&ci->i_ceph_lock);
-	if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
+	if (__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 0))
 		set_cached_acl(inode, type, acl);
 	else
 		forget_cached_acl(inode, type);
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6f4678d98df7..01ad09733ac7 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -11,10 +11,12 @@
 #include <linux/task_io_accounting_ops.h>
 #include <linux/signal.h>
 #include <linux/iversion.h>
+#include <linux/ktime.h>
 
 #include "super.h"
 #include "mds_client.h"
 #include "cache.h"
+#include "metric.h"
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/striper.h>
 
@@ -216,6 +218,9 @@ static int ceph_sync_readpages(struct ceph_fs_client *fsc,
 	if (!rc)
 		rc = ceph_osdc_wait_request(osdc, req);
 
+	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+				 req->r_end_latency, rc);
+
 	ceph_osdc_put_request(req);
 	dout("readpages result %d\n", rc);
 	return rc;
@@ -299,6 +304,7 @@ static int ceph_readpage(struct file *filp, struct page *page)
 static void finish_read(struct ceph_osd_request *req)
 {
 	struct inode *inode = req->r_inode;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_osd_data *osd_data;
 	int rc = req->r_result <= 0 ? req->r_result : 0;
 	int bytes = req->r_result >= 0 ? req->r_result : 0;
@@ -336,6 +342,10 @@ unlock:
 		put_page(page);
 		bytes -= PAGE_SIZE;
 	}
+
+	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+				 req->r_end_latency, rc);
+
 	kfree(osd_data->pages);
 }
 
@@ -643,6 +653,9 @@ static int ceph_sync_writepages(struct ceph_fs_client *fsc,
 	if (!rc)
 		rc = ceph_osdc_wait_request(osdc, req);
 
+	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+				  req->r_end_latency, rc);
+
 	ceph_osdc_put_request(req);
 	if (rc == 0)
 		rc = len;
@@ -794,6 +807,9 @@ static void writepages_finish(struct ceph_osd_request *req)
 		ceph_clear_error_write(ci);
 	}
 
+	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+				  req->r_end_latency, rc);
+
 	/*
 	 * We lost the cache cap, need to truncate the page before
 	 * it is unlocked, otherwise we'd truncate it later in the
@@ -1852,6 +1868,10 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
 	if (!err)
 		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+				  req->r_end_latency, err);
+
 out_put:
 	ceph_osdc_put_request(req);
 	if (err == -ECANCELED)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f1acde6fb9a6..972c13aa4225 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -597,6 +597,27 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 	}
 }
 
+/**
+ * change_auth_cap_ses - move inode to appropriate lists when auth caps change
+ * @ci: inode to be moved
+ * @session: new auth caps session
+ */
+static void change_auth_cap_ses(struct ceph_inode_info *ci,
+				struct ceph_mds_session *session)
+{
+	lockdep_assert_held(&ci->i_ceph_lock);
+
+	if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
+		return;
+
+	spin_lock(&session->s_mdsc->cap_dirty_lock);
+	if (!list_empty(&ci->i_dirty_item))
+		list_move(&ci->i_dirty_item, &session->s_cap_dirty);
+	if (!list_empty(&ci->i_flushing_item))
+		list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
+	spin_unlock(&session->s_mdsc->cap_dirty_lock);
+}
+
 /*
  * Add a capability under the given MDS session.
  *
@@ -727,6 +748,9 @@ void ceph_add_cap(struct inode *inode,
 	if (flags & CEPH_CAP_FLAG_AUTH) {
 		if (!ci->i_auth_cap ||
 		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
+			if (ci->i_auth_cap &&
+			    ci->i_auth_cap->session != cap->session)
+				change_auth_cap_ses(ci, cap->session);
 			ci->i_auth_cap = cap;
 			cap->mds_wanted = wanted;
 		}
@@ -912,6 +936,20 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
 	return 0;
 }
 
+int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+				   int touch)
+{
+	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+	int r;
+
+	r = __ceph_caps_issued_mask(ci, mask, touch);
+	if (r)
+		ceph_update_cap_hit(&fsc->mdsc->metric);
+	else
+		ceph_update_cap_mis(&fsc->mdsc->metric);
+	return r;
+}
+
 /*
  * Return true if mask caps are currently being revoked by an MDS.
  */
@@ -1109,8 +1147,10 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 
 	/* remove from inode's cap rbtree, and clear auth cap */
 	rb_erase(&cap->ci_node, &ci->i_caps);
-	if (ci->i_auth_cap == cap)
+	if (ci->i_auth_cap == cap) {
+		WARN_ON_ONCE(!list_empty(&ci->i_dirty_item));
 		ci->i_auth_cap = NULL;
+	}
 
 	/* remove from session list */
 	spin_lock(&session->s_cap_lock);
@@ -1167,6 +1207,7 @@ struct cap_msg_args {
 	u64			xattr_version;
 	u64			change_attr;
 	struct ceph_buffer	*xattr_buf;
+	struct ceph_buffer	*old_xattr_buf;
 	struct timespec64	atime, mtime, ctime, btime;
 	int			op, caps, wanted, dirty;
 	u32			seq, issue_seq, mseq, time_warp_seq;
@@ -1175,6 +1216,7 @@ struct cap_msg_args {
 	kgid_t			gid;
 	umode_t			mode;
 	bool			inline_data;
+	bool			wake;
 };
 
 /*
@@ -1304,44 +1346,29 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
 }
 
 /*
- * Send a cap msg on the given inode.  Update our caps state, then
- * drop i_ceph_lock and send the message.
+ * Prepare to send a cap message to an MDS. Update the cap state, and populate
+ * the arg struct with the parameters that will need to be sent. This should
+ * be done under the i_ceph_lock to guard against changes to cap state.
  *
  * Make note of max_size reported/requested from mds, revoked caps
  * that have now been implemented.
- *
- * Return non-zero if delayed release, or we experienced an error
- * such that the caller should requeue + retry later.
- *
- * called with i_ceph_lock, then drops it.
- * caller should hold snap_rwsem (read), s_mutex.
  */
-static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-		      int op, int flags, int used, int want, int retain,
-		      int flushing, u64 flush_tid, u64 oldest_flush_tid)
-	__releases(cap->ci->i_ceph_lock)
+static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
+		       int op, int flags, int used, int want, int retain,
+		       int flushing, u64 flush_tid, u64 oldest_flush_tid)
 {
 	struct ceph_inode_info *ci = cap->ci;
 	struct inode *inode = &ci->vfs_inode;
-	struct ceph_buffer *old_blob = NULL;
-	struct cap_msg_args arg;
 	int held, revoking;
-	int wake = 0;
-	int ret;
 
-	/* Don't send anything if it's still being created. Return delayed */
-	if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
-		spin_unlock(&ci->i_ceph_lock);
-		dout("%s async create in flight for %p\n", __func__, inode);
-		return 1;
-	}
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	held = cap->issued | cap->implemented;
 	revoking = cap->implemented & ~cap->issued;
 	retain &= ~revoking;
 
-	dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
-	     inode, cap, cap->session,
+	dout("%s %p cap %p session %p %s -> %s (revoking %s)\n",
+	     __func__, inode, cap, cap->session,
 	     ceph_cap_string(held), ceph_cap_string(held & retain),
 	     ceph_cap_string(revoking));
 	BUG_ON((retain & CEPH_CAP_PIN) == 0);
@@ -1349,60 +1376,62 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	ci->i_ceph_flags &= ~CEPH_I_FLUSH;
 
 	cap->issued &= retain;  /* drop bits we don't want */
-	if (cap->implemented & ~cap->issued) {
-		/*
-		 * Wake up any waiters on wanted -> needed transition.
-		 * This is due to the weird transition from buffered
-		 * to sync IO... we need to flush dirty pages _before_
-		 * allowing sync writes to avoid reordering.
-		 */
-		wake = 1;
-	}
+	/*
+	 * Wake up any waiters on wanted -> needed transition. This is due to
+	 * the weird transition from buffered to sync IO... we need to flush
+	 * dirty pages _before_ allowing sync writes to avoid reordering.
+	 */
+	arg->wake = cap->implemented & ~cap->issued;
 	cap->implemented &= cap->issued | used;
 	cap->mds_wanted = want;
 
-	arg.session = cap->session;
-	arg.ino = ceph_vino(inode).ino;
-	arg.cid = cap->cap_id;
-	arg.follows = flushing ? ci->i_head_snapc->seq : 0;
-	arg.flush_tid = flush_tid;
-	arg.oldest_flush_tid = oldest_flush_tid;
+	arg->session = cap->session;
+	arg->ino = ceph_vino(inode).ino;
+	arg->cid = cap->cap_id;
+	arg->follows = flushing ? ci->i_head_snapc->seq : 0;
+	arg->flush_tid = flush_tid;
+	arg->oldest_flush_tid = oldest_flush_tid;
 
-	arg.size = inode->i_size;
-	ci->i_reported_size = arg.size;
-	arg.max_size = ci->i_wanted_max_size;
-	if (cap == ci->i_auth_cap)
-		ci->i_requested_max_size = arg.max_size;
-
-	if (flushing & CEPH_CAP_XATTR_EXCL) {
-		old_blob = __ceph_build_xattrs_blob(ci);
-		arg.xattr_version = ci->i_xattrs.version;
-		arg.xattr_buf = ci->i_xattrs.blob;
-	} else {
-		arg.xattr_buf = NULL;
+	arg->size = inode->i_size;
+	ci->i_reported_size = arg->size;
+	arg->max_size = ci->i_wanted_max_size;
+	if (cap == ci->i_auth_cap) {
+		if (want & CEPH_CAP_ANY_FILE_WR)
+			ci->i_requested_max_size = arg->max_size;
+		else
+			ci->i_requested_max_size = 0;
 	}
 
-	arg.mtime = inode->i_mtime;
-	arg.atime = inode->i_atime;
-	arg.ctime = inode->i_ctime;
-	arg.btime = ci->i_btime;
-	arg.change_attr = inode_peek_iversion_raw(inode);
+	if (flushing & CEPH_CAP_XATTR_EXCL) {
+		arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
+		arg->xattr_version = ci->i_xattrs.version;
+		arg->xattr_buf = ci->i_xattrs.blob;
+	} else {
+		arg->xattr_buf = NULL;
+		arg->old_xattr_buf = NULL;
+	}
 
-	arg.op = op;
-	arg.caps = cap->implemented;
-	arg.wanted = want;
-	arg.dirty = flushing;
+	arg->mtime = inode->i_mtime;
+	arg->atime = inode->i_atime;
+	arg->ctime = inode->i_ctime;
+	arg->btime = ci->i_btime;
+	arg->change_attr = inode_peek_iversion_raw(inode);
 
-	arg.seq = cap->seq;
-	arg.issue_seq = cap->issue_seq;
-	arg.mseq = cap->mseq;
-	arg.time_warp_seq = ci->i_time_warp_seq;
+	arg->op = op;
+	arg->caps = cap->implemented;
+	arg->wanted = want;
+	arg->dirty = flushing;
 
-	arg.uid = inode->i_uid;
-	arg.gid = inode->i_gid;
-	arg.mode = inode->i_mode;
+	arg->seq = cap->seq;
+	arg->issue_seq = cap->issue_seq;
+	arg->mseq = cap->mseq;
+	arg->time_warp_seq = ci->i_time_warp_seq;
 
-	arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+	arg->uid = inode->i_uid;
+	arg->gid = inode->i_gid;
+	arg->mode = inode->i_mode;
+
+	arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
 	if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
 	    !list_empty(&ci->i_cap_snaps)) {
 		struct ceph_cap_snap *capsnap;
@@ -1415,27 +1444,35 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 			}
 		}
 	}
-	arg.flags = flags;
+	arg->flags = flags;
+}
 
-	spin_unlock(&ci->i_ceph_lock);
+/*
+ * Send a cap msg on the given inode.
+ *
+ * Caller should hold snap_rwsem (read), s_mutex.
+ */
+static void __send_cap(struct ceph_mds_client *mdsc, struct cap_msg_args *arg,
+		       struct ceph_inode_info *ci)
+{
+	struct inode *inode = &ci->vfs_inode;
+	int ret;
 
-	ceph_buffer_put(old_blob);
-
-	ret = send_cap_msg(&arg);
+	ret = send_cap_msg(arg);
 	if (ret < 0) {
 		pr_err("error sending cap msg, ino (%llx.%llx) "
 		       "flushing %s tid %llu, requeue\n",
-		       ceph_vinop(inode), ceph_cap_string(flushing),
-		       flush_tid);
+		       ceph_vinop(inode), ceph_cap_string(arg->dirty),
+		       arg->flush_tid);
 		spin_lock(&ci->i_ceph_lock);
 		__cap_delay_requeue(mdsc, ci);
 		spin_unlock(&ci->i_ceph_lock);
 	}
 
-	if (wake)
-		wake_up_all(&ci->i_cap_wq);
+	ceph_buffer_put(arg->old_xattr_buf);
 
-	return ret;
+	if (arg->wake)
+		wake_up_all(&ci->i_cap_wq);
 }
 
 static inline int __send_flush_snap(struct inode *inode,
@@ -1456,6 +1493,7 @@ static inline int __send_flush_snap(struct inode *inode,
 	arg.max_size = 0;
 	arg.xattr_version = capsnap->xattr_version;
 	arg.xattr_buf = capsnap->xattr_blob;
+	arg.old_xattr_buf = NULL;
 
 	arg.atime = capsnap->atime;
 	arg.mtime = capsnap->mtime;
@@ -1479,6 +1517,7 @@ static inline int __send_flush_snap(struct inode *inode,
 
 	arg.inline_data = capsnap->inline_data;
 	arg.flags = 0;
+	arg.wake = false;
 
 	return send_cap_msg(&arg);
 }
@@ -1676,6 +1715,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
 	     ceph_cap_string(was | mask));
 	ci->i_dirty_caps |= mask;
 	if (was == 0) {
+		struct ceph_mds_session *session = ci->i_auth_cap->session;
+
 		WARN_ON_ONCE(ci->i_prealloc_cap_flush);
 		swap(ci->i_prealloc_cap_flush, *pcf);
 
@@ -1688,7 +1729,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
 		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
 		BUG_ON(!list_empty(&ci->i_dirty_item));
 		spin_lock(&mdsc->cap_dirty_lock);
-		list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+		list_add(&ci->i_dirty_item, &session->s_cap_dirty);
 		spin_unlock(&mdsc->cap_dirty_lock);
 		if (ci->i_flushing_caps == 0) {
 			ihold(inode);
@@ -1731,30 +1772,33 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
  * Remove cap_flush from the mdsc's or inode's flushing cap list.
  * Return true if caller needs to wake up flush waiters.
  */
-static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
-			       struct ceph_inode_info *ci,
-			       struct ceph_cap_flush *cf)
+static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
+					 struct ceph_cap_flush *cf)
 {
 	struct ceph_cap_flush *prev;
 	bool wake = cf->wake;
-	if (mdsc) {
-		/* are there older pending cap flushes? */
-		if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
-			prev = list_prev_entry(cf, g_list);
-			prev->wake = true;
-			wake = false;
-		}
-		list_del(&cf->g_list);
-	} else if (ci) {
-		if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
-			prev = list_prev_entry(cf, i_list);
-			prev->wake = true;
-			wake = false;
-		}
-		list_del(&cf->i_list);
-	} else {
-		BUG_ON(1);
+
+	if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
+		prev = list_prev_entry(cf, g_list);
+		prev->wake = true;
+		wake = false;
 	}
+	list_del(&cf->g_list);
+	return wake;
+}
+
+static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
+				       struct ceph_cap_flush *cf)
+{
+	struct ceph_cap_flush *prev;
+	bool wake = cf->wake;
+
+	if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
+		prev = list_prev_entry(cf, i_list);
+		prev->wake = true;
+		wake = false;
+	}
+	list_del(&cf->i_list);
 	return wake;
 }
 
@@ -1953,6 +1997,9 @@ retry_locked:
 	}
 
 	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+		int mflags = 0;
+		struct cap_msg_args arg;
+
 		cap = rb_entry(p, struct ceph_cap, ci_node);
 
 		/* avoid looping forever */
@@ -2030,12 +2077,24 @@ ack:
 			if (mutex_trylock(&session->s_mutex) == 0) {
 				dout("inverting session/ino locks on %p\n",
 				     session);
+				session = ceph_get_mds_session(session);
 				spin_unlock(&ci->i_ceph_lock);
 				if (took_snap_rwsem) {
 					up_read(&mdsc->snap_rwsem);
 					took_snap_rwsem = 0;
 				}
-				mutex_lock(&session->s_mutex);
+				if (session) {
+					mutex_lock(&session->s_mutex);
+					ceph_put_mds_session(session);
+				} else {
+					/*
+					 * Because we take the reference while
+					 * holding the i_ceph_lock, it should
+					 * never be NULL. Throw a warning if it
+					 * ever is.
+					 */
+					WARN_ON_ONCE(true);
+				}
 				goto retry;
 			}
 		}
@@ -2070,6 +2129,9 @@ ack:
 			flushing = ci->i_dirty_caps;
 			flush_tid = __mark_caps_flushing(inode, session, false,
 							 &oldest_flush_tid);
+			if (flags & CHECK_CAPS_FLUSH &&
+			    list_empty(&session->s_cap_dirty))
+				mflags |= CEPH_CLIENT_CAPS_SYNC;
 		} else {
 			flushing = 0;
 			flush_tid = 0;
@@ -2080,9 +2142,12 @@ ack:
 
 		mds = cap->mds;  /* remember mds, so we don't repeat */
 
-		/* __send_cap drops i_ceph_lock */
-		__send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0, cap_used, want,
-			   retain, flushing, flush_tid, oldest_flush_tid);
+		__prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
+			   want, retain, flushing, flush_tid, oldest_flush_tid);
+		spin_unlock(&ci->i_ceph_lock);
+
+		__send_cap(mdsc, &arg, ci);
+
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
@@ -2121,6 +2186,7 @@ retry:
 retry_locked:
 	if (ci->i_dirty_caps && ci->i_auth_cap) {
 		struct ceph_cap *cap = ci->i_auth_cap;
+		struct cap_msg_args arg;
 
 		if (session != cap->session) {
 			spin_unlock(&ci->i_ceph_lock);
@@ -2148,11 +2214,13 @@ retry_locked:
 		flush_tid = __mark_caps_flushing(inode, session, true,
 						 &oldest_flush_tid);
 
-		/* __send_cap drops i_ceph_lock */
-		__send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
+		__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
 			   __ceph_caps_used(ci), __ceph_caps_wanted(ci),
 			   (cap->issued | cap->implemented),
 			   flushing, flush_tid, oldest_flush_tid);
+		spin_unlock(&ci->i_ceph_lock);
+
+		__send_cap(mdsc, &arg, ci);
 	} else {
 		if (!list_empty(&ci->i_cap_flush_list)) {
 			struct ceph_cap_flush *cf =
@@ -2354,15 +2422,19 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
 		first_tid = cf->tid + 1;
 
 		if (cf->caps) {
+			struct cap_msg_args arg;
+
 			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
 			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
-			__send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+			__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
 					 (cf->tid < last_snap_flush ?
 					  CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
 					  __ceph_caps_used(ci),
 					  __ceph_caps_wanted(ci),
 					  (cap->issued | cap->implemented),
 					  cf->caps, cf->tid, oldest_flush_tid);
+			spin_unlock(&ci->i_ceph_lock);
+			__send_cap(mdsc, &arg, ci);
 		} else {
 			struct ceph_cap_snap *capsnap =
 					container_of(cf, struct ceph_cap_snap,
@@ -2446,6 +2518,8 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 	struct ceph_cap *cap;
 	u64 oldest_flush_tid;
 
+	lockdep_assert_held(&session->s_mutex);
+
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
 
 	spin_lock(&mdsc->cap_dirty_lock);
@@ -2685,6 +2759,11 @@ out_unlock:
 	if (snap_rwsem_locked)
 		up_read(&mdsc->snap_rwsem);
 
+	if (!ret)
+		ceph_update_cap_mis(&mdsc->metric);
+	else if (ret == 1)
+		ceph_update_cap_hit(&mdsc->metric);
+
 	dout("get_cap_refs %p ret %d got %s\n", inode,
 	     ret, ceph_cap_string(*got));
 	return ret;
@@ -2937,7 +3016,8 @@ static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
  * If we are releasing a WR cap (from a sync write), finalize any affected
  * cap_snap, and wake up any waiters.
  */
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
+				bool skip_checking_caps)
 {
 	struct inode *inode = &ci->vfs_inode;
 	int last = 0, put = 0, flushsnaps = 0, wake = 0;
@@ -2993,7 +3073,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 	dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
 	     last ? " last" : "", put ? " put" : "");
 
-	if (last)
+	if (last && !skip_checking_caps)
 		ceph_check_caps(ci, 0, NULL);
 	else if (flushsnaps)
 		ceph_flush_snaps(ci, NULL);
@@ -3003,6 +3083,16 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 		iput(inode);
 }
 
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+	__ceph_put_cap_refs(ci, had, false);
+}
+
+void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
+{
+	__ceph_put_cap_refs(ci, had, true);
+}
+
 /*
  * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
  * context.  Adjust per-snap dirty page accounting as appropriate.
@@ -3301,10 +3391,6 @@ static void handle_cap_grant(struct inode *inode,
 				ci->i_requested_max_size = 0;
 			}
 			wake = true;
-		} else if (ci->i_wanted_max_size > ci->i_max_size &&
-			   ci->i_wanted_max_size > ci->i_requested_max_size) {
-			/* CEPH_CAP_OP_IMPORT */
-			wake = true;
 		}
 	}
 
@@ -3380,9 +3466,18 @@ static void handle_cap_grant(struct inode *inode,
 			fill_inline = true;
 	}
 
-	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
+	if (ci->i_auth_cap == cap &&
+	    le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
 		if (newcaps & ~extra_info->issued)
 			wake = true;
+
+		if (ci->i_requested_max_size > max_size ||
+		    !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
+			/* re-request max_size if necessary */
+			ci->i_requested_max_size = 0;
+			wake = true;
+		}
+
 		ceph_kick_flushing_inode_caps(session, ci);
 		spin_unlock(&ci->i_ceph_lock);
 		up_read(&session->s_mdsc->snap_rwsem);
@@ -3442,15 +3537,26 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 	bool wake_mdsc = false;
 
 	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
+		/* Is this the one that was flushed? */
 		if (cf->tid == flush_tid)
 			cleaned = cf->caps;
-		if (cf->caps == 0) /* capsnap */
+
+		/* Is this a capsnap? */
+		if (cf->caps == 0)
 			continue;
+
 		if (cf->tid <= flush_tid) {
-			if (__finish_cap_flush(NULL, ci, cf))
-				wake_ci = true;
+			/*
+			 * An earlier or current tid. The FLUSH_ACK should
+			 * represent a superset of this flush's caps.
+			 */
+			wake_ci |= __detach_cap_flush_from_ci(ci, cf);
 			list_add_tail(&cf->i_list, &to_remove);
 		} else {
+			/*
+			 * This is a later one. Any caps in it are still dirty
+			 * so don't count them as cleaned.
+			 */
 			cleaned &= ~cf->caps;
 			if (!cleaned)
 				break;
@@ -3470,10 +3576,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 
 	spin_lock(&mdsc->cap_dirty_lock);
 
-	list_for_each_entry(cf, &to_remove, i_list) {
-		if (__finish_cap_flush(mdsc, NULL, cf))
-			wake_mdsc = true;
-	}
+	list_for_each_entry(cf, &to_remove, i_list)
+		wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf);
 
 	if (ci->i_flushing_caps == 0) {
 		if (list_empty(&ci->i_cap_flush_list)) {
@@ -3565,17 +3669,15 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
 		dout(" removing %p cap_snap %p follows %lld\n",
 		     inode, capsnap, follows);
 		list_del(&capsnap->ci_item);
-		if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
-			wake_ci = true;
+		wake_ci |= __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
 
 		spin_lock(&mdsc->cap_dirty_lock);
 
 		if (list_empty(&ci->i_cap_flush_list))
 			list_del_init(&ci->i_flushing_item);
 
-		if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
-			wake_mdsc = true;
-
+		wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc,
+							  &capsnap->cap_flush);
 		spin_unlock(&mdsc->cap_dirty_lock);
 	}
 	spin_unlock(&ci->i_ceph_lock);
@@ -3595,10 +3697,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
  *
  * caller hold s_mutex.
  */
-static void handle_cap_trunc(struct inode *inode,
+static bool handle_cap_trunc(struct inode *inode,
 			     struct ceph_mds_caps *trunc,
 			     struct ceph_mds_session *session)
-	__releases(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int mds = session->s_mds;
@@ -3609,7 +3710,9 @@ static void handle_cap_trunc(struct inode *inode,
 	int implemented = 0;
 	int dirty = __ceph_caps_dirty(ci);
 	int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
-	int queue_trunc = 0;
+	bool queue_trunc = false;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	issued |= implemented | dirty;
 
@@ -3617,10 +3720,7 @@ static void handle_cap_trunc(struct inode *inode,
 	     inode, mds, seq, truncate_size, truncate_seq);
 	queue_trunc = ceph_fill_file_size(inode, issued,
 					  truncate_seq, truncate_size, size);
-	spin_unlock(&ci->i_ceph_lock);
-
-	if (queue_trunc)
-		ceph_queue_vmtruncate(inode);
+	return queue_trunc;
 }
 
 /*
@@ -3694,15 +3794,9 @@ retry:
 			tcap->issue_seq = t_seq - 1;
 			tcap->issued |= issued;
 			tcap->implemented |= issued;
-			if (cap == ci->i_auth_cap)
+			if (cap == ci->i_auth_cap) {
 				ci->i_auth_cap = tcap;
-
-			if (!list_empty(&ci->i_cap_flush_list) &&
-			    ci->i_auth_cap == tcap) {
-				spin_lock(&mdsc->cap_dirty_lock);
-				list_move_tail(&ci->i_flushing_item,
-					       &tcap->session->s_cap_flushing);
-				spin_unlock(&mdsc->cap_dirty_lock);
+				change_auth_cap_ses(ci, tcap->session);
 			}
 		}
 		__ceph_remove_cap(cap, false);
@@ -3771,7 +3865,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
 			      struct ceph_mds_cap_peer *ph,
 			      struct ceph_mds_session *session,
 			      struct ceph_cap **target_cap, int *old_issued)
-	__acquires(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap *cap, *ocap, *new_cap = NULL;
@@ -3796,14 +3889,13 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
 
 	dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
 	     inode, ci, mds, mseq, peer);
-
 retry:
-	spin_lock(&ci->i_ceph_lock);
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
 		if (!new_cap) {
 			spin_unlock(&ci->i_ceph_lock);
 			new_cap = ceph_get_cap(mdsc, NULL);
+			spin_lock(&ci->i_ceph_lock);
 			goto retry;
 		}
 		cap = new_cap;
@@ -3838,9 +3930,6 @@ retry:
 		__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
 	}
 
-	/* make sure we re-request max_size, if necessary */
-	ci->i_requested_max_size = 0;
-
 	*old_issued = issued;
 	*target_cap = cap;
 }
@@ -3869,6 +3958,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	size_t snaptrace_len;
 	void *p, *end;
 	struct cap_extra_info extra_info = {};
+	bool queue_trunc;
 
 	dout("handle_caps from mds%d\n", session->s_mds);
 
@@ -4016,6 +4106,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		} else {
 			down_read(&mdsc->snap_rwsem);
 		}
+		spin_lock(&ci->i_ceph_lock);
 		handle_cap_import(mdsc, inode, h, peer, session,
 				  &cap, &extra_info.issued);
 		handle_cap_grant(inode, session, cap,
@@ -4052,7 +4143,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		break;
 
 	case CEPH_CAP_OP_TRUNC:
-		handle_cap_trunc(inode, h, session);
+		queue_trunc = handle_cap_trunc(inode, h, session);
+		spin_unlock(&ci->i_ceph_lock);
+		if (queue_trunc)
+			ceph_queue_vmtruncate(inode);
 		break;
 
 	default:
@@ -4121,15 +4215,16 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 /*
  * Flush all dirty caps to the mds
  */
-void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+static void flush_dirty_session_caps(struct ceph_mds_session *s)
 {
+	struct ceph_mds_client *mdsc = s->s_mdsc;
 	struct ceph_inode_info *ci;
 	struct inode *inode;
 
 	dout("flush_dirty_caps\n");
 	spin_lock(&mdsc->cap_dirty_lock);
-	while (!list_empty(&mdsc->cap_dirty)) {
-		ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
+	while (!list_empty(&s->s_cap_dirty)) {
+		ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info,
 				      i_dirty_item);
 		inode = &ci->vfs_inode;
 		ihold(inode);
@@ -4143,6 +4238,35 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 	dout("flush_dirty_caps done\n");
 }
 
+static void iterate_sessions(struct ceph_mds_client *mdsc,
+			     void (*cb)(struct ceph_mds_session *))
+{
+	int mds;
+
+	mutex_lock(&mdsc->mutex);
+	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
+		struct ceph_mds_session *s;
+
+		if (!mdsc->sessions[mds])
+			continue;
+
+		s = ceph_get_mds_session(mdsc->sessions[mds]);
+		if (!s)
+			continue;
+
+		mutex_unlock(&mdsc->mutex);
+		cb(s);
+		ceph_put_mds_session(s);
+		mutex_lock(&mdsc->mutex);
+	}
+	mutex_unlock(&mdsc->mutex);
+}
+
+void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+{
+	iterate_sessions(mdsc, flush_dirty_session_caps);
+}
+
 void __ceph_touch_fmode(struct ceph_inode_info *ci,
 			struct ceph_mds_client *mdsc, int fmode)
 {
@@ -4269,6 +4393,9 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 				cap->issued &= ~drop;
 				cap->implemented &= ~drop;
 				cap->mds_wanted = wanted;
+				if (cap == ci->i_auth_cap &&
+				    !(wanted & CEPH_CAP_ANY_FILE_WR))
+					ci->i_requested_max_size = 0;
 			} else {
 				dout("encode_inode_release %p cap %p %s"
 				     " (force)\n", inode, cap,
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index dcaed75de9e6..070ed8481340 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -7,6 +7,8 @@
 #include <linux/ctype.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
+#include <linux/math64.h>
+#include <linux/ktime.h>
 
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/mon_client.h>
@@ -18,6 +20,7 @@
 #ifdef CONFIG_DEBUG_FS
 
 #include "mds_client.h"
+#include "metric.h"
 
 static int mdsmap_show(struct seq_file *s, void *p)
 {
@@ -124,6 +127,87 @@ static int mdsc_show(struct seq_file *s, void *p)
 	return 0;
 }
 
+#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) {		\
+	s64 _total, _avg, _min, _max, _sq, _st;				\
+	_avg = ktime_to_us(avg);					\
+	_min = ktime_to_us(min == KTIME_MAX ? 0 : min);			\
+	_max = ktime_to_us(max);					\
+	_total = total - 1;						\
+	_sq = _total > 0 ? DIV64_U64_ROUND_CLOSEST(sq, _total) : 0;	\
+	_st = int_sqrt64(_sq);						\
+	_st = ktime_to_us(_st);						\
+	seq_printf(s, "%-14s%-12lld%-16lld%-16lld%-16lld%lld\n",	\
+		   name, total, _avg, _min, _max, _st);			\
+}
+
+static int metric_show(struct seq_file *s, void *p)
+{
+	struct ceph_fs_client *fsc = s->private;
+	struct ceph_mds_client *mdsc = fsc->mdsc;
+	struct ceph_client_metric *m = &mdsc->metric;
+	int i, nr_caps = 0;
+	s64 total, sum, avg, min, max, sq;
+
+	seq_printf(s, "item          total       avg_lat(us)     min_lat(us)     max_lat(us)     stdev(us)\n");
+	seq_printf(s, "-----------------------------------------------------------------------------------\n");
+
+	spin_lock(&m->read_latency_lock);
+	total = m->total_reads;
+	sum = m->read_latency_sum;
+	avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+	min = m->read_latency_min;
+	max = m->read_latency_max;
+	sq = m->read_latency_sq_sum;
+	spin_unlock(&m->read_latency_lock);
+	CEPH_METRIC_SHOW("read", total, avg, min, max, sq);
+
+	spin_lock(&m->write_latency_lock);
+	total = m->total_writes;
+	sum = m->write_latency_sum;
+	avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+	min = m->write_latency_min;
+	max = m->write_latency_max;
+	sq = m->write_latency_sq_sum;
+	spin_unlock(&m->write_latency_lock);
+	CEPH_METRIC_SHOW("write", total, avg, min, max, sq);
+
+	spin_lock(&m->metadata_latency_lock);
+	total = m->total_metadatas;
+	sum = m->metadata_latency_sum;
+	avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+	min = m->metadata_latency_min;
+	max = m->metadata_latency_max;
+	sq = m->metadata_latency_sq_sum;
+	spin_unlock(&m->metadata_latency_lock);
+	CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq);
+
+	seq_printf(s, "\n");
+	seq_printf(s, "item          total           miss            hit\n");
+	seq_printf(s, "-------------------------------------------------\n");
+
+	seq_printf(s, "%-14s%-16lld%-16lld%lld\n", "d_lease",
+		   atomic64_read(&m->total_dentries),
+		   percpu_counter_sum(&m->d_lease_mis),
+		   percpu_counter_sum(&m->d_lease_hit));
+
+	mutex_lock(&mdsc->mutex);
+	for (i = 0; i < mdsc->max_sessions; i++) {
+		struct ceph_mds_session *s;
+
+		s = __ceph_lookup_mds_session(mdsc, i);
+		if (!s)
+			continue;
+		nr_caps += s->s_nr_caps;
+		ceph_put_mds_session(s);
+	}
+	mutex_unlock(&mdsc->mutex);
+	seq_printf(s, "%-14s%-16d%-16lld%lld\n", "caps", nr_caps,
+		   percpu_counter_sum(&m->i_caps_mis),
+		   percpu_counter_sum(&m->i_caps_hit));
+
+	return 0;
+}
+
 static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p)
 {
 	struct seq_file *s = p;
@@ -222,6 +306,7 @@ DEFINE_SHOW_ATTRIBUTE(mdsmap);
 DEFINE_SHOW_ATTRIBUTE(mdsc);
 DEFINE_SHOW_ATTRIBUTE(caps);
 DEFINE_SHOW_ATTRIBUTE(mds_sessions);
+DEFINE_SHOW_ATTRIBUTE(metric);
 
 
 /*
@@ -255,6 +340,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
 	debugfs_remove(fsc->debugfs_mdsmap);
 	debugfs_remove(fsc->debugfs_mds_sessions);
 	debugfs_remove(fsc->debugfs_caps);
+	debugfs_remove(fsc->debugfs_metric);
 	debugfs_remove(fsc->debugfs_mdsc);
 }
 
@@ -295,11 +381,17 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
 						fsc,
 						&mdsc_fops);
 
+	fsc->debugfs_metric = debugfs_create_file("metrics",
+						  0400,
+						  fsc->client->debugfs_dir,
+						  fsc,
+						  &metric_fops);
+
 	fsc->debugfs_caps = debugfs_create_file("caps",
-						   0400,
-						   fsc->client->debugfs_dir,
-						   fsc,
-						   &caps_fops);
+						0400,
+						fsc->client->debugfs_dir,
+						fsc,
+						&caps_fops);
 }
 
 
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 4c4202c93b71..39f5311404b0 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,6 +38,8 @@ static int __dir_lease_try_check(const struct dentry *dentry);
 static int ceph_d_init(struct dentry *dentry)
 {
 	struct ceph_dentry_info *di;
+	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+	struct ceph_mds_client *mdsc = fsc->mdsc;
 
 	di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
 	if (!di)
@@ -48,6 +50,9 @@ static int ceph_d_init(struct dentry *dentry)
 	di->time = jiffies;
 	dentry->d_fsdata = di;
 	INIT_LIST_HEAD(&di->lease_list);
+
+	atomic64_inc(&mdsc->metric.total_dentries);
+
 	return 0;
 }
 
@@ -344,8 +349,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
 	    __ceph_dir_is_complete_ordered(ci) &&
-	    __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
+	    __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
 		int shared_gen = atomic_read(&ci->i_shared_gen);
+
 		spin_unlock(&ci->i_ceph_lock);
 		err = __dcache_readdir(file, ctx, shared_gen);
 		if (err != -EAGAIN)
@@ -762,7 +768,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 		    !is_root_ceph_dentry(dir, dentry) &&
 		    ceph_test_mount_opt(fsc, DCACHE) &&
 		    __ceph_dir_is_complete(ci) &&
-		    (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
+		    __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
 			__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
 			spin_unlock(&ci->i_ceph_lock);
 			dout(" dir %p complete, -ENOENT\n", dir);
@@ -1203,11 +1209,12 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 			op = CEPH_MDS_OP_RENAMESNAP;
 		else
 			return -EROFS;
+	} else if (old_dir != new_dir) {
+		err = ceph_quota_check_rename(mdsc, d_inode(old_dentry),
+					      new_dir);
+		if (err)
+			return err;
 	}
-	/* don't allow cross-quota renames */
-	if ((old_dir != new_dir) &&
-	    (!ceph_quota_is_same_realm(old_dir, new_dir)))
-		return -EXDEV;
 
 	dout("rename dir %p dentry %p to dir %p dentry %p\n",
 	     old_dir, old_dentry, new_dir, new_dentry);
@@ -1709,6 +1716,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 		if (flags & LOOKUP_RCU)
 			return -ECHILD;
 
+		percpu_counter_inc(&mdsc->metric.d_lease_mis);
+
 		op = ceph_snap(dir) == CEPH_SNAPDIR ?
 			CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
 		req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -1740,6 +1749,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 			dout("d_revalidate %p lookup result=%d\n",
 			     dentry, err);
 		}
+	} else {
+		percpu_counter_inc(&mdsc->metric.d_lease_hit);
 	}
 
 	dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
@@ -1782,9 +1793,12 @@ static int ceph_d_delete(const struct dentry *dentry)
 static void ceph_d_release(struct dentry *dentry)
 {
 	struct ceph_dentry_info *di = ceph_dentry(dentry);
+	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
 
 	dout("d_release %p\n", dentry);
 
+	atomic64_dec(&fsc->mdsc->metric.total_dentries);
+
 	spin_lock(&dentry->d_lock);
 	__dentry_lease_unlist(di);
 	dentry->d_fsdata = NULL;
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 79dc06881e78..e088843a7734 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -172,9 +172,16 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino)
 static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
 {
 	struct inode *inode = __lookup_inode(sb, ino);
+	int err;
+
 	if (IS_ERR(inode))
 		return ERR_CAST(inode);
-	if (inode->i_nlink == 0) {
+	/* We need LINK caps to reliably check i_nlink */
+	err = ceph_do_getattr(inode, CEPH_CAP_LINK_SHARED, false);
+	if (err)
+		return ERR_PTR(err);
+	/* -ESTALE if inode as been unlinked and no file is open */
+	if ((inode->i_nlink == 0) && (atomic_read(&inode->i_count) == 1)) {
 		iput(inode);
 		return ERR_PTR(-ESTALE);
 	}
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index afdfca965a7f..160644ddaeed 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -11,11 +11,13 @@
 #include <linux/writeback.h>
 #include <linux/falloc.h>
 #include <linux/iversion.h>
+#include <linux/ktime.h>
 
 #include "super.h"
 #include "mds_client.h"
 #include "cache.h"
 #include "io.h"
+#include "metric.h"
 
 static __le32 ceph_flags_sys2wire(u32 flags)
 {
@@ -906,6 +908,12 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 		ret = ceph_osdc_start_request(osdc, req, false);
 		if (!ret)
 			ret = ceph_osdc_wait_request(osdc, req);
+
+		ceph_update_read_latency(&fsc->mdsc->metric,
+					 req->r_start_latency,
+					 req->r_end_latency,
+					 ret);
+
 		ceph_osdc_put_request(req);
 
 		i_size = i_size_read(inode);
@@ -1044,6 +1052,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	struct inode *inode = req->r_inode;
 	struct ceph_aio_request *aio_req = req->r_priv;
 	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_client_metric *metric = &fsc->mdsc->metric;
 
 	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
 	BUG_ON(!osd_data->num_bvecs);
@@ -1051,6 +1061,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	dout("ceph_aio_complete_req %p rc %d bytes %u\n",
 	     inode, rc, osd_data->bvec_pos.iter.bi_size);
 
+	/* r_start_latency == 0 means the request was not submitted */
+	if (req->r_start_latency) {
+		if (aio_req->write)
+			ceph_update_write_latency(metric, req->r_start_latency,
+						  req->r_end_latency, rc);
+		else
+			ceph_update_read_latency(metric, req->r_start_latency,
+						 req->r_end_latency, rc);
+	}
+
 	if (rc == -EOLDSNAPC) {
 		struct ceph_aio_work *aio_work;
 		BUG_ON(!aio_req->write);
@@ -1179,6 +1199,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_client_metric *metric = &fsc->mdsc->metric;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct bio_vec *bvecs;
@@ -1295,6 +1316,13 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 		if (!ret)
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
+		if (write)
+			ceph_update_write_latency(metric, req->r_start_latency,
+						  req->r_end_latency, ret);
+		else
+			ceph_update_read_latency(metric, req->r_start_latency,
+						 req->r_end_latency, ret);
+
 		size = i_size_read(inode);
 		if (!write) {
 			if (ret == -ENOENT)
@@ -1466,6 +1494,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 		if (!ret)
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
+		ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+					  req->r_end_latency, ret);
 out:
 		ceph_osdc_put_request(req);
 		if (ret != 0) {
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 7fef94fd1e55..357c937699d5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -2288,8 +2288,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
 
 	dout("do_getattr inode %p mask %s mode 0%o\n",
 	     inode, ceph_cap_string(mask), inode->i_mode);
-	if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
-		return 0;
+	if (!force && ceph_caps_issued_mask_metric(ceph_inode(inode), mask, 1))
+			return 0;
 
 	mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 7c63abf5bea9..a50497142e59 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -10,6 +10,7 @@
 #include <linux/seq_file.h>
 #include <linux/ratelimit.h>
 #include <linux/bits.h>
+#include <linux/ktime.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -658,6 +659,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
 	if (refcount_dec_and_test(&s->s_ref)) {
 		if (s->s_auth.authorizer)
 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
+		WARN_ON(mutex_is_locked(&s->s_mutex));
 		xa_destroy(&s->s_delegated_inos);
 		kfree(s);
 	}
@@ -753,6 +755,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	INIT_LIST_HEAD(&s->s_cap_releases);
 	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
 
+	INIT_LIST_HEAD(&s->s_cap_dirty);
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 
 	mdsc->sessions[mds] = s;
@@ -801,7 +804,7 @@ void ceph_mdsc_release_request(struct kref *kref)
 	struct ceph_mds_request *req = container_of(kref,
 						    struct ceph_mds_request,
 						    r_kref);
-	ceph_mdsc_release_dir_caps(req);
+	ceph_mdsc_release_dir_caps_no_check(req);
 	destroy_reply_info(&req->r_reply_info);
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
@@ -2201,6 +2204,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 	mutex_init(&req->r_fill_mutex);
 	req->r_mdsc = mdsc;
 	req->r_started = jiffies;
+	req->r_start_latency = ktime_get();
 	req->r_resend_mds = -1;
 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
 	INIT_LIST_HEAD(&req->r_unsafe_target_item);
@@ -2547,6 +2551,8 @@ out:
 static void complete_request(struct ceph_mds_client *mdsc,
 			     struct ceph_mds_request *req)
 {
+	req->r_end_latency = ktime_get();
+
 	if (req->r_callback)
 		req->r_callback(mdsc, req);
 	complete_all(&req->r_completion);
@@ -3155,6 +3161,9 @@ out_err:
 
 	/* kick calling process */
 	complete_request(mdsc, req);
+
+	ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
+				     req->r_end_latency, err);
 out:
 	ceph_mdsc_put_request(req);
 	return;
@@ -3393,6 +3402,18 @@ void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
 	}
 }
 
+void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
+{
+	int dcaps;
+
+	dcaps = xchg(&req->r_dir_caps, 0);
+	if (dcaps) {
+		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
+						dcaps);
+	}
+}
+
 /*
  * called under session->mutex.
  */
@@ -3425,7 +3446,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 		if (req->r_session->s_mds != session->s_mds)
 			continue;
 
-		ceph_mdsc_release_dir_caps(req);
+		ceph_mdsc_release_dir_caps_no_check(req);
 
 		__send_request(mdsc, session, req, true);
 	}
@@ -3760,8 +3781,6 @@ fail:
  * recovering MDS might have.
  *
  * This is a relatively heavyweight operation, but it's rare.
- *
- * called with mdsc->mutex held.
  */
 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 			       struct ceph_mds_session *session)
@@ -4015,7 +4034,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 			    oldstate != CEPH_MDS_STATE_STARTING)
 				pr_info("mds%d recovery completed\n", s->s_mds);
 			kick_requests(mdsc, i);
+			mutex_unlock(&mdsc->mutex);
+			mutex_lock(&s->s_mutex);
+			mutex_lock(&mdsc->mutex);
 			ceph_kick_flushing_caps(mdsc, s);
+			mutex_unlock(&s->s_mutex);
 			wake_up_session_caps(s, RECONNECT);
 		}
 	}
@@ -4323,6 +4346,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 
 {
 	struct ceph_mds_client *mdsc;
+	int err;
 
 	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
 	if (!mdsc)
@@ -4331,8 +4355,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	mutex_init(&mdsc->mutex);
 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
 	if (!mdsc->mdsmap) {
-		kfree(mdsc);
-		return -ENOMEM;
+		err = -ENOMEM;
+		goto err_mdsc;
 	}
 
 	fsc->mdsc = mdsc;
@@ -4364,13 +4388,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	spin_lock_init(&mdsc->snap_flush_lock);
 	mdsc->last_cap_flush_tid = 1;
 	INIT_LIST_HEAD(&mdsc->cap_flush_list);
-	INIT_LIST_HEAD(&mdsc->cap_dirty);
 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
 	mdsc->num_cap_flushing = 0;
 	spin_lock_init(&mdsc->cap_dirty_lock);
 	init_waitqueue_head(&mdsc->cap_flushing_wq);
 	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
 	atomic_set(&mdsc->cap_reclaim_pending, 0);
+	err = ceph_metric_init(&mdsc->metric);
+	if (err)
+		goto err_mdsmap;
 
 	spin_lock_init(&mdsc->dentry_list_lock);
 	INIT_LIST_HEAD(&mdsc->dentry_leases);
@@ -4389,6 +4415,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	strscpy(mdsc->nodename, utsname()->nodename,
 		sizeof(mdsc->nodename));
 	return 0;
+
+err_mdsmap:
+	kfree(mdsc->mdsmap);
+err_mdsc:
+	kfree(mdsc);
+	return err;
 }
 
 /*
@@ -4646,6 +4678,8 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
 
 	ceph_mdsc_stop(mdsc);
 
+	ceph_metric_destroy(&mdsc->metric);
+
 	fsc->mdsc = NULL;
 	kfree(mdsc);
 	dout("mdsc_destroy %p done\n", mdsc);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 903d9edfd4bf..5e0c4073a6be 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -10,12 +10,15 @@
 #include <linux/spinlock.h>
 #include <linux/refcount.h>
 #include <linux/utsname.h>
+#include <linux/ktime.h>
 
 #include <linux/ceph/types.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/mdsmap.h>
 #include <linux/ceph/auth.h>
 
+#include "metric.h"
+
 /* The first 8 bits are reserved for old ceph releases */
 enum ceph_feature_type {
 	CEPHFS_FEATURE_MIMIC = 8,
@@ -196,8 +199,12 @@ struct ceph_mds_session {
 	struct list_head  s_cap_releases; /* waiting cap_release messages */
 	struct work_struct s_cap_release_work;
 
-	/* protected by mutex */
+	/* See ceph_inode_info->i_dirty_item. */
+	struct list_head  s_cap_dirty;	      /* inodes w/ dirty caps */
+
+	/* See ceph_inode_info->i_flushing_item. */
 	struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
+
 	unsigned long     s_renew_requested; /* last time we sent a renew req */
 	u64               s_renew_seq;
 
@@ -297,6 +304,8 @@ struct ceph_mds_request {
 
 	unsigned long r_timeout;  /* optional.  jiffies, 0 is "wait forever" */
 	unsigned long r_started;  /* start time to measure timeout against */
+	unsigned long r_start_latency;  /* start time to measure latency */
+	unsigned long r_end_latency;    /* finish time to measure latency */
 	unsigned long r_request_started; /* start time for mds request only,
 					    used to measure lease durations */
 
@@ -419,7 +428,6 @@ struct ceph_mds_client {
 
 	u64               last_cap_flush_tid;
 	struct list_head  cap_flush_list;
-	struct list_head  cap_dirty;        /* inodes with dirty caps */
 	struct list_head  cap_dirty_migrating; /* ...that are migration... */
 	int               num_cap_flushing; /* # caps we are flushing */
 	spinlock_t        cap_dirty_lock;   /* protects above items */
@@ -454,6 +462,8 @@ struct ceph_mds_client {
 	struct list_head  dentry_leases;     /* fifo list */
 	struct list_head  dentry_dir_leases; /* lru list */
 
+	struct ceph_client_metric metric;
+
 	spinlock_t		snapid_map_lock;
 	struct rb_root		snapid_map_tree;
 	struct list_head	snapid_map_lru;
@@ -497,6 +507,7 @@ extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
 				struct inode *dir,
 				struct ceph_mds_request *req);
 extern void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req);
+extern void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req);
 static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
 {
 	kref_get(&req->r_kref);
diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
new file mode 100644
index 000000000000..9217f35bc2b9
--- /dev/null
+++ b/fs/ceph/metric.c
@@ -0,0 +1,148 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#include <linux/types.h>
+#include <linux/percpu_counter.h>
+#include <linux/math64.h>
+
+#include "metric.h"
+
+int ceph_metric_init(struct ceph_client_metric *m)
+{
+	int ret;
+
+	if (!m)
+		return -EINVAL;
+
+	atomic64_set(&m->total_dentries, 0);
+	ret = percpu_counter_init(&m->d_lease_hit, 0, GFP_KERNEL);
+	if (ret)
+		return ret;
+
+	ret = percpu_counter_init(&m->d_lease_mis, 0, GFP_KERNEL);
+	if (ret)
+		goto err_d_lease_mis;
+
+	ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
+	if (ret)
+		goto err_i_caps_hit;
+
+	ret = percpu_counter_init(&m->i_caps_mis, 0, GFP_KERNEL);
+	if (ret)
+		goto err_i_caps_mis;
+
+	spin_lock_init(&m->read_latency_lock);
+	m->read_latency_sq_sum = 0;
+	m->read_latency_min = KTIME_MAX;
+	m->read_latency_max = 0;
+	m->total_reads = 0;
+	m->read_latency_sum = 0;
+
+	spin_lock_init(&m->write_latency_lock);
+	m->write_latency_sq_sum = 0;
+	m->write_latency_min = KTIME_MAX;
+	m->write_latency_max = 0;
+	m->total_writes = 0;
+	m->write_latency_sum = 0;
+
+	spin_lock_init(&m->metadata_latency_lock);
+	m->metadata_latency_sq_sum = 0;
+	m->metadata_latency_min = KTIME_MAX;
+	m->metadata_latency_max = 0;
+	m->total_metadatas = 0;
+	m->metadata_latency_sum = 0;
+
+	return 0;
+
+err_i_caps_mis:
+	percpu_counter_destroy(&m->i_caps_hit);
+err_i_caps_hit:
+	percpu_counter_destroy(&m->d_lease_mis);
+err_d_lease_mis:
+	percpu_counter_destroy(&m->d_lease_hit);
+
+	return ret;
+}
+
+void ceph_metric_destroy(struct ceph_client_metric *m)
+{
+	if (!m)
+		return;
+
+	percpu_counter_destroy(&m->i_caps_mis);
+	percpu_counter_destroy(&m->i_caps_hit);
+	percpu_counter_destroy(&m->d_lease_mis);
+	percpu_counter_destroy(&m->d_lease_hit);
+}
+
+static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
+				    ktime_t *min, ktime_t *max,
+				    ktime_t *sq_sump, ktime_t lat)
+{
+	ktime_t total, avg, sq, lsum;
+
+	total = ++(*totalp);
+	lsum = (*lsump += lat);
+
+	if (unlikely(lat < *min))
+		*min = lat;
+	if (unlikely(lat > *max))
+		*max = lat;
+
+	if (unlikely(total == 1))
+		return;
+
+	/* the sq is (lat - old_avg) * (lat - new_avg) */
+	avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1));
+	sq = lat - avg;
+	avg = DIV64_U64_ROUND_CLOSEST(lsum, total);
+	sq = sq * (lat - avg);
+	*sq_sump += sq;
+}
+
+void ceph_update_read_latency(struct ceph_client_metric *m,
+			      ktime_t r_start, ktime_t r_end,
+			      int rc)
+{
+	ktime_t lat = ktime_sub(r_end, r_start);
+
+	if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
+		return;
+
+	spin_lock(&m->read_latency_lock);
+	__update_latency(&m->total_reads, &m->read_latency_sum,
+			 &m->read_latency_min, &m->read_latency_max,
+			 &m->read_latency_sq_sum, lat);
+	spin_unlock(&m->read_latency_lock);
+}
+
+void ceph_update_write_latency(struct ceph_client_metric *m,
+			       ktime_t r_start, ktime_t r_end,
+			       int rc)
+{
+	ktime_t lat = ktime_sub(r_end, r_start);
+
+	if (unlikely(rc && rc != -ETIMEDOUT))
+		return;
+
+	spin_lock(&m->write_latency_lock);
+	__update_latency(&m->total_writes, &m->write_latency_sum,
+			 &m->write_latency_min, &m->write_latency_max,
+			 &m->write_latency_sq_sum, lat);
+	spin_unlock(&m->write_latency_lock);
+}
+
+void ceph_update_metadata_latency(struct ceph_client_metric *m,
+				  ktime_t r_start, ktime_t r_end,
+				  int rc)
+{
+	ktime_t lat = ktime_sub(r_end, r_start);
+
+	if (unlikely(rc && rc != -ENOENT))
+		return;
+
+	spin_lock(&m->metadata_latency_lock);
+	__update_latency(&m->total_metadatas, &m->metadata_latency_sum,
+			 &m->metadata_latency_min, &m->metadata_latency_max,
+			 &m->metadata_latency_sq_sum, lat);
+	spin_unlock(&m->metadata_latency_lock);
+}
diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
new file mode 100644
index 000000000000..ccd81285a450
--- /dev/null
+++ b/fs/ceph/metric.h
@@ -0,0 +1,62 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_MDS_METRIC_H
+#define _FS_CEPH_MDS_METRIC_H
+
+#include <linux/types.h>
+#include <linux/percpu_counter.h>
+#include <linux/ktime.h>
+
+/* This is the global metrics */
+struct ceph_client_metric {
+	atomic64_t            total_dentries;
+	struct percpu_counter d_lease_hit;
+	struct percpu_counter d_lease_mis;
+
+	struct percpu_counter i_caps_hit;
+	struct percpu_counter i_caps_mis;
+
+	spinlock_t read_latency_lock;
+	u64 total_reads;
+	ktime_t read_latency_sum;
+	ktime_t read_latency_sq_sum;
+	ktime_t read_latency_min;
+	ktime_t read_latency_max;
+
+	spinlock_t write_latency_lock;
+	u64 total_writes;
+	ktime_t write_latency_sum;
+	ktime_t write_latency_sq_sum;
+	ktime_t write_latency_min;
+	ktime_t write_latency_max;
+
+	spinlock_t metadata_latency_lock;
+	u64 total_metadatas;
+	ktime_t metadata_latency_sum;
+	ktime_t metadata_latency_sq_sum;
+	ktime_t metadata_latency_min;
+	ktime_t metadata_latency_max;
+};
+
+extern int ceph_metric_init(struct ceph_client_metric *m);
+extern void ceph_metric_destroy(struct ceph_client_metric *m);
+
+static inline void ceph_update_cap_hit(struct ceph_client_metric *m)
+{
+	percpu_counter_inc(&m->i_caps_hit);
+}
+
+static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
+{
+	percpu_counter_inc(&m->i_caps_mis);
+}
+
+extern void ceph_update_read_latency(struct ceph_client_metric *m,
+				     ktime_t r_start, ktime_t r_end,
+				     int rc);
+extern void ceph_update_write_latency(struct ceph_client_metric *m,
+				      ktime_t r_start, ktime_t r_end,
+				      int rc);
+extern void ceph_update_metadata_latency(struct ceph_client_metric *m,
+				         ktime_t r_start, ktime_t r_end,
+					 int rc);
+#endif /* _FS_CEPH_MDS_METRIC_H */
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index 19507e2fdb57..198ddde5c1e6 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -264,7 +264,7 @@ restart:
 	return NULL;
 }
 
-bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
+static bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
 {
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc;
 	struct ceph_snap_realm *old_realm, *new_realm;
@@ -361,8 +361,6 @@ restart:
 		spin_unlock(&ci->i_ceph_lock);
 		switch (op) {
 		case QUOTA_CHECK_MAX_FILES_OP:
-			exceeded = (max && (rvalue >= max));
-			break;
 		case QUOTA_CHECK_MAX_BYTES_OP:
 			exceeded = (max && (rvalue + delta > max));
 			break;
@@ -417,7 +415,7 @@ bool ceph_quota_is_max_files_exceeded(struct inode *inode)
 
 	WARN_ON(!S_ISDIR(inode->i_mode));
 
-	return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 0);
+	return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 1);
 }
 
 /*
@@ -518,3 +516,59 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
 	return is_updated;
 }
 
+/*
+ * ceph_quota_check_rename - check if a rename can be executed
+ * @mdsc:	MDS client instance
+ * @old:	inode to be copied
+ * @new:	destination inode (directory)
+ *
+ * This function verifies if a rename (e.g. moving a file or directory) can be
+ * executed.  It forces an rstat update in the @new target directory (and in the
+ * source @old as well, if it's a directory).  The actual check is done both for
+ * max_files and max_bytes.
+ *
+ * This function returns 0 if it's OK to do the rename, or, if quotas are
+ * exceeded, -EXDEV (if @old is a directory) or -EDQUOT.
+ */
+int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
+			    struct inode *old, struct inode *new)
+{
+	struct ceph_inode_info *ci_old = ceph_inode(old);
+	int ret = 0;
+
+	if (ceph_quota_is_same_realm(old, new))
+		return 0;
+
+	/*
+	 * Get the latest rstat for target directory (and for source, if a
+	 * directory)
+	 */
+	ret = ceph_do_getattr(new, CEPH_STAT_RSTAT, false);
+	if (ret)
+		return ret;
+
+	if (S_ISDIR(old->i_mode)) {
+		ret = ceph_do_getattr(old, CEPH_STAT_RSTAT, false);
+		if (ret)
+			return ret;
+		ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
+					   ci_old->i_rbytes);
+		if (!ret)
+			ret = check_quota_exceeded(new,
+						   QUOTA_CHECK_MAX_FILES_OP,
+						   ci_old->i_rfiles +
+						   ci_old->i_rsubdirs);
+		if (ret)
+			ret = -EXDEV;
+	} else {
+		ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
+					   i_size_read(old));
+		if (!ret)
+			ret = check_quota_exceeded(new,
+						   QUOTA_CHECK_MAX_FILES_OP, 1);
+		if (ret)
+			ret = -EDQUOT;
+	}
+
+	return ret;
+}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 60aac3aee055..5a6cdd39bc10 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -128,6 +128,7 @@ struct ceph_fs_client {
 	struct dentry *debugfs_congestion_kb;
 	struct dentry *debugfs_bdi;
 	struct dentry *debugfs_mdsc, *debugfs_mdsmap;
+	struct dentry *debugfs_metric;
 	struct dentry *debugfs_mds_sessions;
 #endif
 
@@ -350,7 +351,25 @@ struct ceph_inode_info {
 	struct rb_root i_caps;           /* cap list */
 	struct ceph_cap *i_auth_cap;     /* authoritative cap, if any */
 	unsigned i_dirty_caps, i_flushing_caps;     /* mask of dirtied fields */
-	struct list_head i_dirty_item, i_flushing_item;
+
+	/*
+	 * Link to the the auth cap's session's s_cap_dirty list. s_cap_dirty
+	 * is protected by the mdsc->cap_dirty_lock, but each individual item
+	 * is also protected by the inode's i_ceph_lock. Walking s_cap_dirty
+	 * requires the mdsc->cap_dirty_lock. List presence for an item can
+	 * be tested under the i_ceph_lock. Changing anything requires both.
+	 */
+	struct list_head i_dirty_item;
+
+	/*
+	 * Link to session's s_cap_flushing list. Protected in a similar
+	 * fashion to i_dirty_item, but also by the s_mutex for changes. The
+	 * s_cap_flushing list can be walked while holding either the s_mutex
+	 * or msdc->cap_dirty_lock. List presence can also be checked while
+	 * holding the i_ceph_lock for this inode.
+	 */
+	struct list_head i_flushing_item;
+
 	/* we need to track cap writeback on a per-cap-bit basis, to allow
 	 * overlapping, pipelined cap flushes to the mds.  we can probably
 	 * reduce the tid to 8 bits if we're concerned about inode size. */
@@ -644,6 +663,8 @@ static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci)
 
 extern int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented);
 extern int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int t);
+extern int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+					  int t);
 extern int __ceph_caps_issued_other(struct ceph_inode_info *ci,
 				    struct ceph_cap *cap);
 
@@ -656,12 +677,12 @@ static inline int ceph_caps_issued(struct ceph_inode_info *ci)
 	return issued;
 }
 
-static inline int ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask,
-					int touch)
+static inline int ceph_caps_issued_mask_metric(struct ceph_inode_info *ci,
+					       int mask, int touch)
 {
 	int r;
 	spin_lock(&ci->i_ceph_lock);
-	r = __ceph_caps_issued_mask(ci, mask, touch);
+	r = __ceph_caps_issued_mask_metric(ci, mask, touch);
 	spin_unlock(&ci->i_ceph_lock);
 	return r;
 }
@@ -1074,6 +1095,8 @@ extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
 				bool snap_rwsem_locked);
 extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
+					    int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 				       struct ceph_snap_context *snapc);
 extern void ceph_flush_snaps(struct ceph_inode_info *ci,
@@ -1189,13 +1212,14 @@ extern void ceph_handle_quota(struct ceph_mds_client *mdsc,
 			      struct ceph_mds_session *session,
 			      struct ceph_msg *msg);
 extern bool ceph_quota_is_max_files_exceeded(struct inode *inode);
-extern bool ceph_quota_is_same_realm(struct inode *old, struct inode *new);
 extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode,
 					     loff_t newlen);
 extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode,
 						loff_t newlen);
 extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
 				     struct kstatfs *buf);
+extern int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
+				   struct inode *old, struct inode *new);
 extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
 
 #endif /* _FS_CEPH_SUPER_H */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 7b8a070a782d..71ee34d160c3 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -856,7 +856,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 
 	if (ci->i_xattrs.version == 0 ||
 	    !((req_mask & CEPH_CAP_XATTR_SHARED) ||
-	      __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
+	      __ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1))) {
 		spin_unlock(&ci->i_ceph_lock);
 
 		/* security module gets xattr while filling trace */
@@ -914,7 +914,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
 	if (ci->i_xattrs.version == 0 ||
-	    !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+	    !__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1)) {
 		spin_unlock(&ci->i_ceph_lock);
 		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
 		if (err)
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 525b7c3f1c81..2247e71beb83 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -53,6 +53,8 @@ struct ceph_options {
 	unsigned long osd_keepalive_timeout;	/* jiffies */
 	unsigned long osd_request_timeout;	/* jiffies */
 
+	u32 osd_req_flags;  /* CEPH_OSD_FLAG_*, applied to each OSD request */
+
 	/*
 	 * any type that can't be simply compared or doesn't need
 	 * to be compared should go beyond this point,
@@ -64,6 +66,7 @@ struct ceph_options {
 	int num_mon;
 	char *name;
 	struct ceph_crypto_key *key;
+	struct rb_root crush_locs;
 };
 
 /*
@@ -188,7 +191,7 @@ static inline int calc_pages_for(u64 off, u64 len)
 #define RB_CMP3WAY(a, b) ((a) < (b) ? -1 : (a) > (b))
 
 #define DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \
-static void insert_##name(struct rb_root *root, type *t)		\
+static bool __insert_##name(struct rb_root *root, type *t)		\
 {									\
 	struct rb_node **n = &root->rb_node;				\
 	struct rb_node *parent = NULL;					\
@@ -206,11 +209,17 @@ static void insert_##name(struct rb_root *root, type *t)		\
 		else if (cmp > 0)					\
 			n = &(*n)->rb_right;				\
 		else							\
-			BUG();						\
+			return false;					\
 	}								\
 									\
 	rb_link_node(&t->nodefld, parent, n);				\
 	rb_insert_color(&t->nodefld, root);				\
+	return true;							\
+}									\
+static void __maybe_unused insert_##name(struct rb_root *root, type *t)	\
+{									\
+	if (!__insert_##name(root, t))					\
+		BUG();							\
 }									\
 static void erase_##name(struct rb_root *root, type *t)			\
 {									\
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index dbb8a6959a73..ce4ffeb384d7 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -19,7 +19,7 @@ struct ceph_monmap {
 	struct ceph_fsid fsid;
 	u32 epoch;
 	u32 num_mon;
-	struct ceph_entity_inst mon_inst[0];
+	struct ceph_entity_inst mon_inst[];
 };
 
 struct ceph_mon_client;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 9d9f745b98a1..c60b59e9291b 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -8,6 +8,7 @@
 #include <linux/mempool.h>
 #include <linux/rbtree.h>
 #include <linux/refcount.h>
+#include <linux/ktime.h>
 
 #include <linux/ceph/types.h>
 #include <linux/ceph/osdmap.h>
@@ -135,6 +136,7 @@ struct ceph_osd_req_op {
 		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
+			u32 flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 		} alloc_hint;
 		struct {
 			u64 snapid;
@@ -164,6 +166,7 @@ struct ceph_osd_request_target {
 	bool recovery_deletes;
 
 	unsigned int flags;                /* CEPH_OSD_FLAG_* */
+	bool used_replica;
 	bool paused;
 
 	u32 epoch;
@@ -213,6 +216,8 @@ struct ceph_osd_request {
 	/* internal */
 	unsigned long r_stamp;                /* jiffies, send or check time */
 	unsigned long r_start_stamp;          /* jiffies */
+	ktime_t r_start_latency;              /* ktime_t */
+	ktime_t r_end_latency;                /* ktime_t */
 	int r_attempts;
 	u32 r_map_dne_bound;
 
@@ -468,7 +473,8 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int
 extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				       unsigned int which,
 				       u64 expected_object_size,
-				       u64 expected_write_size);
+				       u64 expected_write_size,
+				       u32 flags);
 
 extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       struct ceph_snap_context *snapc,
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 5e601975745f..3f4498fef6ad 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -302,9 +302,26 @@ bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
 			      const struct ceph_pg *raw_pgid);
 
+struct crush_loc {
+	char *cl_type_name;
+	char *cl_name;
+};
+
+struct crush_loc_node {
+	struct rb_node cl_node;
+	struct crush_loc cl_loc;  /* pointers into cl_data */
+	char cl_data[];
+};
+
+int ceph_parse_crush_location(char *crush_location, struct rb_root *locs);
+int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2);
+void ceph_clear_crush_locs(struct rb_root *locs);
+
+int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
+			    struct rb_root *locs);
+
 extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map,
 						    u64 id);
-
 extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
 extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id);
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 88ed3c5c04c5..3a518fd0eaad 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -464,6 +464,19 @@ enum {
 
 const char *ceph_osd_watch_op_name(int o);
 
+enum {
+	CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE = 1,
+	CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE = 2,
+	CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ = 4,
+	CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ = 8,
+	CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY = 16,
+	CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE = 32,
+	CEPH_OSD_ALLOC_HINT_FLAG_SHORTLIVED = 64,
+	CEPH_OSD_ALLOC_HINT_FLAG_LONGLIVED = 128,
+	CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE = 256,
+	CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE = 512,
+};
+
 enum {
 	CEPH_OSD_BACKOFF_OP_BLOCK = 1,
 	CEPH_OSD_BACKOFF_OP_ACK_BLOCK = 2,
@@ -517,6 +530,7 @@ struct ceph_osd_op {
 		struct {
 			__le64 expected_object_size;
 			__le64 expected_write_size;
+			__le32 flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 		} __attribute__ ((packed)) alloc_hint;
 		struct {
 			__le64 snapid;
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 54741295c70b..33c16f2de7f6 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -87,7 +87,7 @@ struct crush_rule_mask {
 struct crush_rule {
 	__u32 len;
 	struct crush_rule_mask mask;
-	struct crush_rule_step steps[0];
+	struct crush_rule_step steps[];
 };
 
 #define crush_rule_size(len) (sizeof(struct crush_rule) + \
@@ -301,6 +301,12 @@ struct crush_map {
 
 	__u32 *choose_tries;
 #else
+	/* device/bucket type id -> type name (CrushWrapper::type_map) */
+	struct rb_root type_names;
+
+	/* device/bucket id -> name (CrushWrapper::name_map) */
+	struct rb_root names;
+
 	/* CrushWrapper::choose_args */
 	struct rb_root choose_args;
 #endif
@@ -342,4 +348,10 @@ struct crush_work {
 	struct crush_work_bucket **work; /* Per-bucket working store */
 };
 
+#ifdef __KERNEL__
+/* osdmap.c */
+void clear_crush_names(struct rb_root *root);
+void clear_choose_args(struct crush_map *c);
+#endif
+
 #endif
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 66f22e8aa529..afe0e8184c23 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -176,6 +176,10 @@ int ceph_compare_options(struct ceph_options *new_opt,
 		}
 	}
 
+	ret = ceph_compare_crush_locs(&opt1->crush_locs, &opt2->crush_locs);
+	if (ret)
+		return ret;
+
 	/* any matching mon ip implies a match */
 	for (i = 0; i < opt1->num_mon; i++) {
 		if (ceph_monmap_contains(client->monc.monmap,
@@ -259,6 +263,8 @@ enum {
 	Opt_secret,
 	Opt_key,
 	Opt_ip,
+	Opt_crush_location,
+	Opt_read_from_replica,
 	/* string args above */
 	Opt_share,
 	Opt_crc,
@@ -268,11 +274,25 @@ enum {
 	Opt_abort_on_full,
 };
 
+enum {
+	Opt_read_from_replica_no,
+	Opt_read_from_replica_balance,
+	Opt_read_from_replica_localize,
+};
+
+static const struct constant_table ceph_param_read_from_replica[] = {
+	{"no",		Opt_read_from_replica_no},
+	{"balance",	Opt_read_from_replica_balance},
+	{"localize",	Opt_read_from_replica_localize},
+	{}
+};
+
 static const struct fs_parameter_spec ceph_parameters[] = {
 	fsparam_flag	("abort_on_full",		Opt_abort_on_full),
 	fsparam_flag_no ("cephx_require_signatures",	Opt_cephx_require_signatures),
 	fsparam_flag_no ("cephx_sign_messages",		Opt_cephx_sign_messages),
 	fsparam_flag_no ("crc",				Opt_crc),
+	fsparam_string	("crush_location",		Opt_crush_location),
 	fsparam_string	("fsid",			Opt_fsid),
 	fsparam_string	("ip",				Opt_ip),
 	fsparam_string	("key",				Opt_key),
@@ -283,6 +303,8 @@ static const struct fs_parameter_spec ceph_parameters[] = {
 	fsparam_u32	("osdkeepalive",		Opt_osdkeepalivetimeout),
 	__fsparam	(fs_param_is_s32, "osdtimeout", Opt_osdtimeout,
 			 fs_param_deprecated, NULL),
+	fsparam_enum	("read_from_replica",		Opt_read_from_replica,
+			 ceph_param_read_from_replica),
 	fsparam_string	("secret",			Opt_secret),
 	fsparam_flag_no ("share",			Opt_share),
 	fsparam_flag_no ("tcp_nodelay",			Opt_tcp_nodelay),
@@ -297,6 +319,7 @@ struct ceph_options *ceph_alloc_options(void)
 	if (!opt)
 		return NULL;
 
+	opt->crush_locs = RB_ROOT;
 	opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
 				GFP_KERNEL);
 	if (!opt->mon_addr) {
@@ -319,6 +342,7 @@ void ceph_destroy_options(struct ceph_options *opt)
 	if (!opt)
 		return;
 
+	ceph_clear_crush_locs(&opt->crush_locs);
 	kfree(opt->name);
 	if (opt->key) {
 		ceph_crypto_key_destroy(opt->key);
@@ -453,6 +477,34 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
 		if (!opt->key)
 			return -ENOMEM;
 		return get_secret(opt->key, param->string, &log);
+	case Opt_crush_location:
+		ceph_clear_crush_locs(&opt->crush_locs);
+		err = ceph_parse_crush_location(param->string,
+						&opt->crush_locs);
+		if (err) {
+			error_plog(&log, "Failed to parse CRUSH location: %d",
+				   err);
+			return err;
+		}
+		break;
+	case Opt_read_from_replica:
+		switch (result.uint_32) {
+		case Opt_read_from_replica_no:
+			opt->osd_req_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
+						CEPH_OSD_FLAG_LOCALIZE_READS);
+			break;
+		case Opt_read_from_replica_balance:
+			opt->osd_req_flags |= CEPH_OSD_FLAG_BALANCE_READS;
+			opt->osd_req_flags &= ~CEPH_OSD_FLAG_LOCALIZE_READS;
+			break;
+		case Opt_read_from_replica_localize:
+			opt->osd_req_flags |= CEPH_OSD_FLAG_LOCALIZE_READS;
+			opt->osd_req_flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
+			break;
+		default:
+			BUG();
+		}
+		break;
 
 	case Opt_osdtimeout:
 		warn_plog(&log, "Ignoring osdtimeout");
@@ -535,6 +587,7 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
 {
 	struct ceph_options *opt = client->options;
 	size_t pos = m->count;
+	struct rb_node *n;
 
 	if (opt->name) {
 		seq_puts(m, "name=");
@@ -544,6 +597,28 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
 	if (opt->key)
 		seq_puts(m, "secret=<hidden>,");
 
+	if (!RB_EMPTY_ROOT(&opt->crush_locs)) {
+		seq_puts(m, "crush_location=");
+		for (n = rb_first(&opt->crush_locs); ; ) {
+			struct crush_loc_node *loc =
+			    rb_entry(n, struct crush_loc_node, cl_node);
+
+			seq_printf(m, "%s:%s", loc->cl_loc.cl_type_name,
+				   loc->cl_loc.cl_name);
+			n = rb_next(n);
+			if (!n)
+				break;
+
+			seq_putc(m, '|');
+		}
+		seq_putc(m, ',');
+	}
+	if (opt->osd_req_flags & CEPH_OSD_FLAG_BALANCE_READS) {
+		seq_puts(m, "read_from_replica=balance,");
+	} else if (opt->osd_req_flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
+		seq_puts(m, "read_from_replica=localize,");
+	}
+
 	if (opt->flags & CEPH_OPT_FSID)
 		seq_printf(m, "fsid=%pU,", &opt->fsid);
 	if (opt->flags & CEPH_OPT_NOSHARE)
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 3d70244bc1b6..254ded0b05f6 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -2,7 +2,6 @@
 #ifdef __KERNEL__
 # include <linux/slab.h>
 # include <linux/crush/crush.h>
-void clear_choose_args(struct crush_map *c);
 #else
 # include "crush_compat.h"
 # include "crush.h"
@@ -130,6 +129,8 @@ void crush_destroy(struct crush_map *map)
 #ifndef __KERNEL__
 	kfree(map->choose_tries);
 #else
+	clear_crush_names(&map->type_names);
+	clear_crush_names(&map->names);
 	clear_choose_args(map);
 #endif
 	kfree(map);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 1344f232ecc5..409d505ff320 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -81,11 +81,13 @@ static int osdmap_show(struct seq_file *s, void *p)
 		u32 state = map->osd_state[i];
 		char sb[64];
 
-		seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
+		seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\t%2d\n",
 			   i, ceph_pr_addr(addr),
 			   ((map->osd_weight[i]*100) >> 16),
 			   ceph_osdmap_state_str(sb, sizeof(sb), state),
-			   ((ceph_get_primary_affinity(map, i)*100) >> 16));
+			   ((ceph_get_primary_affinity(map, i)*100) >> 16),
+			   ceph_get_crush_locality(map, i,
+					   &client->options->crush_locs));
 	}
 	for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
 		struct ceph_pg_mapping *pg =
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1d4973f8cd7a..4fea3c33af2a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -932,10 +932,14 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
 	op->watch.gen = 0;
 }
 
+/*
+ * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
+ */
 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				unsigned int which,
 				u64 expected_object_size,
-				u64 expected_write_size)
+				u64 expected_write_size,
+				u32 flags)
 {
 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 						      CEPH_OSD_OP_SETALLOCHINT,
@@ -943,6 +947,7 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 
 	op->alloc_hint.expected_object_size = expected_object_size;
 	op->alloc_hint.expected_write_size = expected_write_size;
+	op->alloc_hint.flags = flags;
 
 	/*
 	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
@@ -1018,6 +1023,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 		    cpu_to_le64(src->alloc_hint.expected_object_size);
 		dst->alloc_hint.expected_write_size =
 		    cpu_to_le64(src->alloc_hint.expected_write_size);
+		dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
 		break;
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
@@ -1497,6 +1503,45 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
 	       (osdc->osdmap->epoch < osdc->epoch_barrier);
 }
 
+static int pick_random_replica(const struct ceph_osds *acting)
+{
+	int i = prandom_u32() % acting->size;
+
+	dout("%s picked osd%d, primary osd%d\n", __func__,
+	     acting->osds[i], acting->primary);
+	return i;
+}
+
+/*
+ * Picks the closest replica based on client's location given by
+ * crush_location option.  Prefers the primary if the locality is
+ * the same.
+ */
+static int pick_closest_replica(struct ceph_osd_client *osdc,
+				const struct ceph_osds *acting)
+{
+	struct ceph_options *opt = osdc->client->options;
+	int best_i, best_locality;
+	int i = 0, locality;
+
+	do {
+		locality = ceph_get_crush_locality(osdc->osdmap,
+						   acting->osds[i],
+						   &opt->crush_locs);
+		if (i == 0 ||
+		    (locality >= 0 && best_locality < 0) ||
+		    (locality >= 0 && best_locality >= 0 &&
+		     locality < best_locality)) {
+			best_i = i;
+			best_locality = locality;
+		}
+	} while (++i < acting->size);
+
+	dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
+	     acting->osds[best_i], best_locality, acting->primary);
+	return best_i;
+}
+
 enum calc_target_result {
 	CALC_TARGET_NO_ACTION = 0,
 	CALC_TARGET_NEED_RESEND,
@@ -1510,6 +1555,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 	struct ceph_pg_pool_info *pi;
 	struct ceph_pg pgid, last_pgid;
 	struct ceph_osds up, acting;
+	bool is_read = t->flags & CEPH_OSD_FLAG_READ;
+	bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
 	bool force_resend = false;
 	bool unpaused = false;
 	bool legacy_change = false;
@@ -1540,9 +1587,9 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 	ceph_oid_copy(&t->target_oid, &t->base_oid);
 	ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
 	if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
-		if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
+		if (is_read && pi->read_tier >= 0)
 			t->target_oloc.pool = pi->read_tier;
-		if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
+		if (is_write && pi->write_tier >= 0)
 			t->target_oloc.pool = pi->write_tier;
 
 		pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
@@ -1581,7 +1628,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 		unpaused = true;
 	}
 	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
-			ceph_osds_changed(&t->acting, &acting, any_change);
+			ceph_osds_changed(&t->acting, &acting,
+					  t->used_replica || any_change);
 	if (t->pg_num)
 		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
 
@@ -1597,7 +1645,24 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 		t->sort_bitwise = sort_bitwise;
 		t->recovery_deletes = recovery_deletes;
 
-		t->osd = acting.primary;
+		if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
+				 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
+		    !is_write && pi->type == CEPH_POOL_TYPE_REP &&
+		    acting.size > 1) {
+			int pos;
+
+			WARN_ON(!is_read || acting.osds[0] != acting.primary);
+			if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
+				pos = pick_random_replica(&acting);
+			} else {
+				pos = pick_closest_replica(osdc, &acting);
+			}
+			t->osd = acting.osds[pos];
+			t->used_replica = pos > 0;
+		} else {
+			t->osd = acting.primary;
+			t->used_replica = false;
+		}
 	}
 
 	if (unpaused || legacy_change || force_resend || split)
@@ -2366,13 +2431,17 @@ promote:
 
 static void account_request(struct ceph_osd_request *req)
 {
+	struct ceph_osd_client *osdc = req->r_osdc;
+
 	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
 	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
 
 	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
-	atomic_inc(&req->r_osdc->num_requests);
+	req->r_flags |= osdc->client->options->osd_req_flags;
+	atomic_inc(&osdc->num_requests);
 
 	req->r_start_stamp = jiffies;
+	req->r_start_latency = ktime_get();
 }
 
 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
@@ -2389,6 +2458,8 @@ static void finish_request(struct ceph_osd_request *req)
 	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 
+	req->r_end_latency = ktime_get();
+
 	if (req->r_osd)
 		unlink_request(req->r_osd, req);
 	atomic_dec(&osdc->num_requests);
@@ -3657,6 +3728,26 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 		goto out_unlock_osdc;
 	}
 
+	if (m.result == -EAGAIN) {
+		dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
+		unlink_request(osd, req);
+		mutex_unlock(&osd->lock);
+
+		/*
+		 * The object is missing on the replica or not (yet)
+		 * readable.  Clear pgid to force a resend to the primary
+		 * via legacy_change.
+		 */
+		req->r_t.pgid.pool = 0;
+		req->r_t.pgid.seed = 0;
+		WARN_ON(!req->r_t.used_replica);
+		req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
+				  CEPH_OSD_FLAG_LOCALIZE_READS);
+		req->r_tid = 0;
+		__submit_request(req, false);
+		goto out_unlock_osdc;
+	}
+
 	if (m.num_ops != req->r_num_ops) {
 		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
 		       req->r_num_ops, req->r_tid);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 2a6e63a8edbe..96c25f5e064a 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -138,6 +138,79 @@ bad:
 	return -EINVAL;
 }
 
+struct crush_name_node {
+	struct rb_node cn_node;
+	int cn_id;
+	char cn_name[];
+};
+
+static struct crush_name_node *alloc_crush_name(size_t name_len)
+{
+	struct crush_name_node *cn;
+
+	cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
+	if (!cn)
+		return NULL;
+
+	RB_CLEAR_NODE(&cn->cn_node);
+	return cn;
+}
+
+static void free_crush_name(struct crush_name_node *cn)
+{
+	WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
+
+	kfree(cn);
+}
+
+DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
+
+static int decode_crush_names(void **p, void *end, struct rb_root *root)
+{
+	u32 n;
+
+	ceph_decode_32_safe(p, end, n, e_inval);
+	while (n--) {
+		struct crush_name_node *cn;
+		int id;
+		u32 name_len;
+
+		ceph_decode_32_safe(p, end, id, e_inval);
+		ceph_decode_32_safe(p, end, name_len, e_inval);
+		ceph_decode_need(p, end, name_len, e_inval);
+
+		cn = alloc_crush_name(name_len);
+		if (!cn)
+			return -ENOMEM;
+
+		cn->cn_id = id;
+		memcpy(cn->cn_name, *p, name_len);
+		cn->cn_name[name_len] = '\0';
+		*p += name_len;
+
+		if (!__insert_crush_name(root, cn)) {
+			free_crush_name(cn);
+			return -EEXIST;
+		}
+	}
+
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+void clear_crush_names(struct rb_root *root)
+{
+	while (!RB_EMPTY_ROOT(root)) {
+		struct crush_name_node *cn =
+		    rb_entry(rb_first(root), struct crush_name_node, cn_node);
+
+		erase_crush_name(root, cn);
+		free_crush_name(cn);
+	}
+}
+
 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
 {
 	struct crush_choose_arg_map *arg_map;
@@ -354,6 +427,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 	if (c == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	c->type_names = RB_ROOT;
+	c->names = RB_ROOT;
 	c->choose_args = RB_ROOT;
 
         /* set tunables to default values */
@@ -510,8 +585,14 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 		}
 	}
 
-	ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
-	ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
+	err = decode_crush_names(p, end, &c->type_names);
+	if (err)
+		goto fail;
+
+	err = decode_crush_names(p, end, &c->names);
+	if (err)
+		goto fail;
+
 	ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
 
         /* tunables */
@@ -636,48 +717,11 @@ DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
 /*
  * rbtree of pg pool info
  */
-static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
-{
-	struct rb_node **p = &root->rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_pg_pool_info *pi = NULL;
-
-	while (*p) {
-		parent = *p;
-		pi = rb_entry(parent, struct ceph_pg_pool_info, node);
-		if (new->id < pi->id)
-			p = &(*p)->rb_left;
-		else if (new->id > pi->id)
-			p = &(*p)->rb_right;
-		else
-			return -EEXIST;
-	}
-
-	rb_link_node(&new->node, parent, p);
-	rb_insert_color(&new->node, root);
-	return 0;
-}
-
-static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
-{
-	struct ceph_pg_pool_info *pi;
-	struct rb_node *n = root->rb_node;
-
-	while (n) {
-		pi = rb_entry(n, struct ceph_pg_pool_info, node);
-		if (id < pi->id)
-			n = n->rb_left;
-		else if (id > pi->id)
-			n = n->rb_right;
-		else
-			return pi;
-	}
-	return NULL;
-}
+DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
 
 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
 {
-	return __lookup_pg_pool(&map->pg_pools, id);
+	return lookup_pg_pool(&map->pg_pools, id);
 }
 
 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
@@ -690,8 +734,7 @@ const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
 	if (WARN_ON_ONCE(id > (u64) INT_MAX))
 		return NULL;
 
-	pi = __lookup_pg_pool(&map->pg_pools, (int) id);
-
+	pi = lookup_pg_pool(&map->pg_pools, id);
 	return pi ? pi->name : NULL;
 }
 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
@@ -714,14 +757,14 @@ u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
 {
 	struct ceph_pg_pool_info *pi;
 
-	pi = __lookup_pg_pool(&map->pg_pools, id);
+	pi = lookup_pg_pool(&map->pg_pools, id);
 	return pi ? pi->flags : 0;
 }
 EXPORT_SYMBOL(ceph_pg_pool_flags);
 
 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 {
-	rb_erase(&pi->node, root);
+	erase_pg_pool(root, pi);
 	kfree(pi->name);
 	kfree(pi);
 }
@@ -903,7 +946,7 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 		ceph_decode_32_safe(p, end, len, bad);
 		dout("  pool %llu len %d\n", pool, len);
 		ceph_decode_need(p, end, len, bad);
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (pi) {
 			char *name = kstrndup(*p, len, GFP_NOFS);
 
@@ -1154,18 +1197,18 @@ static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
 
 		ceph_decode_64_safe(p, end, pool, e_inval);
 
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (!incremental || !pi) {
 			pi = kzalloc(sizeof(*pi), GFP_NOFS);
 			if (!pi)
 				return -ENOMEM;
 
+			RB_CLEAR_NODE(&pi->node);
 			pi->id = pool;
 
-			ret = __insert_pg_pool(&map->pg_pools, pi);
-			if (ret) {
+			if (!__insert_pg_pool(&map->pg_pools, pi)) {
 				kfree(pi);
-				return ret;
+				return -EEXIST;
 			}
 		}
 
@@ -1829,7 +1872,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 		struct ceph_pg_pool_info *pi;
 
 		ceph_decode_64_safe(p, end, pool, e_inval);
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (pi)
 			__remove_pg_pool(&map->pg_pools, pi);
 	}
@@ -2672,3 +2715,221 @@ int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
 	return acting.primary;
 }
 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
+
+static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
+					      size_t name_len)
+{
+	struct crush_loc_node *loc;
+
+	loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
+	if (!loc)
+		return NULL;
+
+	RB_CLEAR_NODE(&loc->cl_node);
+	return loc;
+}
+
+static void free_crush_loc(struct crush_loc_node *loc)
+{
+	WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
+
+	kfree(loc);
+}
+
+static int crush_loc_compare(const struct crush_loc *loc1,
+			     const struct crush_loc *loc2)
+{
+	return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
+	       strcmp(loc1->cl_name, loc2->cl_name);
+}
+
+DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
+		 RB_BYPTR, const struct crush_loc *, cl_node)
+
+/*
+ * Parses a set of <bucket type name>':'<bucket name> pairs separated
+ * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
+ *
+ * Note that @crush_location is modified by strsep().
+ */
+int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
+{
+	struct crush_loc_node *loc;
+	const char *type_name, *name, *colon;
+	size_t type_name_len, name_len;
+
+	dout("%s '%s'\n", __func__, crush_location);
+	while ((type_name = strsep(&crush_location, "|"))) {
+		colon = strchr(type_name, ':');
+		if (!colon)
+			return -EINVAL;
+
+		type_name_len = colon - type_name;
+		if (type_name_len == 0)
+			return -EINVAL;
+
+		name = colon + 1;
+		name_len = strlen(name);
+		if (name_len == 0)
+			return -EINVAL;
+
+		loc = alloc_crush_loc(type_name_len, name_len);
+		if (!loc)
+			return -ENOMEM;
+
+		loc->cl_loc.cl_type_name = loc->cl_data;
+		memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
+		loc->cl_loc.cl_type_name[type_name_len] = '\0';
+
+		loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
+		memcpy(loc->cl_loc.cl_name, name, name_len);
+		loc->cl_loc.cl_name[name_len] = '\0';
+
+		if (!__insert_crush_loc(locs, loc)) {
+			free_crush_loc(loc);
+			return -EEXIST;
+		}
+
+		dout("%s type_name '%s' name '%s'\n", __func__,
+		     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
+	}
+
+	return 0;
+}
+
+int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
+{
+	struct rb_node *n1 = rb_first(locs1);
+	struct rb_node *n2 = rb_first(locs2);
+	int ret;
+
+	for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
+		struct crush_loc_node *loc1 =
+		    rb_entry(n1, struct crush_loc_node, cl_node);
+		struct crush_loc_node *loc2 =
+		    rb_entry(n2, struct crush_loc_node, cl_node);
+
+		ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
+		if (ret)
+			return ret;
+	}
+
+	if (!n1 && n2)
+		return -1;
+	if (n1 && !n2)
+		return 1;
+	return 0;
+}
+
+void ceph_clear_crush_locs(struct rb_root *locs)
+{
+	while (!RB_EMPTY_ROOT(locs)) {
+		struct crush_loc_node *loc =
+		    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
+
+		erase_crush_loc(locs, loc);
+		free_crush_loc(loc);
+	}
+}
+
+/*
+ * [a-zA-Z0-9-_.]+
+ */
+static bool is_valid_crush_name(const char *name)
+{
+	do {
+		if (!('a' <= *name && *name <= 'z') &&
+		    !('A' <= *name && *name <= 'Z') &&
+		    !('0' <= *name && *name <= '9') &&
+		    *name != '-' && *name != '_' && *name != '.')
+			return false;
+	} while (*++name != '\0');
+
+	return true;
+}
+
+/*
+ * Gets the parent of an item.  Returns its id (<0 because the
+ * parent is always a bucket), type id (>0 for the same reason,
+ * via @parent_type_id) and location (via @parent_loc).  If no
+ * parent, returns 0.
+ *
+ * Does a linear search, as there are no parent pointers of any
+ * kind.  Note that the result is ambigous for items that occur
+ * multiple times in the map.
+ */
+static int get_immediate_parent(struct crush_map *c, int id,
+				u16 *parent_type_id,
+				struct crush_loc *parent_loc)
+{
+	struct crush_bucket *b;
+	struct crush_name_node *type_cn, *cn;
+	int i, j;
+
+	for (i = 0; i < c->max_buckets; i++) {
+		b = c->buckets[i];
+		if (!b)
+			continue;
+
+		/* ignore per-class shadow hierarchy */
+		cn = lookup_crush_name(&c->names, b->id);
+		if (!cn || !is_valid_crush_name(cn->cn_name))
+			continue;
+
+		for (j = 0; j < b->size; j++) {
+			if (b->items[j] != id)
+				continue;
+
+			*parent_type_id = b->type;
+			type_cn = lookup_crush_name(&c->type_names, b->type);
+			parent_loc->cl_type_name = type_cn->cn_name;
+			parent_loc->cl_name = cn->cn_name;
+			return b->id;
+		}
+	}
+
+	return 0;  /* no parent */
+}
+
+/*
+ * Calculates the locality/distance from an item to a client
+ * location expressed in terms of CRUSH hierarchy as a set of
+ * (bucket type name, bucket name) pairs.  Specifically, looks
+ * for the lowest-valued bucket type for which the location of
+ * @id matches one of the locations in @locs, so for standard
+ * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
+ * a matching host is closer than a matching rack and a matching
+ * data center is closer than a matching zone.
+ *
+ * Specifying multiple locations (a "multipath" location) such
+ * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
+ * is a multimap.  The locality will be:
+ *
+ * - 3 for OSDs in racks foo1 and foo2
+ * - 8 for OSDs in data center bar
+ * - -1 for all other OSDs
+ *
+ * The lowest possible bucket type is 1, so the best locality
+ * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
+ * the OSD itself.
+ */
+int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
+			    struct rb_root *locs)
+{
+	struct crush_loc loc;
+	u16 type_id;
+
+	/*
+	 * Instead of repeated get_immediate_parent() calls,
+	 * the location of @id could be obtained with a single
+	 * depth-first traversal.
+	 */
+	for (;;) {
+		id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
+		if (id >= 0)
+			return -1;  /* not local */
+
+		if (lookup_crush_loc(locs, &loc))
+			return type_id;
+	}
+}