Esta postagem ilustra o uso do Couchbase Analytics com o Couchbase Go SDK. O Couchbase Analytics é um novo serviço disponível no Couchbase Server 6.0. Você pode ler mais em https://docs.couchbase.com/server/6.0/analytics/introduction.html.
Nesta postagem, usaremos um conjunto de dados do mundo real que é grande o suficiente para não caber na memória (pelo menos na minha máquina local). Usaremos a função Exportação CSV do Conjunto de dados de viagens de táxi verde de Nova York de 2016. Esse é um conjunto de dados de aproximadamente 16,4 milhões de registros com 23 campos por registro. Você pode acompanhar e experimentar o aplicativo clonando o projeto aquiou executar
ir obter github.com/chvck/gocb-táxi-análises . Você também precisará executar
ir obter ./... se você tiver clonado o projeto pelo git.
Como esses dados são um arquivo CSV, a primeira tarefa é importá-los. Infelizmente, esse conjunto de dados usa formatos de data e hora não padronizados, portanto, precisamos usar um pequeno script para convertê-los em algo mais utilizável. Se você tiver clonado o o projeto então você pode fazer isso com
1 |
ir executar principal.ir --reformatar --csv /caminho/para/táxis.csv |
Isso criará um
2016_Green_Taxi_Trip_Data.csv no diretório do projeto. Também aproveitei a oportunidade para alterar os cabeçalhos do CSV para torná-los mais amigáveis ao JSON e também para adicionar um campo de tipo sempre definido como verde (caso mais tarde quiséssemos adicionar também o conjunto de dados de táxi amarelo). Durante a conversão, também poderíamos ter importado os dados, mas já temos uma ótima ferramenta em cbimport que podemos usar. Crie um bucket chamado táxis com o despejo completo ativado (em Advanced bucket settings - não estaremos executando operações de k/v, portanto, o desempenho de k/v não é tão importante nesse caso) e, em seguida, execute:
1 |
cbimport csv --agrupamento couchbase://localhost -u user -p password -b taxis --infer-types -omit-empty -d file:///path/to/2016_Green_Taxi_Trip_Data.csv -l import.log -g green::%vendorID%::#MONO_INCR# |
Cada documento terá um ID exclusivo, como
verde::1::1000 . Normalmente, essas duas etapas não seriam necessárias, pois nossos dados já estariam armazenados no Couchbase.
Antes de poder trabalhar com o Couchbase Analytics, é necessário preparar um conjunto de dados para permitir que você consulte os dados:
1 |
CRIAR CONJUNTO DE DADOS alltaxis ON táxis; |
Esse conjunto de dados requer alguns recursos. Se quiser experimentar um conjunto de dados um pouco menor em um laptop ocupado, você pode criar um conjunto de dados filtrado que rastreará apenas um subconjunto dos documentos em seu bucket:
1 |
CRIAR CONJUNTO DE DADOS alltaxis ON táxis ONDE `vendorID` = 1; |
Isso lhe dará um conjunto de dados com um pouco mais de 3 milhões de documentos.
Depois de criar um dos conjuntos de dados, você precisa inicializá-lo ativando o processamento do conjunto de dados com:
1 |
CONECTAR LINK Local; |
Isso começará a preencher o conjunto de dados que você acabou de criar. Você pode ver o progresso na interface do usuário na coluna Datasets à direita, abaixo do nome do conjunto de dados. Você pode continuar a trabalhar com o conjunto de dados enquanto ele estiver sendo criado, mas verá resultados diferentes cada vez que executar uma consulta e a execução poderá ser um pouco mais lenta.
Como vamos fazer uma análise de dados, vale a pena investigar algumas coisas. Acho que um bom ponto de partida seria saber o número de viagens de táxi ao longo do ano e poder aplicar vários filtros para ver coisas como gorjetas em relação às tarifas.
A base que usaremos para nossas consultas é:
1 |
SELECIONAR DATE_PART_STR(pickupDate, "mês") AS período, CONTAGEM(*) como contagem DE alltaxis GRUPO BY DATE_PART_STR(pickupDate, "mês") ORDEM BY período; |
Essa consulta está extraindo o mês como um número (1-12) do arquivo pickupDate e exibindo o número de viagens em relação ao mês. O que podemos ver ao executar essa consulta é que março tem o maior número de viagens e que novembro tem o menor número de viagens. Há também uma tendência de queda ao longo do ano. Eu esperava que o verão tivesse menos viagens do que o resto do ano, então já aprendi alguma coisa!
Essa consulta leva cerca de 24s para o conjunto de dados completo em meu computador. A execução da mesma consulta no serviço de consulta operacional (muitas vezes chamado apenas de N1QL, mas essa é a linguagem) com apenas um índice primário leva tempo limite no console de consulta (600s). Podemos ver que, para consultas ad hoc em grandes conjuntos de dados, o Couchbase Analytics é uma boa opção, complementando o serviço de consulta N1QL operacional.
Consulta a partir de um aplicativo Golang
Agora que configuramos e testamos o funcionamento do nosso conjunto de dados do Analytics, podemos usá-lo por meio do Go SDK. Na seção runServer função que temos:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
var erro erro agrupamento, erro = gocb.Conectar(cbConnStr) se erro != nulo { pânico("Erro ao se conectar ao cluster:" + erro.Erro()) } agrupamento.Autenticar(gocb.PasswordAuthenticator{ Nome de usuário: cbUsername, Senha: cbPassword, }) _, erro = agrupamento.OpenBucket("táxis", "") se erro != nulo { registro.Fatal(erro) } parar := fazer(chan os.Sinal, 1) // Parar o servidor em caso de interrupção sinal.Notificar(parar, os.Interrupção) srv, erro := executar() se erro != nulo { registro.Fatal(erro) } fmt.Println("Servidor em execução em", srv.Endereço) <-parar registro.Println("Parando o servidor") srv.Desligamento(nulo) |
Isso cria uma conexão com o Couchbase Server e faz a autenticação usando o nome de usuário e a senha (essas propriedades podem ser personalizadas modificando as propriedades na parte superior do main.go). Em seguida, abrimos uma conexão com o nosso bucket. O restante da função está lidando com o servidor Web. Criamos um canal que escuta o sinal de interrupção e, quando ele é acionado, desligamos o servidor http.
É difícil visualizar e filtrar esses dados na linha de comando, portanto, na base de código vinculada, adicionamos uma interface gráfica simples. O servidor da Web fornece a página de índice e expõe um único ponto de extremidade para recuperar dados dinâmicos. Mais uma vez, para executar isso, use
ir executar principal.ir e você pode acessar o front-end em
http://localhost:8010 .
Nosso manipulador para o endpoint de dados dinâmicos tem a seguinte aparência:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func manipulador de pedidos(w http.Escritor de respostas, r *http.Solicitação) { opções, erro := processQueryString(r.URL.Consulta()) se erro != nulo { http.Erro(w, erro.Erro(), http.StatusInternalServerError) retorno } q := `selecionar DATE_PART_STR(pickupDate, "%s") AS período, %s como agregado DE alltaxis` q += ` %s GRUPO BY DATE_PART_STR(pickupDate, "%s") ORDEM BY período;` q = fmt.Sprintf(q, opções.Período, opções.Agregado, opções.Onde, opções.Período) consulta := gocb.NewAnalyticsQuery(q) resultados, erro := agrupamento.ExecuteAnalyticsQuery(consulta, opções.Parâmetros) se erro != nulo { http.Erro(w, erro.Erro(), http.StatusInternalServerError) retorno } dados, erro := processResults(resultados) se erro != nulo { http.Erro(w, erro.Erro(), http.StatusInternalServerError) retorno } dados.Onde = fmt.Sprintf("%s, %s, %v", opções.Agregado, opções.Onde, opções.Parâmetros) dados.Consulta = fmt.Sprintf("query = %s, params = %v", q, opções.Parâmetros) dados.TimeTaken = resultados.Métricas().Tempo de execução.Nanossegundos() js, erro := json.Marechal(*dados) se erro != nulo { http.Erro(w, erro.Erro(), http.StatusInternalServerError) retorno } w.WriteHeader(200) w.Cabeçalho().Conjunto("Content-Type", "application/json") w.Escrever(js) } |
O que podemos ver aqui é que processamos a string de consulta para extrair o where (mais parâmetros, mais sobre isso abaixo), o agregado e o período de tempo. Criamos nossa consulta como uma string, incorporando essas propriedades e, em seguida, usamos NewAnalyticsQuery para criar um AnalyticsQuery . Para executar a consulta, ela é passada para agrupamento.ExecuteAnalyticsQuery . Os resultados são então processados por processResults antes de enviar a resposta http. Onde , TimeTaken e Consulta também são adicionadas à resposta para que possamos exibir o que foi consultado no frontend.
Vamos dar uma olhada em cada uma dessas partes com um pouco mais de detalhes. Os parâmetros where e aggregate são passados do frontend já formatados corretamente. A string de consulta poderia ser algo como
?período=hora&mês=5&dia=14&agregado=contagem(*)&onde=fareAmount,>,15&onde=dica,<,1
O que está acontecendo aqui é que o período determina a granularidade da consulta: um dia, um mês ou o ano inteiro. No caso de estarmos analisando um dia, precisamos saber qual dia e qual mês também, os parâmetros de mês e dia estarão presentes ou não, dependendo do período. O agregado é a operação e o campo ao qual aplicar a operação. Em vez de contagem(*) pode ser SUM(dicas) ou AVG(tarifa) etc...Os parâmetros where são as cláusulas where individuais a serem aplicadas - elas são enviadas como matrizes do formulário [field, operator, value]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func whereTimePeriod(período string, consulta url.Valores) string { onde := "" se período == "dia" { mês := consulta["mês"][0] se mês == "1" { onde = `pickupDate <= "2016-01-31 23:59:59"` } mais se mês == "12" { onde = `pickupDate >= "2016-12-01 00:00:00"` } mais { monthInt, _ := strconv.ParseFloat(mês, 64) onde = fmt.Sprintf(`pickupDate >= "2016-%02g-01T00:00:00" E pickupDate <= "2016-%02g-31T23:59:59"`, monthInt, monthInt) } } mais se período == "hora" { mês := consulta["mês"][0] monthInt, _ := strconv.ParseFloat(mês, 64) dia := consulta["dia"][0] dayInt, _ := strconv.ParseFloat(dia, 64) onde = fmt.Sprintf(`pickupDate > "2016-%02g-%02gT00:00:00" E pickupDate <= "2016-%02g-%02gT23:59:59"`, monthInt, dayInt, monthInt, dayInt) } retorno onde } |
O whereTimePeriod gera a parte que delimita o tempo da cláusula where extraindo o período da string de consulta. Dependendo do valor do parâmetro period, uma lógica diferente é aplicada para criar a cláusula where; se for necessário o ano inteiro, será retornada uma cláusula where vazia.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
func processQueryString(queryString url.Valores) (*queryOptions, erro) { agregado := queryString["agregado"][0] período := "mês" onde := "" numParams := 0 var parâmetros []interface{} se len(queryString["período"]) > 0 { período = queryString["período"][0] onde = whereTimePeriod(período, queryString) } para _, cond := alcance queryString["onde"] { numParams++ condParts := cadeias de caracteres.Dividir(cond, ",") se len(onde) > 0 { onde = fmt.Sprintf("%s E %s %s $%d", onde, condParts[0], condParts[1], numParams) } mais { onde = fmt.Sprintf("%s %s $%d", condParts[0], condParts[1], numParams) } val, erro := strconv.Atoi(condParts[2]) se erro != nulo { retorno nulo, erro } parâmetros = anexar(parâmetros, val) } se len(onde) > 0 { onde = fmt.Sprintf("WHERE %s", onde) } retorno &queryOptions{ Agregado: agregado, Onde: onde, Período: período, Parâmetros: parâmetros, }, nulo } |
Depois que a parte limitada pelo tempo for criada, cada um dos parâmetros where fornecidos pelo frontend poderá ser adicionado. Você pode ver que, em vez de incluir os valores where usando formatação de cadeia de caracteres, estamos usando parâmetros de consulta. Essa é uma prática recomendada para evitar injeção de SQL.
Depois que a consulta tiver sido executada, o processResults é executada, com a seguinte aparência:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func processResults(resultados gocb.Resultados da análise) (*Dados do calendário, erro) { var fila mapa[string]interface{} var dateParts []flutuante64 var agregados []flutuante64 para resultados.Próximo(&fila) { se datePart, ok := fila["período"]; ok { dateParts = anexar(dateParts, datePart.(flutuante64)) agregados = anexar(agregados, fila["agregado"].(flutuante64)) } } se erro := resultados.Fechar(); erro != nulo { retorno nulo, erro } retorno &Dados do calendário{ DateParts: dateParts, Agregado: agregados, }, nulo } |
Ele itera sobre os resultados usando resultados.Próximo(&fila) e, para cada resultado, extrai o período de tempo para o qual o resultado se refere como um número, ou seja, a hora (0-23), o dia (1-31) ou o mês (1-12). Ele também extrai o valor agregado que corresponde a esse período de tempo. No final, há uma chamada para r resultados.Fechar() que verifica se há erros para garantir que todos os dados tenham sido lidos corretamente.
Usando o front-end, podemos facilmente experimentar diferentes agregados para diferentes campos, aplicar cláusulas where e detalhar os dados para obter uma visão mais granular das coisas. Por exemplo, provavelmente queremos saber em que mês os táxis geraram mais dinheiro:
Vamos nos aprofundar nisso clicando no ponto correspondente a maio:
Parece que a maior parte do dinheiro é gerada nos finais de semana, o que faz sentido. Como o mês de maio é agradável e os finais de semana são os mais populares, talvez a maioria das viagens seja de vários passageiros indo juntos ver os pontos turísticos?
Não parece ser o caso! Há muitas outras comparações que poderíamos fazer, como tarifas versus gorjetas ou o número de viagens versus o número de viagens sem gorjetas. Esse conjunto de dados também tem dados de localização para que possamos fazer coisas como criar mapas de calor dos locais de retirada.
Conclusão
Nesse exemplo, mostramos como uma consulta simples, que pode até ser ad-hoc, pode ser usada para analisar rapidamente um conjunto de dados com uma variedade de métricas, tudo sem exigir a criação de índices. O Couchbase Analytics adicionará esse grande recurso à plataforma Couchbase quando for disponibilizado de forma geral. Os desenvolvedores da Golang têm acesso agora por meio da versão beta 6.0.
Gostaríamos muito de receber seu feedback! Por favor download Couchbase Server 6.0 beta hoje e experimente a versão atualizada do Análise do Couchbase. Estaremos atentos aos seus comentários em www.couchbase.com/forums/ sobre qualquer coisa, desde análises até o Go SDK.