Este post ilustra el uso de Couchbase Analytics con el SDK de Couchbase Go. Couchbase Analytics es un nuevo servicio disponible en Couchbase Server 6.0, puedes leer más en https://docs.couchbase.com/server/6.0/analytics/introduction.html.
En este post vamos a utilizar un conjunto de datos del mundo real que es lo suficientemente grande como para no caber en la memoria (al menos en mi máquina local). Vamos a utilizar el Exportación CSV de la Conjunto de datos de trayectos en taxi ecológico en NYC en 2016. Se trata de un conjunto de datos de ~16,4 millones de registros con 23 campos por registro. Puede seguir el proceso y probar la aplicación clonando el proyecto aquío ejecute
ir consiga github.com/chvck/gocb-taxi-análisis . También tendrá que ejecutar
ir consiga ./... si has clonado el proyecto con git.
Como se trata de un archivo CSV, lo primero que hay que hacer es importarlo. Desafortunadamente, este conjunto de datos utiliza formatos de fecha y hora no estándar, por lo que necesitamos utilizar un pequeño script para convertirlos en algo más utilizable. Si has clonado el proyecto puede hacerlo con
1 |
ir ejecute principal.ir --reformatear --csv /ruta/a/taxis.csv |
Esto creará un
2016_Green_Taxi_Trip_Data.csv en el directorio del proyecto. También he aprovechado esta oportunidad para cambiar las cabeceras CSV para hacerlas más amigables con JSON y también añadir un campo de tipo siempre establecido en verde (en caso de que más tarde quisiéramos añadir también el conjunto de datos de taxis amarillos). Durante la conversión también podríamos haber importado los datos pero ya tenemos una gran herramienta en cbimport que podemos utilizar. Cree un cubo llamado taxis con la evicción completa activada (en Advanced bucket settings - no estaremos ejecutando operaciones k/v por lo que el rendimiento k/v no importa tanto en este caso) y luego ejecutar:
1 |
cbimport csv --grupo couchbase://localhost -u usuario -p contraseña -b taxis --infer-tipos -omit-empty -d file:///path/to/2016_Green_Taxi_Trip_Data.csv -l import.log -g green::%vendorID%::#MONO_INCR# |
Cada documento tendrá un identificador único como
verde::1::1000 . Normalmente, estos dos pasos no serían necesarios, ya que nuestros datos ya estarían almacenados en Couchbase.
Antes de poder trabajar con Couchbase Analytics tienes que preparar un conjunto de datos que te permita consultarlos:
1 |
CREAR DATASET alltaxis EN taxis; |
Este conjunto de datos requiere algunos recursos. Si quieres experimentar con un conjunto de datos ligeramente más pequeño en un portátil con pocos recursos, puedes crear un conjunto de datos filtrado que solo rastreará un subconjunto de los documentos de tu cubo:
1 |
CREAR DATASET alltaxis EN taxis DONDE `vendorID` = 1; |
Así obtendrá un conjunto de datos de algo más de 3 millones de documentos.
Una vez que haya creado uno de los conjuntos de datos, deberá inicializarlo activando el procesamiento de conjuntos de datos con:
1 |
CONECTAR ENLACE Local; |
Esto comenzará a rellenar el conjunto de datos que acaba de crear. Puede ver el progreso en la interfaz de usuario, en la columna de conjuntos de datos de la derecha, debajo del nombre del conjunto de datos. Puede seguir trabajando con el conjunto de datos mientras se construye, pero verá resultados diferentes cada vez que ejecute una consulta y la ejecución puede ser un poco más lenta.
Como vamos a hacer análisis de datos, merece la pena investigar un par de cosas. Creo que un buen punto de partida sería conocer el número de viajes en taxi a lo largo del año y poder aplicar varios filtros para ver cosas como las propinas frente a las tarifas.
La base que utilizaremos para nuestras consultas es:
1 |
SELECCIONE DATE_PART_STR(pickupDate, "mes") AS periodo, CONTAR(*) como cuente DESDE alltaxis GRUPO POR DATE_PART_STR(pickupDate, "mes") PEDIR POR periodo; |
Esta consulta extrae el mes como un número (1-12) del archivo pickupDate y mostrando el número de desplazamientos en función del mes. La consulta muestra que marzo es el mes con más desplazamientos y noviembre el que menos. También se observa una tendencia a la baja a lo largo del año. Esperaba que el verano tuviera menos desplazamientos que el resto del año, así que ya he aprendido algo.
En mi máquina, esta consulta tarda unos 24 s en ejecutar el conjunto de datos completo. Ejecutando lo mismo contra el servicio de consulta operacional (a menudo referido simplemente como N1QL, pero ese es el lenguaje) con sólo un índice primario los tiempos se agotan desde la consola de consulta (600s). Podemos ver que para consultas ad hoc en grandes conjuntos de datos Couchbase Analytics es una buena opción, complementando el servicio de consulta operacional N1QL .
Consultas desde una aplicación Golang
Ahora que hemos configurado y comprobado que nuestro conjunto de datos de Analytics funciona, podemos utilizarlo a través del SDK de Go. En la ventana runServer función que tenemos:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
var err error grupo, err = gocb.Conectar(cbConnStr) si err != nil { pánico("Error al conectar con el cluster:" + err.Error()) } grupo.Autentificar(gocb.PasswordAuthenticator{ Nombre de usuario: cbUsername, Contraseña: cbContraseña, }) _, err = grupo.OpenBucket("taxis", "") si err != nil { registro.Fatal(err) } stop := escriba a(chan os.Señal, 1) // Detener el servidor en caso de interrupción señal.Notificar a(stop, os.Interrumpir) srv, err := ejecute() si err != nil { registro.Fatal(err) } fmt.Imprimir("Servidor funcionando en", srv.Dirección) <-stop registro.Imprimir("Parando servidor") srv.Cierre(nil) |
Esto crea una conexión con Couchbase Server y autentica usando el nombre de usuario y la contraseña (estas propiedades pueden ser personalizadas modificando las propiedades en la parte superior del main.go). A continuación abrimos una conexión a nuestro bucket. El resto de la función es el manejo del servidor web. Creamos un canal a la escucha de la señal de interrupción y cuando se dispara, cerramos el servidor http.
Es difícil visualizar y filtrar estos datos en la línea de comandos, por lo que en el código base vinculado hemos añadido una sencilla interfaz gráfica de usuario. El servidor web sirve la página de índice y expone un único punto final para recuperar datos dinámicos. Una vez más, para ejecutar esto utilice
ir ejecute principal.ir y puede acceder al frontend desde
http://localhost:8010 .
Nuestro manejador para el punto final de datos dinámicos tiene este aspecto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func requestHandler(w http.EscritorRespuesta, r *http.Solicitar) { opta por, err := processQueryString(r.URL.Consulta()) si err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) devolver } q := `seleccione DATE_PART_STR(pickupDate, "%s") AS periodo, %s como agregado DESDE alltaxis` q += ` %s GRUPO POR DATE_PART_STR(pickupDate, "%s") PEDIR POR periodo;` q = fmt.Sprintf(q, opta por.Periodo, opta por.Agregado, opta por.Dónde, opta por.Periodo) consulta := gocb.NewAnalyticsQuery(q) resultados, err := grupo.EjecutarAnalyticsQuery(consulta, opta por.Parámetros) si err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) devolver } datos, err := procesarResultados(resultados) si err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) devolver } datos.Dónde = fmt.Sprintf("%s %s, %v", opta por.Agregado, opta por.Dónde, opta por.Parámetros) datos.Consulta = fmt.Sprintf("query = %s, params = %v", q, opta por.Parámetros) datos.TiempoTomado = resultados.Métricas().Tiempo de ejecución.Nanosegundos() js, err := json.Mariscal(*datos) si err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) devolver } w.WriteHeader(200) w.Cabecera().Establecer("Tipo de contenido", "application/json") w.Escriba a(js) } |
Lo que podemos ver aquí es que procesamos la cadena de consulta para extraer el dónde (más parámetros, más sobre esto más adelante), el agregado y el periodo de tiempo. Creamos nuestra consulta como una cadena, incorporando estas propiedades y luego usamos NewAnalyticsQuery para crear un AnalyticsQuery . Para ejecutar la consulta se pasa a grupo.EjecutarAnalyticsQuery . A continuación, los resultados son procesados por procesarResultados antes de enviar la respuesta http. Dónde , TiempoTomado y Consulta también se añaden a la respuesta para que podamos mostrar lo que se ha consultado en el frontend.
Veamos cada una de estas partes con un poco más de detalle. Los parámetros where y aggregate se pasan desde el frontend ya formateados correctamente. La cadena de consulta podría ser algo como
?periodo=hora&mes=5&día=14&agregado=cuente(*)&donde=fareAmount,>,15&donde=consejo,<,1
Lo que ocurre aquí es que el periodo dicta la granularidad de la consulta: un día, un mes o el año entero. Para el caso en que estemos consultando un día necesitamos saber también qué día y qué mes, los parámetros mes y día estarán presentes o no dependiendo del periodo. El agregado es la operación y el campo al que aplicar la operación. En lugar de cuente(*) podría ser SUM(consejos) o AVG(tarifa) etc... Los parámetros where son las cláusulas where individuales a aplicar - se envían como matrices de la forma [campo, operador, valor].
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func whereTimePeriod(periodo cadena, consulta url.Valores) cadena { donde := "" si periodo == "día" { mes := consulta["mes"][0] si mes == "1" { donde = `pickupDate <= "2016-01-31 23:59:59"` } si no si mes == "12" { donde = `pickupDate >= "2016-12-01 00:00:00"` } si no { mesInt, _ := strconv.ParseFloat(mes, 64) donde = fmt.Sprintf(`pickupDate >= "2016-%02g-01T00:00:00" Y pickupDate <= "2016-%02g-31T23:59:59"`, mesInt, mesInt) } } si no si periodo == "hora" { mes := consulta["mes"][0] mesInt, _ := strconv.ParseFloat(mes, 64) día := consulta["día"][0] dayInt, _ := strconv.ParseFloat(día, 64) donde = fmt.Sprintf(`pickupDate > "2016-%02g-%02gT00:00:00" Y pickupDate <= "2016-%02g-%02gT23:59:59"`, mesInt, dayInt, mesInt, dayInt) } devolver donde } |
En whereTimePeriod genera la parte temporal de la cláusula where extrayendo el periodo de la cadena de consulta. Dependiendo del valor del parámetro periodo, se aplica una lógica diferente para construir la cláusula where, si se requiere el año completo, entonces se devuelve una cláusula where vacía.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
func processQueryString(queryString url.Valores) (*queryOptions, error) { agregado := queryString["agregado"][0] periodo := "mes" donde := "" numParams := 0 var parámetros []interfaz{} si len(queryString["período"]) > 0 { periodo = queryString["período"][0] donde = whereTimePeriod(periodo, queryString) } para _, cond := gama queryString["donde"] { numParams++ condParts := cadenas.Dividir(cond, ",") si len(donde) > 0 { donde = fmt.Sprintf("%s Y %s %s $%d", donde, condParts[0], condParts[1], numParams) } si no { donde = fmt.Sprintf("%s %s $%d", condParts[0], condParts[1], numParams) } val, err := strconv.Atoi(condParts[2]) si err != nil { devolver nil, err } parámetros = añadir(parámetros, val) } si len(donde) > 0 { donde = fmt.Sprintf("DONDE %s", donde) } devolver &queryOptions{ Agregado: agregado, Dónde: donde, Periodo: periodo, Parámetros: parámetros, }, nil } |
Una vez que la parte limitada en el tiempo se construye entonces cada uno de los parámetros where que el frontend ha proporcionado puede ser añadido. Puedes ver que en lugar de incluir los valores where usando formato de cadena estamos usando parámetros de consulta. Esta es una buena práctica para evitar inyecciones SQL.
Una vez ejecutada la consulta, el procesarResultados se ejecuta la función
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func procesarResultados(resultados gocb.AnálisisResultados) (*calendarData, error) { var fila mapa[cadena]interfaz{} var fechaPartes []float64 var agregados []float64 para resultados.Siguiente(&fila) { si fechaParte, ok := fila["período"]; ok { fechaPartes = añadir(fechaPartes, fechaParte.(float64)) agregados = añadir(agregados, fila["agregado"].(float64)) } } si err := resultados.Cerrar(); err != nil { devolver nil, err } devolver &calendarData{ FechaPartes: fechaPartes, Agregado: agregados, }, nil } |
Itera sobre los resultados utilizando resultados.Siguiente(&fila) y, para cada resultado, extrae el periodo de tiempo al que corresponde el resultado en forma de número, es decir, la hora (0-23), el día (1-31) o el mes (1-12). También extrae el valor agregado que corresponde a ese periodo de tiempo. Al final hay una llamada a r esultados.Cerrar() que comprueba si hay errores para asegurarse de que todos los datos se han leído correctamente.
Utilizando el frontend podemos probar fácilmente diferentes agregados para diferentes campos, aplicar cláusulas where y profundizar en los datos para obtener una visión más granular de las cosas. Por ejemplo, probablemente queramos saber en qué mes generaron más dinero los taxis:
Profundicemos en ello haciendo clic en el punto correspondiente a mayo:
Parece que la mayor parte del dinero se genera los fines de semana, eso tiene sentido. Como hace buen tiempo en mayo y los fines de semana son los más populares, ¿quizá la mayoría de los viajes son de varios pasajeros que van juntos a ver los monumentos?
No lo parece. Podríamos hacer muchas otras comparaciones, como las tarifas frente a las propinas o el número de viajes frente al número de viajes sin propinas. Este conjunto de datos también contiene datos de localización, por lo que podríamos hacer cosas como crear mapas de calor de los lugares de recogida.
Conclusión
En este ejemplo, vimos cómo una simple consulta, que incluso podría ser ad-hoc, se puede utilizar para analizar rápidamente un conjunto de datos con una variedad de métricas, todo ello sin necesidad de la creación de índices. Couchbase Analytics añadirá esta gran capacidad a la plataforma Couchbase cuando esté disponible de forma general. Los desarrolladores Golang tienen acceso ahora a través de la beta 6.0.
Nos encantaría conocer su opinión. Por favor, descargar Couchbase Server 6.0 beta hoy y pruebe la versión actualizada de Análisis de Couchbase. Estaremos atentos a sus comentarios en www.couchbase.com/forums/ sobre cualquier tema, desde análisis hasta el SDK de Go.