diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ecab5510a8e7b..36bb656d40374 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -328,6 +328,7 @@ class StreamingContext private[streaming] ( * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -336,8 +337,8 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory) + ] (directory: String, depth: Int = 1): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth) } /** @@ -348,6 +349,7 @@ class StreamingContext private[streaming] ( * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -356,8 +358,13 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + ] ( + directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + depth: Int = 1 + ): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } /** @@ -367,9 +374,10 @@ class StreamingContext private[streaming] ( * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of directory */ - def textFileStream(directory: String): DStream[String] = { - fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) + def textFileStream(directory: String, depth: Int = 1): DStream[String] = { + fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 5f13fdc5579ed..4f1de65b393b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream} +import java.io.{FileNotFoundException, IOException, ObjectInputStream} import scala.collection.mutable import scala.reflect.ClassTag -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.{RDD, UnionRDD} @@ -33,8 +33,10 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * This class represents an input stream that monitors a Hadoop-compatible filesystem for new * files and creates a stream out of them. The way it works as follows. * - * At each batch interval, the file system is queried for files in the given directory and - * detected new files are selected for that batch. In this case "new" means files that + * At each batch interval, Use `depth` to find files in the directory recursively, + * the file system is queried for files in the given directory and detected new + * files are selected for that batch. If the `depth` is greater than 1, + * it is queried for files in the depth of the recursion, In this case "new" means files that * became visible to readers during that time period. Some extra care is needed to deal * with the fact that files may become visible after they are created. For this purpose, this * class remembers the information about the files selected in past batches for @@ -70,10 +72,12 @@ private[streaming] class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @transient ssc_ : StreamingContext, directory: String, + depth: Int = 1, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { + require(depth >= 1, "nested directories depth must >= 1") // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData @@ -97,6 +101,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() + // Set of directories that were found from the beginning to the present + @transient private var lastFoundDirs = new mutable.HashSet[Path]() + // Read-through cache of file mod times, used to speed up mod time lookups @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) @@ -143,7 +150,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } /** - * Find new files for the batch of `currentTime`. This is done by first calculating the + * Find new files for the batch of `currentTime` in nested directories. + * This is done by first calculating the * ignore threshold for file mod times, and then getting a list of files filtered based on * the current batch time and the ignore threshold. The ignore threshold is the max of * initial ignore threshold and the trailing end of the remember window (that is, which ever @@ -163,7 +171,48 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas val filter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth() + + // Nested directories to find new files. + def dfs(status: FileStatus): List[FileStatus] = { + val path = status.getPath + val depthFilter = depth + directoryDepth - path.depth() + if (status.isDir) { + if (depthFilter - 1 >= 0) { + if (lastFoundDirs.contains(path)) { + if (status.getModificationTime > modTimeIgnoreThreshold) { + fs.listStatus(path).toList.flatMap(dfs(_)) + } else Nil + } else { + lastFoundDirs += path + fs.listStatus(path).toList.flatMap(dfs(_)) + } + } else Nil + } else { + if (filter.accept(path)) status :: Nil else Nil + } + } + + val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath)) + else { + lastFoundDirs.filter { path => + // If the mod time of directory is more than ignore time, no new files in this directory. + try { + val status = fs.getFileStatus(path) + if (status != null + && status.getModificationTime > modTimeIgnoreThreshold) true else false + } + catch { + // If the directory don't find ,remove the directory from `lastFoundDirs` + case e: FileNotFoundException => + lastFoundDirs.remove(path) + false + } + } + }.flatMap(fs.listStatus(_)).toSeq + + val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray + val timeTaken = System.currentTimeMillis - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) @@ -203,6 +252,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas */ private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { val pathStr = path.toString + if (path.getName().startsWith("_")) { + logDebug(s"startsWith: ${path.getName()}") + return false + } // Reject file if it does not satisfy filter if (!filter(path)) { logDebug(s"$pathStr rejected by filter") @@ -270,6 +323,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]() recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) + lastFoundDirs = new mutable.HashSet[Path]() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 307052a4a9cbb..d9e1363457ff6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -97,14 +97,31 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("file input stream - newFilesOnly = true") { + test("file input stream - newFilesOnly = true and depth = 1") { testFileStream(newFilesOnly = true) } - test("file input stream - newFilesOnly = false") { + test("file input stream - newFilesOnly = false and depth = 1") { testFileStream(newFilesOnly = false) } + test("file input stream - newFilesOnly = true and depth = 2") { + testFileStream(newFilesOnly = true, 2) + } + + test("file input stream - newFilesOnly = false and depth = 2") { + testFileStream(newFilesOnly = false, 2) + } + + test("file input stream - newFilesOnly = true and depth = 3") { + testFileStream(newFilesOnly = true, 3) + } + + test("file input stream - newFilesOnly = false and depth = 3") { + testFileStream(newFilesOnly = false, 3) + } + + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -233,11 +250,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - def testFileStream(newFilesOnly: Boolean) { + def testFileStream(newFilesOnly: Boolean, depth :Int = 1) { var ssc: StreamingContext = null val testDir: File = null try { - val testDir = Utils.createTempDir() + var testDir = Utils.createTempDir() + for (i <- 2 until depth) { + testDir = Utils.createTempDir(testDir.toString) + } val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) @@ -247,7 +267,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") ssc = new StreamingContext(newConf, batchDuration) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( - testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + testDir.toString, (x: Path) => true, + newFilesOnly = newFilesOnly, depth).map(_._2.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(fileStream, outputBuffer) outputStream.register()