Sunday, 20 September 2020

Decaton - High throughput asynchronous task processing based on Apache Kafka

 Introduction 

Decaton은 LINE에서 open source화 한 Apache Kafka 기반의 streaming task processing framework이다. Decaton은 하나의 partition에서 소비되는 record의 동시처리를 가능하게 하도록 설계되었다. Kafka는 기본적으로 consumer 당 하나의 thread로 동작하는데 Decaton은 여기서부터 오는 한계점을 개선하고자 한 framework이다. Decaton은 아래 세가지 성질을 만족한다. 

- 하나의 partition으로부터 오는 record를 동시처리(multi-thread or asynchronous) 
- record key 별 처리 순서 보장
- record들이 처리되는 순서와 상관 없이 at-least-once semantic을 보장  

아래 두가지는 Kafka consumer에서 기본적으로 보장해주는 성질인데 일관성 유지 및 계산 결과 보장 등 데이터 정합성을 위해 아주 중요하다. 동시처리 model에서는 이것이 깨질 수가 있는데 Decaton에서는 동시처리를 하면서도 높은 수준의 데이터 정합성을 지킨다. 

Why Need Concurrent Processing in Consumer

Kafka topic은 partition이라는 분산처리의 단위로 나누어져서 저장된다. 어떤 topic에 저장할 record는 아래의 그림과 같이 여러 partition에 나누어 저장되며 각 partition 내에서는 순서대로 read/write가 되는 것을 보장한다. 


 consumer는 하나의 topic으로 produce되는 message를 모두 처리하는데, 하나의 consumer가 처리할 수 있는 message의 양보다 producer가 생상하는 message의 양이 더 많으면 (= producer의 throughput이 더 높으면) 어떤 message들은 처리되지 못하고 계속 broker에 남아있게 된다. 더 많은 message를 소비할 수 있도록 consumer instance를 추가한다. 하지만 Kafka에서 하나의 partition을 여러 consumer에서 처리하는 것은 불가능하므로 consumer의 최대 concurrency는 partition 개수로 한정된다. 


Kafka를 사용하는 여러 사례들에서 이 model로도 충분히 우수한 성능을 보였다. 하지만 사용 중인 Kafka에서 concurrency를 높여야 하는 상황일 때 다음의 두가지 문제점이 있을 수 있다. 첫째, consumer instance를 추가하면서 (horizontal scaling) 비용이 추가로 발생하며, consumer group coordinator의 workload가 증가하고 network traffic이 더 발생할 수 있다. 둘째, consumer에서 처리하는 작업이 CPU intensive한 작업이 아니라 대기 시간을 요하는 I/O intensive 작업이라면 instance를 추가하는 것으로부터 얻는 효용이 높지 않다. consumer의 CPU 사용률이 높지 않은데도 instance를 추가하려는 상황이라면 기존 instance의 사용률을 높일 방안을 생각해보아야 한다.

Multi-Threaded Message Consumption with Kafka Consumer


Kafka에 대한 enterprise solution을 제공하는 Confluent의 웹페이지에서 동일한 문제를 다루고 있다. 마찬가지로 thread per consumer model에서부터 오는 한계점을 지적하고 있으며 multi-threaded model을 도입했을 때의 side effect 및 개선 방안을 제시한다.

요약하면 다음과 같다.

suggestion.
한 consumer 내에서 동시처리를 위해 고정된 개수의 thread pool을 사용한다. partition 별로 thread를 할당하여 비동기로 처리하도록 한다.

improvements.
CPU 사용률을 제한하기 위해 고정된 개수의 thread pool을 사용하였다. 반면 비동기로 처리하므로 consumer는 별도의 대기 없이 poll을 통해서 record를 무한정 가져올 수가 있는데, 그러면 thread pool의 처리량을 넘게 되고 서로다른 thread에서 동일한 partition의 message를 처리하게 된다. 여러 thread에서 한 partition의 message를 처리하는 것은 곧 Kafka에서 제공하는 데이터 정합성을 깨뜨린다는 의미이다. 이를 방지하기 위해서 consumer는 poll 전에 각 partition에 할당된 thread의 상태를 확인한 다음 추가 record를 가져와도 된다면 fetch 해오고 아니면 일시 정지시키며 (KafkaConsumer.pause(), KafkaConsumer.resume()) 한 partition에 두개 이상의 thread 할당되지 않도록 한다. 또 automatic offset commit option을 off 시켰다. 개별 thread에 할당된 각 record가 처리되었는지 main thread는 모르기 때문이다. 마찬가지로 consumer는 poll 전에 작업 완료한 thread 별로 offset을 얻어와 commit 한다. 
consumer 내에서 여러 partition에 대한 동시처리를 하고 있을 때 consumer group rebalancing이 발생할 수 있다. 각 partition 별로 할당된 thread들은 이에 대한 처리를 해주어야 한다. 이를 위해서 subscription 시 ConsumerRebalanceListener.onPartitionsRevoke() callback을 등록하여 callback 내부에서 revoked partition들에 대한 처리를 멈추고 작업 완료된 마지막 offset를 commit하게 하였다.

Decaton

위 Confluent 글을 읽으면서 Decaton이 해결하고자 하는 문제를 조금 더 쉽게 이해할 수 있었다. 요약하자면 consumer 내 동시처리를 위해서 offset commit coordination이 필요한데 Decaton은 이를 대용량 처리 환경에서도 적절하게 처리할 수 있도록 한다. Decaton에서는 sliding offset range를 방식으로 동작하며 TCP의 sliding window protocol과 유사하다고 이해하면 쉽다. 위의 방식과 성능 비교는 당장에 할 수 없지만 개별 stream 당 1초에 100만건 이상의 I/O task를 처리하는 LINE 서버 시스템에서 검증되었으므로 그 우수한 성능을 이미 입증했다고 볼 수 있다.

Decaton의 동작 방식을 이해하기 위해서 다음의 class들을 중점적으로 살펴보는 것을 추천한다.
```
ProcessorSubscription
PartitionContext
PartitionContexts
ProcessPipeline, Processors 
PartitionProcessor
TaskRequest
```
하나의 topic으로부터 message를 받아오는 consumer의 동작은 ProcessorSubscription class가 담당하며 전체적인 흐름은 아래와 같다. PartitionContext는 subscription instance에 할당된 한 partition에 대한 모든 exeuction context를 제공한다. 한 partition마다 가지는 여러 thread들은 ProcessorUnit에 대응된다. 


ProcessorSubscription

1. pollOnce()하여 records들을 받아옴.
1-1. record마다 각 partition에 대응되는 context에 offset 등록 및 task 제출. 
1-2. 제출된 task는 scheduleThenProcess() 이후 complete()를 호출하여 lcocally commited 된다. 각 context 들은 가장 먼저 등록되었던 task가 locally commited 되었으면 highWatermark를 해당 offset으로 갱신한다.
2. commitIntervalMillis 만큼의 시간이 지났으면 commitCompletedOffsets() 하여 각 partition 마다 offset을 갱신한다.
2-1. consumer.commitSync()를 호출하여 ConsumerCoordinator를 통해 offset을 commit 한다.
2-2. local의 lastCommittedOffset을 갱신한다. 
```
public class ProcessorSubscription extends Thread implements AsyncShutdownable {
    public void run() {
        try {
            consumer.subscribe(subscribeTopics(), new ConsumerRebalanceListener(),...
 
            long lastCommittedMillis = System.currentTimeMillis();
            while (!terminated.get()) {
                pollOnce(consumer);
                if (System.currentTimeMillis() - lastCommittedMillis >= commitIntervalMillis.value()) {
                    try {
                        commitCompletedOffsets(consumer);
                    catch (CommitFailedException | TimeoutException e) {
                        logger.warn("Offset commit failed, but continuing to consume", e);
                        // Continue processing, assuming commit will be handled successfully in next attempt.
                    }
                    lastCommittedMillis = System.currentTimeMillis();
                }
            }
        }
...
    }
 
    private void pollOnce(Consumer<String, byte[]> consumer) {
        ConsumerRecords<String, byte[]> records = consumer.poll(POLL_TIMEOUT_MILLIS);
 
        records.forEach(record -> {
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            PartitionContext context = contexts.get(tp);
            DeferredCompletion completion = context.registerOffset(record.offset());
 
            if (blacklistedKeysFilter.shouldTake(record)) {
                TaskRequest taskRequest =
                        new TaskRequest(tp, record.offset(), completion, record.key(), record.value());
                context.addRequest(taskRequest);
            else {
                completion.complete();
            }
        });
        contexts.updateHighWatermarks();
        contexts.maybeHandlePropertyReload();
        pausePartitions(consumer);
        resumePartitions(consumer);
    }
 
    private void commitCompletedOffsets(Consumer<?, ?> consumer) {
        // fetch offsets to be committed 
        Map<TopicPartition, OffsetAndMetadata> commitOffsets = contexts.commitOffsets();
        if (commitOffsets.isEmpty()) {
            logger.debug("No new offsets to commit, skipping commit");
            return;
        }
        logger.debug("Committing offsets: {}", commitOffsets);
        consumer.commitSync(commitOffsets);
        contexts.updateCommittedOffsets(commitOffsets);
    }
}
```

PartitionContext

```
public class PartitionContext {
    public OptionalLong offsetWaitingCommit() {
        long readyOffset = commitControl.commitReadyOffset();
        if (readyOffset > lastCommittedOffset) {
            return OptionalLong.of(readyOffset);
        }
        return OptionalLong.empty();
    }
 
    public void updateHighWatermark() {
        commitControl.updateHighWatermark();
        metrics.tasksPending.set(commitControl.pendingOffsetsCount());
    }
 
    public DeferredCompletion registerOffset(long offset) {
        return commitControl.reportFetchedOffset(offset);
    }
}
```

PartitionProcessor

```
/**
 * This class is responsible for following portions:
 * - Create and manage configured number of {@link ProcessorUnit}s to parallel-ize processing of tasks received
 *   from single partition.
 * - Route fed task appropriately to one of belonging {@link ProcessorUnit}s, respecting task's key for keeping
 *   process locally and ordering.
 * - Manage lifecycle of {@link DecatonProcessor}s for each {@link ProcessorUnit}s.
 */
public class PartitionProcessor implements AsyncShutdownable {
    private final PartitionScope scope;
    private final Processors<?> processors;
 
    private final List<ProcessorUnit> units; // unit of parallelism 
 
...
}
```


OutOfOrderCommitControl

Decaton의 offset commit은 OutOfOrderCommitControl class에서 이루어진다. OutOfOrderCommitControl class 내에서 각 task에 대한 상태(offset , committed)는 deque에서 저장하여 sliding window 방식으로 broker에 commit 한다. 

```
public class OutOfOrderCommitControl {
 
    static class OffsetState {
        private final long offset;
        private volatile boolean committed;
  }
 
    private final Deque<OffsetState> states;
    ... 
     
    public synchronized DeferredCompletion reportFetchedOffset(long offset) {
...
        OffsetState state = new OffsetState(offset, false);
        states.addLast(state);
        latest = state.offset;
 
        return () -> complete(state);
    }   
 
    public synchronized void updateHighWatermark() {
...
        long lastHighWatermark = highWatermark;
 
        OffsetState state;
        while ((state = states.peekFirst()) != null) {
            earliest = state.offset;
            if (state.committed) {
                highWatermark = state.offset;
                states.pollFirst();
            else {
                break;
            }
        }
 
        if (highWatermark != lastHighWatermark) {
            logger.debug("High watermark updated for {}: {} => {}",
                         topicPartition, lastHighWatermark, highWatermark);
        }
    }
}
```

Internal Queueing and Workers

> Once the offset commit coordination works, we can process records from single partition in parallel using multiple threads.
Decaton manages internal queues and associated threads to process records for each partition. It relaxes ordering guarantee from partition-based to key-based and route each record into internal queue which will be consumed and processed by associated worker with logic that you supply.




Decaton은 동시처리에 단위인 thread 마다 internal queue가 있어 각 partition으로부터의 record를 처리한다. 이는 partition-based에서 key-based로 완화된 guarantee를 제공하며 key에 따라 internal queue에 routing 한다. 본래 producer에서 partition으로의 분배가 key에 따라 결정되는데 decaton에서는 이 key에 대해 추가적으로 routing 해주기 위해서 prefix를 첨부하여 mumur2 hash 처리를 한다. 

Offset Commit Strategy 


Decaton의 offset commit은 OutOfOrderCommitControl class에서 이루어진다. OutOfOrderCommitControl class 내에서 각 task에 대한 상태(offset , committed)는 deque에서 저장하여 sliding window 방식으로 broker에 commit 한다. 

References

- https://github.com/line/decaton
- https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/
- https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/
- https://kafka.apache.org
- https://dzone.com/articles/kafka-topic-architecture-replication-failover-and


No comments:

Post a Comment