알베르토 마르케티는 풀스택 개발자이자 다음과 같은 저서를 집필했습니다. "렌더스크립트: 안드로이드에서의 병렬 컴퓨팅, 쉬운 방법." 그는 현대 언어와 기술의 발견에 끊임없이 뛰어들며 항상 최전선에서 살아가고 있습니다.

시간 제한 작업 카우치베이스 및 Go 사용
이 글에서는 Couchbase 인덱싱 시스템을 활용하여 시간 제한 작업 분산 시스템을 만드는 방법을 보여드리겠습니다. 프로젝트의 예제 코드와 실행 지침은 다음 링크에서 찾을 수 있습니다. https://github.com/cmaster11/cb-blog-timed-tasks.
고지 사항: 주제가 복잡하기 때문에 이 페이지에는 관련 코드 발췌본만 게시되어 있습니다.
컨셉
이러한 시스템의 요구 사항을 정의해 보겠습니다:
- 시간 제한 작업 시스템의 주요 기능은 특정 작업이 제시간에 실행될 시간을 지정할 수 있다는 것입니다. 이는 원하는 실행 시간(유닉스 시간, 밀리초 기준).
- 최신 소프트웨어의 요구 사항은 다중 노드 환경을 지원해야 한다는 것인데, 이는 분산 시스템이어야 한다는 것을 의미합니다. 그런 다음 여러 작업자가 동일한 작업을 처리하지 않도록 해야 합니다! 여기서 멋진 Couchbase 기능을 사용할 수 있습니다, 비관적 잠금를 사용하면 작업자가 문서를 가져와서 다른 작업자가 처리할 수 없도록 잠글 수 있습니다.
다음은 작업을 표현할 수 있는 가능한 구조입니다:
|
1 2 3 4 5 6 7 8 9 |
type Task struct { Id string // The desired task execution time ExecuteAt int64 // Task-specific content Content string } |
카우치베이스 기능
먼저, 우리가 사용할 Couchbase 기능에 대한 개요를 소개합니다:
META()
Couchbase 버킷의 모든 문서에는 다음과 같은 문서 엔티티별 정보가 포함된 연결된 META()-문서가 있습니다:
- ID - 버킷 내부의 문서 키입니다.
- cas - 카우치베이스에서 다음을 방지하기 위해 사용하는 int64 번호입니다. 레이스 조건 문서를 편집하는 동안
- 만료 - 문서가 만료될 예정인 경우, 만료되지 않을 경우 0입니다.
힌트: 이러한 필드(예: META().cas) 색인화 가능 (카우치베이스 5.0부터 시작).
CAS(확인 및 설정)
문서를 가져올 때 CAS 값도 반환되며, 이후 문서 변경을 위한 호출에서 이 값을 지정하여 원하는 버전의 문서를 편집할 수 있습니다.
예시:
- 클라이언트 A가 문서를 가져오는데 현재 CAS 값이 1234입니다.
- 클라이언트 B가 문서를 편집하여 CAS 값을 5678로 변경합니다.
- A가 CAS 값을 제공하지 않고 문서를 편집하려고 하면 편집은 성공하지만 B가 변경한 내용은 손실됩니다.
- A가 CAS 값(1234)을 제공하는 문서를 편집하려고 하면 현재 값(5678)이 다르므로 오류가 반환됩니다. 그러면 클라이언트 A는 문서를 다시 가져와서 프로세스를 다시 실행해야 합니다.
CAS 값은 문서의 잘못된/새로운 버전을 교체하거나 변경하여 변경 내용을 잃어버리지 않도록 하는 데 매우 유용한 도구입니다.
비관적 잠금
Couchbase를 사용하면 한 번에 한 클라이언트만 읽고 쓸 수 있도록 문서를 '잠금'할 수 있습니다. gocb.GetAndLock Go SDK 기능.
|
1 2 3 4 5 6 |
// Lock the document lockTime := 10 // seconds lockedCAS, err := bucket.GetAndLock(documentKey, lockTime, &outStruct) // Unlock it _, err = bucket.Unlock(documentKey, lockedCAS) |
문서가 잠겨 있으면 올바른 CAS 값을 사용하지 않는 한 문서를 잠금/변경/잠금 해제하는 다른 모든 요청에서 오류가 발생합니다(단순히 문서를 가져오는 것은 여전히 가능).
참고: 문서의 최대 잠금 시간은 15초이며, lockTime 값을 0으로 설정하면 최대 시간이 설정됩니다. 이렇게 하면 잠금 시간 초과로 인해 자동으로 사용 가능한 상태로 표시되기 전에 작업을 실행할 수 있는 시간이 제한됩니다.
힌트: 문서가 잠겨 있는 동안 반환되는 CAS 값은 -1입니다.
인덱싱 및 쿼리
두 가지 힌트를 종합해 보면 문서가 잠겨 있으면 -1로 변하는 필드(META().cas)를 색인할 수 있다는 것을 알 수 있습니다. 이 조건에 따라 문서를 쿼리할 수 있다는 뜻이기도 합니다!
쿼리
요구 사항에 맞는 쿼리를 정의해 보겠습니다:
- 나중에 문서를 가져오고 잠그는 데 사용할 수 있는 작업 ID를 얻고 싶습니다: SELECT Id.
- 작업이 이미 잠겨 있으면 안 됩니다: WHERE META().cas -1.
- 작업을 지금 실행해야 합니다: WHERE ExecuteAt <= NOW_MILLIS() (NOW_MILLIS 는 현재 유닉스 시간을 밀리초 단위로 반환합니다.)
- 시간이 가장 가까운 작업을 가져와야 하므로 실행 시간을 기준으로 작업을 정렬하려고 합니다: ORDER BY ExecuteAt ASC.
- 지금은(!!!) 작업자가 한 번에 하나의 작업만 처리하고 싶다고 가정해 보겠습니다: LIMIT 1.
결과는 이 쿼리와 유사해야 합니다:
|
1 2 3 4 5 6 |
SELECT `Id` FROM `timed_tasks` // Our bucket WHERE META().`cas` <> -1 AND `ExecuteAt` <= NOW_MILLIS() ORDER BY `ExecuteAt` ASC LIMIT 1 |
이 함수를 실행하면 다음과 유사한 배열이 반환됩니다:
|
1 2 3 |
[{ "Id": "task_id_goes_here" }] |
색인
이제 방금 생각한 쿼리 실행에 최적화된 쿼리별 인덱스를 계획할 수 있습니다. 쿼리별 인덱스는 NoSQL 데이터베이스 쿼리 성능을 개선하기 위한 필수 요소입니다.
- 이 쿼리는 문서가 현재 잠겨 있지 않은지 확인하는 것입니다:
1WHERE META().cas <> -1. - 또한 실행 시간을 과거로 직접 요청하고 있습니다. 그런 다음 ExecuteAt 필드를 인덱싱해야 합니다.
그러면 인덱스 쿼리는 다음과 같을 수 있습니다:
|
1 2 3 4 |
CREATE INDEX `idx_timed_task` ON `timed_tasks` (`ExecuteAt` ASC) WHERE META().`cas` <> -1 |
쿼리 최적화
이제 쿼리를 더욱 최적화할 수 있습니다:
- 쿼리에 인덱스를 사용하도록 지시할 수 있습니다. 힌트 를 추가합니다: USE INDEX(GSI를 사용하는 idx_timed_task).
- 쿼리를 실행하기 전에 인덱스가 최신 상태가 될 때까지(일반적으로 인덱싱은 비동기 프로세스입니다) 기다리도록 Couchbase에 요청하여 결과에 잠금 해제된 작업이 확실히 포함되도록 할 수 있습니다. 일관성 SDK 수준에서의 요구 사항: query.Consistency(gocb.RequestPlus).
흐름
시간 제한 작업 소비자 워커 루프에 대해 가능한 흐름은 다음과 같습니다:
- 사용 가능한 작업 ID를 쿼리합니다.
- 작업을 가져와 잠급니다.
- 작업을 처리합니다.
- 작업을 삭제합니다.
여러 노드
멀티노드 설정이 이 흐름을 어떻게 바꿀 수 있는지 잠시 생각해 보겠습니다.
여러 작업자가 동시에 사용 가능한 작업을 쿼리하는 경우, 동일한 작업을 찾으면 그 중 한 작업자만 성공적으로 처리할 수 있고 다른 작업자는 새 작업을 얻기 위해 루프를 반복(새 쿼리 실행)해야 할 수 있습니다.
그런 다음 다른 접근 방식을 구현할 수 있습니다:
- 사용 가능한 작업 ID를 쿼리하여 ID의 양을 작업자 수로 제한합니다.
- 각 작업 ID에 대해 작업 잠금을 시도합니다. 처음 잠그기에 성공하면 4로 이동합니다.
- 성공적으로 잠긴 작업이 없으면 반복을 반복합니다.
- 작업을 처리합니다.
- 작업을 삭제합니다.
최상의 경우, 모든 작업자는 첫 번째 시도에서 하나의 작업을 성공적으로 잠글 수 있습니다. 최악의 경우, 작업자는 여러 문서를 잠그는 데 실패해야 합니다. 평균적으로 작업자는 몇 개의 작업을 잠그지 못한 후에야 작업을 성공적으로 잠그는 것을 볼 수 있습니다.
데이터베이스 쿼리 빈도와 지원 가능한 잠금 시도 실패 횟수 사이에서 타협점을 찾아야 합니다. 일반적으로 문서 잠금을 시도하는 것이 N1QL 쿼리를 실행하는 것보다 훨씬 빠릅니다.
코드
몇 가지 관련 코드 예시를 살펴보겠습니다:
프로듀서
작업의 생성은 이 함수에서 요약할 수 있습니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func NewTask(executeAt time.Time, content string) (*Task, error) { if executeAt.IsZero() { return nil, errors.New("executeAt must not be a zero time") } taskUUID, err := uuid.NewV1() // github.com/satori/go.uuid if err != nil { return nil, err } // Convert time.Time to int64 milliseconds executeAtMillis := executeAt.UnixNano() / int64(time.Millisecond) task := Task{ Id: taskUUID.String(), ExecuteAt: executeAtMillis, Content: content, } return &task, nil } |
유효한 작업 개체를 생성한 후에는 버킷에 삽입하기만 하면 됩니다:
|
1 |
_, err := controller.bucket.Insert(task.Id, task, 0) |
소비자
이 코드를 사용하여 ID로 문서를 가져와 잠글 수 있습니다:
|
1 2 3 |
// Using zero values for lock time will set the maximum time available. task := new(Task) lockedCAS, err := controller.bucket.GetAndLock(taskId, 0, &task) |
이 코드를 사용하여 작업을 제거할 수 있습니다:
|
1 |
_, err := controller.bucket.Remove(taskId, lockedCAS) |
주요 소비자 코드는 다음 스니펫으로 요약할 수 있습니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
taskIds, err := couchbaseController.QueryNextTaskIds(consumersCount) ... if len(taskIds) == 0 { ... // No tasks have been found, restart the loop } var taskId string var task *internal.Task var lockedCAS gocb.Cas for _, taskId = range taskIds { // Lock and get the task, so that only this consumer will process it task, lockedCAS, err = couchbaseController.GetAndLockTask(taskId) if err != nil { ... // Error getting the task, proceed to next one in list continue } // Successfully locked task! // Move out to process it break } if task == nil { ... // No tasks could be locked, restart loop } // Actual processing of the task // Improvement: could also return an error, which would let the task be // processed by another worker later. processTask(task) /* Remove the task from Couchbase. The task will be currently locked, which means we need to provide the current CAS value, so that the producer is authorized to remove it. */ err = couchbaseController.RemoveTask(taskId, lockedCAS) ... |
결론
이 게시물에서는 Couchbase와 Go를 사용하여 안정적인 분산 시간 제한 작업 시스템을 만드는 방법을 살펴봤습니다.
이 시스템은 다음과 같이 더 발전할 수 있습니다:
- 오류 처리 지원.
- 재시도 기능 구현(처리에 실패하면 나중에 작업 일정 다시 잡기).
- 잠금 로직을 다음과 같이 개선합니다:
- 기본 작업자 수 대신 반환되는 작업 ID의 최대 수를 조정합니다.
- 15초 이상의 작업 처리 시간(Couchbase에서 문서의 최대 잠금 시간)을 지원합니다.
시간 내주셔서 감사드리며 행복한 개발이 되시길 바랍니다!
이 게시물은 커뮤니티 글쓰기 프로그램