vendredi 28 juillet 2017

SparkException: This RDD lacks a SparkContext

I'm trying to create a string sampler using a rdd of string as a dictionnary and the class RandomDataGenerator from org.apache.spark.mllib.random package.

import org.apache.spark.mllib.random.RandomDataGenerator
import org.apache.spark.rdd.RDD
import scala.util.Random

class StringSampler(var dic: RDD[String], var seed: Long = System.nanoTime) extends RandomDataGenerator[String] {
    require(dic != null, "Dictionary cannot be null")
    require(!dic.isEmpty, "Dictionary must contains lines (words)")
    Random.setSeed(seed)

    var fraction: Long = 1 / dic.count()

    //return a random line from dictionary
    override def nextValue(): String = dic.sample(withReplacement = true, fraction).take(1)(0)

    override def setSeed(newSeed: Long): Unit = Random.setSeed(newSeed)

    override def copy(): StringSampler = new StringSampler(dic)

    def setDictionary(newDic: RDD[String]): Unit = {
        require(newDic != null, "Dictionary cannot be null")
        require(!newDic.isEmpty, "Dictionary must contains lines (words)")
        dic = newDic
        fraction = 1 / dic.count()
    }
}


val dictionaryName: String
val dictionaries: Broadcast[Map[String, RDD[String]]]
val dictionary: RDD[String] = dictionaries.value(dictionaryName) // dictionary.cache()
val sampler = new StringSampler(dictionary)
RandomRDDs.randomRDD(context, sampler, size, numPartitions)

But I encounter a SparkException saying that the dictionary is lacking of a SparkContext when I try to generate a random RDD of strings. It seems that spark is loosing context of the dictionary rdd when copying it to cluster nodes and I don't know how to fix it.

I tried to cache the dictionary before passing it to the StringSampler, but it didn't change anything... I was thinking about linking it back to the original SparkContext, but I don't even know if it's possible. Anyone have an idea ?

Caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.




Aucun commentaire:

Enregistrer un commentaire