Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java =================================================================== --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (revision 1558704) +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (working copy) @@ -118,5 +118,15 @@ = DFSUtil.string2Bytes(DOT_SNAPSHOT_DIR); public static final String SEPARATOR_DOT_SNAPSHOT_DIR - = Path.SEPARATOR + DOT_SNAPSHOT_DIR; + = Path.SEPARATOR + DOT_SNAPSHOT_DIR; + + // quorum read properties + public static final String DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS = + "dfs.dfsclient.quorum.read.threshold.millis"; + public static final long DEFAULT_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS = + 500; + + public static final String DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE = + "dfs.dfsclient.quorum.read.threadpool.size"; + public static final int DEFAULT_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE = 0; } Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java =================================================================== --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (revision 1558704) +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (working copy) @@ -16,7 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hdfs; -import java.io.IOException; import com.google.common.annotations.VisibleForTesting; @@ -46,4 +45,6 @@ public boolean failPacket() { return false; } + + public void startFetchFromDatanode() {} } Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSQuorumReadMetrics.java =================================================================== --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSQuorumReadMetrics.java (revision 0) +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSQuorumReadMetrics.java (working copy) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.concurrent.atomic.AtomicLong; + +public class DFSQuorumReadMetrics { + public AtomicLong readOpsParallel = new AtomicLong(); + public AtomicLong readOpsParallelWin = new AtomicLong(); + public AtomicLong readOpsParallelInCurThread = new AtomicLong(); + + public void incParallelReadOps() { + readOpsParallel.incrementAndGet(); + } + + public void incParallelReadOpsInCurThread() { + readOpsParallelInCurThread.incrementAndGet(); + } + + public void incParallelReadWins() { + readOpsParallelWin.incrementAndGet(); + } + + public long getParallelReadOps() { + return readOpsParallel.intValue(); + } + + public long getParallelReadOpsInCurThread() { + return readOpsParallelInCurThread.intValue(); + } + + public long getParallelReadWins() { + return readOpsParallelWin.intValue(); + } +} Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java =================================================================== --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (revision 1558704) +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (working copy) @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -32,7 +33,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -861,13 +867,17 @@ corruptedBlockMap.put(blk, dnSet); } } - - private DNAddrPair chooseDataNode(LocatedBlock block) - throws IOException { + + private DNAddrPair chooseDataNode(LocatedBlock block) throws IOException { + return chooseDataNode(block, null); + } + + private DNAddrPair chooseDataNode(LocatedBlock block, + Collection ignoredNodes) throws IOException { while (true) { DatanodeInfo[] nodes = block.getLocations(); try { - DatanodeInfo chosenNode = bestNode(nodes, deadNodes); + DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes); final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); if (DFSClient.LOG.isDebugEnabled()) { @@ -876,17 +886,20 @@ InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); return new DNAddrPair(chosenNode, targetAddr); } catch (IOException ie) { + String errMsg = getBestNodeErrorString(nodes, deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getMaxBlockAcquireFailures()) { - throw new BlockMissingException(src, "Could not obtain block: " + blockInfo, - block.getStartOffset()); + DFSClient.LOG.warn("Could not obtain block " + block + errMsg + + ". Throw a BlockMissingException"); + throw new BlockMissingException(src, "Could not obtain block: " + + blockInfo, block.getStartOffset()); } if (nodes == null || nodes.length == 0) { DFSClient.LOG.info("No node available for " + blockInfo); } DFSClient.LOG.info("Could not obtain " + block.getBlock() - + " from any node: " + ie + + " from any node: " + ie + errMsg + ". Will get new block locations from namenode and retry..."); try { // Introducing a random factor to the wait time before another retry. @@ -918,29 +931,60 @@ byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { - // - // Connect to best DataNode for desired Block, with potential offset - // + block = getBlockAt(block.getStartOffset(), false); + while (true) { + DNAddrPair retval = chooseDataNode(block); + try { + actualGetFromOneDataNode(retval, block, start, end, buf, offset, + corruptedBlockMap); + return; + } catch (IOException e) { + // Ignore. Already processed inside the function. + // Loop through to try the next node. + } + } + } + + private Callable getFromOneDataNode(final DNAddrPair datanode, + final LocatedBlock block, final long start, final long end, + final ByteBuffer bb, + final Map> corruptedBlockMap) { + return new Callable() { + @Override + public ByteBuffer call() throws Exception { + byte[] buf = bb.array(); + int offset = bb.position(); + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + return bb; + } + }; + } + + private void actualGetFromOneDataNode(final DNAddrPair datanode, + LocatedBlock block, final long start, final long end, byte[] buf, + int offset, Map> corruptedBlockMap) + throws IOException { + DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once - + while (true) { // cached block locations may have been updated by chooseDataNode() - // or fetchBlockAt(). Always get the latest list of locations at the + // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. CachingStrategy curCachingStrategy; synchronized (this) { block = getBlockAt(block.getStartOffset(), false); curCachingStrategy = cachingStrategy; } - DNAddrPair retval = chooseDataNode(block); - DatanodeInfo chosenNode = retval.info; - InetSocketAddress targetAddr = retval.addr; + DatanodeInfo chosenNode = datanode.info; + InetSocketAddress targetAddr = datanode.addr; BlockReader reader = null; - + try { Token blockToken = block.getBlockToken(); - + int len = (int) (end - start + 1); reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), blockToken, start, len, buffersize, verifyChecksum, @@ -952,11 +996,14 @@ } return; } catch (ChecksumException e) { - DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + - src + " at " + block.getBlock() + ":" + - e.getPos() + " from " + chosenNode); + String msg = "fetchBlockByteRange(). Got a checksum exception for " + + src + " at " + block.getBlock() + ":" + e.getPos() + " from " + + chosenNode; + DFSClient.LOG.warn(msg); // we want to remember what we have tried addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); + addToDeadNodes(chosenNode); + throw new IOException(msg); } catch (AccessControlException ex) { DFSClient.LOG.warn("Short circuit access failed " + ex); dfsClient.disableLegacyBlockReaderLocal(); @@ -975,22 +1022,118 @@ fetchBlockAt(block.getStartOffset()); continue; } else { - DFSClient.LOG.warn("Failed to connect to " + targetAddr + - " for file " + src + " for block " + block.getBlock() + ":" + e); + String msg = "Failed to connect to " + targetAddr + " for file " + + src + " for block " + block.getBlock() + ":" + e; if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connection failure ", e); + DFSClient.LOG.debug("Connection failure: " + msg, e); } + addToDeadNodes(chosenNode); + throw new IOException(msg); } } finally { if (reader != null) { reader.close(); } } - // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); } } + private void fetchBlockByteRangeSpeculative(LocatedBlock block, long start, + long end, byte[] buf, int offset, + Map> corruptedBlockMap) + throws IOException { + ArrayList> futures = null; + ArrayList ignored = new ArrayList(); + ByteBuffer bb = null; + int len = (int) (end - start + 1); + block = getBlockAt(block.getStartOffset(), false); + while (true) { + DNAddrPair retval = chooseDataNode(block, ignored); + DatanodeInfo chosenNode = retval.info; + // futures is null if there is no request already executing. + if (futures == null) { + bb = ByteBuffer.wrap(buf, offset, len); + } else { + bb = ByteBuffer.allocate(len); + } + Callable getFromDataNodeCallable = getFromOneDataNode(retval, + block, start, end, bb, corruptedBlockMap); + Future future = dfsClient.getParallelReadsThreadPool() + .submit(getFromDataNodeCallable); + if (futures == null) { + try { + // wait for 500 ms. + future.get(dfsClient.getQuorumReadTimeout(), TimeUnit.MILLISECONDS); + return; + } catch (TimeoutException e) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Waited for " + + dfsClient.getQuorumReadTimeout() + + "ms . Still not complete. Spawning new task"); + } + ignored.add(chosenNode); + dfsClient.getQuorumReadMetrics().incParallelReadOps(); + futures = new ArrayList>(); + futures.add(future); + continue; // no need to refresh block locations + } catch (InterruptedException e) { + // Ignore + } catch (ExecutionException e) { + // Ignore already logged in the call. + } + } else { + futures.add(future); + // if not succeded. Submit callables for each datanode in a loop, wait + // for 50 ms each. get + // the result from the fastest one. + try { + ByteBuffer result = getFirst(futures, 50); + // cancel the rest. + cancelAll(futures); + if (result.array() != buf) { // compare the array pointers + dfsClient.getQuorumReadMetrics().incParallelReadWins(); + System.arraycopy(result.array(), result.position(), buf, offset, + len); + } + return; + } catch (InterruptedException ie) { + // Ignore + } catch (ExecutionException e) { + // exception already handled in the call method. getFirst will remove + // the failing future + // from the list. nothing more to do. + } + ignored.add(chosenNode); + } + // executed if we get an error from a data node + block = getBlockAt(block.getStartOffset(), false); + } + } + + private ByteBuffer getFirst(ArrayList> futures, + long timeout) throws ExecutionException, InterruptedException { + while (true) { + for (Future future : futures) { + if (future.isDone()) { + try { + return future.get(); + } catch (ExecutionException e) { + // already logged in the Callable + futures.remove(future); + throw e; + } + } + } + Thread.sleep(timeout); + } + } + + private void cancelAll(List> futures) { + for (Future future : futures) { + future.cancel(true); + } + } + /** * Should the block access token be refetched on an exception * @@ -1245,8 +1388,14 @@ long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { - fetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); + if (dfsClient.allowParallelReads + && dfsClient.getParallelReadsThreadPool() != null) { + fetchBlockByteRangeSpeculative(blk, targetStart, targetStart + + bytesToRead - 1, buffer, offset, corruptedBlockMap); + } else { + fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, + buffer, offset, corruptedBlockMap); + } } finally { // Check and report if any block replicas are corrupted. // BlockMissingException may be caught if all block replicas are @@ -1432,12 +1581,13 @@ * Pick the best node from which to stream the data. * Entries in nodes are already in the priority order */ - static DatanodeInfo bestNode(DatanodeInfo nodes[], - AbstractMap deadNodes) - throws IOException { - if (nodes != null) { + static DatanodeInfo bestNode(DatanodeInfo nodes[], + AbstractMap deadNodes, + Collection ignoredNodes) throws IOException { + if (nodes != null) { for (int i = 0; i deadNodes, + Collection ignoredNodes) { + StringBuilder errMsgr = new StringBuilder( + "No live nodes contain current block "); + errMsgr.append("Block locations:"); + for (DatanodeInfo datanode : nodes) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + errMsgr.append(" Dead nodes: "); + for (DatanodeInfo datanode : deadNodes.keySet()) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + if (ignoredNodes != null) { + errMsgr.append(" Ignored nodes: "); + for (DatanodeInfo datanode : ignoredNodes) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + } + return errMsgr.toString(); + } + /** Utility class to encapsulate data node info and its address. */ static class DNAddrPair { DatanodeInfo info; Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java =================================================================== --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (revision 1558704) +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (working copy) @@ -76,6 +76,9 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; @@ -167,6 +170,7 @@ import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; @@ -216,6 +220,10 @@ private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy; private ClientMmapManager mmapManager; + public volatile boolean allowParallelReads = false; + private volatile long quorumReadThresholdMillis; + private DFSQuorumReadMetrics quorumReadMetrics; + private ThreadPoolExecutor parallelReadsThreadPool = null; private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY = new ClientMmapManagerFactory(); @@ -588,6 +596,18 @@ this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead); this.mmapManager = MMAP_MANAGER_FACTORY.get(conf); + + this.quorumReadThresholdMillis = conf.getLong( + HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS, + HdfsConstants.DEFAULT_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS); + int numThreads = conf.getInt( + HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE, + HdfsConstants.DEFAULT_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE); + if (numThreads > 0) { + this.enableParallelReads(); + this.setNumParallelThreadsForReads(numThreads); + } + this.quorumReadMetrics = new DFSQuorumReadMetrics(); } /** @@ -2638,4 +2658,49 @@ public ClientMmapManager getMmapManager() { return mmapManager; } + + public void setQuorumReadTimeout(long timeoutMillis) { + this.quorumReadThresholdMillis = timeoutMillis; + } + + public long getQuorumReadTimeout() { + return this.quorumReadThresholdMillis; + } + + public void enableParallelReads() { + allowParallelReads = true; + } + + public void disableParallelReads() { + allowParallelReads = false; + } + + public void setNumParallelThreadsForReads(int num) { + if (this.parallelReadsThreadPool == null) { + this.parallelReadsThreadPool = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue(), + new Daemon.DaemonFactory(), + new ThreadPoolExecutor.CallerRunsPolicy() { + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution rejected, Executing in current thread"); + // increment metrics + quorumReadMetrics.incParallelReadOpsInCurThread(); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + parallelReadsThreadPool.allowCoreThreadTimeOut(true); + } else { + this.parallelReadsThreadPool.setMaximumPoolSize(num); + } + } + + public ThreadPoolExecutor getParallelReadsThreadPool() { + return parallelReadsThreadPool; + } + + public DFSQuorumReadMetrics getQuorumReadMetrics() { + return quorumReadMetrics; + } } Index: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java =================================================================== --- hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (revision 1558704) +++ hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (working copy) @@ -20,19 +20,28 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.log4j.Level; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * This class tests the DFS positional read functionality in a single node @@ -44,9 +53,10 @@ boolean simulatedStorage = false; private void writeFile(FileSystem fileSys, Path name) throws IOException { + int replication = 3;// We need > 1 blocks to test out the quorum reads. // test empty file open and read - DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0, - blockSize, (short) 1, seed); + DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0, blockSize, + (short) replication, seed); FSDataInputStream in = fileSys.open(name); byte[] buffer = new byte[12 * blockSize]; in.readFully(0, buffer, 0, 0); @@ -64,7 +74,7 @@ // now create the real file DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize, - blockSize, (short) 1, seed); + blockSize, (short) replication, seed); } private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { @@ -191,26 +201,133 @@ assertTrue(fileSys.delete(name, true)); assertTrue(!fileSys.exists(name)); } - + + private Callable getPReadFileCallable(final FileSystem fileSys, + final Path file) { + return new Callable() { + public Void call() throws IOException { + pReadFile(fileSys, file); + return null; + } + }; + } + /** * Tests positional read in DFS. */ @Test public void testPreadDFS() throws IOException { - dfsPreadTest(false, true); //normal pread - dfsPreadTest(true, true); //trigger read code path without transferTo. + Configuration conf = new Configuration(); + dfsPreadTest(conf, false, true); // normal pread + dfsPreadTest(conf, true, true); // trigger read code path without + // transferTo. } @Test public void testPreadDFSNoChecksum() throws IOException { + Configuration conf = new Configuration(); ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL); - dfsPreadTest(false, false); - dfsPreadTest(true, false); + dfsPreadTest(conf, false, false); + dfsPreadTest(conf, true, false); } - private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum) + /** + * Tests positional read in DFS, with quorum reads enabled. + */ + public void testQuorumPreadDFSBasic() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE, 5); + conf.setLong(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS, 100); + dfsPreadTest(conf, false, true); // normal pread + dfsPreadTest(conf, true, true); // trigger read code path without + // transferTo. + } + + public void testMaxOutQuorumPool() throws IOException, InterruptedException, + ExecutionException { + Configuration conf = new Configuration(); + int numQuorumPoolThreads = 5; + conf.setBoolean("dfs.client.metrics.enable", true); + conf.setInt(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE, + numQuorumPoolThreads); + conf.setLong(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS, 100); + + // Set up the InjectionHandler + DFSClientFaultInjector.instance = Mockito + .mock(DFSClientFaultInjector.class); + DFSClientFaultInjector injector = DFSClientFaultInjector.instance; + // make preads sleep for 60ms + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(60); + return null; + } + }).when(injector).startFetchFromDatanode(); + + MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null); + DistributedFileSystem fileSys = (DistributedFileSystem) cluster + .getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + DFSQuorumReadMetrics metrics = dfsClient.getQuorumReadMetrics(); + + try { + Path file1 = new Path("quorumReadMaxOut.dat"); + writeFile(fileSys, file1); + // time the pReadFile test + long t0, t1; + // Basic test. Reads complete within timeout. Assert that there were no + // quorum reads. + t0 = System.currentTimeMillis(); + pReadFile(fileSys, file1); + t1 = System.currentTimeMillis(); + long pReadTestTime = t1 - t0; + // assert that there were no quorum reads. 60ms + delta 0); + assertTrue(metrics.getParallelReadOpsInCurThread() == 0); + /* + * Multiple threads reading. Reads take longer than timeout. Assert that + * there were quorum reads. And that reads had to run in the current + * thread. + */ + int factor = 10; + int numParallelReads = numQuorumPoolThreads * factor; + long initialReadOpsValue = metrics.getParallelReadOps(); + ExecutorService executor = Executors.newFixedThreadPool(numParallelReads); + ArrayList> futures = new ArrayList>(); + for (int i = 0; i initialReadOpsValue); + assertTrue(metrics.getParallelReadOpsInCurThread() > 0); + cleanupFile(fileSys, file1); + } finally { + fileSys.close(); + cluster.shutdown(); + Mockito.reset(injector); + } + } + + private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) throws IOException { - Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); if (simulatedStorage) {