Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class StreamingContext private[streaming] (
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
Expand All @@ -336,8 +337,8 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
] (directory: String, depth: Int = 1): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth)
}

/**
Expand All @@ -348,6 +349,7 @@ class StreamingContext private[streaming] (
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param depth Searching depth of directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
Expand All @@ -356,8 +358,13 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
] (
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
depth: Int = 1
): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly)
}

/**
Expand All @@ -367,9 +374,10 @@ class StreamingContext private[streaming] (
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of directory
*/
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
def textFileStream(directory: String, depth: Int = 1): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.io.{FileNotFoundException, IOException, ObjectInputStream}

import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.rdd.{RDD, UnionRDD}
Expand All @@ -33,8 +33,10 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
* files and creates a stream out of them. The way it works as follows.
*
* At each batch interval, the file system is queried for files in the given directory and
* detected new files are selected for that batch. In this case "new" means files that
* At each batch interval, Use `depth` to find files in the directory recursively,
* the file system is queried for files in the given directory and detected new
* files are selected for that batch. If the `depth` is greater than 1,
* it is queried for files in the depth of the recursion, In this case "new" means files that
* became visible to readers during that time period. Some extra care is needed to deal
* with the fact that files may become visible after they are created. For this purpose, this
* class remembers the information about the files selected in past batches for
Expand Down Expand Up @@ -70,10 +72,12 @@ private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@transient ssc_ : StreamingContext,
directory: String,
depth: Int = 1,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {

require(depth >= 1, "nested directories depth must >= 1")
// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

Expand All @@ -97,6 +101,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()

// Set of directories that were found from the beginning to the present
@transient private var lastFoundDirs = new mutable.HashSet[Path]()

// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)

Expand Down Expand Up @@ -143,7 +150,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}

/**
* Find new files for the batch of `currentTime`. This is done by first calculating the
* Find new files for the batch of `currentTime` in nested directories.
* This is done by first calculating the
* ignore threshold for file mod times, and then getting a list of files filtered based on
* the current batch time and the ignore threshold. The ignore threshold is the max of
* initial ignore threshold and the trailing end of the remember window (that is, which ever
Expand All @@ -163,7 +171,48 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
val filter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()

// Nested directories to find new files.
def dfs(status: FileStatus): List[FileStatus] = {
val path = status.getPath
val depthFilter = depth + directoryDepth - path.depth()
if (status.isDir) {
if (depthFilter - 1 >= 0) {
if (lastFoundDirs.contains(path)) {
if (status.getModificationTime > modTimeIgnoreThreshold) {
fs.listStatus(path).toList.flatMap(dfs(_))
} else Nil
} else {
lastFoundDirs += path
fs.listStatus(path).toList.flatMap(dfs(_))
}
} else Nil
} else {
if (filter.accept(path)) status :: Nil else Nil
}
}

val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath))
else {
lastFoundDirs.filter { path =>
// If the mod time of directory is more than ignore time, no new files in this directory.
try {
val status = fs.getFileStatus(path)
if (status != null
&& status.getModificationTime > modTimeIgnoreThreshold) true else false
}
catch {
// If the directory don't find ,remove the directory from `lastFoundDirs`
case e: FileNotFoundException =>
lastFoundDirs.remove(path)
false
}
}
}.flatMap(fs.listStatus(_)).toSeq

val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray

val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
Expand Down Expand Up @@ -203,6 +252,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
val pathStr = path.toString
if (path.getName().startsWith("_")) {
logDebug(s"startsWith: ${path.getName()}")
return false
}
// Reject file if it does not satisfy filter
if (!filter(path)) {
logDebug(s"$pathStr rejected by filter")
Expand Down Expand Up @@ -270,6 +323,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
lastFoundDirs = new mutable.HashSet[Path]()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,31 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}


test("file input stream - newFilesOnly = true") {
test("file input stream - newFilesOnly = true and depth = 1") {
testFileStream(newFilesOnly = true)
}

test("file input stream - newFilesOnly = false") {
test("file input stream - newFilesOnly = false and depth = 1") {
testFileStream(newFilesOnly = false)
}

test("file input stream - newFilesOnly = true and depth = 2") {
testFileStream(newFilesOnly = true, 2)
}

test("file input stream - newFilesOnly = false and depth = 2") {
testFileStream(newFilesOnly = false, 2)
}

test("file input stream - newFilesOnly = true and depth = 3") {
testFileStream(newFilesOnly = true, 3)
}

test("file input stream - newFilesOnly = false and depth = 3") {
testFileStream(newFilesOnly = false, 3)
}


test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
Expand Down Expand Up @@ -233,11 +250,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}

def testFileStream(newFilesOnly: Boolean) {
def testFileStream(newFilesOnly: Boolean, depth :Int = 1) {
var ssc: StreamingContext = null
val testDir: File = null
try {
val testDir = Utils.createTempDir()
var testDir = Utils.createTempDir()
for (i <- 2 until depth) {
testDir = Utils.createTempDir(testDir.toString)
}
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, Charset.forName("UTF-8"))

Expand All @@ -247,7 +267,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
ssc = new StreamingContext(newConf, batchDuration)
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
testDir.toString, (x: Path) => true,
newFilesOnly = newFilesOnly, depth).map(_._2.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputBuffer)
outputStream.register()
Expand Down