Spark Structured Streaming Connector DCP Channel exception

Hello All,

We have couchbase version 7.1.1. We are using spark streaming connector 3.2.2 via databricks. We are able to stream documents and changes successfully into azure storage via databricks however our streaming process fails with below exception and this issue is on and off. Any recommendations to get past this exception?

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 74 in stage 3367.0 failed 4 times, most recent failure: Lost task 74.3 in stage 3367.0 (TID 186229) (10.56.67.143 executor 0): java.lang.IllegalStateException: Tried to add duplicate channel: DcpChannel{address=servername.domain.com:11207}
	at com.couchbase.client.dcp.conductor.Conductor.add(Conductor.java:298)
	at java.util.ArrayList.forEach(ArrayList.java:1259)



**Detailed Error is Below**

org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:606)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:360)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$8(TransactionalWriteEdge.scala:427)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$1(TransactionalWriteEdge.scala:362)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:156)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:143)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.withOperationTypeTag(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperation$6(DeltaLogging.scala:121)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:120)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:330)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:424)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:444)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:20)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:20)
	at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:419)
	at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:339)
	at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:20)
	at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:330)
	at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:302)
	at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:20)
	at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:57)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:137)
	at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:73)
	at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:60)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
	at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:431)
	at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:410)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:119)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:104)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$recordWriteFilesOperation$1(TransactionalWriteEdge.scala:210)
	at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:1585)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.recordWriteFilesOperation(TransactionalWriteEdge.scala:209)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles(TransactionalWriteEdge.scala:231)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles$(TransactionalWriteEdge.scala:225)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles(TransactionalWriteEdge.scala:475)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles$(TransactionalWriteEdge.scala:466)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:215)
	at com.databricks.sql.transaction.tahoe.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:212)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:104)
	at com.databricks.sql.transaction.tahoe.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:105)
	at com.databricks.sql.transaction.tahoe.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:58)
	at com.databricks.sql.transaction.tahoe.DeltaLog.withNewTransaction(DeltaLog.scala:235)
	at com.databricks.sql.transaction.tahoe.sources.DeltaSink.addBatch(DeltaSink.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:719)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:717)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:301)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:299)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:74)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:717)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$5(MicroBatchExecution.scala:283)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:816)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:280)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:301)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:299)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:74)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:239)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:233)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:360)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:324)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:251)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:428)
	at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.awaitShuffleMapStage$1(DeltaOptimizedWriterExec.scala:189)
	at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.getShuffleStats(DeltaOptimizedWriterExec.scala:194)
	at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.computeBins(DeltaOptimizedWriterExec.scala:136)
	at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.doExecute(DeltaOptimizedWriterExec.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:224)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SortExec.doExecute(SortExec.scala:121)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:224)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:311)
	... 91 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 74 in stage 3367.0 failed 4 times, most recent failure: Lost task 74.3 in stage 3367.0 (TID 186229) (10.56.67.143 executor 0): java.lang.IllegalStateException: Tried to add duplicate channel: DcpChannel{address=servername.domain.com:11207}
	at com.couchbase.client.dcp.conductor.Conductor.add(Conductor.java:298)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at com.couchbase.client.dcp.conductor.Conductor.connect(Conductor.java:115)
	at com.couchbase.client.dcp.Client.connect(Client.java:460)
	at com.couchbase.spark.kv.KeyValuePartitionReader.attachAndConnectDcpListener(KeyValuePartitionReader.scala:97)
	at com.couchbase.spark.kv.KeyValuePartitionReader.<init>(KeyValuePartitionReader.scala:76)
	at com.couchbase.spark.kv.KeyValueMicroBatchStream.$anonfun$createReaderFactory$1(KeyValueMicroBatchStream.scala:72)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:64)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.IllegalStateException: Tried to add duplicate channel: DcpChannel{address=servername.domain.com:11207}
	at com.couchbase.client.dcp.conductor.Conductor.add(Conductor.java:298)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at com.couchbase.client.dcp.conductor.Conductor.connect(Conductor.java:115)
	at com.couchbase.client.dcp.Client.connect(Client.java:460)
	at com.couchbase.spark.kv.KeyValuePartitionReader.attachAndConnectDcpListener(KeyValuePartitionReader.scala:97)
	at com.couchbase.spark.kv.KeyValuePartitionReader.<init>(KeyValuePartitionReader.scala:76)
	at com.couchbase.spark.kv.KeyValueMicroBatchStream.$anonfun$createReaderFactory$1(KeyValueMicroBatchStream.scala:72)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:64)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Sorry for the late reply on this one, it slipped through the cracks. For anyone Googling this problem, the presence of “Tried to add duplicate channel” in the stack trace indicates this is an issue that should be solved by going to Spark Connector 3.3.0 or above.