Couchbase connector to elastic search - multiple indexes, pipeline problems

hello dear all,
1-
we aim to have incoming documents from a bucket to be indexed into more than one index; each for the use of a separate department.
what solution do you recommend?
as far as i know, it is impossible to have a list of indexes in .toml.
can we connect the connector to logstash?
or should i bring up two connectors per bucket?

2-
on elasticsearch side, we have defined some pipelines do add some new fields to incoming documents. this case works well; however, when we change the pipeline to remove some fields, document is moved to cbes-rejected.
the pipelines are implemented using painless lang.
would you please lead us why this happens? and how can we solve it?

thank you and best regards,

Hi Ehsan,

we aim to have incoming documents from a bucket to be indexed into more than one index

I don’t have experience with Logstash, so I don’t know if that would work. The typical recommendation is to bring up two connectors.

when we change the pipeline to remove some fields, document is moved to cbes-rejected.

In general, pipelines with Painless scripts should work. If a document ends up in cbes-rejects index, it’s because Elasticsearch reported an non-transient error when the connector tried to index the document.

The first place I’d look would be the error field of the cbes-rejects entry associated with the failed document. The next place I’d look would be the Elasticsearch log.

As for possible causes…

Is it possible that the rejected documents do not have the field the Painless script is attempting to remove?

See if you can index one of the rejected documents manually using the Elasticsearch REST API (don’t forget the pipeline query parameter :slight_smile: ). This will help verify the pipeline script is working correctly.

Thanks,
David

hello dear david
thank you for your response, and sorry for my latency in response.

for the first issue, i followed your recommendation and brought up another connector, and it works like a charm! thank you! :slight_smile:

for the second issue,
you are exactly true! the cbes-reject reason is null_pointer_exception.
i followed your advice. when i insert a single document into ELK, the pipeline works without null_pointer_exception. but when documents come from couchbase connector, i receive such an error.
below is the way i do test my pipeline in kibana, following my pipeline script. i wonder if you can take a look at them and lead me what i am missing. thank you in advance! :slight_smile:

test method:

PUT test/_doc/1?pipeline=payment_time_correction_test
{
    "doc": {
        ...
    }
}

my pipeline script:

PUT _ingest/pipeline/payment_time_correction
{
  "processors": [
    {
      "drop": {
        "if": """
        if (ctx.doc.class != null)
        {
          if ((ctx.doc.class == 'payment.card') ||
          (ctx.doc.class == 'payment.tempId') ||
          (ctx.doc.class == 'payment.card.temp') ||
          (ctx.doc.class == 'payment.ddebit.contract')) {
            return true;
          }
          return false;
        }
        return false;
        """
      },
      "script": {
        "lang": "painless",
        "source": """
        ZoneId Dushanbe = ZoneId.of('Asia/Dushanbe');
        if (ctx.doc.createdts != null)
        {
            ZonedDateTime createdts = ZonedDateTime.parse(ctx.doc.createdts);
            ctx.doc.localCreatedTime = createdts.withZoneSameLocal(Dushanbe);
        }
        if (ctx.doc.updatedts != null)
        {
            ZonedDateTime updatedts = ZonedDateTime.parse(ctx.doc.updatedts);
            ctx.doc.localUpdatedTime = updatedts.withZoneSameLocal(Dushanbe);
        }
        if (ctx.doc.expireTime != null)
        {
            ZonedDateTime expireTime = ZonedDateTime.parse(ctx.doc.expireTime);
            ctx.doc.localExpireTime = expireTime.withZoneSameLocal(Dushanbe);
        }
        if (ctx.doc.class != null)
        {
          if (ctx.doc.class == 'payment.transaction.card')
          {
            if (ctx.doc.cardInfo != null)
            {
              ctx.doc.remove("cardInfo");
            }
            if (ctx.doc.info != null)
            {
              ctx.doc.remove("info");
            }
            if (ctx.doc.nationalCode != null)
            {
              ctx.doc.remove("nationalCode");
            }
          }
          if (ctx.doc.class == 'payment.transaction.ddebit')
          {
            if (ctx.doc.info != null)
            {
              ctx.doc.remove("info");
            }
            if (ctx.doc.nationalCode != null)
            {
              ctx.doc.remove("nationalCode");
            }
            if (ctx.doc.ip != null)
            {
              ctx.doc.remove("ip");
            }
          }
          if (ctx.doc.class == 'payment.transaction.ipg')
          {
            if (ctx.doc.info != null)
            {
              ctx.doc.remove("info");
            }
            if (ctx.doc.nationalCode != null)
            {
              ctx.doc.remove("nationalCode");
            }
            if (ctx.doc.ip != null)
            {
              ctx.doc.remove("ip");
            }
          }
          if (ctx.doc.class == 'payment.transaction.bill')
          {
            if (ctx.doc.info != null)
            {
              ctx.doc.remove("info");
            }
            if (ctx.doc.nationalCode != null)
            {
              ctx.doc.remove("nationalCode");
            }
            if (ctx.doc.ip != null)
            {
              ctx.doc.remove("ip");
            }
          }
          if (ctx.doc.class == 'payment.transaction.qr')
          {
            if (ctx.doc.info != null)
            {
              ctx.doc.remove("info");
            }
            if (ctx.doc.nationalCode != null)
            {
              ctx.doc.remove("nationalCode");
            }
            if (ctx.doc.ip != null)
            {
              ctx.doc.remove("ip");
            }
          }
        }
        """
      }
    }
  ]
}

Hi Ehsan,

I wish I knew enough about Elasticsearch scripting to offer specific advice. Perhaps the Elasticsearch logs contain a stack trace for the Null Pointer Exception that would pinpoint the line of the script where the problem occurs.

It might also be useful to dump the contents of the document comfing from the Couchbase connector so you can see if it’s differnt than what you expect, maybe using the Debug.Explain?

Thanks,
David

HI Ehsan,

Another possibility: maybe the connector is transforming the document in ways you don’t expect, due to the document structure configuration. If you want the ES document to exactly match the Couchbase document, you can use:

[elasticsearch.docStructure]
  documentContentAtTopLevel = true
  metadataFieldName = ''

Thanks,
David

thank you very much!