Eventing timer function not working after same doc created

Rohat,

First, I am sorry that things still do not work for you, I gave you a complete working example fully tested that would also log errors. Please read this post and utilize the files I supply at the end of this post in the ZIP archive.

In the future I would appreciate it if you provide as much information as possible including the Application log (on Linux it is located in the directory /opt/couchbase/var/lib/couchbase/data/@eventing/ ) and also capture the output of the python script/webserver too and provide that if you have any questions as I have no visibility into your system other than what you provide.

In my example you can provide a timer in the “past” it will indeed process, I inserted it into my Function’s source bucket ‘register’ with KEY Type::1

{
"type": "Type",
"id": 1,
"startDate": "2020-06-12T10:32:00",
"endDate":  "2020-06-12T10:33:00"
}

I also don’t even know what statistics you are looking at or the document count, for example the Eventing metadata bucket (for me this is “meta”)

First for the bucket “meta” under Enterprise Edition 6.5.1 build 6299 I expect in an idle or quiescent state to see 2048 documents (1024 for the Eventing function plus another 1024 because the function uses timers)

Next for the bucket “meta” even if the system is idle a scanner executing every 15 seconds is constantly run to find items (timers) ready to fire. So you will get ops form this I will show two image of this


The other bucket involved in my “test” is called “register” so lets look at the ops/sec. on that bucket - we expect zero or close to zero as I only have one test document in that bucket (which I inserted and then I mutated again).


Some important points:

Okay back to my test Function in my earlier post (I was making two cURL calls in scheduleStartTimer and scheduleEndTimer) but this doesn’t really impact anything (nor would it cause an issue with the high Ops/Sec you claim. I added a bit more logging here is the new Eventing function:

/*
{
    "type": "Type",
    "id": 1,
    "startDate": "2020-06-12T10:32:00",
    "endDate":  "2020-06-12T10:33:00"
}

*/
function OnUpdate(doc, meta) {
    try {
        // fail fast if doc type is not Type
        if (doc.type !== 'Type') {
            return
        }
        log('OnUpdate meta:', meta);
        log('OnUpdate doc:', doc);

        const correlationId = create_UUID();

        const startTriggerDate = new Date(doc.startDate);
        let s_id = "rand::" + meta.id + '::' + Math.random();
        let s_context = {
            "docId": meta.id,
            "correlationId": correlationId,
            "s_id": s_id
        };
        createTimer(scheduleStartTimer, startTriggerDate, /*null*/ s_id, s_context);
        log("start timer created doc, startDate, triggerDate ", meta.id, doc.startDate, startTriggerDate, s_context);

        const endTriggerDate = new Date(doc.endDate);
        let e_id = "rand::" + meta.id + '::' + Math.random();
        let e_context = {
            "docId": meta.id,
            "correlationId": correlationId,
            "e_id": e_id
        };
        createTimer(scheduleEndTimer, endTriggerDate, /*null*/ e_id, e_context);
        log("end timer created doc, endDate, triggerDate ", meta.id, doc.endDate, endTriggerDate, e_context);

    } catch (e) {
        log("OnUpdate Exception:", e);
    }
}

function scheduleStartTimer(context) {
    log("scheduleStartTimer timer started ", context);
    const doc = source_bucket[context.docId];
    if (!doc || doc === null || doc == undefined) {
        log("scheduleStartTimer timer fired, but no doc found via context.docId ", context.docId);
        return;
    }

    let counter = 0;

    while (counter < 5) {
        try {

            let now = new Date();
            let startDate = new Date(doc.startDate);

            if (now > startDate) {
                log("scheduleStartTimer has fired curl request started now and startDate", now, startDate);

                var request = {
                    path: '',
                    headers: {
                        'What': 'scheduleStartTimer',
                        'context': context,
                        'AppKey': 'something',
                        'AnotherItem': 'someting else'
                    },
                    body: {
                        id: context.docId,
                        value: doc.type,
                        loop: counter
                    }
                };
                //log('scheduleStartTimer request', request);
                var response = curl('POST', apiUrl, request);
                if (response.status != 200 && response.status != 302) {
                    log('scheduleStartTimer cURL try ' + counter + ' failed for context ' + context, response);
                } else {
                    log('scheduleStartTimer cURL try ' + counter + ' success for context ', context, response);
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleStartTimer Exception:", e);
        }
    }
}

function scheduleEndTimer(context) {
    log("scheduleEndTimer timer started ", context);
    const doc = source_bucket[context.docId];
    if (!doc || doc === null || doc == undefined) {
        log("scheduleStartTimer timer fired, but no doc found via context.docId ", context.docId);
        return;
    }
    let counter = 0;

    while (counter < 5) {
        try {
            let now = new Date();
            let endDate = new Date(doc.endDate);

            if (now > endDate) {
                log("scheduleEndTimer has fired curl request started now and endDate", now, endDate);

                var request = {
                    path: '',
                    headers: {
                        'What': 'scheduleEndTimer',
                        'context': context,
                        'AppKey': 'something',
                        'AnotherItem': 'someting else'
                    },
                    body: {
                        id: context.docId,
                        value: doc.type,
                        loop: counter
                    }
                };
                //log('scheduleEndTimer request', request);
                var response = curl('POST', apiUrl, request);
                if (response.status != 200 && response.status != 302) {
                    log('scheduleEndTimer cURL try ' + counter + ' failed for context ' + context, response);
                } else {
                    log('scheduleEndTimer cURL try ' + counter + ' success for context ', context, response);
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleEndTimer Exception:", e);
        }
    }
}

function create_UUID() {
let dt = new Date().getTime();
const uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
    let r = (dt + Math.random() * 16) % 16 | 0;
    dt = Math.floor(dt / 16);
    return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
return uuid;
}

I injected a 20 ms (0.02 sec) delay in the python web server those lines can be removed or commented out. Below I comment it out and I also added some more debugging as inprinting some of headers I passed to the POST from Eventing.

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
import threading
from time import sleep

class Handler(BaseHTTPRequestHandler):


    def do_GET(self):
        #sleep(0.02)
        self.send_response(200)
        self.end_headers()
        message =  threading.currentThread().getName()
        self.wfile.write(message)
        self.wfile.write('\n')
        request_headers = self.headers
        hdr_what = request_headers.getheaders('What')
        hdr_context = request_headers.getheaders('context')
        print('done with POST ' + ', '.join(hdr_what) + ' ' + ', '.join(hdr_context))
        return


    def do_POST(self):
        #sleep(0.02)
        self.send_response(200)
        self.end_headers()
        message =  threading.currentThread().getName()
        self.wfile.write(message)
        self.wfile.write('\n')
        request_headers = self.headers
        hdr_what = request_headers.getheaders('What')
        hdr_context = request_headers.getheaders('context')
        print('done with POST ' + ', '.join(hdr_what) + ' ' + ', '.join(hdr_context))
        return


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """Handle requests in a separate thread."""

if __name__ == '__main__':
    server = ThreadedHTTPServer(('localhost', 8080), Handler)
    print 'Starting server, use <Ctrl-C> to stop'
    server.serve_forever()

Okay Rohat I’ll I ask is yo run my test case first verbatim, make two buckets “register” for our source data and “meta” for your Eventing metadata. Both should be initially empty.

Then add the (code) I supplied above I called the Functoin Rohat3, make sure you specify a binding alias and an URL alais as shown below:

Deploy the Function Rohat3

Open a shell tool onto your Eventing node (I assume you have a single eventing node for development if you have more than one start the python webserver on each node). I made a file Rohat3.py to create a test webserver so start it as follows:

python Rohat3.py

Now back in the Eventing UI goto the Buckets and select Documents for “register” then hit “ADD DOCUMENT” use a key Type::1 yes that is an uppercase “T”

image

Hit “Save” and add the data below (yes it is in the past) just do a cut-n-paste

{
    "type": "Type",
    "id": 1,
    "startDate": "2020-06-12T10:32:00",
    "endDate":  "2020-06-12T10:33:00"
}

Hit “Save” in about 15 seconds the completed Event and also both timers will have processed the shell tool you have open will show the POSTs as expected.

linuxbrew@couch01:~/blog_curl$ python Rohat3.py
Starting server, use <Ctrl-C> to stop
127.0.0.1 - - [28/Jun/2020 13:24:09] "POST / HTTP/1.1" 200 -
done with POST scheduleStartTimer {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"}
127.0.0.1 - - [28/Jun/2020 13:24:09] "POST / HTTP/1.1" 200 -
done with POST scheduleEndTimer {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"}

Also the Application log will show the processing as expected e.g.

root@couch01:~# cat /opt/couchbase/var/lib/couchbase/data/@eventing/Rohat3.log
2020-06-28T13:23:57.312-07:00 [INFO] "OnUpdate meta:" {"cas":1593375837222666200,"id":"Type::1","expiration":0,"flags":33554438,"vb":634,"seq":13}
2020-06-28T13:23:57.312-07:00 [INFO] "OnUpdate doc:" {"type":"Type","id":1,"startDate":"2020-06-12T10:32:00","endDate":"2020-06-12T10:33:00"}
2020-06-28T13:23:57.314-07:00 [INFO] "start timer created doc, startDate, triggerDate " "Type::1" "2020-06-12T10:32:00" "2020-06-12T17:32:00.000Z" {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"}
2020-06-28T13:23:57.315-07:00 [INFO] "end timer created doc, endDate, triggerDate " "Type::1" "2020-06-12T10:33:00" "2020-06-12T17:33:00.000Z" {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"}
2020-06-28T13:24:09.106-07:00 [INFO] "scheduleStartTimer timer started " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"}
2020-06-28T13:24:09.107-07:00 [INFO] "scheduleStartTimer has fired curl request started now and startDate" "2020-06-28T20:24:09.107Z" "2020-06-12T17:32:00.000Z"
2020-06-28T13:24:09.110-07:00 [INFO] "scheduleStartTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:24:09 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}
2020-06-28T13:24:09.111-07:00 [INFO] "scheduleEndTimer timer started " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"}
2020-06-28T13:24:09.111-07:00 [INFO] "scheduleEndTimer has fired curl request started now and endDate" "2020-06-28T20:24:09.111Z" "2020-06-12T17:33:00.000Z"
2020-06-28T13:24:09.113-07:00 [INFO] "scheduleEndTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:24:09 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}
root@couch01:~#

Now edit your document in the bucket “register” and put it about 1 or 2 minutes in the future it is S28/Jun/2020 13:34:04 for me so I will apply the following by updating both startDate and endDate to be in the future.

{
  "type": "Type",
  "id": 1,
  "startDate": "2020-06-28T13:35:00",
  "endDate": "2020-06-28T13:36:00"
}

The new data for the python web server to the console is as follows:

127.0.0.1 - - [28/Jun/2020 13:35:07] "POST / HTTP/1.1" 200 -
done with POST scheduleStartTimer {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"}
127.0.0.1 - - [28/Jun/2020 13:36:03] "POST / HTTP/1.1" 200 -
done with POST scheduleEndTimer {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"}

And the new Application log data is as follows:

2020-06-28T13:34:08.311-07:00 [INFO] "OnUpdate meta:" {"cas":1593376448244809700,"id":"Type::1","expiration":0,"flags":33554438,"vb":634,"seq":16}
2020-06-28T13:34:08.311-07:00 [INFO] "OnUpdate doc:" {"type":"Type","id":1,"startDate":"2020-06-28T13:35:00","endDate":"2020-06-28T13:36:00"}
2020-06-28T13:34:08.314-07:00 [INFO] "start timer created doc, startDate, triggerDate " "Type::1" "2020-06-28T13:35:00" "2020-06-28T20:35:00.000Z" {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"}
2020-06-28T13:34:08.315-07:00 [INFO] "end timer created doc, endDate, triggerDate " "Type::1" "2020-06-28T13:36:00" "2020-06-28T20:36:00.000Z" {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"}
2020-06-28T13:35:07.113-07:00 [INFO] "scheduleStartTimer timer started " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"}
2020-06-28T13:35:07.114-07:00 [INFO] "scheduleStartTimer has fired curl request started now and startDate" "2020-06-28T20:35:07.114Z" "2020-06-28T20:35:00.000Z"
2020-06-28T13:35:07.116-07:00 [INFO] "scheduleStartTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:35:07 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}
2020-06-28T13:36:03.118-07:00 [INFO] "scheduleEndTimer timer started " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"}
2020-06-28T13:36:03.119-07:00 [INFO] "scheduleEndTimer has fired curl request started now and endDate" "2020-06-28T20:36:03.118Z" "2020-06-28T20:36:00.000Z"
2020-06-28T13:36:03.120-07:00 [INFO] "scheduleEndTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:36:03 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}

Assuming you have the buckets "register’ and ‘meta’ you can unzip the attached file “Rohat3_WIP.zip” and then import the Eventing function “Rohat3.json” and run the python webserver Rohat3.py on your eventing node(s).Rohat3_WIP.zip (2.2 KB)

So Rohat once you get the above to work “exactly” as I have shown above then and only then feel free to modify one thing at a time and test until you get things working in your environment (switch to the cURL you want, add your busness logic, etc.). At each step test for 1 mutation, then try multiple mutations (or scheduled items in bucket register). When your happy pause/comment out the logging statements.

Back to your high ops/sec. question: First let me know is you see the same issue with high Ops/sec. with only one test doc and one function I show you what is expected. Also verify that nothing else is running in your Couchbase system other than the toy test Eventing function we are talking about. Please note if your source bucket has 190K items in it I would indeed expect some delay processing the backlog if you deploy your function from Everything instead of from now. Assume you want to run cURL on all 190K items, if your REST endpoint is slow as say 100 ms. (0.1 seconds) the most you can process with a default setting of three workers per function (and two threads per worker) is 60 mutations/sec. this is because each of the six (6) thread can not complete in less than a 10th of a second. In fact since we have two timer call backs and both invoke curl you will only achieve 30 mutations per second. As you can see cURL can quickly become a performance bottleneck. So if you want to process faster you can up the number of workers from the default of 3 to say 12 in your Functions settings or you need a faster REST endpoint in the 1-5 ms. range. Note you can emulate a slow REST endpoint by uncommenting the sleep(0.02) statements Rohat3.py and adjusting them to your measured endpoint speed.