[ANN] Kafka Connector 3.1.2

Hi Everyone

I’m glad to announce you new release of Kafka Couchbase Connector. This is a maintenance release, and fixes issue with possible OOM when loading huge bucket, and the internal queue cannot be drained fast enough (KAFKAC-66).

Package: http://packages.couchbase.com/clients/kafka/3.1.2/kafka-connect-couchbase-3.1.2.zip
Source code: https://github.com/couchbase/kafka-connect-couchbase#readme
Issue Tracker: https://issues.couchbase.com/projects/KAFKAC/

It also available in maven in case you are going to extend and integrate with the connector.

HI @avsej.
I have installed Kafka Connector 3.0 to connect my couchbase with kafka.
I have followed the steps mentioned in https://developer.couchbase.com/documentation/server/4.6/connectors/kafka-3.0/quickstart.html
I can see my data from console consumer in kafka. But my data contains payload.content in encrypted form.
I need help to decrypt my data, since i have to show data on kibana using logstash.
Here is the JSON i am getting:

{  
   "schema":{  
      "type":"struct",
      "fields":[  
         {  
            "type":"string",
            "optional":false,
            "field":"event"
         },
         {  
            "type":"int16",
            "optional":false,
            "field":"partition"
         },
         {  
            "type":"string",
            "optional":false,
            "field":"key"
         },
         {  
            "type":"int64",
            "optional":false,
            "field":"cas"
         },
         {  
            "type":"int64",
            "optional":false,
            "field":"bySeqno"
         },
         {  
            "type":"int64",
            "optional":false,
            "field":"revSeqno"
         },
         {  
            "type":"int32",
            "optional":true,
            "field":"expiration"
         },
         {  
            "type":"int32",
            "optional":true,
            "field":"flags"
         },
         {  
            "type":"int32",
            "optional":true,
            "field":"lockTime"
         },
         {  
            "type":"bytes",
            "optional":true,
            "field":"content"
         }
      ],
      "optional":false,
      "name":"com.couchbase.DcpMessage"
   },
   "payload":{  
      "event":"mutation",
      "partition":46,
      "key":"Logs::d42f91ce-e0ec-444f-bb20-09398f15efe4",
      "cas":1501735721952542720,
      "bySeqno":12,
      "revSeqno":1,
      "expiration":0,
      "flags":0,
      "lockTime":0,
      "content":"eyJfc3luYyI6eyJyZXYiOiIxLWEwMzQ4NjlhNzYyYzAyZWYxMzJiZjhhMDFhM2IxOTUwIiwic2VxdWVuY2UiOjEyNDkxLCJyZWNlbnRfc2VxdWVuY2VzIjpbMTI0OTFdLCJoaXN0b3J5Ijp7InJldnMiOlsiMS1hMDM0ODY5YTc2MmMwMmVmMTMyYmY4YTAxYTNiMTk1MCJdLCJwYXJlbnRzIjpbLTFdLCJjaGFubmVscyI6W251bGxdfSwidGltZV9zYXZlZCI6IjIwMTctMDgtMDNUMDQ6NDg6MjYuNjM4NzAxOTU5WiJ9LCJjb21wb25lbnROYW1lIjoiVmlyZ2luIFZveWFnZXMiLCJjb21wb25lbnRWZXJzaW9uIjoiMS4yLjUiLCJjb3JlbGF0aW9uSWQiOiIxNTAxNzM1NTMyODk1IiwiZGV2aWNlaW5mbyI6eyJpZCI6IjVmNWJhNThkNjVmMzRiNDUiLCJuYW1lIjoibW90b3JvbGEgWFQxMDY4Iiwib3BlcmF0aW5nU3lzdGVtIjoiQW5kcm9pZCIsIm9zVmVyc2lvbiI6IjUuMC4yIiwidHlwZSI6IlBob25lIn0sImhvc3RuYW1lIjoibW90b3JvbGEgWFQxMDY4IiwibWVzc2FnZSI6bnVsbCwibWVzc2FnZUNvZGUiOiJJTkpFQ1QiLCJtZXNzYWdlRGV0YWlsIjoie1wiaW5mb3JtYXRpb25UeXBlXCI6XCJiZWFjb25EZXRhaWxzXCIsXCJhY3Rpb25EYXRhXCI6W3tcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjE0NDQ2OjYyNjc3XCIsXCJkaXN0YW5jZVwiOjMuMDYzOTI4NjcyNTY0MTU1N30se1wiYmVhY29uX2lkXCI6XCJiOTQwN2YzMC1mNWY4LTQ2NmUtYWZmOS0yNTU1NmI1N2ZlNmQ6MTk3NTU6NTg5OTdcIixcImRpc3RhbmNlXCI6Mi4zNDE5MzU3MDM2MzE3MjE3fSx7XCJiZWFjb25faWRcIjpcImI5NDA3ZjMwLWY1ZjgtNDY2ZS1hZmY5LTI1NTU2YjU3ZmU2ZDoyMDA2Mzo0NTgxNFwiLFwiZGlzdGFuY2VcIjoyNi41MDQwMDkyMDE4OTgwOTZ9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjI4MTM6MTM5OTVcIixcImRpc3RhbmNlXCI6Ny43MzY3Mzg5ODExMTY0Mzh9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjMzNjMxOjU2NjUyXCIsXCJkaXN0YW5jZVwiOjMuNjUxMjc5OTkyNzg3Nzk1fSx7XCJiZWFjb25faWRcIjpcImI5NDA3ZjMwLWY1ZjgtNDY2ZS1hZmY5LTI1NTU2YjU3ZmU2ZDo0MjQyMzozNzE0NVwiLFwiZGlzdGFuY2VcIjoxLjk1MDgzMDY5OTMyOTkwNTJ9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjUxNzcxOjY0ODk4XCIsXCJkaXN0YW5jZVwiOjIuMzQxOTM1NzAzNjMxNzIxN30se1wiYmVhY29uX2lkXCI6XCJiOTQwN2YzMC1mNWY4LTQ2NmUtYWZmOS0yNTU1NmI1N2ZlNmQ6NTc0NzM6MTE1NzFcIixcImRpc3RhbmNlXCI6MTcuMzQ3ODg0NzY2MDM4Mzl9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjYwMzgzOjQ4NDgxXCIsXCJkaXN0YW5jZVwiOjcuMTM5MzExMjg1MDYxMjY0Nn0se1wiYmVhY29uX2lkXCI6XCJiOTQwN2YzMC1mNWY4LTQ2NmUtYWZmOS0yNTU1NmI1N2ZlNmQ6Njk0OToxNDQ4MFwiLFwiZGlzdGFuY2VcIjozLjM0NjAyMDI4ODQyNjIzMn1dfSIsIm9mZnNldCI6IiswNTozMCIsInNldmVyaXR5IjoiRGVidWciLCJzdGFja3RyYWNlIjpudWxsLCJ0aW1lc3RhbXAiOiIyMDE3LTA4LTAzVDEwOjE1OjMyLjg5NSIsInRyYWNlaW5mbyI6eyJkdXJhdGlvbiI6IjAiLCJsb2dnaW5nUG9pbnQiOm51bGwsIm1ldGhvZFRpbWUiOiIyMDE3LTA4LTAzVDEwOjE1OjMyLjg5NSJ9LCJ0eXBlIjoibG9nIiwidXNlcmluZm8iOnsiYXBwSWQiOm51bGwsImRldmljZUlkIjpudWxsLCJpZCI6bnVsbCwidG9rZW4iOm51bGwsInR5cGUiOm51bGx9fQ=="
   }
}

I need help to decrypt Content field in it.

It does not encrypt your data. The default converter passes document body as bytesequence to Kafka, and its converted configured to represent bytes using base64, because otherwise it might break JSON syntax.

You have several options here.

  1. implement custom converter for Couchbase Connector, and instead writing body as bytes, write it in structured form your application needs
  2. leave default couchbase connector converter, but replace JSON converter you are using for Kafka, for example Avro converter can handle bytesequence directly.
  3. our the side of you application you, can use Base64 decoder to extract the content.

@avsej. I think i can use second option. But How can i replace JSON convertor with Avro convertor?

@avsej Here are my configuration for connect-standalone-properties.

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

apache kafka does not ship avro converter, but it is accessible in confluent distribution, and require their schema regitry:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

You can also try to override BYTES converter in default JsonConverter:

@avsej I have already installed confluent and schema registry. So can i directly use those properties?

yes, but you should check that schema address is correct unless it listening http://localhost:8081

Hi @avsej
I have installed my kafka-connector without confluent-platform and schema registry.
Is there any way to decode the content field when kafka-connect-couchbase connector is installed with apache-kafka only?
I want my content field to display data in simplle text format only.

Thanks @avsej,
I have changed the code ,in the connector, actually apart from what you suggested, One also need to do change in schema. If you suggest I can send the code to you for code review and if this can be helpful for the community.

Regards
Pankaj Sharma

Hi Pankaj,

Greetings!

I see your message on the forum regarding conversion of buff byte to string for content.
Could you please help with the exact details on the changes that were done to get this working.

We used apache kafka connector for posting changes from couchbase to kafka topic, using the kafka connector library.

Tried to change the SchemaConverter.java to replace line 66 with the below, to have the content in json format.
record.put(“content”, DcpMutationMessage.content(event).toString(CharsetUtil.UTF_8));

And also changed schemas.java for “content” type from byte format to string, without any luck.

Please provide more insight into changes that works.

Thank you!

Which version of connector you are using ?

We were able to get the json converted to string by doing the below.
Updated /kafka-connect-couchbase/src/main/java/com/couchbase/connect/kafka/converter/SchemaConverter.java to use ‘buftoString’
Updated content to string_schema in /kafka-connect-couchbase/src/main/java/com/couchbase/connect/kafka/util/Schemas.java

We used kafka-connect-couchbase-3.3.2 and also tried kafka-connect-couchbase-3.4.0-beta.2-SNAPSHOT and it was found working. However, for deletion events, we see exception below.

Anyone experienced this behaviour ?

ERROR WorkerSourceTask{id=unotes-test-couchbase-source-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:269)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:586)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:680)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:567)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:320)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

I will send you the code file which we changed tomorrow.

Hi Pankaj,

Sure, Thank you. Let me know when you send.

Hi Dharan,

public class ConverterUtils {

public static String bufToString(ByteBuf buf) {
    return new String(bufToBytes(buf), CharsetUtil.UTF_8);
}

public static byte[] bufToBytes(ByteBuf buf) {
    byte[] bytes;
    bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    return bytes;
}

}

Class DefaultSchemaSourceHandler use below code sample:

if (type == EventType.MUTATION) {
record.put(“event”, “couchbaseupdate:” + docType);
record.put(“expiration”, DcpMutationMessage.expiry(event));
record.put(“flags”, DcpMutationMessage.flags(event));
record.put(“lockTime”, DcpMutationMessage.lockTime(event));
record.put(“pl”, ConverterUtils.bufToString(DcpMutationMessage.content(event)));
}

FYI @pankaj.sharma

Hi @Dharan

Does this help or you need complete file on this.

Regards
Pankaj Sharma

Thanks Aditya and Pankaj.
In addition to that, need to update Schemas.java and it is working.
.field(“content”, Schema.OPTIONAL_STRING_SCHEMA)

It was not defined optional before and we had the exception. Now, we are good.
Thanks again for your help and quick response.