알베르토 마르케티는 풀스택 개발자이자 다음과 같은 저서를 집필했습니다. "렌더스크립트: 안드로이드에서의 병렬 컴퓨팅, 쉬운 방법." 그는 현대 언어와 기술의 발견에 끊임없이 뛰어들며 항상 최전선에서 살아가고 있습니다.

시간 제한 작업 카우치베이스 및 Go 사용
이 글에서는 Couchbase 인덱싱 시스템을 활용하여 시간 제한 작업 분산 시스템을 만드는 방법을 보여드리겠습니다. 프로젝트의 예제 코드와 실행 지침은 다음 링크에서 찾을 수 있습니다. https://github.com/cmaster11/cb-blog-timed-tasks.
고지 사항: 주제가 복잡하기 때문에 이 페이지에는 관련 코드 발췌본만 게시되어 있습니다.
컨셉
이러한 시스템의 요구 사항을 정의해 보겠습니다:
- 시간 제한 작업 시스템의 주요 기능은 특정 작업이 제시간에 실행될 시간을 지정할 수 있다는 것입니다. 이는 원하는 실행 시간(유닉스 시간, 밀리초 기준).
- 최신 소프트웨어의 요구 사항은 다중 노드 환경을 지원해야 한다는 것인데, 이는 분산 시스템이어야 한다는 것을 의미합니다. 그런 다음 여러 작업자가 동일한 작업을 처리하지 않도록 해야 합니다! 여기서 멋진 Couchbase 기능을 사용할 수 있습니다, 비관적 잠금를 사용하면 작업자가 문서를 가져와서 다른 작업자가 처리할 수 없도록 잠글 수 있습니다.
다음은 작업을 표현할 수 있는 가능한 구조입니다:
|
1 2 3 4 5 6 7 8 9 |
유형 작업 구조체 { Id 문자열 // 원하는 작업 실행 시간 ExecuteAt int64 // 작업별 콘텐츠 콘텐츠 문자열 } |
카우치베이스 기능
먼저, 우리가 사용할 Couchbase 기능에 대한 개요를 소개합니다:
META()
Couchbase 버킷의 모든 문서에는 다음과 같은 문서 엔티티별 정보가 포함된 연결된 META()-문서가 있습니다:
- ID - 버킷 내부의 문서 키입니다.
- cas - 카우치베이스에서 다음을 방지하기 위해 사용하는 int64 번호입니다. 레이스 조건 문서를 편집하는 동안
- 만료 - 문서가 만료될 예정인 경우, 만료되지 않을 경우 0입니다.
힌트: 이러한 필드(예: META().cas) 색인화 가능 (카우치베이스 5.0부터 시작).
CAS(확인 및 설정)
문서를 가져올 때 CAS 값도 반환되며, 이후 문서 변경을 위한 호출에서 이 값을 지정하여 원하는 버전의 문서를 편집할 수 있습니다.
예시:
- 클라이언트 A가 문서를 가져오는데 현재 CAS 값이 1234입니다.
- 클라이언트 B가 문서를 편집하여 CAS 값을 5678로 변경합니다.
- A가 CAS 값을 제공하지 않고 문서를 편집하려고 하면 편집은 성공하지만 B가 변경한 내용은 손실됩니다.
- A가 CAS 값(1234)을 제공하는 문서를 편집하려고 하면 현재 값(5678)이 다르므로 오류가 반환됩니다. 그러면 클라이언트 A는 문서를 다시 가져와서 프로세스를 다시 실행해야 합니다.
CAS 값은 문서의 잘못된/새로운 버전을 교체하거나 변경하여 변경 내용을 잃어버리지 않도록 하는 데 매우 유용한 도구입니다.
비관적 잠금
Couchbase를 사용하면 한 번에 한 클라이언트만 읽고 쓸 수 있도록 문서를 '잠금'할 수 있습니다. gocb.GetAndLock Go SDK 기능.
|
1 2 3 4 5 6 |
// 문서 잠그기 lockTime := 10 // 초 잠긴CAS, err := 버킷.GetAndLock(문서 키, lockTime, &outStruct) // 잠금 해제 _, err = 버킷.잠금 해제(문서 키, 잠긴CAS) |
문서가 잠겨 있으면 올바른 CAS 값을 사용하지 않는 한 문서를 잠금/변경/잠금 해제하는 다른 모든 요청에서 오류가 발생합니다(단순히 문서를 가져오는 것은 여전히 가능).
참고: 문서의 최대 잠금 시간은 15초이며, lockTime 값을 0으로 설정하면 최대 시간이 설정됩니다. 이렇게 하면 잠금 시간 초과로 인해 자동으로 사용 가능한 상태로 표시되기 전에 작업을 실행할 수 있는 시간이 제한됩니다.
힌트: 문서가 잠겨 있는 동안 반환되는 CAS 값은 -1입니다.
인덱싱 및 쿼리
두 가지 힌트를 종합해 보면 문서가 잠겨 있으면 -1로 변하는 필드(META().cas)를 색인할 수 있다는 것을 알 수 있습니다. 이 조건에 따라 문서를 쿼리할 수 있다는 뜻이기도 합니다!
쿼리
요구 사항에 맞는 쿼리를 정의해 보겠습니다:
- 나중에 문서를 가져오고 잠그는 데 사용할 수 있는 작업 ID를 얻고 싶습니다: SELECT Id.
- 작업이 이미 잠겨 있으면 안 됩니다: WHERE META().cas -1.
- 작업을 지금 실행해야 합니다: WHERE ExecuteAt <= NOW_MILLIS() (NOW_MILLIS 는 현재 유닉스 시간을 밀리초 단위로 반환합니다.)
- 시간이 가장 가까운 작업을 가져와야 하므로 실행 시간을 기준으로 작업을 정렬하려고 합니다: ORDER BY ExecuteAt ASC.
- 지금은(!!!) 작업자가 한 번에 하나의 작업만 처리하고 싶다고 가정해 보겠습니다: LIMIT 1.
결과는 이 쿼리와 유사해야 합니다:
|
1 2 3 4 5 6 |
선택 `Id` FROM `timed_tasks` // 버킷 어디 메타().`cas` <> -1 AND `ExecuteAt` <= NOW_MILLIS() 주문 BY `ExecuteAt` ASC LIMIT 1 |
이 함수를 실행하면 다음과 유사한 배열이 반환됩니다:
|
1 2 3 |
[{ "Id": "task_id_goes_here" }] |
색인
이제 방금 생각한 쿼리 실행에 최적화된 쿼리별 인덱스를 계획할 수 있습니다. 쿼리별 인덱스는 NoSQL 데이터베이스 쿼리 성능을 개선하기 위한 필수 요소입니다.
- 이 쿼리는 문서가 현재 잠겨 있지 않은지 확인하는 것입니다:
1어디 메타().cas <> -1. - 또한 실행 시간을 과거로 직접 요청하고 있습니다. 그런 다음 ExecuteAt 필드를 인덱싱해야 합니다.
그러면 인덱스 쿼리는 다음과 같을 수 있습니다:
|
1 2 3 4 |
만들기 INDEX `IDX_timed_task` 켜기 `timed_tasks` (`ExecuteAt` ASC) 어디 메타().`cas` <> -1 |
쿼리 최적화
이제 쿼리를 더욱 최적화할 수 있습니다:
- 쿼리에 인덱스를 사용하도록 지시할 수 있습니다. 힌트 를 추가합니다: USE INDEX(GSI를 사용하는 idx_timed_task).
- 쿼리를 실행하기 전에 인덱스가 최신 상태가 될 때까지(일반적으로 인덱싱은 비동기 프로세스입니다) 기다리도록 Couchbase에 요청하여 결과에 잠금 해제된 작업이 확실히 포함되도록 할 수 있습니다. 일관성 SDK 수준에서의 요구 사항: query.Consistency(gocb.RequestPlus).
흐름
시간 제한 작업 소비자 워커 루프에 대해 가능한 흐름은 다음과 같습니다:
- 사용 가능한 작업 ID를 쿼리합니다.
- 작업을 가져와 잠급니다.
- 작업을 처리합니다.
- 작업을 삭제합니다.
여러 노드
멀티노드 설정이 이 흐름을 어떻게 바꿀 수 있는지 잠시 생각해 보겠습니다.
여러 작업자가 동시에 사용 가능한 작업을 쿼리하는 경우, 동일한 작업을 찾으면 그 중 한 작업자만 성공적으로 처리할 수 있고 다른 작업자는 새 작업을 얻기 위해 루프를 반복(새 쿼리 실행)해야 할 수 있습니다.
그런 다음 다른 접근 방식을 구현할 수 있습니다:
- 사용 가능한 작업 ID를 쿼리하여 ID의 양을 작업자 수로 제한합니다.
- 각 작업 ID에 대해 작업 잠금을 시도합니다. 처음 잠그기에 성공하면 4로 이동합니다.
- 성공적으로 잠긴 작업이 없으면 반복을 반복합니다.
- 작업을 처리합니다.
- 작업을 삭제합니다.
최상의 경우, 모든 작업자는 첫 번째 시도에서 하나의 작업을 성공적으로 잠글 수 있습니다. 최악의 경우, 작업자는 여러 문서를 잠그는 데 실패해야 합니다. 평균적으로 작업자는 몇 개의 작업을 잠그지 못한 후에야 작업을 성공적으로 잠그는 것을 볼 수 있습니다.
데이터베이스 쿼리 빈도와 지원 가능한 잠금 시도 실패 횟수 사이에서 타협점을 찾아야 합니다. 일반적으로 문서 잠금을 시도하는 것이 N1QL 쿼리를 실행하는 것보다 훨씬 빠릅니다.
코드
몇 가지 관련 코드 예시를 살펴보겠습니다:
프로듀서
작업의 생성은 이 함수에서 요약할 수 있습니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func 새로운 작업(executeAt 시간.시간, 콘텐츠 문자열) (*작업, 오류) { 만약 executeAt.IsZero() { 반환 nil, 오류.신규("실행 시간이 0이 아니어야 함") } taskUUID, err := uuid.NewV1() // github.com/satori/go.uuid 만약 err != nil { 반환 nil, err } // time.Time을 int64 밀리초로 변환합니다. executeAtMillis := executeAt.유닉스 나노() / int64(시간.밀리초) 작업 := 작업{ Id: taskUUID.문자열(), ExecuteAt: executeAtMillis, 콘텐츠: 콘텐츠, } 반환 &작업, nil } |
유효한 작업 개체를 생성한 후에는 버킷에 삽입하기만 하면 됩니다:
|
1 |
_, err := 컨트롤러.버킷.삽입(작업.Id, 작업, 0) |
소비자
이 코드를 사용하여 ID로 문서를 가져와 잠글 수 있습니다:
|
1 2 3 |
// 잠금 시간에 0 값을 사용하면 사용 가능한 최대 시간이 설정됩니다. 작업 := new(작업) 잠긴CAS, err := 컨트롤러.버킷.GetAndLock(taskId, 0, &작업) |
이 코드를 사용하여 작업을 제거할 수 있습니다:
|
1 |
_, err := 컨트롤러.버킷.제거(taskId, 잠긴CAS) |
주요 소비자 코드는 다음 스니펫으로 요약할 수 있습니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
taskIds, err := 카우치베이스 컨트롤러.쿼리 다음 작업 ID(소비자 수) ... 만약 len(taskIds) == 0 { ... // 작업을 찾을 수 없습니다, 루프를 다시 시작하세요. } var taskId 문자열 var 작업 *내부.작업 var 잠긴CAS gocb.Cas 에 대한 _, taskId = 범위 taskIds { // 이 소비자만 처리하도록 작업을 잠그고 가져옵니다. 작업, 잠긴CAS, err = 카우치베이스 컨트롤러.GetAndLockTask(taskId) 만약 err != nil { ... // 작업 가져오기 오류, 목록의 다음 작업으로 진행합니다. 계속 } // 작업을 성공적으로 잠궜습니다! // 처리하기 위해 이동 break } 만약 작업 == nil { ... // 작업을 잠글 수 없음, 루프 다시 시작 } // 작업의 실제 처리 // 개선: 오류를 반환할 수도 있습니다. // 나중에 다른 작업자가 처리합니다. 프로세스 작업(작업) /* Couchbase에서 작업을 제거합니다. 이 작업은 현재 잠겨 있으므로 현재 CAS 값으로 설정하여 제작자가 이를 제거할 수 있는 권한을 부여합니다. */ err = 카우치베이스 컨트롤러.RemoveTask(taskId, 잠긴CAS) ... |
결론
이 게시물에서는 Couchbase와 Go를 사용하여 안정적인 분산 시간 제한 작업 시스템을 만드는 방법을 살펴봤습니다.
이 시스템은 다음과 같이 더 발전할 수 있습니다:
- 오류 처리 지원.
- 재시도 기능 구현(처리에 실패하면 나중에 작업 일정 다시 잡기).
- 잠금 로직을 다음과 같이 개선합니다:
- 기본 작업자 수 대신 반환되는 작업 ID의 최대 수를 조정합니다.
- 15초 이상의 작업 처리 시간(Couchbase에서 문서의 최대 잠금 시간)을 지원합니다.
시간 내주셔서 감사드리며 행복한 개발이 되시길 바랍니다!
이 게시물은 커뮤니티 글쓰기 프로그램