diff --git a/Documentation/kernel-parameters.txt b/Documentation/kernel-parameters.txt
index 1a855d0c11fa7c07d7e69f9983f478aa26c850e6..eb0a0582d912fdc4dc71f7bf27571a765d10bf15 100644
--- a/Documentation/kernel-parameters.txt
+++ b/Documentation/kernel-parameters.txt
@@ -3877,6 +3877,12 @@ bytes respectively. Such letter suffixes can also be entirely omitted.
 			using these two parameters to set the minimum and
 			maximum port values.
 
+	sunrpc.svc_rpc_per_connection_limit=
+			[NFS,SUNRPC]
+			Limit the number of requests that the server will
+			process in parallel from a single connection.
+			The default value is 0 (no limit).
+
 	sunrpc.pool_mode=
 			[NFS]
 			Control how the NFS server code allocates CPUs to
diff --git a/fs/Kconfig b/fs/Kconfig
index 4524916fa200b1b4e9c8349ff94c78711595be8d..2bc7ad77584287870681fc940995b355322eadc2 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -70,6 +70,12 @@ config FS_POSIX_ACL
 config EXPORTFS
 	tristate
 
+config EXPORTFS_BLOCK_OPS
+	bool "Enable filesystem export operations for block IO"
+	help
+	  This option enables the export operations for a filesystem to support
+	  external block IO.
+
 config FILE_LOCKING
 	bool "Enable POSIX file locking API" if EXPERT
 	default y
diff --git a/fs/nfsd/Kconfig b/fs/nfsd/Kconfig
index c9f583d7bac858c9ac3afc51cf4dd02838b473d5..47febcf9918502a2b55b99a571c50b6b1c824cc3 100644
--- a/fs/nfsd/Kconfig
+++ b/fs/nfsd/Kconfig
@@ -90,6 +90,7 @@ config NFSD_BLOCKLAYOUT
 	bool "NFSv4.1 server support for pNFS block layouts"
 	depends on NFSD_V4 && BLOCK
 	select NFSD_PNFS
+	select EXPORTFS_BLOCK_OPS
 	help
 	  This option enables support for the exporting pNFS block layouts
 	  in the kernel's NFS server. The pNFS block layout enables NFS
@@ -102,6 +103,7 @@ config NFSD_SCSILAYOUT
 	bool "NFSv4.1 server support for pNFS SCSI layouts"
 	depends on NFSD_V4 && BLOCK
 	select NFSD_PNFS
+	select EXPORTFS_BLOCK_OPS
 	help
 	  This option enables support for the exporting pNFS SCSI layouts
 	  in the kernel's NFS server. The pNFS SCSI layout enables NFS
@@ -111,6 +113,23 @@ config NFSD_SCSILAYOUT
 
 	  If unsure, say N.
 
+config NFSD_FLEXFILELAYOUT
+	bool "NFSv4.1 server support for pNFS Flex File layouts"
+	depends on NFSD_V4
+	select NFSD_PNFS
+	help
+	  This option enables support for the exporting pNFS Flex File
+	  layouts in the kernel's NFS server. The pNFS Flex File  layout
+	  enables NFS clients to directly perform I/O to NFSv3 devices
+	  accesible to both the server and the clients.  See
+	  draft-ietf-nfsv4-flex-files for more details.
+
+	  Warning, this server implements the bare minimum functionality
+	  to be a flex file server - it is for testing the client,
+	  not for use in production.
+
+	  If unsure, say N.
+
 config NFSD_V4_SECURITY_LABEL
 	bool "Provide Security Label support for NFSv4 server"
 	depends on NFSD_V4 && SECURITY
diff --git a/fs/nfsd/Makefile b/fs/nfsd/Makefile
index 3ae5f3c77e28b15b532e25668a86403446dc4425..5f5d3a76980c02725018555e7c8f239dfb59f228 100644
--- a/fs/nfsd/Makefile
+++ b/fs/nfsd/Makefile
@@ -20,3 +20,4 @@ nfsd-$(CONFIG_NFSD_V4)	+= nfs4proc.o nfs4xdr.o nfs4state.o nfs4idmap.o \
 nfsd-$(CONFIG_NFSD_PNFS) += nfs4layouts.o
 nfsd-$(CONFIG_NFSD_BLOCKLAYOUT) += blocklayout.o blocklayoutxdr.o
 nfsd-$(CONFIG_NFSD_SCSILAYOUT) += blocklayout.o blocklayoutxdr.o
+nfsd-$(CONFIG_NFSD_FLEXFILELAYOUT) += flexfilelayout.o flexfilelayoutxdr.o
diff --git a/fs/nfsd/blocklayout.c b/fs/nfsd/blocklayout.c
index ad2c05e80a836fcc49002802357f049f61852732..5a17084415103a08b18cfbcb31bc80e9b422de53 100644
--- a/fs/nfsd/blocklayout.c
+++ b/fs/nfsd/blocklayout.c
@@ -163,6 +163,7 @@ nfsd4_block_get_device_info_simple(struct super_block *sb,
 
 static __be32
 nfsd4_block_proc_getdeviceinfo(struct super_block *sb,
+		struct svc_rqst *rqstp,
 		struct nfs4_client *clp,
 		struct nfsd4_getdeviceinfo *gdp)
 {
@@ -355,6 +356,7 @@ nfsd4_block_get_device_info_scsi(struct super_block *sb,
 
 static __be32
 nfsd4_scsi_proc_getdeviceinfo(struct super_block *sb,
+		struct svc_rqst *rqstp,
 		struct nfs4_client *clp,
 		struct nfsd4_getdeviceinfo *gdp)
 {
diff --git a/fs/nfsd/blocklayoutxdr.c b/fs/nfsd/blocklayoutxdr.c
index 4ebaaf4b8d8a425d98418f09f9b445ff9615da32..ac6f54546fdde92b060e10839f631a574fa76cf5 100644
--- a/fs/nfsd/blocklayoutxdr.c
+++ b/fs/nfsd/blocklayoutxdr.c
@@ -44,7 +44,7 @@ nfsd4_block_encode_volume(struct xdr_stream *xdr, struct pnfs_block_volume *b)
 
 	switch (b->type) {
 	case PNFS_BLOCK_VOLUME_SIMPLE:
-		len = 4 + 4 + 8 + 4 + b->simple.sig_len;
+		len = 4 + 4 + 8 + 4 + (XDR_QUADLEN(b->simple.sig_len) << 2);
 		p = xdr_reserve_space(xdr, len);
 		if (!p)
 			return -ETOOSMALL;
@@ -55,7 +55,7 @@ nfsd4_block_encode_volume(struct xdr_stream *xdr, struct pnfs_block_volume *b)
 		p = xdr_encode_opaque(p, b->simple.sig, b->simple.sig_len);
 		break;
 	case PNFS_BLOCK_VOLUME_SCSI:
-		len = 4 + 4 + 4 + 4 + b->scsi.designator_len + 8;
+		len = 4 + 4 + 4 + 4 + (XDR_QUADLEN(b->scsi.designator_len) << 2) + 8;
 		p = xdr_reserve_space(xdr, len);
 		if (!p)
 			return -ETOOSMALL;
diff --git a/fs/nfsd/export.c b/fs/nfsd/export.c
index b4d84b579f20cd5da76866586dfa283d64c6669a..43e109cc0ccc39e8293a7c8926bcb1c105951714 100644
--- a/fs/nfsd/export.c
+++ b/fs/nfsd/export.c
@@ -706,7 +706,7 @@ static void svc_export_init(struct cache_head *cnew, struct cache_head *citem)
 	new->ex_fslocs.locations = NULL;
 	new->ex_fslocs.locations_count = 0;
 	new->ex_fslocs.migrated = 0;
-	new->ex_layout_type = 0;
+	new->ex_layout_types = 0;
 	new->ex_uuid = NULL;
 	new->cd = item->cd;
 }
@@ -731,7 +731,7 @@ static void export_update(struct cache_head *cnew, struct cache_head *citem)
 	item->ex_fslocs.locations_count = 0;
 	new->ex_fslocs.migrated = item->ex_fslocs.migrated;
 	item->ex_fslocs.migrated = 0;
-	new->ex_layout_type = item->ex_layout_type;
+	new->ex_layout_types = item->ex_layout_types;
 	new->ex_nflavors = item->ex_nflavors;
 	for (i = 0; i < MAX_SECINFO_LIST; i++) {
 		new->ex_flavors[i] = item->ex_flavors[i];
@@ -954,6 +954,16 @@ __be32 check_nfsd_access(struct svc_export *exp, struct svc_rqst *rqstp)
 		    rqstp->rq_cred.cr_flavor == RPC_AUTH_UNIX)
 			return 0;
 	}
+
+	/* If the compound op contains a spo_must_allowed op,
+	 * it will be sent with integrity/protection which
+	 * will have to be expressly allowed on mounts that
+	 * don't support it
+	 */
+
+	if (nfsd4_spo_must_allow(rqstp))
+		return 0;
+
 	return nfserr_wrongsec;
 }
 
diff --git a/fs/nfsd/export.h b/fs/nfsd/export.h
index 2e315072bf3fb83e62d0c469a2d43b2d3683392f..730f15eeb7ed561fb50b5485af00f2a5a8aca2f7 100644
--- a/fs/nfsd/export.h
+++ b/fs/nfsd/export.h
@@ -57,7 +57,7 @@ struct svc_export {
 	struct nfsd4_fs_locations ex_fslocs;
 	uint32_t		ex_nflavors;
 	struct exp_flavor_info	ex_flavors[MAX_SECINFO_LIST];
-	enum pnfs_layouttype	ex_layout_type;
+	u32			ex_layout_types;
 	struct nfsd4_deviceid_map *ex_devid_map;
 	struct cache_detail	*cd;
 };
diff --git a/fs/nfsd/flexfilelayout.c b/fs/nfsd/flexfilelayout.c
new file mode 100644
index 0000000000000000000000000000000000000000..df880e9fa71fb60d20fe12f82ebb7ed1011ce19c
--- /dev/null
+++ b/fs/nfsd/flexfilelayout.c
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2016 Tom Haynes <loghyr@primarydata.com>
+ *
+ * The following implements a super-simple flex-file server
+ * where the NFSv4.1 mds is also the ds. And the storage is
+ * the same. I.e., writing to the mds via a NFSv4.1 WRITE
+ * goes to the same location as the NFSv3 WRITE.
+ */
+#include <linux/slab.h>
+
+#include <linux/nfsd/debug.h>
+
+#include <linux/sunrpc/addr.h>
+
+#include "flexfilelayoutxdr.h"
+#include "pnfs.h"
+
+#define NFSDDBG_FACILITY	NFSDDBG_PNFS
+
+static __be32
+nfsd4_ff_proc_layoutget(struct inode *inode, const struct svc_fh *fhp,
+		struct nfsd4_layoutget *args)
+{
+	struct nfsd4_layout_seg *seg = &args->lg_seg;
+	u32 device_generation = 0;
+	int error;
+	uid_t u;
+
+	struct pnfs_ff_layout *fl;
+
+	/*
+	 * The super simple flex file server has 1 mirror, 1 data server,
+	 * and 1 file handle. So instead of 4 allocs, do 1 for now.
+	 * Zero it out for the stateid - don't want junk in there!
+	 */
+	error = -ENOMEM;
+	fl = kzalloc(sizeof(*fl), GFP_KERNEL);
+	if (!fl)
+		goto out_error;
+	args->lg_content = fl;
+
+	/*
+	 * Avoid layout commit, try to force the I/O to the DS,
+	 * and for fun, cause all IOMODE_RW layout segments to
+	 * effectively be WRITE only.
+	 */
+	fl->flags = FF_FLAGS_NO_LAYOUTCOMMIT | FF_FLAGS_NO_IO_THRU_MDS |
+		    FF_FLAGS_NO_READ_IO;
+
+	/* Do not allow a IOMODE_READ segment to have write pemissions */
+	if (seg->iomode == IOMODE_READ) {
+		u = from_kuid(&init_user_ns, inode->i_uid) + 1;
+		fl->uid = make_kuid(&init_user_ns, u);
+	} else
+		fl->uid = inode->i_uid;
+	fl->gid = inode->i_gid;
+
+	error = nfsd4_set_deviceid(&fl->deviceid, fhp, device_generation);
+	if (error)
+		goto out_error;
+
+	fl->fh.size = fhp->fh_handle.fh_size;
+	memcpy(fl->fh.data, &fhp->fh_handle.fh_base, fl->fh.size);
+
+	/* Give whole file layout segments */
+	seg->offset = 0;
+	seg->length = NFS4_MAX_UINT64;
+
+	dprintk("GET: 0x%llx:0x%llx %d\n", seg->offset, seg->length,
+		seg->iomode);
+	return 0;
+
+out_error:
+	seg->length = 0;
+	return nfserrno(error);
+}
+
+static __be32
+nfsd4_ff_proc_getdeviceinfo(struct super_block *sb, struct svc_rqst *rqstp,
+		struct nfs4_client *clp, struct nfsd4_getdeviceinfo *gdp)
+{
+	struct pnfs_ff_device_addr *da;
+
+	u16 port;
+	char addr[INET6_ADDRSTRLEN];
+
+	da = kzalloc(sizeof(struct pnfs_ff_device_addr), GFP_KERNEL);
+	if (!da)
+		return nfserrno(-ENOMEM);
+
+	gdp->gd_device = da;
+
+	da->version = 3;
+	da->minor_version = 0;
+
+	da->rsize = svc_max_payload(rqstp);
+	da->wsize = da->rsize;
+
+	rpc_ntop((struct sockaddr *)&rqstp->rq_daddr,
+		 addr, INET6_ADDRSTRLEN);
+	if (rqstp->rq_daddr.ss_family == AF_INET) {
+		struct sockaddr_in *sin;
+
+		sin = (struct sockaddr_in *)&rqstp->rq_daddr;
+		port = ntohs(sin->sin_port);
+		snprintf(da->netaddr.netid, FF_NETID_LEN + 1, "tcp");
+		da->netaddr.netid_len = 3;
+	} else {
+		struct sockaddr_in6 *sin6;
+
+		sin6 = (struct sockaddr_in6 *)&rqstp->rq_daddr;
+		port = ntohs(sin6->sin6_port);
+		snprintf(da->netaddr.netid, FF_NETID_LEN + 1, "tcp6");
+		da->netaddr.netid_len = 4;
+	}
+
+	da->netaddr.addr_len =
+		snprintf(da->netaddr.addr, FF_ADDR_LEN + 1,
+			 "%s.%hhu.%hhu", addr, port >> 8, port & 0xff);
+
+	da->tightly_coupled = false;
+
+	return 0;
+}
+
+const struct nfsd4_layout_ops ff_layout_ops = {
+	.notify_types		=
+			NOTIFY_DEVICEID4_DELETE | NOTIFY_DEVICEID4_CHANGE,
+	.proc_getdeviceinfo	= nfsd4_ff_proc_getdeviceinfo,
+	.encode_getdeviceinfo	= nfsd4_ff_encode_getdeviceinfo,
+	.proc_layoutget		= nfsd4_ff_proc_layoutget,
+	.encode_layoutget	= nfsd4_ff_encode_layoutget,
+};
diff --git a/fs/nfsd/flexfilelayoutxdr.c b/fs/nfsd/flexfilelayoutxdr.c
new file mode 100644
index 0000000000000000000000000000000000000000..5e3fd7fc1a9fc4bf1aab9ea58b94a70305193f36
--- /dev/null
+++ b/fs/nfsd/flexfilelayoutxdr.c
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2016 Tom Haynes <loghyr@primarydata.com>
+ */
+#include <linux/sunrpc/svc.h>
+#include <linux/nfs4.h>
+
+#include "nfsd.h"
+#include "flexfilelayoutxdr.h"
+
+#define NFSDDBG_FACILITY	NFSDDBG_PNFS
+
+struct ff_idmap {
+	char buf[11];
+	int len;
+};
+
+__be32
+nfsd4_ff_encode_layoutget(struct xdr_stream *xdr,
+		struct nfsd4_layoutget *lgp)
+{
+	struct pnfs_ff_layout *fl = lgp->lg_content;
+	int len, mirror_len, ds_len, fh_len;
+	__be32 *p;
+
+	/*
+	 * Unlike nfsd4_encode_user, we know these will
+	 * always be stringified.
+	 */
+	struct ff_idmap uid;
+	struct ff_idmap gid;
+
+	fh_len = 4 + fl->fh.size;
+
+	uid.len = sprintf(uid.buf, "%u", from_kuid(&init_user_ns, fl->uid));
+	gid.len = sprintf(gid.buf, "%u", from_kgid(&init_user_ns, fl->gid));
+
+	/* 8 + len for recording the length, name, and padding */
+	ds_len = 20 + sizeof(stateid_opaque_t) + 4 + fh_len +
+		 8 + uid.len + 8 + gid.len;
+
+	mirror_len = 4 + ds_len;
+
+	/* The layout segment */
+	len = 20 + mirror_len;
+
+	p = xdr_reserve_space(xdr, sizeof(__be32) + len);
+	if (!p)
+		return nfserr_toosmall;
+
+	*p++ = cpu_to_be32(len);
+	p = xdr_encode_hyper(p, 0);		/* stripe unit of 1 */
+
+	*p++ = cpu_to_be32(1);			/* single mirror */
+	*p++ = cpu_to_be32(1);			/* single data server */
+
+	p = xdr_encode_opaque_fixed(p, &fl->deviceid,
+			sizeof(struct nfsd4_deviceid));
+
+	*p++ = cpu_to_be32(1);			/* efficiency */
+
+	*p++ = cpu_to_be32(fl->stateid.si_generation);
+	p = xdr_encode_opaque_fixed(p, &fl->stateid.si_opaque,
+				    sizeof(stateid_opaque_t));
+
+	*p++ = cpu_to_be32(1);			/* single file handle */
+	p = xdr_encode_opaque(p, fl->fh.data, fl->fh.size);
+
+	p = xdr_encode_opaque(p, uid.buf, uid.len);
+	p = xdr_encode_opaque(p, gid.buf, gid.len);
+
+	*p++ = cpu_to_be32(fl->flags);
+	*p++ = cpu_to_be32(0);			/* No stats collect hint */
+
+	return 0;
+}
+
+__be32
+nfsd4_ff_encode_getdeviceinfo(struct xdr_stream *xdr,
+		struct nfsd4_getdeviceinfo *gdp)
+{
+	struct pnfs_ff_device_addr *da = gdp->gd_device;
+	int len;
+	int ver_len;
+	int addr_len;
+	__be32 *p;
+
+	/* len + padding for two strings */
+	addr_len = 16 + da->netaddr.netid_len + da->netaddr.addr_len;
+	ver_len = 20;
+
+	len = 4 + ver_len + 4 + addr_len;
+
+	p = xdr_reserve_space(xdr, len + sizeof(__be32));
+	if (!p)
+		return nfserr_resource;
+
+	/*
+	 * Fill in the overall length and number of volumes at the beginning
+	 * of the layout.
+	 */
+	*p++ = cpu_to_be32(len);
+	*p++ = cpu_to_be32(1);			/* 1 netaddr */
+	p = xdr_encode_opaque(p, da->netaddr.netid, da->netaddr.netid_len);
+	p = xdr_encode_opaque(p, da->netaddr.addr, da->netaddr.addr_len);
+
+	*p++ = cpu_to_be32(1);			/* 1 versions */
+
+	*p++ = cpu_to_be32(da->version);
+	*p++ = cpu_to_be32(da->minor_version);
+	*p++ = cpu_to_be32(da->rsize);
+	*p++ = cpu_to_be32(da->wsize);
+	*p++ = cpu_to_be32(da->tightly_coupled);
+
+	return 0;
+}
diff --git a/fs/nfsd/flexfilelayoutxdr.h b/fs/nfsd/flexfilelayoutxdr.h
new file mode 100644
index 0000000000000000000000000000000000000000..467defd4e56364188d689c537080ed15be74a602
--- /dev/null
+++ b/fs/nfsd/flexfilelayoutxdr.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2016 Tom Haynes <loghyr@primarydata.com>
+ */
+#ifndef _NFSD_FLEXFILELAYOUTXDR_H
+#define _NFSD_FLEXFILELAYOUTXDR_H 1
+
+#include <linux/inet.h>
+#include "xdr4.h"
+
+#define FF_FLAGS_NO_LAYOUTCOMMIT 1
+#define FF_FLAGS_NO_IO_THRU_MDS  2
+#define FF_FLAGS_NO_READ_IO      4
+
+struct xdr_stream;
+
+#define FF_NETID_LEN		(4)
+#define FF_ADDR_LEN		(INET6_ADDRSTRLEN + 8)
+struct pnfs_ff_netaddr {
+	char				netid[FF_NETID_LEN + 1];
+	char				addr[FF_ADDR_LEN + 1];
+	u32				netid_len;
+	u32				addr_len;
+};
+
+struct pnfs_ff_device_addr {
+	struct pnfs_ff_netaddr		netaddr;
+	u32				version;
+	u32				minor_version;
+	u32				rsize;
+	u32				wsize;
+	bool				tightly_coupled;
+};
+
+struct pnfs_ff_layout {
+	u32				flags;
+	u32				stats_collect_hint;
+	kuid_t				uid;
+	kgid_t				gid;
+	struct nfsd4_deviceid		deviceid;
+	stateid_t			stateid;
+	struct nfs_fh			fh;
+};
+
+__be32 nfsd4_ff_encode_getdeviceinfo(struct xdr_stream *xdr,
+		struct nfsd4_getdeviceinfo *gdp);
+__be32 nfsd4_ff_encode_layoutget(struct xdr_stream *xdr,
+		struct nfsd4_layoutget *lgp);
+
+#endif /* _NFSD_FLEXFILELAYOUTXDR_H */
diff --git a/fs/nfsd/nfs4layouts.c b/fs/nfsd/nfs4layouts.c
index 953c0755cb37e23697a2308800ccaf7bf85232cf..2be9602b0221bd19f7492dad6266f3fd78c4e050 100644
--- a/fs/nfsd/nfs4layouts.c
+++ b/fs/nfsd/nfs4layouts.c
@@ -27,6 +27,9 @@ static const struct nfsd4_callback_ops nfsd4_cb_layout_ops;
 static const struct lock_manager_operations nfsd4_layouts_lm_ops;
 
 const struct nfsd4_layout_ops *nfsd4_layout_ops[LAYOUT_TYPE_MAX] =  {
+#ifdef CONFIG_NFSD_FLEXFILELAYOUT
+	[LAYOUT_FLEX_FILES]	= &ff_layout_ops,
+#endif
 #ifdef CONFIG_NFSD_BLOCKLAYOUT
 	[LAYOUT_BLOCK_VOLUME]	= &bl_layout_ops,
 #endif
@@ -122,28 +125,35 @@ nfsd4_set_deviceid(struct nfsd4_deviceid *id, const struct svc_fh *fhp,
 
 void nfsd4_setup_layout_type(struct svc_export *exp)
 {
+#if defined(CONFIG_NFSD_BLOCKLAYOUT) || defined(CONFIG_NFSD_SCSILAYOUT)
 	struct super_block *sb = exp->ex_path.mnt->mnt_sb;
+#endif
 
 	if (!(exp->ex_flags & NFSEXP_PNFS))
 		return;
 
 	/*
-	 * Check if the file system supports exporting a block-like layout.
+	 * If flex file is configured, use it by default. Otherwise
+	 * check if the file system supports exporting a block-like layout.
 	 * If the block device supports reservations prefer the SCSI layout,
 	 * otherwise advertise the block layout.
 	 */
+#ifdef CONFIG_NFSD_FLEXFILELAYOUT
+	exp->ex_layout_types |= 1 << LAYOUT_FLEX_FILES;
+#endif
 #ifdef CONFIG_NFSD_BLOCKLAYOUT
+	/* overwrite flex file layout selection if needed */
 	if (sb->s_export_op->get_uuid &&
 	    sb->s_export_op->map_blocks &&
 	    sb->s_export_op->commit_blocks)
-		exp->ex_layout_type = LAYOUT_BLOCK_VOLUME;
+		exp->ex_layout_types |= 1 << LAYOUT_BLOCK_VOLUME;
 #endif
 #ifdef CONFIG_NFSD_SCSILAYOUT
 	/* overwrite block layout selection if needed */
 	if (sb->s_export_op->map_blocks &&
 	    sb->s_export_op->commit_blocks &&
 	    sb->s_bdev && sb->s_bdev->bd_disk->fops->pr_ops)
-		exp->ex_layout_type = LAYOUT_SCSI;
+		exp->ex_layout_types |= 1 << LAYOUT_SCSI;
 #endif
 }
 
diff --git a/fs/nfsd/nfs4proc.c b/fs/nfsd/nfs4proc.c
index de1ff1d98bb188a5661893f25e67926b70f7182f..1fb222752b2b154d1c7171b6c1ea766a8472d09e 100644
--- a/fs/nfsd/nfs4proc.c
+++ b/fs/nfsd/nfs4proc.c
@@ -605,8 +605,7 @@ nfsd4_create(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	fh_init(&resfh, NFS4_FHSIZE);
 
-	status = fh_verify(rqstp, &cstate->current_fh, S_IFDIR,
-			   NFSD_MAY_CREATE);
+	status = fh_verify(rqstp, &cstate->current_fh, S_IFDIR, NFSD_MAY_NOP);
 	if (status)
 		return status;
 
@@ -1219,12 +1218,12 @@ nfsd4_verify(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 static const struct nfsd4_layout_ops *
 nfsd4_layout_verify(struct svc_export *exp, unsigned int layout_type)
 {
-	if (!exp->ex_layout_type) {
+	if (!exp->ex_layout_types) {
 		dprintk("%s: export does not support pNFS\n", __func__);
 		return NULL;
 	}
 
-	if (exp->ex_layout_type != layout_type) {
+	if (!(exp->ex_layout_types & (1 << layout_type))) {
 		dprintk("%s: layout type %d not supported\n",
 			__func__, layout_type);
 		return NULL;
@@ -1270,7 +1269,7 @@ nfsd4_getdeviceinfo(struct svc_rqst *rqstp,
 	nfserr = nfs_ok;
 	if (gdp->gd_maxcount != 0) {
 		nfserr = ops->proc_getdeviceinfo(exp->ex_path.mnt->mnt_sb,
-					cstate->session->se_client, gdp);
+				rqstp, cstate->session->se_client, gdp);
 	}
 
 	gdp->gd_notify_types &= ops->notify_types;
@@ -2335,6 +2334,45 @@ static struct nfsd4_operation nfsd4_ops[] = {
 	},
 };
 
+/**
+ * nfsd4_spo_must_allow - Determine if the compound op contains an
+ * operation that is allowed to be sent with machine credentials
+ *
+ * @rqstp: a pointer to the struct svc_rqst
+ *
+ * Checks to see if the compound contains a spo_must_allow op
+ * and confirms that it was sent with the proper machine creds.
+ */
+
+bool nfsd4_spo_must_allow(struct svc_rqst *rqstp)
+{
+	struct nfsd4_compoundres *resp = rqstp->rq_resp;
+	struct nfsd4_compoundargs *argp = rqstp->rq_argp;
+	struct nfsd4_op *this = &argp->ops[resp->opcnt - 1];
+	struct nfsd4_compound_state *cstate = &resp->cstate;
+	struct nfs4_op_map *allow = &cstate->clp->cl_spo_must_allow;
+	u32 opiter;
+
+	if (!cstate->minorversion)
+		return false;
+
+	if (cstate->spo_must_allowed == true)
+		return true;
+
+	opiter = resp->opcnt;
+	while (opiter < argp->opcnt) {
+		this = &argp->ops[opiter++];
+		if (test_bit(this->opnum, allow->u.longs) &&
+			cstate->clp->cl_mach_cred &&
+			nfsd4_mach_creds_match(cstate->clp, rqstp)) {
+			cstate->spo_must_allowed = true;
+			return true;
+		}
+	}
+	cstate->spo_must_allowed = false;
+	return false;
+}
+
 int nfsd4_max_reply(struct svc_rqst *rqstp, struct nfsd4_op *op)
 {
 	struct nfsd4_operation *opdesc;
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index 70d0b9b33031ee88985da10ff2d11c782a2b49f6..8410ca275db1aecf0a1f8a022b92cdc1597ff258 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -1200,27 +1200,6 @@ free_ol_stateid_reaplist(struct list_head *reaplist)
 	}
 }
 
-static void release_lockowner(struct nfs4_lockowner *lo)
-{
-	struct nfs4_client *clp = lo->lo_owner.so_client;
-	struct nfs4_ol_stateid *stp;
-	struct list_head reaplist;
-
-	INIT_LIST_HEAD(&reaplist);
-
-	spin_lock(&clp->cl_lock);
-	unhash_lockowner_locked(lo);
-	while (!list_empty(&lo->lo_owner.so_stateids)) {
-		stp = list_first_entry(&lo->lo_owner.so_stateids,
-				struct nfs4_ol_stateid, st_perstateowner);
-		WARN_ON(!unhash_lock_stateid(stp));
-		put_ol_stateid_locked(stp, &reaplist);
-	}
-	spin_unlock(&clp->cl_lock);
-	free_ol_stateid_reaplist(&reaplist);
-	nfs4_put_stateowner(&lo->lo_owner);
-}
-
 static void release_open_stateid_locks(struct nfs4_ol_stateid *open_stp,
 				       struct list_head *reaplist)
 {
@@ -1972,7 +1951,7 @@ static bool svc_rqst_integrity_protected(struct svc_rqst *rqstp)
 	       service == RPC_GSS_SVC_PRIVACY;
 }
 
-static bool mach_creds_match(struct nfs4_client *cl, struct svc_rqst *rqstp)
+bool nfsd4_mach_creds_match(struct nfs4_client *cl, struct svc_rqst *rqstp)
 {
 	struct svc_cred *cr = &rqstp->rq_cred;
 
@@ -2388,6 +2367,22 @@ nfsd4_exchange_id(struct svc_rqst *rqstp,
 
 	switch (exid->spa_how) {
 	case SP4_MACH_CRED:
+		exid->spo_must_enforce[0] = 0;
+		exid->spo_must_enforce[1] = (
+			1 << (OP_BIND_CONN_TO_SESSION - 32) |
+			1 << (OP_EXCHANGE_ID - 32) |
+			1 << (OP_CREATE_SESSION - 32) |
+			1 << (OP_DESTROY_SESSION - 32) |
+			1 << (OP_DESTROY_CLIENTID - 32));
+
+		exid->spo_must_allow[0] &= (1 << (OP_CLOSE) |
+					1 << (OP_OPEN_DOWNGRADE) |
+					1 << (OP_LOCKU) |
+					1 << (OP_DELEGRETURN));
+
+		exid->spo_must_allow[1] &= (
+					1 << (OP_TEST_STATEID - 32) |
+					1 << (OP_FREE_STATEID - 32));
 		if (!svc_rqst_integrity_protected(rqstp)) {
 			status = nfserr_inval;
 			goto out_nolock;
@@ -2424,7 +2419,7 @@ nfsd4_exchange_id(struct svc_rqst *rqstp,
 				status = nfserr_inval;
 				goto out;
 			}
-			if (!mach_creds_match(conf, rqstp)) {
+			if (!nfsd4_mach_creds_match(conf, rqstp)) {
 				status = nfserr_wrong_cred;
 				goto out;
 			}
@@ -2473,6 +2468,8 @@ nfsd4_exchange_id(struct svc_rqst *rqstp,
 			goto out;
 	}
 	new->cl_minorversion = cstate->minorversion;
+	new->cl_spo_must_allow.u.words[0] = exid->spo_must_allow[0];
+	new->cl_spo_must_allow.u.words[1] = exid->spo_must_allow[1];
 
 	gen_clid(new, nn);
 	add_to_unconfirmed(new);
@@ -2676,7 +2673,7 @@ nfsd4_create_session(struct svc_rqst *rqstp,
 
 	if (conf) {
 		status = nfserr_wrong_cred;
-		if (!mach_creds_match(conf, rqstp))
+		if (!nfsd4_mach_creds_match(conf, rqstp))
 			goto out_free_conn;
 		cs_slot = &conf->cl_cs_slot;
 		status = check_slot_seqid(cr_ses->seqid, cs_slot->sl_seqid, 0);
@@ -2692,7 +2689,7 @@ nfsd4_create_session(struct svc_rqst *rqstp,
 			goto out_free_conn;
 		}
 		status = nfserr_wrong_cred;
-		if (!mach_creds_match(unconf, rqstp))
+		if (!nfsd4_mach_creds_match(unconf, rqstp))
 			goto out_free_conn;
 		cs_slot = &unconf->cl_cs_slot;
 		status = check_slot_seqid(cr_ses->seqid, cs_slot->sl_seqid, 0);
@@ -2801,7 +2798,7 @@ __be32 nfsd4_bind_conn_to_session(struct svc_rqst *rqstp,
 	if (!session)
 		goto out_no_session;
 	status = nfserr_wrong_cred;
-	if (!mach_creds_match(session->se_client, rqstp))
+	if (!nfsd4_mach_creds_match(session->se_client, rqstp))
 		goto out;
 	status = nfsd4_map_bcts_dir(&bcts->dir);
 	if (status)
@@ -2848,7 +2845,7 @@ nfsd4_destroy_session(struct svc_rqst *r,
 	if (!ses)
 		goto out_client_lock;
 	status = nfserr_wrong_cred;
-	if (!mach_creds_match(ses->se_client, r))
+	if (!nfsd4_mach_creds_match(ses->se_client, r))
 		goto out_put_session;
 	status = mark_session_dead_locked(ses, 1 + ref_held_by_me);
 	if (status)
@@ -3087,7 +3084,7 @@ nfsd4_destroy_clientid(struct svc_rqst *rqstp, struct nfsd4_compound_state *csta
 		status = nfserr_stale_clientid;
 		goto out;
 	}
-	if (!mach_creds_match(clp, rqstp)) {
+	if (!nfsd4_mach_creds_match(clp, rqstp)) {
 		clp = NULL;
 		status = nfserr_wrong_cred;
 		goto out;
@@ -3112,7 +3109,7 @@ nfsd4_reclaim_complete(struct svc_rqst *rqstp, struct nfsd4_compound_state *csta
 		 * We don't take advantage of the rca_one_fs case.
 		 * That's OK, it's optional, we can safely ignore it.
 		 */
-		 return nfs_ok;
+		return nfs_ok;
 	}
 
 	status = nfserr_complete_already;
@@ -5945,6 +5942,7 @@ nfsd4_release_lockowner(struct svc_rqst *rqstp,
 	__be32 status;
 	struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);
 	struct nfs4_client *clp;
+	LIST_HEAD (reaplist);
 
 	dprintk("nfsd4_release_lockowner clientid: (%08x/%08x):\n",
 		clid->cl_boot, clid->cl_id);
@@ -5975,9 +5973,23 @@ nfsd4_release_lockowner(struct svc_rqst *rqstp,
 		nfs4_get_stateowner(sop);
 		break;
 	}
+	if (!lo) {
+		spin_unlock(&clp->cl_lock);
+		return status;
+	}
+
+	unhash_lockowner_locked(lo);
+	while (!list_empty(&lo->lo_owner.so_stateids)) {
+		stp = list_first_entry(&lo->lo_owner.so_stateids,
+				       struct nfs4_ol_stateid,
+				       st_perstateowner);
+		WARN_ON(!unhash_lock_stateid(stp));
+		put_ol_stateid_locked(stp, &reaplist);
+	}
 	spin_unlock(&clp->cl_lock);
-	if (lo)
-		release_lockowner(lo);
+	free_ol_stateid_reaplist(&reaplist);
+	nfs4_put_stateowner(&lo->lo_owner);
+
 	return status;
 }
 
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 9df898ba648f73a14b1e42be47dbfddf189ec0a6..0aa0236a142904c6de5123b87ef96c55043c7f7d 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -1299,16 +1299,14 @@ nfsd4_decode_exchange_id(struct nfsd4_compoundargs *argp,
 		break;
 	case SP4_MACH_CRED:
 		/* spo_must_enforce */
-		READ_BUF(4);
-		dummy = be32_to_cpup(p++);
-		READ_BUF(dummy * 4);
-		p += dummy;
-
+		status = nfsd4_decode_bitmap(argp,
+					exid->spo_must_enforce);
+		if (status)
+			goto out;
 		/* spo_must_allow */
-		READ_BUF(4);
-		dummy = be32_to_cpup(p++);
-		READ_BUF(dummy * 4);
-		p += dummy;
+		status = nfsd4_decode_bitmap(argp, exid->spo_must_allow);
+		if (status)
+			goto out;
 		break;
 	case SP4_SSV:
 		/* ssp_ops */
@@ -2164,22 +2162,20 @@ nfsd4_encode_aclname(struct xdr_stream *xdr, struct svc_rqst *rqstp,
 }
 
 static inline __be32
-nfsd4_encode_layout_type(struct xdr_stream *xdr, enum pnfs_layouttype layout_type)
+nfsd4_encode_layout_types(struct xdr_stream *xdr, u32 layout_types)
 {
-	__be32 *p;
+	__be32		*p;
+	unsigned long	i = hweight_long(layout_types);
 
-	if (layout_type) {
-		p = xdr_reserve_space(xdr, 8);
-		if (!p)
-			return nfserr_resource;
-		*p++ = cpu_to_be32(1);
-		*p++ = cpu_to_be32(layout_type);
-	} else {
-		p = xdr_reserve_space(xdr, 4);
-		if (!p)
-			return nfserr_resource;
-		*p++ = cpu_to_be32(0);
-	}
+	p = xdr_reserve_space(xdr, 4 + 4 * i);
+	if (!p)
+		return nfserr_resource;
+
+	*p++ = cpu_to_be32(i);
+
+	for (i = LAYOUT_NFSV4_1_FILES; i < LAYOUT_TYPE_MAX; ++i)
+		if (layout_types & (1 << i))
+			*p++ = cpu_to_be32(i);
 
 	return 0;
 }
@@ -2754,13 +2750,13 @@ nfsd4_encode_fattr(struct xdr_stream *xdr, struct svc_fh *fhp,
 	}
 #ifdef CONFIG_NFSD_PNFS
 	if (bmval1 & FATTR4_WORD1_FS_LAYOUT_TYPES) {
-		status = nfsd4_encode_layout_type(xdr, exp->ex_layout_type);
+		status = nfsd4_encode_layout_types(xdr, exp->ex_layout_types);
 		if (status)
 			goto out;
 	}
 
 	if (bmval2 & FATTR4_WORD2_LAYOUT_TYPES) {
-		status = nfsd4_encode_layout_type(xdr, exp->ex_layout_type);
+		status = nfsd4_encode_layout_types(xdr, exp->ex_layout_types);
 		if (status)
 			goto out;
 	}
@@ -3867,14 +3863,6 @@ nfsd4_encode_write(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd4_w
 	return nfserr;
 }
 
-static const u32 nfs4_minimal_spo_must_enforce[2] = {
-	[1] = 1 << (OP_BIND_CONN_TO_SESSION - 32) |
-	      1 << (OP_EXCHANGE_ID - 32) |
-	      1 << (OP_CREATE_SESSION - 32) |
-	      1 << (OP_DESTROY_SESSION - 32) |
-	      1 << (OP_DESTROY_CLIENTID - 32)
-};
-
 static __be32
 nfsd4_encode_exchange_id(struct nfsd4_compoundres *resp, __be32 nfserr,
 			 struct nfsd4_exchange_id *exid)
@@ -3885,6 +3873,7 @@ nfsd4_encode_exchange_id(struct nfsd4_compoundres *resp, __be32 nfserr,
 	char *server_scope;
 	int major_id_sz;
 	int server_scope_sz;
+	int status = 0;
 	uint64_t minor_id = 0;
 
 	if (nfserr)
@@ -3913,18 +3902,20 @@ nfsd4_encode_exchange_id(struct nfsd4_compoundres *resp, __be32 nfserr,
 	case SP4_NONE:
 		break;
 	case SP4_MACH_CRED:
-		/* spo_must_enforce, spo_must_allow */
-		p = xdr_reserve_space(xdr, 16);
-		if (!p)
-			return nfserr_resource;
-
 		/* spo_must_enforce bitmap: */
-		*p++ = cpu_to_be32(2);
-		*p++ = cpu_to_be32(nfs4_minimal_spo_must_enforce[0]);
-		*p++ = cpu_to_be32(nfs4_minimal_spo_must_enforce[1]);
-		/* empty spo_must_allow bitmap: */
-		*p++ = cpu_to_be32(0);
-
+		status = nfsd4_encode_bitmap(xdr,
+					exid->spo_must_enforce[0],
+					exid->spo_must_enforce[1],
+					exid->spo_must_enforce[2]);
+		if (status)
+			goto out;
+		/* spo_must_allow bitmap: */
+		status = nfsd4_encode_bitmap(xdr,
+					exid->spo_must_allow[0],
+					exid->spo_must_allow[1],
+					exid->spo_must_allow[2]);
+		if (status)
+			goto out;
 		break;
 	default:
 		WARN_ON_ONCE(1);
@@ -3951,6 +3942,8 @@ nfsd4_encode_exchange_id(struct nfsd4_compoundres *resp, __be32 nfserr,
 	/* Implementation id */
 	*p++ = cpu_to_be32(0);	/* zero length nfs_impl_id4 array */
 	return 0;
+out:
+	return status;
 }
 
 static __be32
diff --git a/fs/nfsd/nfsd.h b/fs/nfsd/nfsd.h
index cf980523898b78cc98debc868e2b5a249531ab18..9446849888d52e470d763e75d5dbb5e1f3e60f41 100644
--- a/fs/nfsd/nfsd.h
+++ b/fs/nfsd/nfsd.h
@@ -124,6 +124,7 @@ void nfs4_state_shutdown_net(struct net *net);
 void nfs4_reset_lease(time_t leasetime);
 int nfs4_reset_recoverydir(char *recdir);
 char * nfs4_recoverydir(void);
+bool nfsd4_spo_must_allow(struct svc_rqst *rqstp);
 #else
 static inline int nfsd4_init_slabs(void) { return 0; }
 static inline void nfsd4_free_slabs(void) { }
@@ -134,6 +135,10 @@ static inline void nfs4_state_shutdown_net(struct net *net) { }
 static inline void nfs4_reset_lease(time_t leasetime) { }
 static inline int nfs4_reset_recoverydir(char *recdir) { return 0; }
 static inline char * nfs4_recoverydir(void) {return NULL; }
+static inline bool nfsd4_spo_must_allow(struct svc_rqst *rqstp)
+{
+	return false;
+}
 #endif
 
 /*
diff --git a/fs/nfsd/nfsfh.c b/fs/nfsd/nfsfh.c
index a8919444c46085545ef99bd9b4b72f984745557e..cfe7500d5847baaca753a064d132ed6dd4d58c3b 100644
--- a/fs/nfsd/nfsfh.c
+++ b/fs/nfsd/nfsfh.c
@@ -59,14 +59,20 @@ static int nfsd_acceptable(void *expv, struct dentry *dentry)
  * the write call).
  */
 static inline __be32
-nfsd_mode_check(struct svc_rqst *rqstp, umode_t mode, umode_t requested)
+nfsd_mode_check(struct svc_rqst *rqstp, struct dentry *dentry,
+		umode_t requested)
 {
-	mode &= S_IFMT;
+	umode_t mode = d_inode(dentry)->i_mode & S_IFMT;
 
 	if (requested == 0) /* the caller doesn't care */
 		return nfs_ok;
-	if (mode == requested)
+	if (mode == requested) {
+		if (mode == S_IFDIR && !d_can_lookup(dentry)) {
+			WARN_ON_ONCE(1);
+			return nfserr_notdir;
+		}
 		return nfs_ok;
+	}
 	/*
 	 * v4 has an error more specific than err_notdir which we should
 	 * return in preference to err_notdir:
@@ -298,7 +304,7 @@ static __be32 nfsd_set_fh_dentry(struct svc_rqst *rqstp, struct svc_fh *fhp)
  * that it expects something not of the given type.
  *
  * @access is formed from the NFSD_MAY_* constants defined in
- * include/linux/nfsd/nfsd.h.
+ * fs/nfsd/vfs.h.
  */
 __be32
 fh_verify(struct svc_rqst *rqstp, struct svc_fh *fhp, umode_t type, int access)
@@ -340,7 +346,7 @@ fh_verify(struct svc_rqst *rqstp, struct svc_fh *fhp, umode_t type, int access)
 	if (error)
 		goto out;
 
-	error = nfsd_mode_check(rqstp, d_inode(dentry)->i_mode, type);
+	error = nfsd_mode_check(rqstp, dentry, type);
 	if (error)
 		goto out;
 
@@ -533,7 +539,7 @@ fh_compose(struct svc_fh *fhp, struct svc_export *exp, struct dentry *dentry,
 	 * the reference filehandle (if it is in the same export)
 	 * or the export options.
 	 */
-	 set_version_and_fsid_type(fhp, exp, ref_fh);
+	set_version_and_fsid_type(fhp, exp, ref_fh);
 
 	if (ref_fh == fhp)
 		fh_put(ref_fh);
diff --git a/fs/nfsd/nfsproc.c b/fs/nfsd/nfsproc.c
index 4cd78ef4c95c4def33831856e863a36fc850ff59..e9214768cde90f84138f6279e115a7a8c7350b95 100644
--- a/fs/nfsd/nfsproc.c
+++ b/fs/nfsd/nfsproc.c
@@ -251,9 +251,6 @@ nfsd_proc_create(struct svc_rqst *rqstp, struct nfsd_createargs *argp,
 
 	/* Check for NFSD_MAY_WRITE in nfsd_create if necessary */
 
-	nfserr = nfserr_acces;
-	if (!argp->len)
-		goto done;
 	nfserr = nfserr_exist;
 	if (isdotent(argp->name, argp->len))
 		goto done;
@@ -362,8 +359,8 @@ nfsd_proc_create(struct svc_rqst *rqstp, struct nfsd_createargs *argp,
 	nfserr = 0;
 	if (!inode) {
 		/* File doesn't exist. Create it and set attrs */
-		nfserr = nfsd_create(rqstp, dirfhp, argp->name, argp->len,
-					attr, type, rdev, newfhp);
+		nfserr = nfsd_create_locked(rqstp, dirfhp, argp->name,
+					argp->len, attr, type, rdev, newfhp);
 	} else if (type == S_IFREG) {
 		dprintk("nfsd:   existing %s, valid=%x, size=%ld\n",
 			argp->name, attr->ia_valid, (long) attr->ia_size);
diff --git a/fs/nfsd/nfsxdr.c b/fs/nfsd/nfsxdr.c
index 79d964aa8079f354fcadf50ac2a1d3d216668d4a..41b468a6a90f807fe3f3d2e4ceeaa9f8c7ae0f8c 100644
--- a/fs/nfsd/nfsxdr.c
+++ b/fs/nfsd/nfsxdr.c
@@ -240,7 +240,7 @@ nfssvc_decode_diropargs(struct svc_rqst *rqstp, __be32 *p,
 	 || !(p = decode_filename(p, &args->name, &args->len)))
 		return 0;
 
-	 return xdr_argsize_check(rqstp, p);
+	return xdr_argsize_check(rqstp, p);
 }
 
 int
diff --git a/fs/nfsd/pnfs.h b/fs/nfsd/pnfs.h
index 7d073b9b1553041d32910ef41de7883a90af3b37..0c2a716e87411d6b08258632e966666f34190b82 100644
--- a/fs/nfsd/pnfs.h
+++ b/fs/nfsd/pnfs.h
@@ -21,6 +21,7 @@ struct nfsd4_layout_ops {
 	u32		notify_types;
 
 	__be32 (*proc_getdeviceinfo)(struct super_block *sb,
+			struct svc_rqst *rqstp,
 			struct nfs4_client *clp,
 			struct nfsd4_getdeviceinfo *gdevp);
 	__be32 (*encode_getdeviceinfo)(struct xdr_stream *xdr,
@@ -44,6 +45,9 @@ extern const struct nfsd4_layout_ops bl_layout_ops;
 #ifdef CONFIG_NFSD_SCSILAYOUT
 extern const struct nfsd4_layout_ops scsi_layout_ops;
 #endif
+#ifdef CONFIG_NFSD_FLEXFILELAYOUT
+extern const struct nfsd4_layout_ops ff_layout_ops;
+#endif
 
 __be32 nfsd4_preprocess_layout_stateid(struct svc_rqst *rqstp,
 		struct nfsd4_compound_state *cstate, stateid_t *stateid,
diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
index 64053eadeb818f2a754bd791af7010b992e99d1a..b95adf9a15954b02a9a37f165f3fc8ef331ccfb8 100644
--- a/fs/nfsd/state.h
+++ b/fs/nfsd/state.h
@@ -345,6 +345,7 @@ struct nfs4_client {
 	u32			cl_exchange_flags;
 	/* number of rpc's in progress over an associated session: */
 	atomic_t		cl_refcount;
+	struct nfs4_op_map      cl_spo_must_allow;
 
 	/* for nfs41 callbacks */
 	/* We currently support a single back channel with a single slot */
diff --git a/fs/nfsd/vfs.c b/fs/nfsd/vfs.c
index 6fbd81ecb41080a6e81aa712fc45af77c3a4fde1..ba944123167b92f3a7460d8acc66b02dc7c53575 100644
--- a/fs/nfsd/vfs.c
+++ b/fs/nfsd/vfs.c
@@ -1135,96 +1135,37 @@ nfsd_check_ignore_resizing(struct iattr *iap)
 		iap->ia_valid &= ~ATTR_SIZE;
 }
 
-/*
- * Create a file (regular, directory, device, fifo); UNIX sockets 
- * not yet implemented.
- * If the response fh has been verified, the parent directory should
- * already be locked. Note that the parent directory is left locked.
- *
- * N.B. Every call to nfsd_create needs an fh_put for _both_ fhp and resfhp
- */
+/* The parent directory should already be locked: */
 __be32
-nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
+nfsd_create_locked(struct svc_rqst *rqstp, struct svc_fh *fhp,
 		char *fname, int flen, struct iattr *iap,
 		int type, dev_t rdev, struct svc_fh *resfhp)
 {
-	struct dentry	*dentry, *dchild = NULL;
+	struct dentry	*dentry, *dchild;
 	struct inode	*dirp;
 	__be32		err;
 	__be32		err2;
 	int		host_err;
 
-	err = nfserr_perm;
-	if (!flen)
-		goto out;
-	err = nfserr_exist;
-	if (isdotent(fname, flen))
-		goto out;
-
-	err = fh_verify(rqstp, fhp, S_IFDIR, NFSD_MAY_CREATE);
-	if (err)
-		goto out;
-
 	dentry = fhp->fh_dentry;
 	dirp = d_inode(dentry);
 
-	err = nfserr_notdir;
-	if (!dirp->i_op->lookup)
-		goto out;
-	/*
-	 * Check whether the response file handle has been verified yet.
-	 * If it has, the parent directory should already be locked.
-	 */
-	if (!resfhp->fh_dentry) {
-		host_err = fh_want_write(fhp);
-		if (host_err)
-			goto out_nfserr;
-
-		/* called from nfsd_proc_mkdir, or possibly nfsd3_proc_create */
-		fh_lock_nested(fhp, I_MUTEX_PARENT);
-		dchild = lookup_one_len(fname, dentry, flen);
-		host_err = PTR_ERR(dchild);
-		if (IS_ERR(dchild))
-			goto out_nfserr;
-		err = fh_compose(resfhp, fhp->fh_export, dchild, fhp);
-		if (err)
-			goto out;
-	} else {
-		/* called from nfsd_proc_create */
-		dchild = dget(resfhp->fh_dentry);
-		if (!fhp->fh_locked) {
-			/* not actually possible */
-			printk(KERN_ERR
-				"nfsd_create: parent %pd2 not locked!\n",
+	dchild = dget(resfhp->fh_dentry);
+	if (!fhp->fh_locked) {
+		WARN_ONCE(1, "nfsd_create: parent %pd2 not locked!\n",
 				dentry);
-			err = nfserr_io;
-			goto out;
-		}
-	}
-	/*
-	 * Make sure the child dentry is still negative ...
-	 */
-	err = nfserr_exist;
-	if (d_really_is_positive(dchild)) {
-		dprintk("nfsd_create: dentry %pd/%pd not negative!\n",
-			dentry, dchild);
-		goto out; 
+		err = nfserr_io;
+		goto out;
 	}
 
+	err = nfsd_permission(rqstp, fhp->fh_export, dentry, NFSD_MAY_CREATE);
+	if (err)
+		goto out;
+
 	if (!(iap->ia_valid & ATTR_MODE))
 		iap->ia_mode = 0;
 	iap->ia_mode = (iap->ia_mode & S_IALLUGO) | type;
 
-	err = nfserr_inval;
-	if (!S_ISREG(type) && !S_ISDIR(type) && !special_file(type)) {
-		printk(KERN_WARNING "nfsd: bad file type %o in nfsd_create\n",
-		       type);
-		goto out;
-	}
-
-	/*
-	 * Get the dir op function pointer.
-	 */
 	err = 0;
 	host_err = 0;
 	switch (type) {
@@ -1242,6 +1183,10 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
 	case S_IFSOCK:
 		host_err = vfs_mknod(dirp, dchild, iap->ia_mode, rdev);
 		break;
+	default:
+		printk(KERN_WARNING "nfsd: bad file type %o in nfsd_create\n",
+		       type);
+		host_err = -EINVAL;
 	}
 	if (host_err < 0)
 		goto out_nfserr;
@@ -1251,7 +1196,7 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
 	/*
 	 * nfsd_create_setattr already committed the child.  Transactional
 	 * filesystems had a chance to commit changes for both parent and
-	 * child * simultaneously making the following commit_metadata a
+	 * child simultaneously making the following commit_metadata a
 	 * noop.
 	 */
 	err2 = nfserrno(commit_metadata(fhp));
@@ -1263,8 +1208,7 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
 	if (!err)
 		err = fh_update(resfhp);
 out:
-	if (dchild && !IS_ERR(dchild))
-		dput(dchild);
+	dput(dchild);
 	return err;
 
 out_nfserr:
@@ -1272,6 +1216,50 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
 	goto out;
 }
 
+/*
+ * Create a filesystem object (regular, directory, special).
+ * Note that the parent directory is left locked.
+ *
+ * N.B. Every call to nfsd_create needs an fh_put for _both_ fhp and resfhp
+ */
+__be32
+nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
+		char *fname, int flen, struct iattr *iap,
+		int type, dev_t rdev, struct svc_fh *resfhp)
+{
+	struct dentry	*dentry, *dchild = NULL;
+	struct inode	*dirp;
+	__be32		err;
+	int		host_err;
+
+	if (isdotent(fname, flen))
+		return nfserr_exist;
+
+	err = fh_verify(rqstp, fhp, S_IFDIR, NFSD_MAY_NOP);
+	if (err)
+		return err;
+
+	dentry = fhp->fh_dentry;
+	dirp = d_inode(dentry);
+
+	host_err = fh_want_write(fhp);
+	if (host_err)
+		return nfserrno(host_err);
+
+	fh_lock_nested(fhp, I_MUTEX_PARENT);
+	dchild = lookup_one_len(fname, dentry, flen);
+	host_err = PTR_ERR(dchild);
+	if (IS_ERR(dchild))
+		return nfserrno(host_err);
+	err = fh_compose(resfhp, fhp->fh_export, dchild, fhp);
+	if (err) {
+		dput(dchild);
+		return err;
+	}
+	return nfsd_create_locked(rqstp, fhp, fname, flen, iap, type,
+					rdev, resfhp);
+}
+
 #ifdef CONFIG_NFSD_V3
 
 /*
@@ -1304,12 +1292,6 @@ do_nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
 	dentry = fhp->fh_dentry;
 	dirp = d_inode(dentry);
 
-	/* Get all the sanity checks out of the way before
-	 * we lock the parent. */
-	err = nfserr_notdir;
-	if (!dirp->i_op->lookup)
-		goto out;
-
 	host_err = fh_want_write(fhp);
 	if (host_err)
 		goto out_nfserr;
diff --git a/fs/nfsd/vfs.h b/fs/nfsd/vfs.h
index 2d573ec057f801d6406ccf51d21549eb59009a44..3cbb1b33777b5219aef4116fb6e07fd83183a8a8 100644
--- a/fs/nfsd/vfs.h
+++ b/fs/nfsd/vfs.h
@@ -59,6 +59,9 @@ __be32		nfsd4_vfs_fallocate(struct svc_rqst *, struct svc_fh *,
 __be32		nfsd4_clone_file_range(struct file *, u64, struct file *,
 			u64, u64);
 #endif /* CONFIG_NFSD_V4 */
+__be32		nfsd_create_locked(struct svc_rqst *, struct svc_fh *,
+				char *name, int len, struct iattr *attrs,
+				int type, dev_t rdev, struct svc_fh *res);
 __be32		nfsd_create(struct svc_rqst *, struct svc_fh *,
 				char *name, int len, struct iattr *attrs,
 				int type, dev_t rdev, struct svc_fh *res);
diff --git a/fs/nfsd/xdr4.h b/fs/nfsd/xdr4.h
index d9554813e58afaa15d8e94292e776ea7b7f27f01..beea0c5edc51436cb3525fa0f2cc58463f07a3e0 100644
--- a/fs/nfsd/xdr4.h
+++ b/fs/nfsd/xdr4.h
@@ -59,6 +59,7 @@ struct nfsd4_compound_state {
 	struct nfsd4_session	*session;
 	struct nfsd4_slot	*slot;
 	int			data_offset;
+	bool                    spo_must_allowed;
 	size_t			iovlen;
 	u32			minorversion;
 	__be32			status;
@@ -403,6 +404,8 @@ struct nfsd4_exchange_id {
 	clientid_t	clientid;
 	u32		seqid;
 	int		spa_how;
+	u32             spo_must_enforce[3];
+	u32             spo_must_allow[3];
 };
 
 struct nfsd4_sequence {
@@ -654,6 +657,8 @@ set_change_info(struct nfsd4_change_info *cinfo, struct svc_fh *fhp)
 
 }
 
+
+bool nfsd4_mach_creds_match(struct nfs4_client *cl, struct svc_rqst *rqstp);
 int nfs4svc_encode_voidres(struct svc_rqst *, __be32 *, void *);
 int nfs4svc_decode_compoundargs(struct svc_rqst *, __be32 *,
 		struct nfsd4_compoundargs *);
diff --git a/fs/xfs/Makefile b/fs/xfs/Makefile
index 3542d94fddce5ca4a45e33e08a25fc894b2264eb..52c288514be1ff729a38f7b6d9a8e7232ea60b67 100644
--- a/fs/xfs/Makefile
+++ b/fs/xfs/Makefile
@@ -121,5 +121,4 @@ xfs-$(CONFIG_XFS_RT)		+= xfs_rtalloc.o
 xfs-$(CONFIG_XFS_POSIX_ACL)	+= xfs_acl.o
 xfs-$(CONFIG_SYSCTL)		+= xfs_sysctl.o
 xfs-$(CONFIG_COMPAT)		+= xfs_ioctl32.o
-xfs-$(CONFIG_NFSD_BLOCKLAYOUT)	+= xfs_pnfs.o
-xfs-$(CONFIG_NFSD_SCSILAYOUT)	+= xfs_pnfs.o
+xfs-$(CONFIG_EXPORTFS_BLOCK_OPS)	+= xfs_pnfs.o
diff --git a/fs/xfs/xfs_export.c b/fs/xfs/xfs_export.c
index a1b2dd828b9d0ae447419b304793d03e9ec144f5..fe1bfee35898ea4d9b657c7935f90f586e7ab9c9 100644
--- a/fs/xfs/xfs_export.c
+++ b/fs/xfs/xfs_export.c
@@ -246,7 +246,7 @@ const struct export_operations xfs_export_operations = {
 	.fh_to_parent		= xfs_fs_fh_to_parent,
 	.get_parent		= xfs_fs_get_parent,
 	.commit_metadata	= xfs_fs_nfs_commit_metadata,
-#ifdef CONFIG_NFSD_BLOCKLAYOUT
+#ifdef CONFIG_EXPORTFS_BLOCK_OPS
 	.get_uuid		= xfs_fs_get_uuid,
 	.map_blocks		= xfs_fs_map_blocks,
 	.commit_blocks		= xfs_fs_commit_blocks,
diff --git a/fs/xfs/xfs_pnfs.h b/fs/xfs/xfs_pnfs.h
index 93f74853961b1cce598f5b83e96e622457ae173e..e8339f74966b18b43a929ed29f3d9259aa17142b 100644
--- a/fs/xfs/xfs_pnfs.h
+++ b/fs/xfs/xfs_pnfs.h
@@ -1,7 +1,7 @@
 #ifndef _XFS_PNFS_H
 #define _XFS_PNFS_H 1
 
-#if defined(CONFIG_NFSD_BLOCKLAYOUT) || defined(CONFIG_NFSD_SCSILAYOUT)
+#ifdef CONFIG_EXPORTFS_BLOCK_OPS
 int xfs_fs_get_uuid(struct super_block *sb, u8 *buf, u32 *len, u64 *offset);
 int xfs_fs_map_blocks(struct inode *inode, loff_t offset, u64 length,
 		struct iomap *iomap, bool write, u32 *device_generation);
@@ -15,5 +15,5 @@ xfs_break_layouts(struct inode *inode, uint *iolock, bool with_imutex)
 {
 	return 0;
 }
-#endif /* CONFIG_NFSD_PNFS */
+#endif /* CONFIG_EXPORTFS_BLOCK_OPS */
 #endif /* _XFS_PNFS_H */
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index bfed6b367350cdd34fe4e87598ec5cb94aac60f8..c6564ada9bebb8ea0787e60fc8cea3b46d09433c 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -643,4 +643,15 @@ enum pnfs_update_layout_reason {
 	PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET,
 };
 
+#define NFS4_OP_MAP_NUM_LONGS					\
+	DIV_ROUND_UP(LAST_NFS4_OP, 8 * sizeof(unsigned long))
+#define NFS4_OP_MAP_NUM_WORDS \
+	(NFS4_OP_MAP_NUM_LONGS * sizeof(unsigned long) / sizeof(u32))
+struct nfs4_op_map {
+	union {
+		unsigned long longs[NFS4_OP_MAP_NUM_LONGS];
+		u32 words[NFS4_OP_MAP_NUM_WORDS];
+	} u;
+};
+
 #endif
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 82b81a1c24382740366dfeea40ec11dd611372a6..5bcbbe511be663a91b1f50886e549974ca1e7af3 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1185,17 +1185,6 @@ struct pnfs_ds_commit_info {
 	struct pnfs_commit_bucket *buckets;
 };
 
-#define NFS4_OP_MAP_NUM_LONGS \
-	DIV_ROUND_UP(LAST_NFS4_OP, 8 * sizeof(unsigned long))
-#define NFS4_OP_MAP_NUM_WORDS \
-	(NFS4_OP_MAP_NUM_LONGS * sizeof(unsigned long) / sizeof(u32))
-struct nfs4_op_map {
-	union {
-		unsigned long longs[NFS4_OP_MAP_NUM_LONGS];
-		u32 words[NFS4_OP_MAP_NUM_WORDS];
-	} u;
-};
-
 struct nfs41_state_protection {
 	u32 how;
 	struct nfs4_op_map enforce;
diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
index ed03c9f7f908d7a477ef5349f35439b30b014b85..62a60eeacb0aeaf9907a5247d42706851fec3ffb 100644
--- a/include/linux/sunrpc/cache.h
+++ b/include/linux/sunrpc/cache.h
@@ -78,8 +78,6 @@ struct cache_detail {
 	struct hlist_head *	hash_table;
 	rwlock_t		hash_lock;
 
-	atomic_t		inuse; /* active user-space update or lookup */
-
 	char			*name;
 	void			(*cache_put)(struct kref *);
 
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 7ca44fb5b675d1c078a2a2f121056606282dad58..7321ae933867566013a250623564d722d2800305 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -268,6 +268,7 @@ struct svc_rqst {
 						 * cache pages */
 #define	RQ_VICTIM	(5)			/* about to be shut down */
 #define	RQ_BUSY		(6)			/* request is busy */
+#define	RQ_DATA		(7)			/* request has data */
 	unsigned long		rq_flags;	/* flags field */
 
 	void *			rq_argp;	/* decoded arguments */
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index 79ba50856707b9b9a619be78335b27b383312ed9..ab02a457da1fa8aea378889394730c708b03e89b 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -25,7 +25,6 @@ struct svc_xprt_ops {
 	void		(*xpo_detach)(struct svc_xprt *);
 	void		(*xpo_free)(struct svc_xprt *);
 	int		(*xpo_secure_port)(struct svc_rqst *);
-	void		(*xpo_adjust_wspace)(struct svc_xprt *);
 };
 
 struct svc_xprt_class {
@@ -69,6 +68,7 @@ struct svc_xprt {
 
 	struct svc_serv		*xpt_server;	/* service for transport */
 	atomic_t    	    	xpt_reserved;	/* space on outq that is rsvd */
+	atomic_t		xpt_nr_rqsts;	/* Number of requests */
 	struct mutex		xpt_mutex;	/* to serialize sending data */
 	spinlock_t		xpt_lock;	/* protects sk_deferred
 						 * and xpt_auth_cache */
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index 003dca933803901da37efb168833b67a3a1f06ce..8a707f8a41c3df76b682032cf180eda869a49e1d 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -473,6 +473,39 @@ TRACE_EVENT(svc_recv,
 			show_rqstp_flags(__entry->flags))
 );
 
+DECLARE_EVENT_CLASS(svc_rqst_event,
+
+	TP_PROTO(struct svc_rqst *rqst),
+
+	TP_ARGS(rqst),
+
+	TP_STRUCT__entry(
+		__field(__be32, xid)
+		__field(unsigned long, flags)
+		__dynamic_array(unsigned char, addr, rqst->rq_addrlen)
+	),
+
+	TP_fast_assign(
+		__entry->xid = rqst->rq_xid;
+		__entry->flags = rqst->rq_flags;
+		memcpy(__get_dynamic_array(addr),
+			&rqst->rq_addr, rqst->rq_addrlen);
+	),
+
+	TP_printk("addr=%pIScp rq_xid=0x%x flags=%s",
+		(struct sockaddr *)__get_dynamic_array(addr),
+		be32_to_cpu(__entry->xid),
+		show_rqstp_flags(__entry->flags))
+);
+
+DEFINE_EVENT(svc_rqst_event, svc_defer,
+	TP_PROTO(struct svc_rqst *rqst),
+	TP_ARGS(rqst));
+
+DEFINE_EVENT(svc_rqst_event, svc_drop,
+	TP_PROTO(struct svc_rqst *rqst),
+	TP_ARGS(rqst));
+
 DECLARE_EVENT_CLASS(svc_rqst_status,
 
 	TP_PROTO(struct svc_rqst *rqst, int status),
@@ -529,45 +562,67 @@ TRACE_EVENT(svc_xprt_do_enqueue,
 
 	TP_STRUCT__entry(
 		__field(struct svc_xprt *, xprt)
-		__field_struct(struct sockaddr_storage, ss)
 		__field(int, pid)
 		__field(unsigned long, flags)
+		__dynamic_array(unsigned char, addr, xprt != NULL ?
+			xprt->xpt_remotelen : 0)
 	),
 
 	TP_fast_assign(
 		__entry->xprt = xprt;
-		xprt ? memcpy(&__entry->ss, &xprt->xpt_remote, sizeof(__entry->ss)) : memset(&__entry->ss, 0, sizeof(__entry->ss));
 		__entry->pid = rqst? rqst->rq_task->pid : 0;
-		__entry->flags = xprt ? xprt->xpt_flags : 0;
+		if (xprt) {
+			memcpy(__get_dynamic_array(addr),
+				&xprt->xpt_remote,
+				xprt->xpt_remotelen);
+			__entry->flags = xprt->xpt_flags;
+		} else
+			__entry->flags = 0;
 	),
 
 	TP_printk("xprt=0x%p addr=%pIScp pid=%d flags=%s", __entry->xprt,
-		(struct sockaddr *)&__entry->ss,
+		__get_dynamic_array_len(addr) != 0 ?
+			(struct sockaddr *)__get_dynamic_array(addr) : NULL,
 		__entry->pid, show_svc_xprt_flags(__entry->flags))
 );
 
-TRACE_EVENT(svc_xprt_dequeue,
+DECLARE_EVENT_CLASS(svc_xprt_event,
 	TP_PROTO(struct svc_xprt *xprt),
 
 	TP_ARGS(xprt),
 
 	TP_STRUCT__entry(
 		__field(struct svc_xprt *, xprt)
-		__field_struct(struct sockaddr_storage, ss)
 		__field(unsigned long, flags)
+		__dynamic_array(unsigned char, addr, xprt != NULL ?
+			xprt->xpt_remotelen : 0)
 	),
 
 	TP_fast_assign(
-		__entry->xprt = xprt,
-		xprt ? memcpy(&__entry->ss, &xprt->xpt_remote, sizeof(__entry->ss)) : memset(&__entry->ss, 0, sizeof(__entry->ss));
-		__entry->flags = xprt ? xprt->xpt_flags : 0;
+		__entry->xprt = xprt;
+		if (xprt) {
+			memcpy(__get_dynamic_array(addr),
+					&xprt->xpt_remote,
+					xprt->xpt_remotelen);
+			__entry->flags = xprt->xpt_flags;
+		} else
+			__entry->flags = 0;
 	),
 
 	TP_printk("xprt=0x%p addr=%pIScp flags=%s", __entry->xprt,
-		(struct sockaddr *)&__entry->ss,
+		__get_dynamic_array_len(addr) != 0 ?
+			(struct sockaddr *)__get_dynamic_array(addr) : NULL,
 		show_svc_xprt_flags(__entry->flags))
 );
 
+DEFINE_EVENT(svc_xprt_event, svc_xprt_dequeue,
+	TP_PROTO(struct svc_xprt *xprt),
+	TP_ARGS(xprt));
+
+DEFINE_EVENT(svc_xprt_event, svc_xprt_no_write_space,
+	TP_PROTO(struct svc_xprt *xprt),
+	TP_ARGS(xprt));
+
 TRACE_EVENT(svc_wake_up,
 	TP_PROTO(int pid),
 
@@ -592,21 +647,56 @@ TRACE_EVENT(svc_handle_xprt,
 	TP_STRUCT__entry(
 		__field(struct svc_xprt *, xprt)
 		__field(int, len)
-		__field_struct(struct sockaddr_storage, ss)
 		__field(unsigned long, flags)
+		__dynamic_array(unsigned char, addr, xprt != NULL ?
+			xprt->xpt_remotelen : 0)
 	),
 
 	TP_fast_assign(
 		__entry->xprt = xprt;
-		xprt ? memcpy(&__entry->ss, &xprt->xpt_remote, sizeof(__entry->ss)) : memset(&__entry->ss, 0, sizeof(__entry->ss));
 		__entry->len = len;
-		__entry->flags = xprt ? xprt->xpt_flags : 0;
+		if (xprt) {
+			memcpy(__get_dynamic_array(addr),
+					&xprt->xpt_remote,
+					xprt->xpt_remotelen);
+			__entry->flags = xprt->xpt_flags;
+		} else
+			__entry->flags = 0;
 	),
 
 	TP_printk("xprt=0x%p addr=%pIScp len=%d flags=%s", __entry->xprt,
-		(struct sockaddr *)&__entry->ss,
+		__get_dynamic_array_len(addr) != 0 ?
+			(struct sockaddr *)__get_dynamic_array(addr) : NULL,
 		__entry->len, show_svc_xprt_flags(__entry->flags))
 );
+
+
+DECLARE_EVENT_CLASS(svc_deferred_event,
+	TP_PROTO(struct svc_deferred_req *dr),
+
+	TP_ARGS(dr),
+
+	TP_STRUCT__entry(
+		__field(__be32, xid)
+		__dynamic_array(unsigned char, addr, dr->addrlen)
+	),
+
+	TP_fast_assign(
+		__entry->xid = *(__be32 *)(dr->args + (dr->xprt_hlen>>2));
+		memcpy(__get_dynamic_array(addr), &dr->addr, dr->addrlen);
+	),
+
+	TP_printk("addr=%pIScp xid=0x%x",
+		(struct sockaddr *)__get_dynamic_array(addr),
+		be32_to_cpu(__entry->xid))
+);
+
+DEFINE_EVENT(svc_deferred_event, svc_drop_deferred,
+	TP_PROTO(struct svc_deferred_req *dr),
+	TP_ARGS(dr));
+DEFINE_EVENT(svc_deferred_event, svc_revisit_deferred,
+	TP_PROTO(struct svc_deferred_req *dr),
+	TP_ARGS(dr));
 #endif /* _TRACE_SUNRPC_H */
 
 #include <trace/define_trace.h>
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index e085f5ae1548194603de4af635388624daec6516..1d281816f2bf14e34a71932863e5579e4a5caa35 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1230,8 +1230,9 @@ static int svcauth_gss_proxy_init(struct svc_rqst *rqstp,
 	if (status)
 		goto out;
 
-	dprintk("RPC:       svcauth_gss: gss major status = %d\n",
-			ud.major_status);
+	dprintk("RPC:       svcauth_gss: gss major status = %d "
+			"minor status = %d\n",
+			ud.major_status, ud.minor_status);
 
 	switch (ud.major_status) {
 	case GSS_S_CONTINUE_NEEDED:
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 553bf95f700301a4821d4317e9277f784370bd22..4d8e11f94a35fb12390c291330c56331512f13d5 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -362,7 +362,7 @@ void sunrpc_destroy_cache_detail(struct cache_detail *cd)
 	cache_purge(cd);
 	spin_lock(&cache_list_lock);
 	write_lock(&cd->hash_lock);
-	if (cd->entries || atomic_read(&cd->inuse)) {
+	if (cd->entries) {
 		write_unlock(&cd->hash_lock);
 		spin_unlock(&cache_list_lock);
 		goto out;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 4f01f63102ee5d1ce936f54fa8feb3cedfc15607..c3f652395a80b8ded540bc60fe235ce504e239f7 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -21,6 +21,10 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static unsigned int svc_rpc_per_connection_limit __read_mostly;
+module_param(svc_rpc_per_connection_limit, uint, 0644);
+
+
 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
 static int svc_deferred_recv(struct svc_rqst *rqstp);
 static struct cache_deferred_req *svc_defer(struct cache_req *req);
@@ -329,12 +333,45 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
+static bool svc_xprt_slots_in_range(struct svc_xprt *xprt)
+{
+	unsigned int limit = svc_rpc_per_connection_limit;
+	int nrqsts = atomic_read(&xprt->xpt_nr_rqsts);
+
+	return limit == 0 || (nrqsts >= 0 && nrqsts < limit);
+}
+
+static bool svc_xprt_reserve_slot(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+	if (!test_bit(RQ_DATA, &rqstp->rq_flags)) {
+		if (!svc_xprt_slots_in_range(xprt))
+			return false;
+		atomic_inc(&xprt->xpt_nr_rqsts);
+		set_bit(RQ_DATA, &rqstp->rq_flags);
+	}
+	return true;
+}
+
+static void svc_xprt_release_slot(struct svc_rqst *rqstp)
+{
+	struct svc_xprt	*xprt = rqstp->rq_xprt;
+	if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
+		atomic_dec(&xprt->xpt_nr_rqsts);
+		svc_xprt_enqueue(xprt);
+	}
+}
+
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
 	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
 		return true;
-	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED)))
-		return xprt->xpt_ops->xpo_has_wspace(xprt);
+	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
+		if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
+		    svc_xprt_slots_in_range(xprt))
+			return true;
+		trace_svc_xprt_no_write_space(xprt);
+		return false;
+	}
 	return false;
 }
 
@@ -480,8 +517,6 @@ void svc_reserve(struct svc_rqst *rqstp, int space)
 		atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
 		rqstp->rq_reserved = space;
 
-		if (xprt->xpt_ops->xpo_adjust_wspace)
-			xprt->xpt_ops->xpo_adjust_wspace(xprt);
 		svc_xprt_enqueue(xprt);
 	}
 }
@@ -512,8 +547,8 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
 
 	rqstp->rq_res.head[0].iov_len = 0;
 	svc_reserve(rqstp, 0);
+	svc_xprt_release_slot(rqstp);
 	rqstp->rq_xprt = NULL;
-
 	svc_xprt_put(xprt);
 }
 
@@ -781,7 +816,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
 			svc_add_new_temp_xprt(serv, newxpt);
 		else
 			module_put(xprt->xpt_class->xcl_owner);
-	} else {
+	} else if (svc_xprt_reserve_slot(rqstp, xprt)) {
 		/* XPT_DATA|XPT_DEFERRED case: */
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 			rqstp, rqstp->rq_pool->sp_id, xprt,
@@ -871,6 +906,7 @@ EXPORT_SYMBOL_GPL(svc_recv);
  */
 void svc_drop(struct svc_rqst *rqstp)
 {
+	trace_svc_drop(rqstp);
 	dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt);
 	svc_xprt_release(rqstp);
 }
@@ -1148,6 +1184,7 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
 		spin_unlock(&xprt->xpt_lock);
 		dprintk("revisit canceled\n");
 		svc_xprt_put(xprt);
+		trace_svc_drop_deferred(dr);
 		kfree(dr);
 		return;
 	}
@@ -1205,6 +1242,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req)
 	set_bit(RQ_DROPME, &rqstp->rq_flags);
 
 	dr->handle.revisit = svc_revisit;
+	trace_svc_defer(rqstp);
 	return &dr->handle;
 }
 
@@ -1245,6 +1283,7 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
 				struct svc_deferred_req,
 				handle.recent);
 		list_del_init(&dr->handle.recent);
+		trace_svc_revisit_deferred(dr);
 	} else
 		clear_bit(XPT_DEFERRED, &xprt->xpt_flags);
 	spin_unlock(&xprt->xpt_lock);
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index dadfec66dbd8abd301fa3ac307431dabe7999ec6..57625f64efd56edaa65fe0940aaa7979fe65ef87 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -60,7 +60,6 @@
 
 static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,
 					 int flags);
-static void		svc_udp_data_ready(struct sock *);
 static int		svc_udp_recvfrom(struct svc_rqst *);
 static int		svc_udp_sendto(struct svc_rqst *);
 static void		svc_sock_detach(struct svc_xprt *);
@@ -398,48 +397,21 @@ static int svc_sock_secure_port(struct svc_rqst *rqstp)
 	return svc_port_is_privileged(svc_addr(rqstp));
 }
 
-static bool sunrpc_waitqueue_active(wait_queue_head_t *wq)
-{
-	if (!wq)
-		return false;
-	/*
-	 * There should normally be a memory * barrier here--see
-	 * wq_has_sleeper().
-	 *
-	 * It appears that isn't currently necessary, though, basically
-	 * because callers all appear to have sufficient memory barriers
-	 * between the time the relevant change is made and the
-	 * time they call these callbacks.
-	 *
-	 * The nfsd code itself doesn't actually explicitly wait on
-	 * these waitqueues, but it may wait on them for example in
-	 * sendpage() or sendmsg() calls.  (And those may be the only
-	 * places, since it it uses nonblocking reads.)
-	 *
-	 * Maybe we should add the memory barriers anyway, but these are
-	 * hot paths so we'd need to be convinced there's no sigificant
-	 * penalty.
-	 */
-	return waitqueue_active(wq);
-}
-
 /*
  * INET callback when data has been received on the socket.
  */
-static void svc_udp_data_ready(struct sock *sk)
+static void svc_data_ready(struct sock *sk)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
-	wait_queue_head_t *wq = sk_sleep(sk);
 
 	if (svsk) {
 		dprintk("svc: socket %p(inet %p), busy=%d\n",
 			svsk, sk,
 			test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
-		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
-		svc_xprt_enqueue(&svsk->sk_xprt);
+		svsk->sk_odata(sk);
+		if (!test_and_set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags))
+			svc_xprt_enqueue(&svsk->sk_xprt);
 	}
-	if (sunrpc_waitqueue_active(wq))
-		wake_up_interruptible(wq);
 }
 
 /*
@@ -448,56 +420,22 @@ static void svc_udp_data_ready(struct sock *sk)
 static void svc_write_space(struct sock *sk)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)(sk->sk_user_data);
-	wait_queue_head_t *wq = sk_sleep(sk);
 
 	if (svsk) {
 		dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
 			svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
+		svsk->sk_owspace(sk);
 		svc_xprt_enqueue(&svsk->sk_xprt);
 	}
-
-	if (sunrpc_waitqueue_active(wq)) {
-		dprintk("RPC svc_write_space: someone sleeping on %p\n",
-		       svsk);
-		wake_up_interruptible(wq);
-	}
 }
 
 static int svc_tcp_has_wspace(struct svc_xprt *xprt)
 {
-	struct svc_sock *svsk =	container_of(xprt, struct svc_sock, sk_xprt);
-	struct svc_serv *serv = svsk->sk_xprt.xpt_server;
-	int required;
+	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
 
 	if (test_bit(XPT_LISTENER, &xprt->xpt_flags))
 		return 1;
-	required = atomic_read(&xprt->xpt_reserved) + serv->sv_max_mesg;
-	if (sk_stream_wspace(svsk->sk_sk) >= required ||
-	    (sk_stream_min_wspace(svsk->sk_sk) == 0 &&
-	     atomic_read(&xprt->xpt_reserved) == 0))
-		return 1;
-	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
-	return 0;
-}
-
-static void svc_tcp_write_space(struct sock *sk)
-{
-	struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
-	struct socket *sock = sk->sk_socket;
-
-	if (!sk_stream_is_writeable(sk) || !sock)
-		return;
-	if (!svsk || svc_tcp_has_wspace(&svsk->sk_xprt))
-		clear_bit(SOCK_NOSPACE, &sock->flags);
-	svc_write_space(sk);
-}
-
-static void svc_tcp_adjust_wspace(struct svc_xprt *xprt)
-{
-	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
-
-	if (svc_tcp_has_wspace(xprt))
-		clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+	return !test_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
 }
 
 /*
@@ -746,7 +684,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
 	svc_xprt_init(sock_net(svsk->sk_sock->sk), &svc_udp_class,
 		      &svsk->sk_xprt, serv);
 	clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
-	svsk->sk_sk->sk_data_ready = svc_udp_data_ready;
+	svsk->sk_sk->sk_data_ready = svc_data_ready;
 	svsk->sk_sk->sk_write_space = svc_write_space;
 
 	/* initialise setting must have enough space to
@@ -786,11 +724,12 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
 static void svc_tcp_listen_data_ready(struct sock *sk)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
-	wait_queue_head_t *wq;
 
 	dprintk("svc: socket %p TCP (listen) state change %d\n",
 		sk, sk->sk_state);
 
+	if (svsk)
+		svsk->sk_odata(sk);
 	/*
 	 * This callback may called twice when a new connection
 	 * is established as a child socket inherits everything
@@ -808,10 +747,6 @@ static void svc_tcp_listen_data_ready(struct sock *sk)
 		} else
 			printk("svc: socket %p: no user data\n", sk);
 	}
-
-	wq = sk_sleep(sk);
-	if (sunrpc_waitqueue_active(wq))
-		wake_up_interruptible_all(wq);
 }
 
 /*
@@ -820,7 +755,6 @@ static void svc_tcp_listen_data_ready(struct sock *sk)
 static void svc_tcp_state_change(struct sock *sk)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
-	wait_queue_head_t *wq = sk_sleep(sk);
 
 	dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
 		sk, sk->sk_state, sk->sk_user_data);
@@ -828,26 +762,12 @@ static void svc_tcp_state_change(struct sock *sk)
 	if (!svsk)
 		printk("svc: socket %p: no user data\n", sk);
 	else {
-		set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
-		svc_xprt_enqueue(&svsk->sk_xprt);
-	}
-	if (sunrpc_waitqueue_active(wq))
-		wake_up_interruptible_all(wq);
-}
-
-static void svc_tcp_data_ready(struct sock *sk)
-{
-	struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
-	wait_queue_head_t *wq = sk_sleep(sk);
-
-	dprintk("svc: socket %p TCP data ready (svsk %p)\n",
-		sk, sk->sk_user_data);
-	if (svsk) {
-		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
-		svc_xprt_enqueue(&svsk->sk_xprt);
+		svsk->sk_ostate(sk);
+		if (sk->sk_state != TCP_ESTABLISHED) {
+			set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+			svc_xprt_enqueue(&svsk->sk_xprt);
+		}
 	}
-	if (sunrpc_waitqueue_active(wq))
-		wake_up_interruptible(wq);
 }
 
 /*
@@ -901,6 +821,11 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
 	dprintk("%s: connect from %s\n", serv->sv_name,
 		__svc_print_addr(sin, buf, sizeof(buf)));
 
+	/* Reset the inherited callbacks before calling svc_setup_socket */
+	newsock->sk->sk_state_change = svsk->sk_ostate;
+	newsock->sk->sk_data_ready = svsk->sk_odata;
+	newsock->sk->sk_write_space = svsk->sk_owspace;
+
 	/* make sure that a write doesn't block forever when
 	 * low on memory
 	 */
@@ -1317,7 +1242,6 @@ static struct svc_xprt_ops svc_tcp_ops = {
 	.xpo_has_wspace = svc_tcp_has_wspace,
 	.xpo_accept = svc_tcp_accept,
 	.xpo_secure_port = svc_sock_secure_port,
-	.xpo_adjust_wspace = svc_tcp_adjust_wspace,
 };
 
 static struct svc_xprt_class svc_tcp_class = {
@@ -1357,8 +1281,8 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
 	} else {
 		dprintk("setting up TCP socket for reading\n");
 		sk->sk_state_change = svc_tcp_state_change;
-		sk->sk_data_ready = svc_tcp_data_ready;
-		sk->sk_write_space = svc_tcp_write_space;
+		sk->sk_data_ready = svc_data_ready;
+		sk->sk_write_space = svc_write_space;
 
 		svsk->sk_reclen = 0;
 		svsk->sk_tcplen = 0;
@@ -1368,8 +1292,13 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
 		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
 
 		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
-		if (sk->sk_state != TCP_ESTABLISHED)
+		switch (sk->sk_state) {
+		case TCP_SYN_RECV:
+		case TCP_ESTABLISHED:
+			break;
+		default:
 			set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+		}
 	}
 }
 
@@ -1428,17 +1357,14 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
 	/* Initialize the socket */
 	if (sock->type == SOCK_DGRAM)
 		svc_udp_init(svsk, serv);
-	else {
-		/* initialise setting must have enough space to
-		 * receive and respond to one request.
-		 */
-		svc_sock_setbufsize(svsk->sk_sock, 4 * serv->sv_max_mesg,
-					4 * serv->sv_max_mesg);
+	else
 		svc_tcp_init(svsk, serv);
-	}
 
-	dprintk("svc: svc_setup_socket created %p (inet %p)\n",
-				svsk, svsk->sk_sk);
+	dprintk("svc: svc_setup_socket created %p (inet %p), "
+			"listen %d close %d\n",
+			svsk, svsk->sk_sk,
+			test_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags),
+			test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
 
 	return svsk;
 }
@@ -1606,18 +1532,16 @@ static void svc_sock_detach(struct svc_xprt *xprt)
 {
 	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
 	struct sock *sk = svsk->sk_sk;
-	wait_queue_head_t *wq;
 
 	dprintk("svc: svc_sock_detach(%p)\n", svsk);
 
 	/* put back the old socket callbacks */
+	lock_sock(sk);
 	sk->sk_state_change = svsk->sk_ostate;
 	sk->sk_data_ready = svsk->sk_odata;
 	sk->sk_write_space = svsk->sk_owspace;
-
-	wq = sk_sleep(sk);
-	if (sunrpc_waitqueue_active(wq))
-		wake_up_interruptible(wq);
+	sk->sk_user_data = NULL;
+	release_sock(sk);
 }
 
 /*