How to generate message schema when sourcing a topic

Is there some special trick to getting the couchbase source connector to generate and add the schema for JSON messages? I have the following value settings in the connector property file:
‘’’
value.converter = org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable = true
key.converter.schemas.enable = true

transforms = ignoreDeletes,deserializeJson
transforms.ignoreDeletes.type = com.couchbase.connect.kafka.transform.DropIfNullValue
transforms.deserializeJson.type = com.couchbase.connect.kafka.transform.DeserializeJson
‘’’

but that’s not cutting it, It;s working for the KEY object but not the topic message VALUE.

Hi Larry,

Since Couchbase doesn’t know anything about the schema, there’s not much the connector can do on its own. There’s a DefaultSchemaSourceHandler, but its schema is just for document metadata, and the document itself is passed as a blob. One option would be to use it as a template for a custom SourceHandler that defines a schema tailored to your document content.

Another option would be to use KSQL to apply the schema.

Thanks,
David

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
“org.apache.kafka.common.serialization.StringSerializer”);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
“io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer”);
props.put(“schema.registry.url”, “http://127.0.0.1:8081”);

Producer<String, User> producer = new KafkaProducer<String, User>(props);

String topic = “testjsonschema”;
String key = “testkey”;
User user = new User(“John”, “Doe”, 33);

ProducerRecord<String, User> record
= new ProducerRecord<String, User>(topic, key, user);
producer.send(record).get();
producer.close();