I'm trying to create a string sampler using a rdd of string as a dictionnary and the class RandomDataGenerator from org.apache.spark.mllib.random package.
import org.apache.spark.mllib.random.RandomDataGenerator
import org.apache.spark.rdd.RDD
import scala.util.Random
class StringSampler(var dic: RDD[String], var seed: Long = System.nanoTime) extends RandomDataGenerator[String] {
require(dic != null, "Dictionary cannot be null")
require(!dic.isEmpty, "Dictionary must contains lines (words)")
Random.setSeed(seed)
var fraction: Long = 1 / dic.count()
//return a random line from dictionary
override def nextValue(): String = dic.sample(withReplacement = true, fraction).take(1)(0)
override def setSeed(newSeed: Long): Unit = Random.setSeed(newSeed)
override def copy(): StringSampler = new StringSampler(dic)
def setDictionary(newDic: RDD[String]): Unit = {
require(newDic != null, "Dictionary cannot be null")
require(!newDic.isEmpty, "Dictionary must contains lines (words)")
dic = newDic
fraction = 1 / dic.count()
}
}
val dictionaryName: String
val dictionaries: Broadcast[Map[String, RDD[String]]]
val dictionary: RDD[String] = dictionaries.value(dictionaryName) // dictionary.cache()
val sampler = new StringSampler(dictionary)
RandomRDDs.randomRDD(context, sampler, size, numPartitions)
But I encounter a SparkException saying that the dictionary is lacking of a SparkContext when I try to generate a random RDD of strings. It seems that spark is loosing context of the dictionary rdd when copying it to cluster nodes and I don't know how to fix it.
I tried to cache the dictionary before passing it to the StringSampler, but it didn't change anything... I was thinking about linking it back to the original SparkContext, but I don't even know if it's possible. Anyone have an idea ?
Caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
Aucun commentaire:
Enregistrer un commentaire