Null and integer values in kafka offsets for CB-kafka connectors

We have kafka connectors to fetch the data from couchbase buckets. Now, getting the data in json format but for offsets we are getting null in value, _seq appended keys with integer values which is breaking down our application functionality as we dont want null/integer values while reading the data. Any suggestions pls on what can be added to remove the ?

“connector.class”: “com.couchbase.connect.kafka.CouchbaseSourceConnector”,
“couchbase.seed.nodes”:“”,
“tasks.max”:“2”,
“couchbase.topic”:“tp_poc_optima”,
“couchbase.bucket”:“optima”,
“couchbase.username”:“”,
“couchbase.password”:“”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“couchbase.source.handler”:“com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler”,
“value.converter”:“org.apache.kafka.connect.converters.ByteArrayConverter”,
“couchbase.persistence.polling.interval”:“100ms”,
“couchbase.flow.control.buffer”:“16m”,
“couchbase.stream.from”:“BEGINNING”

Sample : in offset for role_seq we have value as 14358

Hi @venkata,

Thanks for using the Kafka connector.

I’m afraid I don’t understand the issue. Can you describe it in more detail, please? What kind of “offset” are you talking about? Can you share examples of a Couchbase document, an actual Kafka record key and value, and the expected Kafka record key and value?

Thanks,
David

Hi @david.nault ,

sorry for late reply. These are kafka offsets where the messages from the couchbase are being store in json format. Some additional records are being added with integer data or it is coming directly from source itself and need to remove or avoid such data.

Sample same as before, lets say for offset number 664889 has key teamId_seq and value 100. which is not required in offsets as data.

Please suggest on how to proceed on this.

For null values, i resolved by adding below in connector config. Pls let me know if it is also correct or not.

“transforms”: “ignoreDeletes”,
“transforms.ignoreDeletes.type”: “com.couchbase.connect.kafka.transform.DropIfNullValue”

Is the requirement that you remove some fields from a document before publishing it to a Kafka topic?

For example, you have a document in Couchbase that looks something this:

{
    "greeting": "hello",
    "teamId_seq": 100
}

and you want the Kafka record to look like this:

{
    "greeting": "hello"
}

If that’s correct, I would recommend using a Single Message Transform (SMT) to remove the unwanted fields. If the standard transforms don’t do what you need, you can write your own.

Thanks,
David

For null values, i resolved by adding below in connector config. Pls let me know if it is also correct or not.

“transforms”: “ignoreDeletes”,
“transforms.ignoreDeletes.type”: “com.couchbase.connect.kafka.transform.DropIfNullValue”

That transform means, “If a record has no value, then don’t publish it to the Kafka topic.” If that’s what you want, then yes, it’s good :slight_smile:

Ok, Thank you for the confirmation

Hi David,

Looks like yes, some documents have only values like below
{
“teamId_seq”: 100
}
Need to avoid such documents

Thanks for clarifying. A custom Single Message Transform (SMT) could look at the document, and either remove fields or prevent the message from being published.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.