Introducción
Couchbase Server 8.0 presenta un nuevo controlador de funciones de eventos llamado OnDeploy que permite a los clientes ejecutar la lógica empresarial durante la implementación o reanudación de funciones de eventos sin necesidad de ninguna mutación externa para activarla.
Anteriormente, los clientes con casos de uso que necesitaban ejecutar cualquier lógica antes de que se implementara o reanudara una función de Eventing tenían pocas opciones, como por ejemplo:
- Realizar manualmente la configuración necesaria por su cuenta.
- Automatice la configuración mediante un script externo antes de activar la implementación o reanudación de la función Eventing.
Ambos métodos son engorrosos y dependen de una intervención externa o manual.
Los eventos “implementación” y “reanudación” en el ciclo de vida de la función Eventing marcan los puntos en los que está a punto de comenzar a procesar mutaciones. Esto hace que el OnDeploy Manejador adecuado para inyectar lógica que requiere que se realicen cualquiera de las siguientes actividades:
- Realice comprobaciones previas al vuelo para asegurarse de que el entorno está configurado correctamente.
- Configure cachés (por ejemplo, tablas de consulta) para mejorar la eficiencia.
- Envía, recopila y procesa datos de diferentes servicios de Couchbase y externos.
- “Autoactivar” la función Eventing después de su implementación/reanudación modificando al menos un documento en su espacio de claves de origen.
- Esta mutación activará su
OnUpdatey/oOnDeletemanipulador. - Este es un caso de uso avanzado de
OnDeployporque, tradicionalmente, la ejecución de la función Eventing se había restringido para que solo se activara cuando se producían cambios en su espacio de claves de origen por parte de entidades distintas de la propia función Eventing o por la expiración del temporizador.
- Esta mutación activará su
Limitador de velocidad
En esta publicación, crearemos un limitador de velocidad robusto utilizando el algoritmo de cubeta de fichas y el servicio Eventing de Couchbase. A lo largo del proceso, obtendrá experiencia práctica con el nuevo controlador OnDeploy y descubrirá cómo Eventing simplifica la integración con otros servicios de Couchbase.
Diseño de alto nivel
Escala multidimensional
El clúster de 6 nodos debe tener las siguientes asignaciones de servicios a nodos:
| N.º. | Número de nodo | Servicio(s) |
|---|---|---|
| 1. | 0 | Datos |
| 2. | 1 | Datos |
| 3. | 2 | Datos |
| 4. | 3 | Eventos, consulta |
| 5. | 4 | Eventos, consulta |
| 6. | 5 | Indexación |
Algunos puntos a tener en cuenta sobre la configuración del clúster:
- Utilizamos tres nodos de servicio de datos para garantizar la redundancia mediante la replicación de datos.
- Ejecutamos el servicio Eventing en dos nodos para aumentar el paralelismo de la función Eventing.
- Esto se hace además de contar con múltiples trabajadores para nuestra función Eventing.
- Los servicios que requieren un uso intensivo de la CPU, como Data y Eventing, deben mantenerse en nodos de clúster separados.
- Necesitamos el servicio Query porque ciertas operaciones, como eliminar todos los documentos de un espacio de claves, se pueden realizar cómodamente a través de este servicio.
- Necesitamos el servicio de indexación para crear índices primarios para el bucket efímero.
Espacios de claves
Nuestro clúster debe tener los siguientes espacios de claves:
| N.º. | Nombre del cubo | Tipo cubo | Alcance | Colección | Descripción |
|---|---|---|---|---|---|
| 1 | por defecto | Couchbase | Por defecto | Por defecto |
|
| _sistema | _móvil | – | |||
| _sistema | _query | – | |||
| 2 | limitador de velocidad | Efímero | Por defecto | Por defecto | – |
| _sistema | _móvil | – | |||
| _sistema | _query | – | |||
| mi-llm | límites | Guarde el documento que contiene la asignación de límites por nivel y tasa. | |||
| mi-llm | rastreador | Guarde los documentos del mostrador de la tienda para llevar un registro del uso de cada usuario. | |||
| 3 | mi-llm | Couchbase | Por defecto | Por defecto | – |
| _sistema | _móvil | – | |||
| _sistema | _query | – | |||
| usuarios | cuentas | Almacenar los datos de la cuenta de usuario, incluido su “nivel”. | |||
| usuarios | eventos | Almacena los eventos de usuario que deben tener una limitación de velocidad basada en el “nivel” del usuario. |
Nota:
- En
limitador de velocidadel cubo esEfímeroporque no necesitamos conservar esos datos. Utilizamos los datos de ese depósito para realizar un seguimiento del uso del límite de velocidad por usuario y para almacenar en caché la asignación entre niveles y límites de velocidad.
Puntos finales externos de la API REST
La función Eventing interactúa con puntos finales API externos que proporcionan las siguientes funcionalidades:
- Proporcione la última asignación de niveles a límites de velocidad.
- Aceptar modificaciones en la asignación de niveles a límites de tarifas.
- Acepta las solicitudes entrantes que estén dentro del límite de frecuencia del usuario mediante nuestra función Eventing.
- En este proyecto, nuestro punto final mantendrá un recuento de estas solicitudes entrantes.
Este recuento nos ayudará a verificar si nuestra aplicación limitadora de velocidad funciona como se espera.
- En este proyecto, nuestro punto final mantendrá un recuento de estas solicitudes entrantes.
- Proporcione el número de solicitudes entrantes que nuestra función Eventing ha considerado que están dentro del límite de tasa del usuario.
El enlace a la especificación OpenAPI de los puntos finales de la API anterior se puede encontrar en aquí.
Nota: El programa Go que aloja estos puntos finales de la API REST se encuentra vinculado en el Apéndice.
Diseño de bajo nivel
Configuración de la función de eventos
A continuación se muestra una lista de todos los cambios que debemos realizar en la configuración predeterminada de la función Eventing.
Espacios de claves
| N.º. | Campo | Valor |
|---|---|---|
| 1. | Ámbito de la función | predeterminado._predeterminado |
| 2. | Espacio de claves de origen | mis-usuarios-llm.eventos |
| 3. | Espacio de almacenamiento de eventos | predeterminado._predeterminado._predeterminado |
Ajustes
| N.º. | Campo | Valor |
|---|---|---|
| 1. | Nombre | mi-limitador-de-velocidad-llm |
| 2. | Límite de la fuente de implementación | A partir de ahora |
| 3. | Descripción | Esta función Eventing actúa como un limitador de velocidad. |
| 4. | Trabajadores | 10 |
Fijaciones para cubetas
| N.º. | Cubo Alias |
Espacio clave | Acceda a | ||
|---|---|---|---|---|---|
| Cubo | Alcance | Colección | |||
| 1. | Cuentas de usuario | mi-llm | usuarios | cuentas | Solo lectura |
| 2. | limitador de velocidad | limitador de velocidad | mi-llm | rastreador | Leer y escribir |
| 3. | Límites de nivel | limitador de velocidad | mi-llm | límites | Leer y escribir |
Enlaces URL
| N.º. | Alias de URL | URL | Autenticación | Nombre de usuario | Contraseña |
|---|---|---|---|---|---|
| 1. | llmEndpoint | http://localhost:3054/my-llm | Básico | Eventos | Eventing123 |
| 2. | nivelesPuntoFinal | http://localhost:3054/tiers |
Nota: Las opciones “permitir cookies” y “validar certificado SSL” están desactivadas.
Diagrama completo del flujo de la solicitud

Este diagrama muestra las interacciones entre los controladores de funciones de Eventing, los puntos finales de la API REST externa y los espacios de claves para comportarse como un limitador de velocidad basado en el algoritmo de cubeta de tokens.
En las siguientes secciones, implementaremos el limitador de velocidad paso a paso.
OnDeploy Configurar
Obtener y almacenar los niveles desde el punto final de la API REST externa

Cuando el OnDeploy El controlador comienza a ejecutarse, primero debe obtener la asignación de niveles a límites de velocidad desde el punto final de la API REST externa representado por el nivelesPuntoFinal Vinculación de URL.
La respuesta de la /niveles La API REST externa será un valor JSON que contiene la asignación del nombre del nivel (de tipo Cadena) a un límite de tasa por hora representado por el recuento total de solicitudes permitidas por hora (es decir, total_request_count) (de tipo número).
Almacenamos la asignación de niveles a límites de tasa en el límite de velocidad.mis-llm.límites espacio de claves.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
function OnDeploy(action) { // ... // GET the tiers from the `tiersEndpoint` const response = curl('GET', tiersEndpoint); if (response.status != 200) { throw new Error("Error(Cannot get tiers): " + JSON.stringify(response)); } const tiers = response.body; log("Successfully retrieved the tiers: " + JSON.stringify(tiers)); // Write the tiers to the `tierLimits` keyspace, in the document with ID `limits` tierLimits["limits"] = tiers; // ... // Create a timer to run every 24 hours to refresh the tiers let timeAfter24hours = new Date(); timeAfter24hours.setDate(timeAfter24hours.getDate() + 1); log("Time after 24 hours is: " + timeAfter24hours); createTimer(updateTierCallback, timeAfter24hours, "tier-updater", {}); // ... } // Function to update the user tiers every 24 hours function updateTierCallback(context) { log('From updateTierCallback: timer fired'); // GET the tiers from the `tiersEndpoint` const response = curl('GET', tiersEndpoint); if (response.status != 200) { log("Error(Cannot get tiers): " + JSON.stringify(response)); } else { const tiers = response.body; log("Successfully retrieved the tiers: " + JSON.stringify(tiers)); // Write the tiers to the `tierLimits` keyspace, in the document with ID `limits` tierLimits["limits"] = tiers; } // Create a timer to run every 24 hours to refresh the tiers let timeAfter24hours = new Date(); timeAfter24hours.setDate(timeAfter24hours.getDate() + 1); log("Time after 24 hours is: " + timeAfter24hours); createTimer(updateTierCallback, timeAfter24hours, "tier-updater", {}); } |
Restablecer todos los rastreadores de límite de velocidad al implementar
![]()
En nuestra aplicación, modelamos la desactivación de la función Eventing como un apagado completo; por lo tanto, durante su implementación, eliminamos todos los documentos que registran el uso de los límites de velocidad de los usuarios. Modelamos la pausa como una suspensión temporal de las actividades de limitación de velocidad; por lo tanto, no borramos esos documentos en caso de que se reanude la función Eventing.
¿Te das cuenta de cómo tratamos las operaciones de implementación y reanudación por separado? OnDeploy hace posibles estos casos de uso porque Eventing también pasa un motivo en el campo acción objetar lo OnDeploy handler para especificar si la función Eventing se está implementando o reanudando.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
function OnDeploy(action) { // ... // If we are deploying, then we should delete all the existing document in the keyspace `rateLimiter` if (action.reason === "deploy") { let results = N1QL("DELETE FROM `rate-limiter`.`my-llm`.tracker"); results.close(); log("Deleted all the documents in the `rate-limiter`.`my-llm`.tracker keyspace as we are deploying!"); } // ... } |
Restablecer los límites de velocidad de los usuarios cada hora

Dado que estamos implementando un algoritmo de cubo de fichas, restablecemos los límites de velocidad de los usuarios cada hora utilizando temporizadores, una funcionalidad de Eventing que es fundamental para nuestro caso de uso. Creamos el primer temporizador en el OnDeploy handler para que se active después de una hora. Una vez que se activa la devolución de llamada del temporizador, se creará un nuevo temporizador que se activará después de una hora, y así sucesivamente, creando un temporizador recurrente que se activa cada hora mientras la función Eventing esté implementada.
Observe que este temporizador no requirió ninguna mutación externa para activar la función Eventing y crearlo. Todo esto se hizo durante la implementación/reanudación en el OnDeploy manipulador.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
function OnDeploy(action) { // ... // Create a timer to run every 1 hour to reset user rate limits let timeAfter1Hour = new Date(); timeAfter1Hour.setHours(timeAfter1Hour.getHours() + 1); log("Time after 1 hour is: " + timeAfter1Hour); createTimer(resetRateLimiter, timeAfter1Hour, "rate-limit-resetter", {}); // ... } // Function to reset the rate limits for all users every 1 hour function resetRateLimiter(context) { log('From resetRateLimiter: timer fired'); let results = N1QL("DELETE FROM `rate-limiter`.`my-llm`.tracker"); results.close(); // Create a timer to run every 1 hour to reset user rate limits let timeAfter1Hour = new Date(); timeAfter1Hour.setHours(timeAfter1Hour.getHours() + 1); log("Time after 1 hour is: " + timeAfter1Hour); createTimer(resetRateLimiter, timeAfter1Hour, "rate-limit-resetter", {}); } |
Actualizar los límites de tarifas diarios

Diseñamos nuestra aplicación para permitir cambios en los límites de velocidad cada 24 horas; por lo tanto, nuestra función Eventing debe extraer la última asignación de niveles a límites de velocidad del punto final de la API REST externa cada 24 horas para garantizar que se apliquen los límites de velocidad correctos a nuestros usuarios.
Una vez más, utilizamos temporizadores recurrentes para obtener la última asignación de niveles a límites de velocidad cada 24 horas.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
function OnDeploy(action) { // ... // Create a timer to run every 24 hours to refresh the tiers let timeAfter24hours = new Date(); timeAfter24hours.setDate(timeAfter24hours.getDate() + 1); log("Time after 24 hours is: " + timeAfter24hours); createTimer(updateTierCallback, timeAfter24hours, "tier-updater", {}); // ... } // Function to update the user tiers every 24 hours function updateTierCallback(context) { log('From updateTierCallback: timer fired'); // GET the tiers from the `tiersEndpoint` const response = curl('GET', tiersEndpoint); if (response.status != 200) { log("Error(Cannot get tiers): " + JSON.stringify(response)); } else { const tiers = response.body; log("Successfully retrieved the tiers: " + JSON.stringify(tiers)); // Write the tiers to the `tierLimits` keyspace, in the document with ID `limits` tierLimits["limits"] = tiers; } // Create a timer to run every 24 hours to refresh the tiers let timeAfter24hours = new Date(); timeAfter24hours.setDate(timeAfter24hours.getDate() + 1); log("Time after 24 hours is: " + timeAfter24hours); createTimer(updateTierCallback, timeAfter24hours, "tier-updater", {}); } |
OnUpdate Configurar
Manejo de eventos de usuario

Nuestra aplicación escuchará los documentos de solicitud entrantes desde el mis-usuarios-llm.eventos keyspace. Estos documentos tienen un identificador único y contienen datos en el formato:
|
1 2 3 4 5 6 |
{ "user_id": String, "respond_to": String, "payload": String, "header": String } |
Si la solicitud del usuario se encuentra dentro de su límite de velocidad, todos los datos del documento, excepto el user_id, se enviarán al punto final protegido por nuestro limitador de velocidad.
Lectura del nivel del usuario

Cuando el OnUpdate El controlador se activa por un documento de evento de usuario entrante del paso anterior, debemos extraer el usuario_id campo de él.
Utilización de la usuario_id campo, recuperaremos el documento con los detalles de la cuenta del usuario desde el mis-usuarios.cuentas keyspace. De este documento, extraeremos el valor de nivel campo.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
function OnUpdate(doc, meta, xattrs) { // ... const user_id = doc.user_id; let done = false; while (!done) { // Get the tier of the `user_id` let userAccountsMeta = { "id": user_id }; let userAccountsResult = couchbase.get(userAccounts, userAccountsMeta, { "cache": true }); if (!userAccountsResult.success) { throw new Error("Error(Unable to get the user's details): " + JSON.stringify(userAccountsResult)); } const tier = userAccountsResult.doc.tier; // ... } // ... } |
Lectura de los límites de tarifa del nivel

Obtenemos los límites de velocidad para el nivel del usuario a partir del documento que contiene la asignación de niveles a límites de velocidad, ubicado en el límite de velocidad.mis-llm.límites espacio de claves.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
function OnUpdate(doc, meta, xattrs) { // ... while (!done) { // ... // Get the rate limit for the tier let tierLimitsMeta = { "id": "limits" }; let tierLimitsResult = couchbase.get(tierLimits, tierLimitsMeta, { "cache": true }); if (!tierLimitsResult.success) { throw new Error("Error(Unable to get the tier limits): " + JSON.stringify(tierLimitsResult)); } const rateLimit = tierLimitsResult.doc[tier]; // ... } // ... } |
Decida si desea limitar la velocidad de la solicitud y actualizar el uso del límite de velocidad del usuario.

Dado el límite de velocidad del usuario, ahora verificamos su uso actual para decidir si puede realizar una solicitud. El limitador de velocidad rastrea el uso de cada usuario con un documento contador en el rate-limit.my-llm.tracker keyspace. Creamos este documento contador bajo demanda para cada usuario_id para almacenar el recuento de solicitudes de ese usuario para la ventana actual, antes de que se actualice el límite del token bucket. Si el uso de un usuario alcanza o supera el límite de su nivel, bloqueamos su solicitud. De lo contrario, la reenviamos al punto final protegido. Por último, actualizamos el uso del límite de velocidad del usuario en su documento contador correspondiente en el rate-limit.my-llm.tracker espacio de claves.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
function OnUpdate(doc, meta, xattrs) { // ... while (!done) { // ... // Try to get the rate limit count for the `user_id` const userIDMeta = { "id": user_id }; const result = couchbase.get(rateLimiter, userIDMeta); // If the rate limit count for the `user_id` does not exist. Try to create it. while (!result.success) { couchbase.insert(rateLimiter, userIDMeta, { "count": 0 }); result = couchbase.get(rateLimiter, userIDMeta); } // Assign the counter document's `count` and `meta` to local variables for convenience const counterDocCount = result.doc.count; const counterDocMeta = result.meta; // Check if the counter has hit the rate limit // We use >= instead of == to handle the edge case where the tier limits have reduced // but the tier tracker documents have not yet been deleted. if (counterDocCount >= rateLimit) { log("User with ID '" + user_id + "' hit their rate limit of " + rateLimit + "!"); done = true; continue; } // Update the count in the document let res = couchbase.mutateIn(rateLimiter, counterDocMeta, [ couchbase.MutateInSpec.replace("count", counterDocCount + 1), ]); // ... } // ... } |
Envía la solicitud “dentro del límite” al punto final deseado.

Las solicitudes de los usuarios, dentro de los límites de velocidad de su nivel correspondiente, se envían al punto final de la API REST protegido por nuestro limitador de velocidad.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
function OnUpdate(doc, meta, xattrs) { // ... while (!done) { // ... done = res.success; if (done) { // POST the request to the `llmEndpoint` delete doc.user_id; const response = curl('POST', llmEndpoint, doc); if (response.status != 200) { throw new Error("Error(MyLLM endpoint is not working): " + response.status); } } } // ... } |
Probando nuestra aplicación
Ahora que hemos implementado nuestro limitador de velocidad, podemos crear el entorno para ejecutarlo y probarlo:
- Ejecute el programa Go para cargar un conjunto de muestra de 100 usuarios.
- Ejecute el programa Go para iniciar el servidor HTTP que proporciona las API REST externas con las que interactúa nuestra función Eventing.
- Implemente la función Eventing.
- Para activar la función Eventing, debemos ejecutar el programa Go para cargar los documentos de eventos del usuario en su espacio de claves de origen, es decir,
mis-usuarios-llm.eventos. - Para obtener el número de solicitudes de usuario que llegan al punto final de la API REST externa protegido por nuestro limitador de velocidad, debe enviar un
GETsolicitud al/mi-llmpunto final.
Conclusión
Esta publicación mostró cómo utilizar el nuevo controlador Couchbase Eventing., OnDeploy, para crear un limitador de velocidad de cubeta de fichas, lo que pone de relieve la potencia y flexibilidad de Couchbase Eventing para desarrollar soluciones integradas e independientes.
En términos más generales, esto demuestra un cambio en el desarrollo de aplicaciones: crear aplicaciones a partir de la propia base de datos. Esto permite ofrecer soluciones personalizadas para diversos requisitos, todo ello dentro de la plataforma Couchbase.
Anexo
Código completo de Eventing: Haga clic aquí
Código Go del servidor: Haga clic aquí
Código Go del cliente: Haga clic aquí
Código Go del cargador de usuario: Haga clic aquí