How do I handle BackpressureException with couchbase-spark-connector

Hello, CB users.

My new version of application with couchbase-spark-connector sends tons of requests to Couchbase Server. As a result, it gives me BackpressureException. (The following is my code)

ids.foreach(id => {

  val sKey = JsonArray.from(id, start_date)
  val eKey = JsonArray.from(id, end_date)

  val couchbaseView = spark
    .couchbaseView(ViewQuery.from("my_view", "all").startKey(sKey).endKey(eKey).inclusiveEnd(false).reduce(false))

  val myView = couchbaseView

  if (myView.toInt > 0) {
    val rows = couchbaseView

    val rowCount = rows.count()

    if (rowCount > 0) {
      val result = rows
        .map(doc => {
          // Do something with doc
          (result1, result2, result3)
        .reduce((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3))

It seems like Couchbase JAVA SDK has delay and retry option (here) for such issue, but I can’t use the same solution with couchbase-spark-connector.
Does anyone have encounter the same problem? or has any good idea to solve it?


@Dynamicscope error handling with spark semantics is a little hard, because there are not many facilities available. I’m currently thinking to add a generic “retry amount” to the config, but this would be very generic.

Do you have any experience what and how I can make your life easier when using spark?

1 Like

I’ve created a ticket to track it - feel free to add your comments there!

1 Like