2.1 - Structured Streaming Error

Hi,

I am trying to integrate version 2.1 of the couch spark connector (on databricks) to a continuous set of document inserts (couchbase v4.6) as a structured stream. Ive followed the documentation (although my code is slightly diff below) but have run into the issue below. My transformation is pretty simple in just counting the diff types of events, however I get a null pointer exception below starting the query (full stack trace at the end)

Any help appreciated!

java.lang.NullPointerException
	at com.couchbase.client.dcp.conductor.Conductor.numberOfPartitions(Conductor.java:161)

Code below (note ‘schema’ variable code not included but does exist that is used in the stream schema)

val eventsInputStreamDF = 
  spark
    .readStream
      .format("com.couchbase.spark.sql")
      .schema(schema)
      .option("idField","id")
      .option("spark.couchbase.bucket.events","*****")
      .load()

val streamingEventNameCountsDF =
  eventsInputStreamDF
    .groupBy("event.eventName")
    .count()

spark.conf.set("spark.sql.shuffle.partitions", "1") 
// Note I set above as the error is com.couchbase.client.dcp.conductor.Conductor.numberOfPartitions - this was seen on one of databricks SS guide to set

val streamingOutputQuery =
  streamingEventNameCountsDF
    .writeStream
    .format("memory")
    .queryName("counts")
    .outputMode("complete")
    .start()
streamingOutputQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@64698f1a
ERROR: Some streams terminated before this command could finish!
java.lang.NullPointerException
	at com.couchbase.client.dcp.conductor.Conductor.numberOfPartitions(Conductor.java:161)
	at com.couchbase.client.dcp.Client.numPartitions(Client.java:512)
	at com.couchbase.spark.sql.streaming.CouchbaseSource.initialize(CouchbaseSource.scala:119)
	at com.couchbase.spark.sql.streaming.CouchbaseSource.<init>(CouchbaseSource.scala:60)
	at com.couchbase.spark.sql.DefaultSource.createSource(DefaultSource.scala:120)
	at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:235)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:145)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:141)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
	at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:141)
	at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:136)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)

Hi,

thanks for reporting this and sorry for the bit slow response. This indeed looks like a bug and not a user issue on your side. I need to dig in and will report back as soon as I know whats going on. Thanks!

Hi thanks,

If you contact me direct on my email Kurt.maile@xiatech.co.uk then I can
send you the databricks notebook if that helps?

Also was hoping to demo streaming this week guessing a fix could take
longer?

Appreciate the response.

Cheers
Kurt

Hi,

I’ll drop you an email there - thanks!

I’ve just encountered this issue with the latest connector. Actually the problem is pretty new, the same Spark pipeline has been running without issues. Did not find out what trigger it.

Switched to another Java 8 update version, from 112 to 13X. Switched back to 112, no error.

Hiya - Has this been fixed then so the java version is compatible?