Spark da StackOverflowError cuando entrena usando ALS

Al intentar entrenar un modelo de aprendizaje automático usando ALS en Spark’s MLLib, seguí recibiendo un StackoverflowError. Aquí hay una pequeña muestra del rastro de la stack:

Traceback (most recent call last): File "/Users/user/Spark/imf.py", line 31, in  model = ALS.train(rdd, rank, numIterations) File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/recommendation.py", line 140, in train lambda_, blocks, nonnegative, seed) File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/common.py", line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/common.py", line 113, in callJavaFunc return _java2py(sc, func(*args)) File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o35.trainALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 35, localhost): java.lang.StackOverflowError at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2296) at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2589) 

Este error también aparecerá cuando intente ejecutar .mean () para calcular el error cuadrado medio. Apareció tanto en la versión 1.3.1_1 como en la versión 1.4.1 de Spark. Estaba usando PySpark, y boost la memoria disponible no ayudó.

La solución fue agregar puntos de control, lo que evita que la recursión utilizada por la base de código genere un desbordamiento. Primero, cree un nuevo directorio para almacenar los puntos de control. Luego, haga que su SparkContext use ese directorio para el punto de control. Aquí está el ejemplo en Python:

 sc.setCheckpointDir('checkpoint/') 

También es posible que necesite agregar puntos de control a la ALS también, pero no he podido determinar si eso hace la diferencia. Para agregar un punto de control allí (probablemente no sea necesario), simplemente hazlo:

 ALS.checkpointInterval = 2