How to use Kafka SMT to modify topic key created by couchbase source connector

I’m trying to use the SMT functions to replace the UUID message key the couchbase source connector generates with a field from within the message. This is what I’ve placed in the properties file for the connector:
‘’’
“transforms”: “ValueToKey, extractKey”,
“transforms.ValueToKey.type”:“org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.ValueToKey.fields”:“UserID”,
“transforms.extractKey.type”:“org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.extractKey.field”:“UserId”
‘’’
but so far this seems to do nothing. I tried to pattern the above from this doc: ExtractField | Confluent Documentation

1 Like

Hi Larry,

UUID message key the couchbase source connector generates

The source connector uses the Couchbase document ID as the Kafka record key. (If you’re seeing some other behavior, please let me know so we can fix the bug). One way forward might be to make the Couchbase document ID the same as the user ID. But let’s assume that’s not what you want, and address the SMT issue.

There’s a section buried in the connector documentation about
Using RawJsonSourceHandler with SMTs:

As a performance optimization, RawJsonSourceHandler and its cousin RawJsonWithMetadataSourceHandler create Kafka Connect records whose values are byte arrays. If you wish to use these handlers together with transforms that modify document content, the record value must be converted from a byte array to a compatible format. To do this, include the DeserializeJson transform as the first in the chain and set value.converter to JsonConverter instead of ByteArrayConverter like so:

couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

transforms=deserializeJson,someOtherTransform
transforms.deserializeJson.type=com.couchbase.connect.kafka.transform.DeserializeJson
transforms.someOtherTransform.type=...

One other important detail: when a document is deleted in Couchbase, the source connector publishes a record with a null value. The ValueToKey transform requires a non-null record value, and will blow up if it sees one. To prevent that explosion, you can add a transform that drops [prevents publication of] records that have null values.

Putting this together with your existing transform definitions, you’d end up with something like:

{
  "couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "transforms": "ignoreDeletes,deserializeJson,valueToKey,extractKey",
  "transforms.ignoreDeletes.type": "com.couchbase.connect.kafka.transform.DropIfNullValue",
  "transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
  "transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.valueToKey.fields": "UserID",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "UserID"
  // plus the rest of your connector properties
}

Thanks,
David

We’re making some progress but I’m not yet reached the goal.

2 strange things are happening:

  • the value of the message key is now getting set to “null”
  • The email field in the message is getting set to " “Email”: “PLACEHOLDER:78f342ea-b00d-4725-ad61-684327d4b4d9”

Once I’ve completed my transformation do I need to convert the value and/or key to a byte array?

This is what my connector properties transform section looks like now:
‘’’
couchbase.source.handler:com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler

value.converter = org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable = false

transforms = ignoreDeletes,deserializeJson,valueToKey,extractKey
transforms.ignoreDeletes.type = com.couchbase.connect.kafka.transform.DropIfNullValue
transforms.deserializeJson.type = com.couchbase.connect.kafka.transform.DeserializeJson
transforms.valueToKey.type = org.apache.kafka.connect.transforms.ValueToKey
transforms.valueToKey.fields = UserID
transforms.extractKey.type = org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractKey.field = UserId
‘’’
When I tried adding this below to the end of the transformation commands it throws and exception and stops.

‘’’
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
‘’’

Hi Larry,

This happens if the field is not found. Please check the field name, and note that it’s case-sensitive. In one place your configuration refers to it as “UserID” and another as “UserId”. EDIT: SORRY, THIS IS MY FAULT, since I screwed up the capitalization in my previous post. I have edited it to fix the mismatch.

That’s very strange. Can you inspect the document in Couchbase to verify the original content is what you expect? What do you see if you comment-out all the transforms except for ignoreDeletes and deserializeJson? What do you see if you spy on the contents of the topic?

No, I don’t believe that’s necessary.

Indeed. There can be only one value converter. Since the DeserializeJson transform was applied, JsonConverter is the one to use.

Thanks,
David

Changed the capitalization for the UserID and now it works and I even understand why it works.

And of course now that I’m trying to reproduce the problem with the email getting changed I can’t get it to repeat it, Interestingly there are several records now in the database that exhibits this “placeholder” change to the email field for the records I was testing with the last few days.

Thank you for your help. It’s on to the next part of my data stream!

1 Like