How to make airflow connect to couchbase ? (Receiving exception couchbase.exceptions.UnAmbiguousTimeoutException)

I am able to connect to my localhost couchbase DB like this with my python script as given below:


from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator

options for a cluster and SQL++ (N1QL) queries

from couchbase.options import ClusterOptions, QueryOptions

cluster = Cluster.connect( ‘couchbase://localhost’, ClusterOptions(
PasswordAuthenticator( ‘Administrator’, ‘password123’ ) ) )
cb = cluster.bucket( ‘beer-sample’ )
coll = cb.default_collection()
result = coll.get( ‘21st_amendment_brewery_cafe’ )
print( result.content_as[dict] )

This worked and am able to fetch result with this script as below :

/Users/shubhendud/PycharmProjects/scripts/venv/bin/python "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py" --multiproc --qt-support=auto --client 127.0.0.1 --port 56011 --file /Users/shubhendud/PycharmProjects/scripts/conn.py
Connected to pydev debugger (build 203.6682.179)
{'name': '21st Amendment Brewery Cafe', 'city': 'San Francisco', 'state': 'California', 'code': '94107', 'country': 'United States', 'phone': '1-415-369-0900', 'website': 'http://www.21st-amendment.com/', 'type': 'brewery', 'updated': '2010-10-24 13:54:07', 'description': 'The 21st Amendment Brewery offers a variety of award winning house made brews and American grilled cuisine in a comfortable loft like setting. Join us before and after Giants baseball games in our outdoor beer garden. A great location for functions and parties in our semi-private Brewers Loft. See you soon at the 21A!', 'address': ['563 Second Street'], 'geo': {'accuracy': 'ROOFTOP', 'lat': 37.7825, 'lon': -122.393}}

Process finished with exit code 0

And I tried the same in an airflow dag
enter image description here

I am running couchbase on docker like enter image description here

I have defined load function for the DAG as

Its a python operator as

    load_task = PythonOperator(
        task_id = 'load',
        python_callable = load,
    )

With function content as

def load():
    cluster = Cluster.connect( 'couchbase://localhost', ClusterOptions(
        PasswordAuthenticator( 'Administrator', 'password123' ) ) )
    cb = cluster.bucket( 'beer-sample' )
    coll = cb.default_collection()
    result = coll.get( '21st_amendment_brewery_cafe' )
    print( result.content_as[dict] )

But instead I am receiving this issue :

7777c5f82916
*** Reading local file: /opt/airflow/logs/dag_id=test_etl_mysqlhook/run_id=manual__2022-10-31T16:08:10.366621+00:00/task_id=load/attempt=1.log
[2022-10-31 16:08:16,934] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: test_etl_mysqlhook.load manual__2022-10-31T16:08:10.366621+00:00 [queued]>
[2022-10-31 16:08:16,947] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: test_etl_mysqlhook.load manual__2022-10-31T16:08:10.366621+00:00 [queued]>
[2022-10-31 16:08:16,948] {taskinstance.py:1356} INFO - 
--------------------------------------------------------------------------------
[2022-10-31 16:08:16,948] {taskinstance.py:1357} INFO - Starting attempt 1 of 1
[2022-10-31 16:08:16,949] {taskinstance.py:1358} INFO - 
--------------------------------------------------------------------------------
[2022-10-31 16:08:16,966] {taskinstance.py:1377} INFO - Executing <Task(PythonOperator): load> on 2022-10-31 16:08:10.366621+00:00
[2022-10-31 16:08:16,971] {standard_task_runner.py:52} INFO - Started process 2923 to run task
[2022-10-31 16:08:16,975] {standard_task_runner.py:79} INFO - Running: ['***', 'tasks', 'run', 'test_etl_mysqlhook', 'load', 'manual__2022-10-31T16:08:10.366621+00:00', '--job-id', '65', '--raw', '--subdir', 'DAGS_FOLDER/mysql_dags.py', '--cfg-path', '/tmp/tmp9fqrnl4b', '--error-file', '/tmp/tmp_7me_lzj']
[2022-10-31 16:08:16,977] {standard_task_runner.py:80} INFO - Job 65: Subtask load
[2022-10-31 16:08:17,188] {task_command.py:369} INFO - Running <TaskInstance: test_etl_mysqlhook.load manual__2022-10-31T16:08:10.366621+00:00 [running]> on host 7777c5f82916
[2022-10-31 16:08:17,270] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=test_etl_mysqlhook
AIRFLOW_CTX_TASK_ID=load
AIRFLOW_CTX_EXECUTION_DATE=2022-10-31T16:08:10.366621+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-31T16:08:10.366621+00:00
[2022-10-31 16:08:27,280] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/mysql_dags.py", line 69, in load
    PasswordAuthenticator( 'Administrator', 'password123' ) ) )
  File "/home/airflow/.local/lib/python3.7/site-packages/couchbase/cluster.py", line 711, in connect
    cluster = Cluster(connstr, *options, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/couchbase/cluster.py", line 99, in __init__
    self._connect()
  File "/home/airflow/.local/lib/python3.7/site-packages/couchbase/logic/wrappers.py", line 101, in wrapped_fn
    raise e
  File "/home/airflow/.local/lib/python3.7/site-packages/couchbase/logic/wrappers.py", line 85, in wrapped_fn
    ret = fn(self, *args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/couchbase/cluster.py", line 105, in _connect
    raise ErrorMapper.build_exception(ret)
couchbase.exceptions.UnAmbiguousTimeoutException: <ec=14, category=couchbase.common, message=unambiguous_timeout (14), C Source=/home/ec2-user/workspace/python/sdk/python-manylinux-wheel-pipeline/couchbase-python-client/src/connection.cxx:199>
[2022-10-31 16:08:27,305] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=test_etl_mysqlhook, task_id=load, execution_date=20221031T160810, start_date=20221031T160816, end_date=20221031T160827
[2022-10-31 16:08:27,321] {standard_task_runner.py:97} ERROR - Failed to execute job 65 for task load (<ec=14, category=couchbase.common, message=unambiguous_timeout (14), C Source=/home/ec2-user/workspace/python/sdk/python-manylinux-wheel-pipeline/couchbase-python-client/src/connection.cxx:199>; 2923)
[2022-10-31 16:08:27,360] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-10-31 16:08:27,403] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check


How do I connect to couchbase db to fetch / write ?

I am able to read mysql records and I have made dataframes of it . I want to write them to couchbase

EDIT : am running airflow and couchbase on docker

Sounds like the issue might be in your docker port mappings.