Decode message with kafka couchbase connector

I just developed a java program to work with Kafka Couchbase connector for consuming the data from couchbase and written producer as well to write the consumer data to a file.

So to do this i referred a code from this following github repository.

Actually everything works expect, i’m unable to decipher the json which i send from producer.

         public byte[] toBytes(final DCPEvent dcpEvent) {
                     if (dcpEvent.message() instanceof MutationMessage) {
                MutationMessage message = (MutationMessage) dcpEvent.message();
                return message.key().getBytes();
            else {
                return dcpEvent.message().toString().getBytes();

How to exactly decode the json data in the consumer? For string data this code work absolutely fine, but while reading the dcpEvent data we are not able to. Please help me out to address this issue, following is the consumer code

public void handleMessage(long offset, byte[] bytes) {
    String message = new String(bytes);
    DCPEvent dcpEvent= new DCPEvent();
    System.out.println(String.valueOf(offset) + ": " + new String(bytes));

You can use jackson object mapper for example:

ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(bytes);

and then use JsonNode APIs to access properties.

Thanks @avsej But it is not working.I think no problem with the consumer code
I have typed string message in producer console.I could see that string message in consumer console.Is there any other way to resolve this issue?

Consumer Console:

Consumer code:
System.out.println(String.valueOf(offset) + ": " +new String(bytes)+": "+bytes);

do you mean it just a string, not JSON encoded into string on the consumer side?

look at the line

                return message.key().getBytes();

in your producer, you are writing only keys into Kafka and skipping the bodies

@avsej, Thanks for the solution. I changed my code to

public byte toBytes(final DCPEvent dcpEvent) {
if (dcpEvent.message() instanceof MutationMessage) {
MutationMessage message = (MutationMessage) dcpEvent.message();
return message.toString().getBytes();
} else {
return dcpEvent.message().toString().getBytes();

Now, the consumer is receiving the key content, etc.
But my concern is, In producer, I’m encoding the json files. Is there any way to decode the bytes I get in consumer?

in producer, you are encoding couchbase structures into JSON strings. Those strings are sent as bytes to Kafka topic.

Kafka consumer should just parse that JSON using any of available parsers.

BTW your new solution is using message.toString().getBytes() which probably does not do what you want, because after all you are getting string representation of the buffer. You can take a look at default JSON encoder implementation

It really helped me a lot.:slight_smile: Thanks a lot for your help @avsej.

Hey guys,

I have Kafka connector to Couchbase setup and my command line Kafka consumer sees messages.
But the problem is that Kafka consumer sees message in this format (“content”:“eyJtZXNzYWdlIjoiaGVsbG8gd29ybGQgbWVzc2FnZSJ9”). My ultimate goal would be to forward these messages from Kafka on to Storm. But I dont know why Kafka is sending messages encoded. How can I decode this message to my original JSON structure which is something like this “1234”, {“message”: “hello world message”}

Appreciate your help.

It is Base64 encoding:

$ echo 'eyJtZXNzYWdlIjoiaGVsbG8gd29ybGQgbWVzc2FnZSJ9' | base64 -d
{"message":"hello world message"}