Index: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (revision 1560241)
+++ hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (working copy)
@@ -20,9 +20,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@@ -33,6 +38,9 @@
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.log4j.Level;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* This class tests the DFS positional read functionality in a single node
@@ -44,9 +52,10 @@
boolean simulatedStorage = false;
private void writeFile(FileSystem fileSys, Path name) throws IOException {
+ int replication = 3;// We need > 1 blocks to test out the hedged reads.
// test empty file open and read
- DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0,
- blockSize, (short) 1, seed);
+ DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0, blockSize,
+ (short) replication, seed);
FSDataInputStream in = fileSys.open(name);
byte[] buffer = new byte[12 * blockSize];
in.readFully(0, buffer, 0, 0);
@@ -64,7 +73,7 @@
// now create the real file
DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize,
- blockSize, (short) 1, seed);
+ blockSize, (short) replication, seed);
}
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
@@ -191,26 +200,128 @@
assertTrue(fileSys.delete(name, true));
assertTrue(!fileSys.exists(name));
}
-
+
+ private Callable getPReadFileCallable(final FileSystem fileSys,
+ final Path file) {
+ return new Callable() {
+ public Void call() throws IOException {
+ pReadFile(fileSys, file);
+ return null;
+ }
+ };
+ }
+
/**
* Tests positional read in DFS.
*/
@Test
public void testPreadDFS() throws IOException {
- dfsPreadTest(false, true); //normal pread
- dfsPreadTest(true, true); //trigger read code path without transferTo.
+ Configuration conf = new Configuration();
+ dfsPreadTest(conf, false, true); // normal pread
+ dfsPreadTest(conf, true, true); // trigger read code path without
+ // transferTo.
}
@Test
public void testPreadDFSNoChecksum() throws IOException {
+ Configuration conf = new Configuration();
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
- dfsPreadTest(false, false);
- dfsPreadTest(true, false);
+ dfsPreadTest(conf, false, false);
+ dfsPreadTest(conf, true, false);
}
- private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
+ /**
+ * Tests positional read in DFS, with hedged reads enabled.
+ */
+ @Test
+ public void testHedgedPreadDFSBasic() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
+ conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 100);
+ dfsPreadTest(conf, false, true); // normal pread
+ dfsPreadTest(conf, true, true); // trigger read code path without
+ // transferTo.
+ }
+
+ @Test
+ public void testMaxOutHedgedReadPool() throws IOException,
+ InterruptedException, ExecutionException {
+ Configuration conf = new Configuration();
+ int numHedgedReadPoolThreads = 5;
+ final int initialHedgedReadTimeoutMillis = 500;
+ final int fixedSleepIntervalMillis = 50;
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+ numHedgedReadPoolThreads);
+ conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+ initialHedgedReadTimeoutMillis);
+
+ // Set up the InjectionHandler
+ DFSClientFaultInjector.instance = Mockito
+ .mock(DFSClientFaultInjector.class);
+ DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
+ // make preads sleep for 50ms
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(fixedSleepIntervalMillis);
+ return null;
+ }
+ }).when(injector).startFetchFromDatanode();
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .format(true).build();
+ DistributedFileSystem fileSys = cluster.getFileSystem();
+ DFSClient dfsClient = fileSys.getClient();
+ DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
+
+ try {
+ Path file1 = new Path("hedgedReadMaxOut.dat");
+ writeFile(fileSys, file1);
+ // Basic test. Reads complete within timeout. Assert that there were no
+ // hedged reads.
+ pReadFile(fileSys, file1);
+ // assert that there were no hedged reads. 50ms + delta 0);
+ assertTrue(metrics.getHedgedReadOpsInCurThread() == 0);
+ /*
+ * Multiple threads reading. Reads take longer than timeout. Assert that
+ * there were hedged reads. And that reads had to run in the current
+ * thread.
+ */
+ int factor = 10;
+ int numHedgedReads = numHedgedReadPoolThreads * factor;
+ long initialReadOpsValue = metrics.getHedgedReadOps();
+ ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads);
+ ArrayList> futures = new ArrayList>();
+ for (int i = 0; i initialReadOpsValue);
+ assertTrue(metrics.getHedgedReadOpsInCurThread() > 0);
+ cleanupFile(fileSys, file1);
+ executor.shutdown();
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ Mockito.reset(injector);
+ }
+ }
+
+ private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
throws IOException {
- Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
if (simulatedStorage) {
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (revision 1560241)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (working copy)
@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
@@ -46,4 +45,6 @@
public boolean failPacket() {
return false;
}
+
+ public void startFetchFromDatanode() {}
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (revision 1560241)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (working copy)
@@ -574,4 +574,14 @@
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
+
+ // hedged read properties
+ public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
+ "dfs.dfsclient.hedged.read.threshold.millis";
+ public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
+ 500;
+
+ public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
+ "dfs.dfsclient.hedged.read.threadpool.size";
+ public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (revision 1560241)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (working copy)
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,7 +33,13 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -861,13 +868,17 @@
corruptedBlockMap.put(blk, dnSet);
}
}
-
- private DNAddrPair chooseDataNode(LocatedBlock block)
- throws IOException {
+
+ private DNAddrPair chooseDataNode(LocatedBlock block) throws IOException {
+ return chooseDataNode(block, null);
+ }
+
+ private DNAddrPair chooseDataNode(LocatedBlock block,
+ Collection ignoredNodes) throws IOException {
while (true) {
DatanodeInfo[] nodes = block.getLocations();
try {
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
+ DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
@@ -876,17 +887,21 @@
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) {
+ String errMsg = getBestNodeErrorString(nodes, deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
- throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
- block.getStartOffset());
+ String description = "Could not obtain block: " + blockInfo;
+ DFSClient.LOG.warn(description + errMsg
+ + ". Throwing a BlockMissingException");
+ throw new BlockMissingException(src, description,
+ block.getStartOffset());
}
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + ie
+ + " from any node: " + ie + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
@@ -918,29 +933,62 @@
byte[] buf, int offset,
Map> corruptedBlockMap)
throws IOException {
- //
- // Connect to best DataNode for desired Block, with potential offset
- //
+ block = getBlockAt(block.getStartOffset(), false);
+ while (true) {
+ DNAddrPair retval = chooseDataNode(block);
+ try {
+ actualGetFromOneDataNode(retval, block, start, end, buf, offset,
+ corruptedBlockMap);
+ return;
+ } catch (IOException e) {
+ // Ignore. Already processed inside the function.
+ // Loop through to try the next node.
+ }
+ }
+ }
+
+ private Callable getFromOneDataNode(final DNAddrPair datanode,
+ final LocatedBlock block, final long start, final long end,
+ final ByteBuffer bb,
+ final Map> corruptedBlockMap,
+ final CountDownLatch latch) {
+ return new Callable() {
+ @Override
+ public ByteBuffer call() throws Exception {
+ byte[] buf = bb.array();
+ int offset = bb.position();
+ actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+ corruptedBlockMap);
+ latch.countDown();
+ return bb;
+ }
+ };
+ }
+
+ private void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long start, final long end, byte[] buf,
+ int offset, Map> corruptedBlockMap)
+ throws IOException {
+ DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
+
while (true) {
// cached block locations may have been updated by chooseDataNode()
- // or fetchBlockAt(). Always get the latest list of locations at the
+ // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
CachingStrategy curCachingStrategy;
synchronized (this) {
block = getBlockAt(block.getStartOffset(), false);
curCachingStrategy = cachingStrategy;
}
- DNAddrPair retval = chooseDataNode(block);
- DatanodeInfo chosenNode = retval.info;
- InetSocketAddress targetAddr = retval.addr;
+ DatanodeInfo chosenNode = datanode.info;
+ InetSocketAddress targetAddr = datanode.addr;
BlockReader reader = null;
-
+
try {
Token blockToken = block.getBlockToken();
-
+
int len = (int) (end - start + 1);
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
blockToken, start, len, buffersize, verifyChecksum,
@@ -952,11 +1000,14 @@
}
return;
} catch (ChecksumException e) {
- DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
- src + " at " + block.getBlock() + ":" +
- e.getPos() + " from " + chosenNode);
+ String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ + chosenNode;
+ DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
+ addToDeadNodes(chosenNode);
+ throw new IOException(msg);
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed " + ex);
dfsClient.disableLegacyBlockReaderLocal();
@@ -972,25 +1023,122 @@
continue;
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
- fetchBlockAt(block.getStartOffset());
+ try {
+ fetchBlockAt(block.getStartOffset());
+ } catch (IOException fbae) {
+ // ignore IOE, since we can retry it later in a loop
+ }
continue;
} else {
- DFSClient.LOG.warn("Failed to connect to " + targetAddr +
- " for file " + src + " for block " + block.getBlock() + ":" + e);
+ String msg = "Failed to connect to " + targetAddr + " for file "
+ + src + " for block " + block.getBlock() + ":" + e;
if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connection failure ", e);
+ DFSClient.LOG.debug("Connection failure: " + msg, e);
}
+ addToDeadNodes(chosenNode);
+ throw new IOException(msg);
}
} finally {
if (reader != null) {
reader.close();
}
}
- // Put chosen node into dead list, continue
- addToDeadNodes(chosenNode);
}
}
+ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
+ long end, byte[] buf, int offset,
+ Map> corruptedBlockMap)
+ throws IOException {
+ ArrayList> futures = null;
+ ArrayList ignored = new ArrayList();
+ ByteBuffer bb = null;
+ int len = (int) (end - start + 1);
+ block = getBlockAt(block.getStartOffset(), false);
+ while (true) {
+ DNAddrPair retval = chooseDataNode(block, ignored);
+ DatanodeInfo chosenNode = retval.info;
+ // futures is null if there is no request already executing.
+ if (futures == null) {
+ bb = ByteBuffer.wrap(buf, offset, len);
+ } else {
+ bb = ByteBuffer.allocate(len);
+ }
+ CountDownLatch hasReceivedResult = new CountDownLatch(1);
+ Callable getFromDataNodeCallable = getFromOneDataNode(retval,
+ block, start, end, bb, corruptedBlockMap, hasReceivedResult);
+ Future future = dfsClient.getHedgedReadsThreadPool()
+ .submit(getFromDataNodeCallable);
+ if (futures == null) {
+ try {
+ future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
+ return;
+ } catch (TimeoutException e) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Waited for "
+ + dfsClient.getHedgedReadTimeout()
+ + "ms . Still not complete. Spawning new task");
+ }
+ ignored.add(chosenNode);
+ dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+ futures = new ArrayList>();
+ futures.add(future);
+ continue; // no need to refresh block locations
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // Ignore already logged in the call.
+ }
+ } else {
+ futures.add(future);
+ // if not succeded. Submit callables for each datanode in a loop, wait
+ // for a fixed interval and get the result from the fastest one.
+ try {
+ ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
+ // cancel the rest.
+ cancelAll(futures);
+ if (result.array() != buf) { // compare the array pointers
+ dfsClient.getHedgedReadMetrics().incHedgedReadWins();
+ System.arraycopy(result.array(), result.position(), buf, offset,
+ len);
+ }
+ return;
+ } catch (InterruptedException ie) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // exception already handled in the call method. getFirstToComplete
+ // will remove the failing future from the list. nothing more to do.
+ }
+ ignored.add(chosenNode);
+ }
+ // executed if we get an error from a data node
+ block = getBlockAt(block.getStartOffset(), false);
+ }
+ }
+
+ private ByteBuffer getFirstToComplete(ArrayList> futures,
+ CountDownLatch latch) throws ExecutionException, InterruptedException {
+ latch.await();
+ for (Future future : futures) {
+ if (future.isDone()) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ // already logged in the Callable
+ futures.remove(future);
+ throw e;
+ }
+ }
+ }
+ throw new InterruptedException("should never reach here!");
+ }
+
+ private void cancelAll(List> futures) {
+ for (Future future : futures) {
+ future.cancel(true);
+ }
+ }
+
/**
* Should the block access token be refetched on an exception
*
@@ -1245,8 +1393,14 @@
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
- fetchBlockByteRange(blk, targetStart,
- targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ if (dfsClient.isHedgedReadsEnabled()
+ && dfsClient.getHedgedReadsThreadPool() != null) {
+ hedgedFetchBlockByteRange(blk, targetStart, targetStart
+ + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ } else {
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+ buffer, offset, corruptedBlockMap);
+ }
} finally {
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
@@ -1432,12 +1586,13 @@
* Pick the best node from which to stream the data.
* Entries in nodes are already in the priority order
*/
- static DatanodeInfo bestNode(DatanodeInfo nodes[],
- AbstractMap deadNodes)
- throws IOException {
- if (nodes != null) {
+ static DatanodeInfo bestNode(DatanodeInfo nodes[],
+ AbstractMap deadNodes,
+ Collection ignoredNodes) throws IOException {
+ if (nodes != null) {
for (int i = 0; i deadNodes,
+ Collection ignoredNodes) {
+ StringBuilder errMsgr = new StringBuilder(
+ " No live nodes contain current block ");
+ errMsgr.append("Block locations:");
+ for (DatanodeInfo datanode : nodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ errMsgr.append(" Dead nodes: ");
+ for (DatanodeInfo datanode : deadNodes.keySet()) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ if (ignoredNodes != null) {
+ errMsgr.append(" Ignored nodes: ");
+ for (DatanodeInfo datanode : ignoredNodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ }
+ return errMsgr.toString();
+ }
+
/** Utility class to encapsulate data node info and its address. */
static class DNAddrPair {
DatanodeInfo info;
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (revision 1560241)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (working copy)
@@ -76,6 +76,9 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
@@ -167,6 +170,7 @@
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
@@ -216,6 +220,10 @@
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private ClientMmapManager mmapManager;
+ private volatile boolean allowHedgedReads;
+ private volatile long hedgedReadThresholdMillis;
+ private DFSHedgedReadMetrics hedgedReadMetrics;
+ private ThreadPoolExecutor hedgedReadsThreadPool;
private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
new ClientMmapManagerFactory();
@@ -588,6 +596,18 @@
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
+
+ this.hedgedReadThresholdMillis = conf.getLong(
+ DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+ DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
+ int numThreads = conf.getInt(
+ DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+ DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
+ if (numThreads > 0) {
+ this.enableHedgedReads();
+ this.setThreadsNumForHedgedReads(numThreads);
+ }
+ this.hedgedReadMetrics = new DFSHedgedReadMetrics();
}
/**
@@ -2638,4 +2658,52 @@
public ClientMmapManager getMmapManager() {
return mmapManager;
}
+
+ public void setHedgedReadTimeout(long timeoutMillis) {
+ this.hedgedReadThresholdMillis = timeoutMillis;
+ }
+
+ public long getHedgedReadTimeout() {
+ return this.hedgedReadThresholdMillis;
+ }
+
+ public void enableHedgedReads() {
+ allowHedgedReads = true;
+ }
+
+ public void disableHedgedReads() {
+ allowHedgedReads = false;
+ }
+
+ public boolean isHedgedReadsEnabled() {
+ return allowHedgedReads;
+ }
+
+ public void setThreadsNumForHedgedReads(int num) {
+ if (this.hedgedReadsThreadPool == null) {
+ this.hedgedReadsThreadPool = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue(),
+ new Daemon.DaemonFactory(),
+ new ThreadPoolExecutor.CallerRunsPolicy() {
+ public void rejectedExecution(Runnable runnable,
+ ThreadPoolExecutor e) {
+ LOG.info("Execution rejected, Executing in current thread");
+ hedgedReadMetrics.incHedgedReadOpsInCurThread();
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ hedgedReadsThreadPool.allowCoreThreadTimeOut(true);
+ } else {
+ this.hedgedReadsThreadPool.setMaximumPoolSize(num);
+ }
+ }
+
+ protected ThreadPoolExecutor getHedgedReadsThreadPool() {
+ return hedgedReadsThreadPool;
+ }
+
+ public DFSHedgedReadMetrics getHedgedReadMetrics() {
+ return hedgedReadMetrics;
+ }
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java (revision 0)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java (working copy)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The client-side metrics for hedged read feature.
+ * This class has a number of metrics variables that are publicly accessible,
+ * we can grab them from client side, like HBase.
+ */
+public class DFSHedgedReadMetrics {
+ public AtomicLong hedgedReadOps = new AtomicLong();
+ public AtomicLong hedgedReadOpsWin = new AtomicLong();
+ public AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
+
+ public void incHedgedReadOps() {
+ hedgedReadOps.incrementAndGet();
+ }
+
+ public void incHedgedReadOpsInCurThread() {
+ hedgedReadOpsInCurThread.incrementAndGet();
+ }
+
+ public void incHedgedReadWins() {
+ hedgedReadOpsWin.incrementAndGet();
+ }
+
+ public long getHedgedReadOps() {
+ return hedgedReadOps.longValue();
+ }
+
+ public long getHedgedReadOpsInCurThread() {
+ return hedgedReadOpsInCurThread.longValue();
+ }
+
+ public long getHedgedReadWins() {
+ return hedgedReadOpsWin.longValue();
+ }
+}