001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.nio.ByteBuffer; 024import java.util.Objects; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.locks.Condition; 030import java.util.concurrent.locks.ReentrantLock; 031 032/** 033 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount 034 * of data has been read from the current buffer. It does so by maintaining two buffers: an active buffer and a read 035 * ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read ahead 036 * buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, 037 * we flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 038 * <p> 039 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 040 * </p> 041 * 042 * @since 2.9.0 043 */ 044public class ReadAheadInputStream extends InputStream { 045 046 private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); 047 048 /** 049 * Creates a new daemon executor service. 050 * 051 * @return a new daemon executor service. 052 */ 053 private static ExecutorService newExecutorService() { 054 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread); 055 } 056 057 /** 058 * Creates a new daemon thread. 059 * 060 * @param r the thread's runnable. 061 * @return a new daemon thread. 062 */ 063 private static Thread newThread(final Runnable r) { 064 final Thread thread = new Thread(r, "commons-io-read-ahead"); 065 thread.setDaemon(true); 066 return thread; 067 } 068 069 private final ReentrantLock stateChangeLock = new ReentrantLock(); 070 071 // @GuardedBy("stateChangeLock") 072 private ByteBuffer activeBuffer; 073 074 // @GuardedBy("stateChangeLock") 075 private ByteBuffer readAheadBuffer; 076 077 // @GuardedBy("stateChangeLock") 078 private boolean endOfStream; 079 080 // @GuardedBy("stateChangeLock") 081 // true if async read is in progress 082 private boolean readInProgress; 083 084 // @GuardedBy("stateChangeLock") 085 // true if read is aborted due to an exception in reading from underlying input stream. 086 private boolean readAborted; 087 088 // @GuardedBy("stateChangeLock") 089 private Throwable readException; 090 091 // @GuardedBy("stateChangeLock") 092 // whether the close method is called. 093 private boolean isClosed; 094 095 // @GuardedBy("stateChangeLock") 096 // true when the close method will close the underlying input stream. This is valid only if 097 // `isClosed` is true. 098 private boolean isUnderlyingInputStreamBeingClosed; 099 100 // @GuardedBy("stateChangeLock") 101 // whether there is a read ahead task running, 102 private boolean isReading; 103 104 // Whether there is a reader waiting for data. 105 private final AtomicBoolean isWaiting = new AtomicBoolean(false); 106 107 private final InputStream underlyingInputStream; 108 109 private final ExecutorService executorService; 110 111 private final boolean shutdownExecutorService; 112 113 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 114 115 /** 116 * Creates an instance with the specified buffer size and read-ahead threshold 117 * 118 * @param inputStream The underlying input stream. 119 * @param bufferSizeInBytes The buffer size. 120 */ 121 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 122 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 123 } 124 125 /** 126 * Creates an instance with the specified buffer size and read-ahead threshold 127 * 128 * @param inputStream The underlying input stream. 129 * @param bufferSizeInBytes The buffer size. 130 * @param executorService An executor service for the read-ahead thread. 131 */ 132 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, 133 final ExecutorService executorService) { 134 this(inputStream, bufferSizeInBytes, executorService, false); 135 } 136 137 /** 138 * Creates an instance with the specified buffer size and read-ahead threshold 139 * 140 * @param inputStream The underlying input stream. 141 * @param bufferSizeInBytes The buffer size. 142 * @param executorService An executor service for the read-ahead thread. 143 * @param shutdownExecutorService Whether or not to shutdown the given ExecutorService on close. 144 */ 145 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, 146 final ExecutorService executorService, final boolean shutdownExecutorService) { 147 if (bufferSizeInBytes <= 0) { 148 throw new IllegalArgumentException( 149 "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); 150 } 151 this.executorService = Objects.requireNonNull(executorService, "executorService"); 152 this.underlyingInputStream = Objects.requireNonNull(inputStream, "inputStream"); 153 this.shutdownExecutorService = shutdownExecutorService; 154 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 155 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 156 this.activeBuffer.flip(); 157 this.readAheadBuffer.flip(); 158 } 159 160 @Override 161 public int available() throws IOException { 162 stateChangeLock.lock(); 163 // Make sure we have no integer overflow. 164 try { 165 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 166 } finally { 167 stateChangeLock.unlock(); 168 } 169 } 170 171 private void checkReadException() throws IOException { 172 if (readAborted) { 173 if (readException instanceof IOException) { 174 throw (IOException) readException; 175 } 176 throw new IOException(readException); 177 } 178 } 179 180 @Override 181 public void close() throws IOException { 182 boolean isSafeToCloseUnderlyingInputStream = false; 183 stateChangeLock.lock(); 184 try { 185 if (isClosed) { 186 return; 187 } 188 isClosed = true; 189 if (!isReading) { 190 // Nobody is reading, so we can close the underlying input stream in this method. 191 isSafeToCloseUnderlyingInputStream = true; 192 // Flip this to make sure the read ahead task will not close the underlying input stream. 193 isUnderlyingInputStreamBeingClosed = true; 194 } 195 } finally { 196 stateChangeLock.unlock(); 197 } 198 199 if (shutdownExecutorService) { 200 try { 201 executorService.shutdownNow(); 202 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 203 } catch (final InterruptedException e) { 204 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 205 iio.initCause(e); 206 throw iio; 207 } finally { 208 if (isSafeToCloseUnderlyingInputStream) { 209 underlyingInputStream.close(); 210 } 211 } 212 } 213 } 214 215 private void closeUnderlyingInputStreamIfNecessary() { 216 boolean needToCloseUnderlyingInputStream = false; 217 stateChangeLock.lock(); 218 try { 219 isReading = false; 220 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 221 // close method cannot close underlyingInputStream because we were reading. 222 needToCloseUnderlyingInputStream = true; 223 } 224 } finally { 225 stateChangeLock.unlock(); 226 } 227 if (needToCloseUnderlyingInputStream) { 228 try { 229 underlyingInputStream.close(); 230 } catch (final IOException e) { 231 // TODO ? 232 } 233 } 234 } 235 236 private boolean isEndOfStream() { 237 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 238 } 239 240 @Override 241 public int read() throws IOException { 242 if (activeBuffer.hasRemaining()) { 243 // short path - just get one byte. 244 return activeBuffer.get() & 0xFF; 245 } 246 final byte[] oneByteArray = oneByte.get(); 247 return read(oneByteArray, 0, 1) == EOF ? -1 : oneByteArray[0] & 0xFF; 248 } 249 250 @Override 251 public int read(final byte[] b, final int offset, int len) throws IOException { 252 if (offset < 0 || len < 0 || len > b.length - offset) { 253 throw new IndexOutOfBoundsException(); 254 } 255 if (len == 0) { 256 return 0; 257 } 258 259 if (!activeBuffer.hasRemaining()) { 260 // No remaining in active buffer - lock and switch to write ahead buffer. 261 stateChangeLock.lock(); 262 try { 263 waitForAsyncReadComplete(); 264 if (!readAheadBuffer.hasRemaining()) { 265 // The first read. 266 readAsync(); 267 waitForAsyncReadComplete(); 268 if (isEndOfStream()) { 269 return EOF; 270 } 271 } 272 // Swap the newly read read ahead buffer in place of empty active buffer. 273 swapBuffers(); 274 // After swapping buffers, trigger another async read for read ahead buffer. 275 readAsync(); 276 } finally { 277 stateChangeLock.unlock(); 278 } 279 } 280 len = Math.min(len, activeBuffer.remaining()); 281 activeBuffer.get(b, offset, len); 282 283 return len; 284 } 285 286 /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ 287 private void readAsync() throws IOException { 288 stateChangeLock.lock(); 289 final byte[] arr; 290 try { 291 arr = readAheadBuffer.array(); 292 if (endOfStream || readInProgress) { 293 return; 294 } 295 checkReadException(); 296 readAheadBuffer.position(0); 297 readAheadBuffer.flip(); 298 readInProgress = true; 299 } finally { 300 stateChangeLock.unlock(); 301 } 302 executorService.execute(() -> { 303 stateChangeLock.lock(); 304 try { 305 if (isClosed) { 306 readInProgress = false; 307 return; 308 } 309 // Flip this so that the close method will not close the underlying input stream when we 310 // are reading. 311 isReading = true; 312 } finally { 313 stateChangeLock.unlock(); 314 } 315 316 // Please note that it is safe to release the lock and read into the read ahead buffer 317 // because either of following two conditions will hold: 318 // 319 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 320 // 321 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 322 // for this async read to complete. 323 // 324 // So there is no race condition in both the situations. 325 int read = 0; 326 int off = 0, len = arr.length; 327 Throwable exception = null; 328 try { 329 // try to fill the read ahead buffer. 330 // if a reader is waiting, possibly return early. 331 do { 332 read = underlyingInputStream.read(arr, off, len); 333 if (read <= 0) { 334 break; 335 } 336 off += read; 337 len -= read; 338 } while (len > 0 && !isWaiting.get()); 339 } catch (final Throwable ex) { 340 exception = ex; 341 if (ex instanceof Error) { 342 // `readException` may not be reported to the user. Rethrow Error to make sure at least 343 // The user can see Error in UncaughtExceptionHandler. 344 throw (Error) ex; 345 } 346 } finally { 347 stateChangeLock.lock(); 348 try { 349 readAheadBuffer.limit(off); 350 if (read < 0 || (exception instanceof EOFException)) { 351 endOfStream = true; 352 } else if (exception != null) { 353 readAborted = true; 354 readException = exception; 355 } 356 readInProgress = false; 357 signalAsyncReadComplete(); 358 } finally { 359 stateChangeLock.unlock(); 360 } 361 closeUnderlyingInputStreamIfNecessary(); 362 } 363 }); 364 } 365 366 private void signalAsyncReadComplete() { 367 stateChangeLock.lock(); 368 try { 369 asyncReadComplete.signalAll(); 370 } finally { 371 stateChangeLock.unlock(); 372 } 373 } 374 375 @Override 376 public long skip(final long n) throws IOException { 377 if (n <= 0L) { 378 return 0L; 379 } 380 if (n <= activeBuffer.remaining()) { 381 // Only skipping from active buffer is sufficient 382 activeBuffer.position((int) n + activeBuffer.position()); 383 return n; 384 } 385 stateChangeLock.lock(); 386 long skipped; 387 try { 388 skipped = skipInternal(n); 389 } finally { 390 stateChangeLock.unlock(); 391 } 392 return skipped; 393 } 394 395 /** 396 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is 397 * already acquired in the caller before calling this function. 398 * 399 * @param n the number of bytes to be skipped. 400 * @return the actual number of bytes skipped. 401 */ 402 private long skipInternal(final long n) throws IOException { 403 assert stateChangeLock.isLocked(); 404 waitForAsyncReadComplete(); 405 if (isEndOfStream()) { 406 return 0; 407 } 408 if (available() >= n) { 409 // we can skip from the internal buffers 410 int toSkip = (int) n; 411 // We need to skip from both active buffer and read ahead buffer 412 toSkip -= activeBuffer.remaining(); 413 assert toSkip > 0; // skipping from activeBuffer already handled. 414 activeBuffer.position(0); 415 activeBuffer.flip(); 416 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 417 swapBuffers(); 418 // Trigger async read to emptied read ahead buffer. 419 readAsync(); 420 return n; 421 } 422 final int skippedBytes = available(); 423 final long toSkip = n - skippedBytes; 424 activeBuffer.position(0); 425 activeBuffer.flip(); 426 readAheadBuffer.position(0); 427 readAheadBuffer.flip(); 428 final long skippedFromInputStream = underlyingInputStream.skip(toSkip); 429 readAsync(); 430 return skippedBytes + skippedFromInputStream; 431 } 432 433 /** 434 * Flips the active and read ahead buffer 435 */ 436 private void swapBuffers() { 437 final ByteBuffer temp = activeBuffer; 438 activeBuffer = readAheadBuffer; 439 readAheadBuffer = temp; 440 } 441 442 private void waitForAsyncReadComplete() throws IOException { 443 stateChangeLock.lock(); 444 try { 445 isWaiting.set(true); 446 // There is only one reader, and one writer, so the writer should signal only once, 447 // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. 448 while (readInProgress) { 449 asyncReadComplete.await(); 450 } 451 } catch (final InterruptedException e) { 452 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 453 iio.initCause(e); 454 throw iio; 455 } finally { 456 isWaiting.set(false); 457 stateChangeLock.unlock(); 458 } 459 checkReadException(); 460 } 461}