I am writing a Scala 2.10.4 application that uses the Spark connector (v1.2) for Couchbase (v4.1) to insert documents into a specific bucket.
Here is my code so far:
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Load Docs to CB")
//.setMaster("local[*]")
.set("com.couchbase.nodes", "00.00.00.000")
.set("com.couchbase.bucket.mybucket", "password")
println("##### Finished Configuration")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
println("##### Finished setting up Spark Context and Sql Context")
val people = sc.parallelize(Seq(
Person("user::michael", "Michael", 27),
Person("user::tom", "Tom", 33)
)).toDF()
people.registerTempTable("people")
people.write.couchbase(Map("idField" -> "uid"))
println("##### Finished writing to Couchbase")
This code successfully writes to Couchbase.
However, I would like to now insert new data which MAY/MAY NOT contain an already existing key in Couchbase. Please note that this bucket is shared with another team, so at any point I cannot simply flush the entire bucket. In addition, I do not want to check if a key already exists and then upsert it because over time, this may lead to an unnecessary build up of unmodified/dead data in the Couchbase bucket.
Therefore, instead of upserting a document, I would like to go for a full deletion of all documents with keys having a certain prefix and then freshly insert documents.
In summary:
1. Delete all documents that have “user::” as the prefix in their keys.
2. Insert new data - which I have pretty much figured out.
How can I implement #1 in Scala? This is my first shot at Scala and Couchbase as well. Any help, directions or suggestions would be greatly appreciated.
Thanks in advance!
I came across this repo with some Spark connector examples for couchbase - https://github.com/couchbaselabs/couchbase-spark-samples/tree/master/src/main/scala.
However, this does not have any example on deleting docs in a bucket using spark connector.
Still stuck!