Is using function in transformation causing Not Serializable exceptions?


Is using function in transformation causing Not Serializable exceptions?



I have a Breeze DenseMatrix, i find mean per row and mean of squares per row and put them in another DenseMatrix, one per column. But i get Task Not Serializable exception. I know that sc is not Serializable but i think that the exception is because i call functions in a transformation in Safe Zones.


Breeze DenseMatrix


mean


mean


DenseMatrix


Task Not Serializable


sc


Serializable



Am i right? And how could be a possible way to be done without any functions? Any help would be great!



Code:


object MotitorDetection {
case class MonDetect() extends Serializable {

var sc: SparkContext = _
var machines: Int=0
var counters: Int=0
var GlobalVec= BDM.zeros[Double](counters, 2)

def findMean(a: BDM[Double]): BDV[Double] = {
var c = mean(a(*, ::))
c}

def toMatrix(x: BDV[Double], y: BDV[Double], C: Int): BDM[Double]={
val m = BDM.zeros[Double](C,2)
m(::, 0) := x
m(::, 1) := y
m}

def SafeZones(stream: DStream[(Int, BDM[Double])]){

stream.foreachRDD { (rdd: RDD[(Int, BDM[Double])], _) =>
if (isEmpty(rdd) == false) {

val InputVec = rdd.map(x=> (x._1, toMatrix(findMean(x._2), findMean(pow(x._2, 2)), counters)))
GlobalMeanVector(InputVec)
}}}



Exception:


org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1.apply(MotitorDetection.scala:85)
at ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1.apply(MotitorDetection.scala:82)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6eee7027)
- field (class: ScalaApps.MotitorDetection$MonDetect, name: sc, type: class org.apache.spark.SparkContext)
- object (class ScalaApps.MotitorDetection$MonDetect, MonDetect())
- field (class: ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1, name: $outer, type: class ScalaApps.MotitorDetection$MonDetect)
- object (class ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1, <function2>)
- field (class: ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1$$anonfun$2, name: $outer, type: class ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1)
- object (class ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 28 more




1 Answer
1



The findMean method is a method of the object MotitorDetection. The object MotitorDetection has a SparkContext on-board, which is not serializable. Thus, the task used in rdd.map is not serializable.


findMean


MotitorDetection


MotitorDetection


SparkContext


rdd.map



Move all the matrix-related functions into a separate serializable object, MatrixUtils, say:


MatrixUtils


object MatrixUtils {
def findMean(a: BDM[Double]): BDV[Double] = {
var c = mean(a(*, ::))
c
}

def toMatrix(x: BDV[Double], y: BDV[Double], C: Int): BDM[Double]={
val m = BDM.zeros[Double](C,2)
m(::, 0) := x
m(::, 1) := y
m
}

...
}



and then use only those methods from rdd.map(...):


rdd.map(...)


object MotitorDetection {
val sc = ...

def SafeZones(stream: DStream[(Int, BDM[Double])]){
import MatrixUtils._

... = rdd.map( ... )

}
}





It didn't work but i am wondering if doing the same computations with just transformations would result in an exception too?
– mkey
Jun 29 at 22:18





@mkey You also got rid of counters, and you made sure that all those objects are not enclosed in some (synthetically generated) object (repl?)?.
– Andrey Tyukin
Jun 29 at 23:15



counters






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Comments

Popular posts from this blog

paramiko-expect timeout is happening after executing the command

Opening a url is failing in Swift

Export result set on Dbeaver to CSV