Scala spark, listbuffer está vacío

En este fragmento de código del comentario 1, la longitud de los elementos de la lista se muestra correctamente, pero en el segundo código de comentario nunca se ejecuta. ¿Por qué ocurre?

val conf = new SparkConf().setAppName("app").setMaster("local") val sc = new SparkContext(conf) var wktReader: WKTReader = new WKTReader(); val dataSet = sc.textFile("dataSet.txt") val items = new ListBuffer[String]() dataSet.foreach { e => items += e println("len = " + items.length) //1. here length is ok } println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") items.foreach { x => print(x)} //2. this code doesn't be executed 

Registros están aquí:

 16/11/20 01:16:52 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/11/20 01:16:52 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040 16/11/20 01:16:53 INFO Executor: Starting executor ID driver on host localhost 16/11/20 01:16:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58608. 16/11/20 01:16:53 INFO NettyBlockTransferService: Server created on 192.168.56.1:58608 16/11/20 01:16:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 58608) 16/11/20 01:16:53 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:58608 with 347.1 MB RAM, BlockManagerId(driver, 192.168.56.1, 58608) 16/11/20 01:16:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 58608) Starting app 16/11/20 01:16:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 139.6 KB, free 347.0 MB) 16/11/20 01:16:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 346.9 MB) 16/11/20 01:16:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:58608 (size: 15.9 KB, free: 347.1 MB) 16/11/20 01:16:58 INFO SparkContext: Created broadcast 0 from textFile at main.scala:25 16/11/20 01:16:58 INFO FileInputFormat: Total input paths to process : 1 16/11/20 01:16:58 INFO SparkContext: Starting job: foreach at main.scala:28 16/11/20 01:16:58 INFO DAGScheduler: Got job 0 (foreach at main.scala:28) with 1 output partitions 16/11/20 01:16:58 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at main.scala:28) 16/11/20 01:16:58 INFO DAGScheduler: Parents of final stage: List() 16/11/20 01:16:58 INFO DAGScheduler: Missing parents: List() 16/11/20 01:16:58 INFO DAGScheduler: Submitting ResultStage 0 (dataSet.txt MapPartitionsRDD[1] at textFile at main.scala:25), which has no missing parents 16/11/20 01:16:58 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 346.9 MB) 16/11/20 01:16:58 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2034.0 B, free 346.9 MB) 16/11/20 01:16:58 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.56.1:58608 (size: 2034.0 B, free: 347.1 MB) 16/11/20 01:16:58 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012 16/11/20 01:16:59 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (dataSet.txt MapPartitionsRDD[1] at textFile at main.scala:25) 16/11/20 01:16:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/11/20 01:16:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5427 bytes) 16/11/20 01:16:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/11/20 01:16:59 INFO HadoopRDD: Input split: file:/D:/dataSet.txt:0+291 16/11/20 01:16:59 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 16/11/20 01:16:59 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 16/11/20 01:16:59 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 16/11/20 01:16:59 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 16/11/20 01:16:59 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id len = 1 len = 2 len = 3 len = 4 len = 5 len = 6 len = 7 16/11/20 01:16:59 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver 16/11/20 01:16:59 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 417 ms on localhost (1/1) 16/11/20 01:16:59 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/11/20 01:16:59 INFO DAGScheduler: ResultStage 0 (foreach at main.scala:28) finished in 0,456 s 16/11/20 01:16:59 INFO DAGScheduler: Job 0 finished: foreach at main.scala:28, took 0,795126 s !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 16/11/20 01:16:59 INFO SparkContext: Invoking stop() from shutdown hook 16/11/20 01:16:59 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040 16/11/20 01:16:59 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/11/20 01:16:59 INFO MemoryStore: MemoryStore cleared 16/11/20 01:16:59 INFO BlockManager: BlockManager stopped 16/11/20 01:16:59 INFO BlockManagerMaster: BlockManagerMaster stopped 16/11/20 01:16:59 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/11/20 01:16:59 INFO SparkContext: Successfully stopped SparkContext 16/11/20 01:16:59 INFO ShutdownHookManager: Shutdown hook called 16/11/20 01:16:59 INFO ShutdownHookManager: Deleting directory 

Aquí:

 dataSet.foreach { e => items += e println("len = " + items.length) //1. here length is ok } 

modifica una copia de items local del ejecutor. La lista de items originales definida en el controlador no se modifica. Como resultado esto:

 items.foreach { x => print(x)} 

ejecuta, pero no hay nada que imprimir.

Por favor, compruebe Entender los cierres

Spark se ejecuta en ejecuters y devuelve los resultados. El código anterior no funciona según lo previsto. Si necesita agregar los elementos de foreach , debe recostackr los datos en el controlador y agregarlos al conjunto_actual. Pero recostackr los datos es una mala idea cuando tienes datos grandes.

 val items = new ListBuffer[String]() val rdd = spark.sparkContext.parallelize(1 to 10, 4) rdd.collect().foreach(data => items += data.toString()) println(items) 

Salida:

 ListBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)