Sunday, 27 September 2020

[Paper Review] Algorithms Behind Modern Storage Systems

 Introduction 

어플리케이션으로부터 생성되는 데이터는 점점 증가하여 storage를 확장하는 것은 더욱 도전적인 문제가 되었다. 각 데이터베이스 시스템은 고유의 tradeoff가 있기 때문에 그들의 원리를 잘 이해하고 사용하는 것은 중요하다.
각 어플리케이션은 read/write 접근 패턴, 요구되는 일관성 수준, latency 등이 상이하기 때문에 이들의 특성을 잘 이해하고 가장 최적화된 데이터베이스를 선택해야 할 것이다.
이번 글에서는 대부분의 현대 데이터베이스 시스템에서 사용되고 있는 두가지 큰 storage system 디자인인 B-Tree (read optimized), LSM-Tree (write optimized)와 각각의 use case, tradeoff를 알아본다.

B-Tree

B-Tree는 read optimized 자료구조로서 binary tree의 일반화된 형태이다. 수많은 variation이 있으며 여러 데이터베이스 (MySQL InnoDB, PostgreSQL)와 파일 시스템(HFS+, HTrees in ext4)에서도 사용되고 있다.
B-Tree를 쉽게 이해하기 위해서 다음의 자료를 참조하는 것을 추천한다.
https://www.cs.princeton.edu/courses/archive/fall06/cos226/lectures/balanced.pdf


위 자료에서는 binary tree부터 시작해서 balanced tree인 red-black tree, 그리고 점점 더 일반화를 거쳐서 2-3-4 tree, B-Tree를 다룬다.

B-Tree의 성질은 다음과 같다.

  • Sorted

  • Self-balancing. insertion 혹은 deletion 시에 overflow 나 일정수준의 occupancy가 떨어지는 것을 확인하여 node 분할 혹은 합병을 한다.

  • Guarantee of logarithmic lookup time

  • Mutable


LSM-Tree

LSM-Tree (log-structured merge tree)는 write optimized 된 immutable, disk-resident 한 자료구조이다. read 보다 write가 더 빈번한 시스템에서 유용하다. LSM-Tree가 더 인기를 얻는 것은 disk performance를 저하시키는 random insert, update, delete를 없애기 때문이다.


Anatomy of LSM-Tree

sequential write를 허용하기 위해서 LSM-Tree는 write, update를 memory-resident table에 batch 형태로 저장한다. 여기에 사용하는 자료구조는 binary search tree나 skip list 등을 사용한다. 크기가 다 차면 이때 한번에 disk로 저장(flush)한다.

데이터를 retrieve 하는 것은 모든 disk-resident 부분과 in-memory table을 찾아보아야 하며 결과를 돌려주기 전에 merge를 수행한다.


SSTables (Sorted String Table)

여러 현대 LSM-Tree의 구현은 disk-resident table로서 SSTables을 사용한다. 이유로서 단순함 (read/write, search가 쉽다) 병합 성질 (병합 중 SSTable scan과 병합된 결과의 write가 sequential 하다)이 있다.

SSTable은 disk-resisdent ordered immutable 자료구조이다. 구조적으로 data block과 index block으로 나누어져 있어 주로 sparse index 형태의 index block에 먼저 접근하여 data block으로 접근한다. data block의 모든 value는 insert, update, delete가 수행된 시점의 timestamp를 가진다.



SSTable은 다음과 같은 성질을 가진다.

  • point query는 primary index를 찾음으로써 매우 빠르게 수행된다.

  • scan은 data block으로부터 key/value가 순차적으로 read되기 때문에 효율적으로 이루어질 수 있다.

SSTable은 memory-resident table이 flush에 의해 disk에 쓰이기 전 일종의 snapshot이다.


Lookups

데이터를 retrieve 하기 위해서 다음 과정이 수행된다.

  • search all SSTables on disk

  • check the memory-resident table

  • merge their contents together before running the result

검색된 data는 여러 SSTable에 있을 수 있기 때문에 read 중에 merge step이 요구된다.

merge step은 update, delete에 대한 결과를 보장하기 위해서도 필요하다. LSM-Tree에서 delete는 tombstone이라 불리는 placeholder를 insert 하는 것이고 insert는 더 큰 timestamp의 record이다. read 동안 record는 delete에 의해 shadow 되어 return 되지 않거나 더 큰 timestamp로 update 된 record를 return 한다. 아래는 merge step이 서로 다른 SSTable의 데이터를 통합(reconcile)하는 과정을 보여준다.



Bloom Filter


read 시에 검색 대상이 되는 SSTable의 개수를 줄이고 모든 SSTable에 대해서 주어진 key를 가지고 있는지 확인하는 것을 피하기 위해 여러 storage system은 Bloom filter를 사용한다. Bloom filter는 주어진 element가 set에 속하는지 아닌지 판단하기 위해 사용하는 probabilistic data structure 이다. 다음의 명제 두가지를 제공한다.

  • might be in an SSTable (probabily produce false positive)

  • is definitely not in an SSTable (definitely not produce false negative)

따라서 LSM-Tree에서는 이 Bloom filter가 제공하는 정보를 기반으로 search 하지 않아도 되는 SSTable은 skip 할 수 있다. Bloom filter는 어떤 hash function을 몇개 사용하는지, filter는 몇 bit 인지, 총 몇개의 element가 insert 되는지를 참고한다. 더 큰 filter를 사용할수록 false positive의 확률은 줄어들지만 space complexity가 증가하는 tradeoff가 있다.


LSM-Tree Maintenance


SSTable은 immutable이기 때문에 in-place modification을 위한 여유 공간 없이 sequentally write 된다. 이것은 insert, update, delete 모든 operation이 전체 파일에 대해 re-write 되어야 한다는 것을 의미한다. 데이터베이스의 state 를 변경하는 작업은 memory-resident table 에서 batch로 이루어지며, 시간이 지날수록 disk-resident table들은 그 크기가 계속해서 커진다.

증가하는 read cost를 줄이고, shadow record에 의한 공간을 일치시키고, disk-resident table의 개수를 줄이기 위해 LSM-Tree는 compaction process를 요구한다. 이 process에서 disk의 모든 SSTable을 읽어 병합한다.

각 SSTable은 key를 기준으로 sort 되어있기 때문에 이 과정은 merge sort 처럼 동작하여 매우 효율적이다.

  • 여러 SSTable로부터 sequential 하게 read 된다.

  • 병합 결과 SSTable 또한 sequential 하게 write 된다.

  • merge sort는 memory에 모두 적재할 수 없는 큰 파일에서도 잘 동작한다.

  • stable sort 이므로 기존 record의 순서를 보존한다.

compaction이 완료되면 기존의 SSTable은 버리고 새로운 SSTabl로 대체한다. 일부 데이터베이스 시스템에서는 같은 크기의 table들을 같은 level 로 group 지어서 각 level 별로 충분한 table이 생성되면 그 상위 level의 table로 compaction이 이루어지도록 한다.


Atomicity and Durability

B-Tree와 LSM-Tree에서 모두 I/O operation의 수를 줄이고 sequential하게 이루어지도록 하기 위해 실제 update를 하기 전 memory에 operation을 batch 한다. 이는 시스템에 장애가 발생했을 때 data integrity가 보장되지 않을 수 있으며 atomicity, durability 를 확신할 수 없다는 것을 암시한다.

이 문제를 해결하기 위해서 대부분의 현대 데이터베이스 시스템에서는 WAL (write-ahead log)를 사용한다. WAL의 핵심 아이디어는 모든 state modification들을 disk 상의 append-only log로 남기는 것이다. 장애 상황에서도 WAL을 replay 하여 이전의 상황을 재현 가능하다.

B-Tree에서는 WAL은 반드시 로그를 남긴 뒤에만 데이터 파일에 변경 사항을 적용하는 것으로 사용된다. 비교적 작은 크기의 WAL을 남기며 data page에 적용되지 않은 변경 사항을 WAL을 통해 재현 가능하다.

LSM-Tree에서는 WAL은 memtable에는 적용되었으나 disk에 완전히 flush 되지 않은 변경사항을 저장하기 위해 사용된다. memtable이 flush 되어 새로운 read가 새로 생성된 SSTable에서 이루어지는 시점부터 해당 segment는 제거된다.


Summarizing 

B-Tree와 LSM-Tree의 가장 큰 차이는 read/write 중 어떤 것에 최적화되었고 이 최적화에 의한 암시가 무엇인지다. 이 둘의 성질을 비교해보자.

B-Tree는 다음의 성질을 가진다.

  • mutable

  • read-optimized

  • write는 연쇄적인 (cascaded) node split을 발생시킬 수 있으며 이는 write operation을 더 무겁게 한다.

  • byte adressing이 불가능한 page environment (block storage)에 최적화 되어있다.

  • 빈번한 update에 의한 fragmentation이 발생할 수 있으며 추가적인 maintenance와 block rewrite를 요구한다. 하지만 LSM-Tree의 maintenance 보다 가볍다.

  • concurrent access는 reader/writer isolation을 요구하며 lock, latch의 chain을 포함한다.

LSM-Tree는 다음의 성질을 가진다.

  • immutable. disk에 한번 write 되면 절대 update 되지 않는다. immutable에서 오는 장점 중 하나는 flush 된 table에 대해서 concurrent access가 가능하다는 점이다.

  • write-optimized.

  • read는 여러 source (SSTable)을 거쳐야 하며 merge process를 거칠 수도 있다.

  • buffered write가 disk에 flush 되어야 하므로 maintenance/compaction이 요구된다.


References

- Alex Petrov, 2018. Algorithms Behind Modern Storage Systems: Different uses for read-optimized B-trees and write-optimized LSM-trees. Queue; 
https://dl.acm.org/doi/10.1145/3212477.3220266
B-Tree

Sunday, 20 September 2020

Decaton - High throughput asynchronous task processing based on Apache Kafka

 Introduction 

Decaton은 LINE에서 open source화 한 Apache Kafka 기반의 streaming task processing framework이다. Decaton은 하나의 partition에서 소비되는 record의 동시처리를 가능하게 하도록 설계되었다. Kafka는 기본적으로 consumer 당 하나의 thread로 동작하는데 Decaton은 여기서부터 오는 한계점을 개선하고자 한 framework이다. Decaton은 아래 세가지 성질을 만족한다. 

- 하나의 partition으로부터 오는 record를 동시처리(multi-thread or asynchronous) 
- record key 별 처리 순서 보장
- record들이 처리되는 순서와 상관 없이 at-least-once semantic을 보장  

아래 두가지는 Kafka consumer에서 기본적으로 보장해주는 성질인데 일관성 유지 및 계산 결과 보장 등 데이터 정합성을 위해 아주 중요하다. 동시처리 model에서는 이것이 깨질 수가 있는데 Decaton에서는 동시처리를 하면서도 높은 수준의 데이터 정합성을 지킨다. 

Why Need Concurrent Processing in Consumer

Kafka topic은 partition이라는 분산처리의 단위로 나누어져서 저장된다. 어떤 topic에 저장할 record는 아래의 그림과 같이 여러 partition에 나누어 저장되며 각 partition 내에서는 순서대로 read/write가 되는 것을 보장한다. 


 consumer는 하나의 topic으로 produce되는 message를 모두 처리하는데, 하나의 consumer가 처리할 수 있는 message의 양보다 producer가 생상하는 message의 양이 더 많으면 (= producer의 throughput이 더 높으면) 어떤 message들은 처리되지 못하고 계속 broker에 남아있게 된다. 더 많은 message를 소비할 수 있도록 consumer instance를 추가한다. 하지만 Kafka에서 하나의 partition을 여러 consumer에서 처리하는 것은 불가능하므로 consumer의 최대 concurrency는 partition 개수로 한정된다. 


Kafka를 사용하는 여러 사례들에서 이 model로도 충분히 우수한 성능을 보였다. 하지만 사용 중인 Kafka에서 concurrency를 높여야 하는 상황일 때 다음의 두가지 문제점이 있을 수 있다. 첫째, consumer instance를 추가하면서 (horizontal scaling) 비용이 추가로 발생하며, consumer group coordinator의 workload가 증가하고 network traffic이 더 발생할 수 있다. 둘째, consumer에서 처리하는 작업이 CPU intensive한 작업이 아니라 대기 시간을 요하는 I/O intensive 작업이라면 instance를 추가하는 것으로부터 얻는 효용이 높지 않다. consumer의 CPU 사용률이 높지 않은데도 instance를 추가하려는 상황이라면 기존 instance의 사용률을 높일 방안을 생각해보아야 한다.

Multi-Threaded Message Consumption with Kafka Consumer


Kafka에 대한 enterprise solution을 제공하는 Confluent의 웹페이지에서 동일한 문제를 다루고 있다. 마찬가지로 thread per consumer model에서부터 오는 한계점을 지적하고 있으며 multi-threaded model을 도입했을 때의 side effect 및 개선 방안을 제시한다.

요약하면 다음과 같다.

suggestion.
한 consumer 내에서 동시처리를 위해 고정된 개수의 thread pool을 사용한다. partition 별로 thread를 할당하여 비동기로 처리하도록 한다.

improvements.
CPU 사용률을 제한하기 위해 고정된 개수의 thread pool을 사용하였다. 반면 비동기로 처리하므로 consumer는 별도의 대기 없이 poll을 통해서 record를 무한정 가져올 수가 있는데, 그러면 thread pool의 처리량을 넘게 되고 서로다른 thread에서 동일한 partition의 message를 처리하게 된다. 여러 thread에서 한 partition의 message를 처리하는 것은 곧 Kafka에서 제공하는 데이터 정합성을 깨뜨린다는 의미이다. 이를 방지하기 위해서 consumer는 poll 전에 각 partition에 할당된 thread의 상태를 확인한 다음 추가 record를 가져와도 된다면 fetch 해오고 아니면 일시 정지시키며 (KafkaConsumer.pause(), KafkaConsumer.resume()) 한 partition에 두개 이상의 thread 할당되지 않도록 한다. 또 automatic offset commit option을 off 시켰다. 개별 thread에 할당된 각 record가 처리되었는지 main thread는 모르기 때문이다. 마찬가지로 consumer는 poll 전에 작업 완료한 thread 별로 offset을 얻어와 commit 한다. 
consumer 내에서 여러 partition에 대한 동시처리를 하고 있을 때 consumer group rebalancing이 발생할 수 있다. 각 partition 별로 할당된 thread들은 이에 대한 처리를 해주어야 한다. 이를 위해서 subscription 시 ConsumerRebalanceListener.onPartitionsRevoke() callback을 등록하여 callback 내부에서 revoked partition들에 대한 처리를 멈추고 작업 완료된 마지막 offset를 commit하게 하였다.

Decaton

위 Confluent 글을 읽으면서 Decaton이 해결하고자 하는 문제를 조금 더 쉽게 이해할 수 있었다. 요약하자면 consumer 내 동시처리를 위해서 offset commit coordination이 필요한데 Decaton은 이를 대용량 처리 환경에서도 적절하게 처리할 수 있도록 한다. Decaton에서는 sliding offset range를 방식으로 동작하며 TCP의 sliding window protocol과 유사하다고 이해하면 쉽다. 위의 방식과 성능 비교는 당장에 할 수 없지만 개별 stream 당 1초에 100만건 이상의 I/O task를 처리하는 LINE 서버 시스템에서 검증되었으므로 그 우수한 성능을 이미 입증했다고 볼 수 있다.

Decaton의 동작 방식을 이해하기 위해서 다음의 class들을 중점적으로 살펴보는 것을 추천한다.
```
ProcessorSubscription
PartitionContext
PartitionContexts
ProcessPipeline, Processors 
PartitionProcessor
TaskRequest
```
하나의 topic으로부터 message를 받아오는 consumer의 동작은 ProcessorSubscription class가 담당하며 전체적인 흐름은 아래와 같다. PartitionContext는 subscription instance에 할당된 한 partition에 대한 모든 exeuction context를 제공한다. 한 partition마다 가지는 여러 thread들은 ProcessorUnit에 대응된다. 


ProcessorSubscription

1. pollOnce()하여 records들을 받아옴.
1-1. record마다 각 partition에 대응되는 context에 offset 등록 및 task 제출. 
1-2. 제출된 task는 scheduleThenProcess() 이후 complete()를 호출하여 lcocally commited 된다. 각 context 들은 가장 먼저 등록되었던 task가 locally commited 되었으면 highWatermark를 해당 offset으로 갱신한다.
2. commitIntervalMillis 만큼의 시간이 지났으면 commitCompletedOffsets() 하여 각 partition 마다 offset을 갱신한다.
2-1. consumer.commitSync()를 호출하여 ConsumerCoordinator를 통해 offset을 commit 한다.
2-2. local의 lastCommittedOffset을 갱신한다. 
```
public class ProcessorSubscription extends Thread implements AsyncShutdownable {
    public void run() {
        try {
            consumer.subscribe(subscribeTopics(), new ConsumerRebalanceListener(),...
 
            long lastCommittedMillis = System.currentTimeMillis();
            while (!terminated.get()) {
                pollOnce(consumer);
                if (System.currentTimeMillis() - lastCommittedMillis >= commitIntervalMillis.value()) {
                    try {
                        commitCompletedOffsets(consumer);
                    catch (CommitFailedException | TimeoutException e) {
                        logger.warn("Offset commit failed, but continuing to consume", e);
                        // Continue processing, assuming commit will be handled successfully in next attempt.
                    }
                    lastCommittedMillis = System.currentTimeMillis();
                }
            }
        }
...
    }
 
    private void pollOnce(Consumer<String, byte[]> consumer) {
        ConsumerRecords<String, byte[]> records = consumer.poll(POLL_TIMEOUT_MILLIS);
 
        records.forEach(record -> {
            TopicPartition tp = new TopicPartition(record.topic(), record.partition());
            PartitionContext context = contexts.get(tp);
            DeferredCompletion completion = context.registerOffset(record.offset());
 
            if (blacklistedKeysFilter.shouldTake(record)) {
                TaskRequest taskRequest =
                        new TaskRequest(tp, record.offset(), completion, record.key(), record.value());
                context.addRequest(taskRequest);
            else {
                completion.complete();
            }
        });
        contexts.updateHighWatermarks();
        contexts.maybeHandlePropertyReload();
        pausePartitions(consumer);
        resumePartitions(consumer);
    }
 
    private void commitCompletedOffsets(Consumer<?, ?> consumer) {
        // fetch offsets to be committed 
        Map<TopicPartition, OffsetAndMetadata> commitOffsets = contexts.commitOffsets();
        if (commitOffsets.isEmpty()) {
            logger.debug("No new offsets to commit, skipping commit");
            return;
        }
        logger.debug("Committing offsets: {}", commitOffsets);
        consumer.commitSync(commitOffsets);
        contexts.updateCommittedOffsets(commitOffsets);
    }
}
```

PartitionContext

```
public class PartitionContext {
    public OptionalLong offsetWaitingCommit() {
        long readyOffset = commitControl.commitReadyOffset();
        if (readyOffset > lastCommittedOffset) {
            return OptionalLong.of(readyOffset);
        }
        return OptionalLong.empty();
    }
 
    public void updateHighWatermark() {
        commitControl.updateHighWatermark();
        metrics.tasksPending.set(commitControl.pendingOffsetsCount());
    }
 
    public DeferredCompletion registerOffset(long offset) {
        return commitControl.reportFetchedOffset(offset);
    }
}
```

PartitionProcessor

```
/**
 * This class is responsible for following portions:
 * - Create and manage configured number of {@link ProcessorUnit}s to parallel-ize processing of tasks received
 *   from single partition.
 * - Route fed task appropriately to one of belonging {@link ProcessorUnit}s, respecting task's key for keeping
 *   process locally and ordering.
 * - Manage lifecycle of {@link DecatonProcessor}s for each {@link ProcessorUnit}s.
 */
public class PartitionProcessor implements AsyncShutdownable {
    private final PartitionScope scope;
    private final Processors<?> processors;
 
    private final List<ProcessorUnit> units; // unit of parallelism 
 
...
}
```


OutOfOrderCommitControl

Decaton의 offset commit은 OutOfOrderCommitControl class에서 이루어진다. OutOfOrderCommitControl class 내에서 각 task에 대한 상태(offset , committed)는 deque에서 저장하여 sliding window 방식으로 broker에 commit 한다. 

```
public class OutOfOrderCommitControl {
 
    static class OffsetState {
        private final long offset;
        private volatile boolean committed;
  }
 
    private final Deque<OffsetState> states;
    ... 
     
    public synchronized DeferredCompletion reportFetchedOffset(long offset) {
...
        OffsetState state = new OffsetState(offset, false);
        states.addLast(state);
        latest = state.offset;
 
        return () -> complete(state);
    }   
 
    public synchronized void updateHighWatermark() {
...
        long lastHighWatermark = highWatermark;
 
        OffsetState state;
        while ((state = states.peekFirst()) != null) {
            earliest = state.offset;
            if (state.committed) {
                highWatermark = state.offset;
                states.pollFirst();
            else {
                break;
            }
        }
 
        if (highWatermark != lastHighWatermark) {
            logger.debug("High watermark updated for {}: {} => {}",
                         topicPartition, lastHighWatermark, highWatermark);
        }
    }
}
```

Internal Queueing and Workers

> Once the offset commit coordination works, we can process records from single partition in parallel using multiple threads.
Decaton manages internal queues and associated threads to process records for each partition. It relaxes ordering guarantee from partition-based to key-based and route each record into internal queue which will be consumed and processed by associated worker with logic that you supply.




Decaton은 동시처리에 단위인 thread 마다 internal queue가 있어 각 partition으로부터의 record를 처리한다. 이는 partition-based에서 key-based로 완화된 guarantee를 제공하며 key에 따라 internal queue에 routing 한다. 본래 producer에서 partition으로의 분배가 key에 따라 결정되는데 decaton에서는 이 key에 대해 추가적으로 routing 해주기 위해서 prefix를 첨부하여 mumur2 hash 처리를 한다. 

Offset Commit Strategy 


Decaton의 offset commit은 OutOfOrderCommitControl class에서 이루어진다. OutOfOrderCommitControl class 내에서 각 task에 대한 상태(offset , committed)는 deque에서 저장하여 sliding window 방식으로 broker에 commit 한다. 

References

- https://github.com/line/decaton
- https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/
- https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/
- https://kafka.apache.org
- https://dzone.com/articles/kafka-topic-architecture-replication-failover-and