Couchbase Spark connector 3.1.x - issues reading data from a medium sized collection

Running into issues when trying to read data from a Couchbase Capella collection using Spark connector.

To illustrate the problem, here is what we are trying to do:
Write data to Capella using Spark Connector 3.1.x

  • Create a new bucket in Capella and allocated 3 GB RAM to that bucket.
  • Create a new scope say scope_dw and a new collection, say collection_dw . We also created a primary index on the new collection.
  • Use Couchbase Spark 3.1.x connector to write contents of a parquet file (around 60 MB in size, 10 column wide dataset with approximately 2.304 million rows).
  • Data successfully written to Couchbase Capella into collection_dw . Couchbase bucket level metrics show 2.11 GB of MEMORY USED and 2.8 GB of DISK USED.

** Read data that was written to Capella using Spark Connector 3.1.x in previous step **

  • Read contents of data written into ***collection_dw *** using 5 Spark executors (each having 3GB RAM) and Spark Driver with 3GB memory allocation. Please note that we want to read all data in the collection for our use case. Sample java code added below
    SparkSession sparkSession = get underlying Spark session
  •  Dataset<Row> filterDataFrame = sparkSession.read()*
    
  •   .format("couchbase.query").option(QueryOptions.Bucket(), "bucket_dw")*
    
  •           .option(QueryOptions.Scope(), "scope_dw")*
    
  •           .option(QueryOptions.Collection(), "collection_dw").load();* 
    
  • The code above tries to read contents of collection and throws an out of memory error after approximately 6 minutes.
  • Details about the error being thrown at Spark Connector level are available at the bottom of this post (for some strange reason, I can not upload documents).

Few interesting observations made while encountering this problem:

  • Although the Capella instance is appropriately sized
    at Bucket level with enough disk and RAM allocated, CPU usage for Capella went up to 100% when reading 2.3 million rows of data.

Would really appreciate it if you folks could look at the issue and suggest ways to fix it. If you need any additional information, please let me know and I am happy to share.


                                                     Logs containing error 

[2022-04-02 00:58:21,847] {spark_submit.py:526} INFO - ########### INITIAL SCHEMA JSON FOR DATASET ########### account_tel_test
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - root
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- __META_ID: string (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- co: double (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- cpu_usage: double (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- device_id: string (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- light: string (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- lpg: double (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- motion: string (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- sfdc_account_id: string (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- smoke: double (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- temp: double (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO - |-- timestamp: string (nullable = true)
[2022-04-02 00:58:21,850] {spark_submit.py:526} INFO -
[2022-04-02 00:58:22,075] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO V2ScanRelationPushDown:
[2022-04-02 00:58:22,075] {spark_submit.py:526} INFO - Pushing operators to bucket_dworkz:dw_scope:account_tel_test
[2022-04-02 00:58:22,075] {spark_submit.py:526} INFO - Pushed Filters:
[2022-04-02 00:58:22,075] {spark_submit.py:526} INFO - Post-Scan Filters:
[2022-04-02 00:58:22,075] {spark_submit.py:526} INFO - Output: __META_ID#27, co#28, cpu_usage#29, device_id#30, light#31, lpg#32, motion#33, sfdc_account_id#34, smoke#35, temp#36, timestamp#37
[2022-04-02 00:58:22,075] {spark_submit.py:526} INFO -
[2022-04-02 00:58:22,212] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO CodeGenerator: Code generated in 22.190073 ms
[2022-04-02 00:58:22,237] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO SparkContext: Starting job: count at Dashboard.java:908
[2022-04-02 00:58:22,238] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Got job 1 (count at Dashboard.java:908) with 1 output partitions
[2022-04-02 00:58:22,239] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Final stage: ResultStage 1 (count at Dashboard.java:908)
[2022-04-02 00:58:22,239] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Parents of final stage: List()
[2022-04-02 00:58:22,239] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Missing parents: List()
[2022-04-02 00:58:22,240] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[13] at count at Dashboard.java:908), which has no missing parents
[2022-04-02 00:58:22,246] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 10.4 KiB, free 1007.8 MiB)
[2022-04-02 00:58:22,248] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.9 KiB, free 1007.8 MiB)
[2022-04-02 00:58:22,248] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on dworkz-airflow-worker-0.dworkz-airflow-worker.dataworkz.svc.cluster.local:33843 (size: 4.9 KiB, free: 1007.8 MiB)
[2022-04-02 00:58:22,249] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1383
[2022-04-02 00:58:22,249] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[13] at count at Dashboard.java:908) (first 15 tasks are for partitions Vector(0))
[2022-04-02 00:58:22,250] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
[2022-04-02 00:58:22,255] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 14) (192.168.3.34, executor 5, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 00:58:22,274] {spark_submit.py:526} INFO - 22/04/02 00:58:22 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.3.34:45479 (size: 4.9 KiB, free: 912.3 MiB)
[2022-04-02 01:01:09,968] {spark_submit.py:526} INFO - 22/04/02 01:01:09 ERROR TaskSchedulerImpl: Lost executor 5 on 192.168.3.34: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[2022-04-02 01:01:09,970] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/5 is now EXITED (Command exited with code 52)
[2022-04-02 01:01:09,971] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO StandaloneSchedulerBackend: Executor app-20220402005813-0010/5 removed: Command exited with code 52
[2022-04-02 01:01:09,971] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220402005813-0010/7 on worker-20220402001755-192.168.3.34-32923 (192.168.3.34:32923) with 2 core(s)
[2022-04-02 01:01:09,971] {spark_submit.py:526} INFO - 22/04/02 01:01:09 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14) (192.168.3.34 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[2022-04-02 01:01:09,971] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO StandaloneSchedulerBackend: Granted executor ID app-20220402005813-0010/7 on hostPort 192.168.3.34:32923 with 2 core(s), 2.0 GiB RAM
[2022-04-02 01:01:09,974] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO DAGScheduler: Executor lost: 5 (epoch 0)
[2022-04-02 01:01:09,974] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO BlockManagerMaster: Removal of executor 5 requested
[2022-04-02 01:01:09,974] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 5
[2022-04-02 01:01:09,975] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 15) (192.168.2.220, executor 2, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 01:01:09,978] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
[2022-04-02 01:01:09,979] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(5, 192.168.3.34, 45479, None)
[2022-04-02 01:01:09,979] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
[2022-04-02 01:01:09,979] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor
[2022-04-02 01:01:09,980] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO DAGScheduler: Shuffle files lost for executor: 5 (epoch 0)
[2022-04-02 01:01:09,995] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/7 is now RUNNING
[2022-04-02 01:01:09,997] {spark_submit.py:526} INFO - 22/04/02 01:01:09 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:40099 (size: 4.9 KiB, free: 912.3 MiB)
[2022-04-02 01:01:11,848] {spark_submit.py:526} INFO - 22/04/02 01:01:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.3.34:56514) with ID 7, ResourceProfileId 0
[2022-04-02 01:01:11,936] {spark_submit.py:526} INFO - 22/04/02 01:01:11 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.3.34:42095 with 912.3 MiB RAM, BlockManagerId(7, 192.168.3.34, 42095, None)
[2022-04-02 01:02:42,066] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on dworkz-airflow-worker-0.dworkz-airflow-worker.dataworkz.svc.cluster.local:33843 in memory (size: 6.4 KiB, free: 1007.8 MiB)
[2022-04-02 01:02:42,072] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.3.162:45523 in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,073] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.8:34885 in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,073] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.3.105:40951 in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,073] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.185:35579 in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,074] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.2.220:40099 in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,074] {spark_submit.py:526} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.216:37521 in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:04:02,133] {spark_submit.py:526} INFO - 22/04/02 01:04:02 WARN TaskSetManager: Lost task 0.1 in stage 1.0 (TID 15) (192.168.2.220 executor 2): java.lang.OutOfMemoryError: GC overhead limit exceeded
[2022-04-02 01:04:02,133] {spark_submit.py:526} INFO -
[2022-04-02 01:04:02,134] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 16) (192.168.1.8, executor 6, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 01:04:02,137] {spark_submit.py:526} INFO - 22/04/02 01:04:02 ERROR TaskSchedulerImpl: Lost executor 2 on 192.168.2.220: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[2022-04-02 01:04:02,137] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO DAGScheduler: Executor lost: 2 (epoch 1)
[2022-04-02 01:04:02,137] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
[2022-04-02 01:04:02,137] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 192.168.2.220, 40099, None)
[2022-04-02 01:04:02,138] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
[2022-04-02 01:04:02,138] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO DAGScheduler: Shuffle files lost for executor: 2 (epoch 1)
[2022-04-02 01:04:02,153] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.8:34885 (size: 4.9 KiB, free: 912.3 MiB)
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/2 is now EXITED (Command exited with code 52)
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO StandaloneSchedulerBackend: Executor app-20220402005813-0010/2 removed: Command exited with code 52
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220402005813-0010/8 on worker-20220402001756-192.168.2.220-41245 (192.168.2.220:41245) with 2 core(s)
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO BlockManagerMaster: Removal of executor 2 requested
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO StandaloneSchedulerBackend: Granted executor ID app-20220402005813-0010/8 on hostPort 192.168.2.220:41245 with 2 core(s), 2.0 GiB RAM
[2022-04-02 01:04:02,319] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 2
[2022-04-02 01:04:02,343] {spark_submit.py:526} INFO - 22/04/02 01:04:02 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/8 is now RUNNING
[2022-04-02 01:04:04,271] {spark_submit.py:526} INFO - 22/04/02 01:04:04 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.2.220:38904) with ID 8, ResourceProfileId 0
[2022-04-02 01:04:04,369] {spark_submit.py:526} INFO - 22/04/02 01:04:04 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.220:41267 with 912.3 MiB RAM, BlockManagerId(8, 192.168.2.220, 41267, None)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - 22/04/02 01:06:28 WARN TaskSetManager: Lost task 0.2 in stage 1.0 (TID 16) (192.168.1.8 executor 6): java.lang.OutOfMemoryError: GC overhead limit exceeded
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at sun.nio.cs.UTF_8.newDecoder(UTF_8.java:68)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at java.lang.StringCoding.decode(StringCoding.java:213)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at java.lang.String.(String.java:463)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at java.lang.String.(String.java:515)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.client.scala.codec.JsonDeserializer$Passthrough$StringConvert$.deserialize(JsonDeserializer.scala:73)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.client.scala.query.QueryResult.$anonfun$rowsAs$1(QueryResult.scala:54)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.client.scala.query.QueryResult$$Lambda$1390/1525300356.apply(Unknown Source)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.client.scala.util.RowTraversalUtil$.traverse(RowTraversalUtil.scala:12)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.client.scala.query.QueryResult.rowsAs(QueryResult.scala:53)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader.$anonfun$rows$1(QueryPartitionReader.scala:50)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader$$Lambda$1389/1436792842.apply(Unknown Source)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at scala.util.Success.flatMap(Try.scala:251)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader.rows$lzycompute(QueryPartitionReader.scala:50)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader.rows(QueryPartitionReader.scala:49)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader.rowIterator$lzycompute(QueryPartitionReader.scala:61)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader.rowIterator(QueryPartitionReader.scala:61)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at com.couchbase.spark.query.QueryPartitionReader.next(QueryPartitionReader.scala:100)
[2022-04-02 01:06:28,997] {spark_submit.py:526} INFO - at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.sql.execution.SparkPlan$$Lambda$787/1698108475.apply(Unknown Source)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - at org.apache.spark.rdd.RDD$$Lambda$788/805824280.apply(Unknown Source)
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO -
[2022-04-02 01:06:28,998] {spark_submit.py:526} INFO - 22/04/02 01:06:28 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 17) (192.168.1.216, executor 1, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 01:06:29,011] {spark_submit.py:526} INFO - 22/04/02 01:06:29 ERROR TaskSchedulerImpl: Lost executor 6 on 192.168.1.8: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

The way this works with query is that it will actually only ever run on one executor, since we can’t split up the workload for a n1ql query. You can either think about splitting up the query on your own if you know the split points (so it will get distributed to different executors by spark) OR actually in your case you might be interested in using our streaming support I’m working on so you can stream the whole collection into a DataFrame and don’t have to use query.

I am more than happy to test the streaming support for reading the whole collection into a dataframe. Could you please provide some details regarding how to get access to the latest version of the Couchbase Spark connector that supports streaming?