I have a custom class E which has, among others, a field word. I have a large es: RDD[E] with several 100000 elements and a doc: Seq[String] with typically a few hundred entries. In es, every element's word field value is unique.
My task is to look up the element in es for each of the strings in doc. It is however not guaranteed that such an element exists. My naive Scala/Spark implementation is thus:
def word2E(words: Seq[String]): Seq[E] = {
words.map(lookupWord(_, es))
.filter(_.isDefined)
.map(_.get)
}
The method lookupWord() is defined as follows:
def lookupWord(w: String, es: RDD[E]): Option[E] = {
val lookup = es.filter(_.word.equals(w))
if (lookup.isEmpty) None
else Some(lookup.first)
}
When I look at the Spark stages overview, it seems like lookupWord() is a bottleneck. In particular, the isEmpty() calls in lookupWord take relatively long (up to 2s) in some cases.
I have already persisted the es RDD. Is there any other leverage for optimizing such a task or is this just as good as it gets when operating on such a dataset?
I have noticed the lookup() method in PairRDDFunctions and considered to construct a PairRDD in which the word field would serve as the key. Might that help?
Drawing any conclusions experimentally here is quite hard because there are so many factors involved.