Spark job throws TemporaryFailureException when saving RDD

I’m trying to ingest a large csv file (0.3M rows, 93 columns) into couchbase-server-enterprise_4.0.0, using the spark-connector-1.0.0-beta. Here is how I’m uploading data to Couchbase:

val rowRDD = sc.textFile(filename).map{row =>
     val obj = JsonObject.create()
     (keys zip row.split("\t")).foreach{case(k, v) => obj.put(k, v)}
     JsonDocument.create(obj.get("id").asInstanceOf[String], obj)
   }
   .saveToCouchbase()

Here is the exception I’m seeing:

15/10/05 09:19:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/10/05 09:19:40 INFO TaskSchedulerImpl: Cancelling stage 0
15/10/05 09:19:40 INFO DAGScheduler: Stage 0 (foreachPartition at DocumentRDDFunctions.scala:39) failed in 2.535 s
15/10/05 09:19:40 INFO DAGScheduler: Job 0 failed: foreachPartition at DocumentRDDFunctions.scala:39, took 2.574053 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): com.couchbase.client.java.error.TemporaryFailureException
    at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:512)
    at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:493)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
    at rx.observers.Subscribers$5.onNext(Subscribers.java:234)
    at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
    at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:256)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:104)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:58)
    ... 12 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

The exception seems to be caused by a Server too busy, any workaround to insert large RDD/documents into Couchbase?

Yes, it looks like for some reason the server is overwhelmed. I’m currently investigating better error handling capabilities for the driver, but sparks primitive are very limited with it.

You can find the ticket here: https://issues.couchbase.com/browse/SPARKC-36

What behaviour would you like to see? Transparent retry for N times? backoff strategies? We have all that in the SDK available, its a question of how to expose it in the connector.

For the specific error - what is your HW configuration?

Hi, thanks for answering my question, it has been a long time now!
For the behaviour, I think a transparent retry will be OK, and I hope with a high write throughput.
For the hardware, I was testing with an amazon r3 machine where both couchbase and spark were installed.