Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (revision 1558704)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (working copy)
@@ -118,5 +118,15 @@
= DFSUtil.string2Bytes(DOT_SNAPSHOT_DIR);
public static final String SEPARATOR_DOT_SNAPSHOT_DIR
- = Path.SEPARATOR + DOT_SNAPSHOT_DIR;
+ = Path.SEPARATOR + DOT_SNAPSHOT_DIR;
+
+ // quorum read properties
+ public static final String DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS =
+ "dfs.dfsclient.quorum.read.threshold.millis";
+ public static final long DEFAULT_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS =
+ 500;
+
+ public static final String DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE =
+ "dfs.dfsclient.quorum.read.threadpool.size";
+ public static final int DEFAULT_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE = 0;
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (revision 1558704)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (working copy)
@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
@@ -46,4 +45,6 @@
public boolean failPacket() {
return false;
}
+
+ public void startFetchFromDatanode() {}
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSQuorumReadMetrics.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSQuorumReadMetrics.java (revision 0)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSQuorumReadMetrics.java (working copy)
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class DFSQuorumReadMetrics {
+ public AtomicLong readOpsParallel = new AtomicLong();
+ public AtomicLong readOpsParallelWin = new AtomicLong();
+ public AtomicLong readOpsParallelInCurThread = new AtomicLong();
+
+ public void incParallelReadOps() {
+ readOpsParallel.incrementAndGet();
+ }
+
+ public void incParallelReadOpsInCurThread() {
+ readOpsParallelInCurThread.incrementAndGet();
+ }
+
+ public void incParallelReadWins() {
+ readOpsParallelWin.incrementAndGet();
+ }
+
+ public long getParallelReadOps() {
+ return readOpsParallel.intValue();
+ }
+
+ public long getParallelReadOpsInCurThread() {
+ return readOpsParallelInCurThread.intValue();
+ }
+
+ public long getParallelReadWins() {
+ return readOpsParallelWin.intValue();
+ }
+}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (revision 1558704)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (working copy)
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,7 +33,12 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -861,13 +867,17 @@
corruptedBlockMap.put(blk, dnSet);
}
}
-
- private DNAddrPair chooseDataNode(LocatedBlock block)
- throws IOException {
+
+ private DNAddrPair chooseDataNode(LocatedBlock block) throws IOException {
+ return chooseDataNode(block, null);
+ }
+
+ private DNAddrPair chooseDataNode(LocatedBlock block,
+ Collection ignoredNodes) throws IOException {
while (true) {
DatanodeInfo[] nodes = block.getLocations();
try {
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
+ DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
@@ -876,17 +886,20 @@
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) {
+ String errMsg = getBestNodeErrorString(nodes, deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
- throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
- block.getStartOffset());
+ DFSClient.LOG.warn("Could not obtain block " + block + errMsg
+ + ". Throw a BlockMissingException");
+ throw new BlockMissingException(src, "Could not obtain block: "
+ + blockInfo, block.getStartOffset());
}
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + ie
+ + " from any node: " + ie + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
@@ -918,29 +931,60 @@
byte[] buf, int offset,
Map> corruptedBlockMap)
throws IOException {
- //
- // Connect to best DataNode for desired Block, with potential offset
- //
+ block = getBlockAt(block.getStartOffset(), false);
+ while (true) {
+ DNAddrPair retval = chooseDataNode(block);
+ try {
+ actualGetFromOneDataNode(retval, block, start, end, buf, offset,
+ corruptedBlockMap);
+ return;
+ } catch (IOException e) {
+ // Ignore. Already processed inside the function.
+ // Loop through to try the next node.
+ }
+ }
+ }
+
+ private Callable getFromOneDataNode(final DNAddrPair datanode,
+ final LocatedBlock block, final long start, final long end,
+ final ByteBuffer bb,
+ final Map> corruptedBlockMap) {
+ return new Callable() {
+ @Override
+ public ByteBuffer call() throws Exception {
+ byte[] buf = bb.array();
+ int offset = bb.position();
+ actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+ corruptedBlockMap);
+ return bb;
+ }
+ };
+ }
+
+ private void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long start, final long end, byte[] buf,
+ int offset, Map> corruptedBlockMap)
+ throws IOException {
+ DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
+
while (true) {
// cached block locations may have been updated by chooseDataNode()
- // or fetchBlockAt(). Always get the latest list of locations at the
+ // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
CachingStrategy curCachingStrategy;
synchronized (this) {
block = getBlockAt(block.getStartOffset(), false);
curCachingStrategy = cachingStrategy;
}
- DNAddrPair retval = chooseDataNode(block);
- DatanodeInfo chosenNode = retval.info;
- InetSocketAddress targetAddr = retval.addr;
+ DatanodeInfo chosenNode = datanode.info;
+ InetSocketAddress targetAddr = datanode.addr;
BlockReader reader = null;
-
+
try {
Token blockToken = block.getBlockToken();
-
+
int len = (int) (end - start + 1);
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
blockToken, start, len, buffersize, verifyChecksum,
@@ -952,11 +996,14 @@
}
return;
} catch (ChecksumException e) {
- DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
- src + " at " + block.getBlock() + ":" +
- e.getPos() + " from " + chosenNode);
+ String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ + chosenNode;
+ DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
+ addToDeadNodes(chosenNode);
+ throw new IOException(msg);
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed " + ex);
dfsClient.disableLegacyBlockReaderLocal();
@@ -975,22 +1022,118 @@
fetchBlockAt(block.getStartOffset());
continue;
} else {
- DFSClient.LOG.warn("Failed to connect to " + targetAddr +
- " for file " + src + " for block " + block.getBlock() + ":" + e);
+ String msg = "Failed to connect to " + targetAddr + " for file "
+ + src + " for block " + block.getBlock() + ":" + e;
if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connection failure ", e);
+ DFSClient.LOG.debug("Connection failure: " + msg, e);
}
+ addToDeadNodes(chosenNode);
+ throw new IOException(msg);
}
} finally {
if (reader != null) {
reader.close();
}
}
- // Put chosen node into dead list, continue
- addToDeadNodes(chosenNode);
}
}
+ private void fetchBlockByteRangeSpeculative(LocatedBlock block, long start,
+ long end, byte[] buf, int offset,
+ Map> corruptedBlockMap)
+ throws IOException {
+ ArrayList> futures = null;
+ ArrayList ignored = new ArrayList();
+ ByteBuffer bb = null;
+ int len = (int) (end - start + 1);
+ block = getBlockAt(block.getStartOffset(), false);
+ while (true) {
+ DNAddrPair retval = chooseDataNode(block, ignored);
+ DatanodeInfo chosenNode = retval.info;
+ // futures is null if there is no request already executing.
+ if (futures == null) {
+ bb = ByteBuffer.wrap(buf, offset, len);
+ } else {
+ bb = ByteBuffer.allocate(len);
+ }
+ Callable getFromDataNodeCallable = getFromOneDataNode(retval,
+ block, start, end, bb, corruptedBlockMap);
+ Future future = dfsClient.getParallelReadsThreadPool()
+ .submit(getFromDataNodeCallable);
+ if (futures == null) {
+ try {
+ // wait for 500 ms.
+ future.get(dfsClient.getQuorumReadTimeout(), TimeUnit.MILLISECONDS);
+ return;
+ } catch (TimeoutException e) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Waited for "
+ + dfsClient.getQuorumReadTimeout()
+ + "ms . Still not complete. Spawning new task");
+ }
+ ignored.add(chosenNode);
+ dfsClient.getQuorumReadMetrics().incParallelReadOps();
+ futures = new ArrayList>();
+ futures.add(future);
+ continue; // no need to refresh block locations
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // Ignore already logged in the call.
+ }
+ } else {
+ futures.add(future);
+ // if not succeded. Submit callables for each datanode in a loop, wait
+ // for 50 ms each. get
+ // the result from the fastest one.
+ try {
+ ByteBuffer result = getFirst(futures, 50);
+ // cancel the rest.
+ cancelAll(futures);
+ if (result.array() != buf) { // compare the array pointers
+ dfsClient.getQuorumReadMetrics().incParallelReadWins();
+ System.arraycopy(result.array(), result.position(), buf, offset,
+ len);
+ }
+ return;
+ } catch (InterruptedException ie) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // exception already handled in the call method. getFirst will remove
+ // the failing future
+ // from the list. nothing more to do.
+ }
+ ignored.add(chosenNode);
+ }
+ // executed if we get an error from a data node
+ block = getBlockAt(block.getStartOffset(), false);
+ }
+ }
+
+ private ByteBuffer getFirst(ArrayList> futures,
+ long timeout) throws ExecutionException, InterruptedException {
+ while (true) {
+ for (Future future : futures) {
+ if (future.isDone()) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ // already logged in the Callable
+ futures.remove(future);
+ throw e;
+ }
+ }
+ }
+ Thread.sleep(timeout);
+ }
+ }
+
+ private void cancelAll(List> futures) {
+ for (Future future : futures) {
+ future.cancel(true);
+ }
+ }
+
/**
* Should the block access token be refetched on an exception
*
@@ -1245,8 +1388,14 @@
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
- fetchBlockByteRange(blk, targetStart,
- targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ if (dfsClient.allowParallelReads
+ && dfsClient.getParallelReadsThreadPool() != null) {
+ fetchBlockByteRangeSpeculative(blk, targetStart, targetStart
+ + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ } else {
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+ buffer, offset, corruptedBlockMap);
+ }
} finally {
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
@@ -1432,12 +1581,13 @@
* Pick the best node from which to stream the data.
* Entries in nodes are already in the priority order
*/
- static DatanodeInfo bestNode(DatanodeInfo nodes[],
- AbstractMap deadNodes)
- throws IOException {
- if (nodes != null) {
+ static DatanodeInfo bestNode(DatanodeInfo nodes[],
+ AbstractMap deadNodes,
+ Collection ignoredNodes) throws IOException {
+ if (nodes != null) {
for (int i = 0; i deadNodes,
+ Collection ignoredNodes) {
+ StringBuilder errMsgr = new StringBuilder(
+ "No live nodes contain current block ");
+ errMsgr.append("Block locations:");
+ for (DatanodeInfo datanode : nodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ errMsgr.append(" Dead nodes: ");
+ for (DatanodeInfo datanode : deadNodes.keySet()) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ if (ignoredNodes != null) {
+ errMsgr.append(" Ignored nodes: ");
+ for (DatanodeInfo datanode : ignoredNodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ }
+ return errMsgr.toString();
+ }
+
/** Utility class to encapsulate data node info and its address. */
static class DNAddrPair {
DatanodeInfo info;
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (revision 1558704)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (working copy)
@@ -76,6 +76,9 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
@@ -167,6 +170,7 @@
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
@@ -216,6 +220,10 @@
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private ClientMmapManager mmapManager;
+ public volatile boolean allowParallelReads = false;
+ private volatile long quorumReadThresholdMillis;
+ private DFSQuorumReadMetrics quorumReadMetrics;
+ private ThreadPoolExecutor parallelReadsThreadPool = null;
private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
new ClientMmapManagerFactory();
@@ -588,6 +596,18 @@
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
+
+ this.quorumReadThresholdMillis = conf.getLong(
+ HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS,
+ HdfsConstants.DEFAULT_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS);
+ int numThreads = conf.getInt(
+ HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE,
+ HdfsConstants.DEFAULT_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE);
+ if (numThreads > 0) {
+ this.enableParallelReads();
+ this.setNumParallelThreadsForReads(numThreads);
+ }
+ this.quorumReadMetrics = new DFSQuorumReadMetrics();
}
/**
@@ -2638,4 +2658,49 @@
public ClientMmapManager getMmapManager() {
return mmapManager;
}
+
+ public void setQuorumReadTimeout(long timeoutMillis) {
+ this.quorumReadThresholdMillis = timeoutMillis;
+ }
+
+ public long getQuorumReadTimeout() {
+ return this.quorumReadThresholdMillis;
+ }
+
+ public void enableParallelReads() {
+ allowParallelReads = true;
+ }
+
+ public void disableParallelReads() {
+ allowParallelReads = false;
+ }
+
+ public void setNumParallelThreadsForReads(int num) {
+ if (this.parallelReadsThreadPool == null) {
+ this.parallelReadsThreadPool = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue(),
+ new Daemon.DaemonFactory(),
+ new ThreadPoolExecutor.CallerRunsPolicy() {
+ public void rejectedExecution(Runnable runnable,
+ ThreadPoolExecutor e) {
+ LOG.info("Execution rejected, Executing in current thread");
+ // increment metrics
+ quorumReadMetrics.incParallelReadOpsInCurThread();
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ parallelReadsThreadPool.allowCoreThreadTimeOut(true);
+ } else {
+ this.parallelReadsThreadPool.setMaximumPoolSize(num);
+ }
+ }
+
+ public ThreadPoolExecutor getParallelReadsThreadPool() {
+ return parallelReadsThreadPool;
+ }
+
+ public DFSQuorumReadMetrics getQuorumReadMetrics() {
+ return quorumReadMetrics;
+ }
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (revision 1558704)
+++ hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (working copy)
@@ -20,19 +20,28 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.log4j.Level;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* This class tests the DFS positional read functionality in a single node
@@ -44,9 +53,10 @@
boolean simulatedStorage = false;
private void writeFile(FileSystem fileSys, Path name) throws IOException {
+ int replication = 3;// We need > 1 blocks to test out the quorum reads.
// test empty file open and read
- DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0,
- blockSize, (short) 1, seed);
+ DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0, blockSize,
+ (short) replication, seed);
FSDataInputStream in = fileSys.open(name);
byte[] buffer = new byte[12 * blockSize];
in.readFully(0, buffer, 0, 0);
@@ -64,7 +74,7 @@
// now create the real file
DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize,
- blockSize, (short) 1, seed);
+ blockSize, (short) replication, seed);
}
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
@@ -191,26 +201,133 @@
assertTrue(fileSys.delete(name, true));
assertTrue(!fileSys.exists(name));
}
-
+
+ private Callable getPReadFileCallable(final FileSystem fileSys,
+ final Path file) {
+ return new Callable() {
+ public Void call() throws IOException {
+ pReadFile(fileSys, file);
+ return null;
+ }
+ };
+ }
+
/**
* Tests positional read in DFS.
*/
@Test
public void testPreadDFS() throws IOException {
- dfsPreadTest(false, true); //normal pread
- dfsPreadTest(true, true); //trigger read code path without transferTo.
+ Configuration conf = new Configuration();
+ dfsPreadTest(conf, false, true); // normal pread
+ dfsPreadTest(conf, true, true); // trigger read code path without
+ // transferTo.
}
@Test
public void testPreadDFSNoChecksum() throws IOException {
+ Configuration conf = new Configuration();
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
- dfsPreadTest(false, false);
- dfsPreadTest(true, false);
+ dfsPreadTest(conf, false, false);
+ dfsPreadTest(conf, true, false);
}
- private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
+ /**
+ * Tests positional read in DFS, with quorum reads enabled.
+ */
+ public void testQuorumPreadDFSBasic() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE, 5);
+ conf.setLong(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS, 100);
+ dfsPreadTest(conf, false, true); // normal pread
+ dfsPreadTest(conf, true, true); // trigger read code path without
+ // transferTo.
+ }
+
+ public void testMaxOutQuorumPool() throws IOException, InterruptedException,
+ ExecutionException {
+ Configuration conf = new Configuration();
+ int numQuorumPoolThreads = 5;
+ conf.setBoolean("dfs.client.metrics.enable", true);
+ conf.setInt(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE,
+ numQuorumPoolThreads);
+ conf.setLong(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS, 100);
+
+ // Set up the InjectionHandler
+ DFSClientFaultInjector.instance = Mockito
+ .mock(DFSClientFaultInjector.class);
+ DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
+ // make preads sleep for 60ms
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(60);
+ return null;
+ }
+ }).when(injector).startFetchFromDatanode();
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+ DistributedFileSystem fileSys = (DistributedFileSystem) cluster
+ .getFileSystem();
+ DFSClient dfsClient = fileSys.getClient();
+ DFSQuorumReadMetrics metrics = dfsClient.getQuorumReadMetrics();
+
+ try {
+ Path file1 = new Path("quorumReadMaxOut.dat");
+ writeFile(fileSys, file1);
+ // time the pReadFile test
+ long t0, t1;
+ // Basic test. Reads complete within timeout. Assert that there were no
+ // quorum reads.
+ t0 = System.currentTimeMillis();
+ pReadFile(fileSys, file1);
+ t1 = System.currentTimeMillis();
+ long pReadTestTime = t1 - t0;
+ // assert that there were no quorum reads. 60ms + delta 0);
+ assertTrue(metrics.getParallelReadOpsInCurThread() == 0);
+ /*
+ * Multiple threads reading. Reads take longer than timeout. Assert that
+ * there were quorum reads. And that reads had to run in the current
+ * thread.
+ */
+ int factor = 10;
+ int numParallelReads = numQuorumPoolThreads * factor;
+ long initialReadOpsValue = metrics.getParallelReadOps();
+ ExecutorService executor = Executors.newFixedThreadPool(numParallelReads);
+ ArrayList> futures = new ArrayList>();
+ for (int i = 0; i initialReadOpsValue);
+ assertTrue(metrics.getParallelReadOpsInCurThread() > 0);
+ cleanupFile(fileSys, file1);
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ Mockito.reset(injector);
+ }
+ }
+
+ private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
throws IOException {
- Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
if (simulatedStorage) {