Large documents are sometimes not saving as expected

I’m trying to save large document using async ops in python 3.9.
Almost 4 out 10 attempts fail to replace the document with fresh document and no error being logged. Is there any issue with the code?

Below code works perfectly for small docs but with documents with size >1MB it fails to update doc but still it shows as success.

import asyncio
from acouchbase.cluster import AsyncCluster


async def create_cluster():
    _authenticator = PasswordAuthenticator(config.db_uid, config.db_pwd)
    _timeout_options = ClusterTimeoutOptions(
        query_timeout=timedelta(seconds=36),
        kv_timeout=timedelta(seconds=36),
        views_timeout=timedelta(seconds=36),
    )

    try:
        cluster = await AsyncCluster.connect(
            cnx_str,
            options=ClusterOptions(
                authenticator=_authenticator, timeout_options=_timeout_options
            ),
            authenticator=_authenticator,
        )
        await cluster.on_connect()
    except Exception as e:
        print("Sentry Logger", e)

async def cnx_bucket():
  cluster = await create_cluster()
  bkt = cluster.bucket("bo")
  await bkt.on_connect()
  scope = bkt.scope("scope")
  collection = scope.collection("data")
  return collection

async def job(docid, doc_data):
  await create_cluster()
  collection = await cnx_bucket()
  collection.replace(docid, doc_data)


if __name__ == "__main__":
  asyncio.run(job("test", {"test": "test"}))

I think the problem was that the “await” on collection.replace() was missing.
The create_cluster() didn’t return the cluster - i suspect this was just a typo.
The create_cluster() call in job() isn’t necessary.

import asyncio
from datetime import timedelta
from acouchbase.cluster import AsyncCluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
from couchbase.options import ClusterTimeoutOptions

async def create_cluster():
    _authenticator = PasswordAuthenticator("Administrator", "password")
    _timeout_options = ClusterTimeoutOptions(
        query_timeout=timedelta(seconds=36),
        kv_timeout=timedelta(seconds=3),
        views_timeout=timedelta(seconds=3),
    )

#    try:
    cluster = await AsyncCluster.connect(
        "localhost",
        options=ClusterOptions(
            authenticator=_authenticator #, timeout_options=_timeout_options
        ),
        authenticator=_authenticator,
    )
    await cluster.on_connect()
    return cluster
#   except Exception as e:
#       print("Sentry Logger", e)

async def cnx_bucket():
  cluster = await create_cluster()
  print(cluster)
  bkt = cluster.bucket("travel-sample")
  await bkt.on_connect()
  scope = bkt.scope("inventory")
  collection = scope.collection("route")
  return collection

async def job(docid, doc_data):
  #await create_cluster()
  collection = await cnx_bucket()
  print(collection)
  result = await collection.replace(docid, doc_data)
  print(result)

if __name__ == "__main__":
   asyncio.run(job("test", {"test": "test"}))

hey sorry, the actual production code has

await collection.replace(docid, doc_data)

any other possibilities for this scenario.

That should work with no issue. How are you checking if the replace took place or not? Getting the doc with the kv api should show it. Getting with the query api would not show changes to indexed properties until they were indexed.

It’s an API that mutates the data and GET call to fetch the updated data.

We rarely use N1QL, its always kvops for all the data ops. Also, a few times the update would really succeed.
Would there be a way to know if there is an error while updating because I don’t see exceptions being caught?

How would you know if the replace failed vs replacing with the exact same document?

I update some field values in the document using mutation request and when querying the same document it shows old data and this happens at least 7 out of 10 times. Since the exception is never caught we never noticed it very lately

Query will show the indexed data. So until the new data is indexed, you’ll see the old indexed data. If you retrieve the document by id in the kv-api (or the Bucket tab in the webconsole), it will have show the current data.

If the replace() fails there will be an exception and you will see it unless you specifically catch it and ignore it.

collection = await cnx_bucket()
collection.replace(docid, doc_data)

Don’t you need to await collection.replace() ?

Hey hi, sorry for the delay. As I said the production code already has await in the code, however, I have pasted the updated code which is more look alike of the code running on the server.

This code never throws an exception or error but 7 out of 10 times if fails to update/replace docs that are more than 3 MB.

Let me know your opinions on this…

import asyncio
from datetime import timedelta

from acouchbase.cluster import AsyncCluster
from backend.config import config
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions
from couchbase.options import ClusterTimeoutOptions

CLUSTER = None


async def create_cluster():
    global CLUSTER
    if CLUSTER:
       return CLUSTER

    _authenticator = PasswordAuthenticator(config.db_uid, config.db_pwd)
    _timeout_options = ClusterTimeoutOptions(
        query_timeout=timedelta(seconds=36),
        kv_timeout=timedelta(seconds=36),
        views_timeout=timedelta(seconds=36),
    )

    try:
        cluster = await AsyncCluster.connect(
            cnx_str,
            options=ClusterOptions(
                authenticator=_authenticator, timeout_options=_timeout_options
            ),
            authenticator=_authenticator,
        )
        await cluster.on_connect()
        CLUSTER = cluster
    except Exception as e:
        print("Sentry Logger", e)
    return CLUSTER


async def cnx_bucket():
  cluster = await create_cluster()
  bkt = cluster.bucket("bo")
  await bkt.on_connect()
  scope = bkt.scope("scope")
  collection = scope.collection("data")
  return collection


async def job(docid, doc_data):
  collection = await cnx_bucket()
  for _ in range(3):
    try:
      await collection.replace(docid, doc_data)
      break
    except CouchbaseException:
      continue
    except Exception:
      print("Sentry Logger", e)
      break


if __name__ == "__main__":
  asyncio.run(job("test", {"test": "test"}))

This code never throws an exception

Your code ignores the exception.

except CouchbaseException:
  continue

Insert 1000 documents of 3MB each of ‘0000…’, ‘1111…’; fetch and compare result,
Replace 1000 documents of 3MB each with ‘1111…’, ‘2222…’, fetch and compare result.

Replace “localhost” with your couchbase server,
Replace “company” with your bucket name,
Replace “password” with your password.
Replace “my_scope” with your scope.
Replace “my_collection” with your collection.
Run once.
Replace insert=True with insert=False.
Run again.


import asyncio
from datetime import timedelta

from acouchbase.cluster import AsyncCluster
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions
from couchbase.options import ClusterTimeoutOptions

CLUSTER = None


async def create_cluster():
    global CLUSTER
    if CLUSTER:
       return CLUSTER

    _authenticator = PasswordAuthenticator("Administrator", "password")
    _timeout_options = ClusterTimeoutOptions(
        query_timeout=timedelta(seconds=36),
        kv_timeout=timedelta(seconds=36),
        views_timeout=timedelta(seconds=36),
    )

    try:
        cluster = await AsyncCluster.connect(
            "couchbase://localhost",
            options=ClusterOptions(
                authenticator=_authenticator, timeout_options=_timeout_options
            ),
            authenticator=_authenticator,
        )
        await cluster.on_connect()
        CLUSTER = cluster
    except Exception as e:
        print("Sentry Logger", e)
    return CLUSTER


async def cnx_bucket():
  cluster = await create_cluster()
  bkt = cluster.bucket("company")
  await bkt.on_connect()
  scope = bkt.scope("my_scope")
  collection = scope.collection("my_collection")
  return collection


async def job(insert):
  collection = await cnx_bucket()
  if(insert):
    offset=0;
  else:
    offset=1;
  for _ in range(1000):
    doc_data =  {"test": ''.ljust(3*1024*1024,str((_+offset) % 10))}
    if(insert):
      await collection.upsert(str(_), doc_data)
    else:
      await collection.replace(str(_), doc_data)
    result = await collection.get(str(_))
    if (result.value.get('test') != doc_data.get('test')): 
      print(doc_data.get('test')+ ' ' +str(result.value.get('test')));

if __name__ == "__main__":
  asyncio.run(job(insert=True))

Thanks for all the suggestions, we were able to identify and resolve the issue.

Hi @nandu - What did the issue turn out to be? I’m collecting information to improve the error messages, diagnostics, documentation.

  • Mike

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.