Sunday, 9 August 2020

[Paper Review] MapReduce: Simplified Data Processing on Large Clusters

Background

computation은 아마도 컴퓨터가 존재하는 목적 그 자체일 것이다. 컴퓨터가 탄생한 시기와는 달리 현대에는 대량의 데이터를 수집, 저장하는 것이 가능해졌고 이를 적절한 수행시간 내에 처리하기 위해서 여러 컴퓨터에 데이터를 분산시켜서 (parallelism, distributed) 처리하는 방법이 제안되었다. 이중 MapReduce는 2000년대 초 Google에서 개발한 프로그래밍 모델 및 구현으로 수천대의 상용 컴퓨터로 이루어진 cluster에서 대량의 데이터가 병렬, 분산처리 될 수 있도록 하였다. 논문에서도 보여주듯 이 분산처리 computation model은 초기 Google에서 성공적으로 사용되었고 현재에도 대량의 데이터를 처리하는 모든 곳에서 널리 사용되고 있다. MapReduce 이후 Spark와 같은 다양한 분산처리 library들이 특성에 맞게 사용되고 있지만 분산처리 computation model을 위한 MapReduce를 잘 이해하는 것은 필연적으로 도움이 될 것이다. 본 review에서는 논문의 번역 및 요약과 더불어 중간중간 부연 설명을 추가하였다. 세부사항은 생략하였으니 논문을 참조하기를 부탁드린다.

2018년 처음 이 논문을 읽기를 시도했었는데 너무 어려워서 읽지 못하고 포기했던 기억이 있다. 하지만 그동안 알게된 분산 시스템에 대한 기초 지식과 Hadoop MapReduce를 직접 사용해본 경험을 가지고 논문을 다시 읽으니 이전보다 훨씬 더 논문이 잘 읽혔다. (짝짝짝!!!)

Abstract 

MapReduce는 large data set을 process하고 generate하기 위한 programming model과 그에 따른 implementation이다. programmer는 key/value pair를 처리하여 intermediate key/value pairs를 생성하는 map function과 같은 intermediate key들을 merge하는 reduce function을 통해 MapReduce program을 구현한다. paper에서 보여주듯 수많은 real world task들이 map reduce를 통해 표현 가능하다.

함수형으로 작성된 program은 자동으로 병렬처리 되어 상용 machine으로 이루어진 대량의 cluster에서 실행된다. MapReduce의 runtime system은 분산 시스템 환경에서 필요한 여러 실행 세부사항들인 partitioning input data, scheduling the program execution across a set of machines, handling machine failures, and managing the required inter-machine communication들을 수행한다. 이러한 abstraction은 병렬, 분산 시스템의 지식이 없는 programmer도 분산 시스템의 resource를 쉽게 활용할 수 있도록 한다.

1. Introduction

Google에서는 crawled documents, web request logs 등 대량의 raw data로부터 inverted indices, graph structe of web documents 등 derive data로 처리하기 위한 special-purpose computations들이 작성되었다. 하지만 input data의 크기가 매우 커짐으로써 computation은 필연적으로 수천대의 machine에 분산처리 될 필요가 있었다. 하지만 분산처리 환경에서의 다음과 같은 문제들은 원래 처리해야할 단순한 computation program의 본질을 흐려 complexity를 높였다.

  • parallelize a computation

  • distribtue data across a set of machines

  • handle machine failures (fault-tolerant)

이러한 complexity를 해결하고자, 분산 시스템에서의 처리를 위한 위의 복잡한 detail은 숨기고 단순한 computation model만을 구현하는 새로운 abstraction을 design 하였다. 이 abstraction은 LISP 및 여러 함수형 언어의 map, reduce primitive로부터 영감을 얻었다. Google은 그들이 기존에 사용하였던 대부분의 computations 들이 map, reduce를 적용하는 것을 포함하는 것을 깨달았다. 또한 이러한 functional model의 사용은 병렬처리와 fault tolerance에 대한 primary mechanism으로서 re-execution을 쉽게 하였다.

논문에서 주요 contribution은 두가지다.

  • a simple and powerful interface, that enables parallization and distribution of large-scale computations

  • an implementation of this interface that achieves high performance on large clusters of commodity PCs.


2. Programming Model 

MapReduce는 input key/value pairs를 받아서 output key/value pairs를 생성한다. MapReduce library의 사용자는 다음의 두 function을 표현한다.

Map.

input pair를 받아서 intermediate key/value pairs를 생성한다. MapReduce library는 같은 intermediate key를 가진 intermediate value들을 group 지어 Reduce function으로 보낸다.

Reduce.

intermediate key와 해당 key를 가진 value의 set들을 input으로 받는다. 실제 program 상에는 value의 iterator가 주어져서 memory에 저장하는 것 이상의 value들에 접근할 수 있도록 한다. 주어진 input을 merge 등 추가 처리하여 최종 output을 만들어낸다.

2.1 Example

다음은 대표적인 computation model인 word count의 pseudo code이다.
```
map(String key, String value): 
// key: document name
// value: document contents 
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values): 
// key: a word
// values: a list of counts
int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));
```

2.2 Types

위 pseudo code에서는 strinng input, output으로 표현되었지만 개념적으로는 아래와 같은 type으로 표현된다.
```
map (k1,v1) → list(k2,v2) 
reduce (k2,list(v2)) → list(v2)
```

2.3 More Examples 

다음은 MapReduce computation model로 쉽게 표현될 수 있는 여러 흥미로운 program들이다. 기대보다 더 단순한 형태로 해당 program이 작성될 수 있음에 놀랄 것이다.

Distributed Grep, Count of URL Access, Reverse Web-Link Graph, Term-Vector per Host, Inverted Index, Distribued Sort


3. Implementation

MapReduce interface의 구현은 여러 방안으로 가능하며 가장 적합한 것은 실행 환경에 달려있다. 소규모의 shared-memory machine이나 NUMA multi-processor, 혹은 networked machine으로 이루어진 대량의 cluster 등 다양한 환경에서 구현이 이루어질 수 있다. 논문에서는 Google에서 널리 사용되고 있는 switched Ethernet 내 상용 machine으로 이루어진 cluster를 대상으로한 구현을 다룬다. 환경은 아래와 같다. 처음 논문이 공개된 시기가 2004년이므로 현재 2020년도 보다 hardware 성능이 떨어지지만 서로 다른 성능의 상용 machine을 그대로 활용한다는 점, hardware (CPU, network) 성능이 선형적으로 증가했다는 점에서 크게 다르지 않다. 따라서 여전히 network bandwidth가 귀중한 자원이며 분산 시스템에서의 잠재적 위험성은 유요하다.

1) Machines are typically dual-processor x86 processors running Linux, with 2-4 GB of memory per machine.

2) Commodity networking hardware is used – typically either 100 megabits/second or 1 gigabit/second at the machine level, but averaging considerably less in over- all bisection bandwidth.

3) A cluster consists of hundreds or thousands of ma- chines, and therefore machine failures are common.

4) Storage is provided by inexpensive IDE disks at- tached directly to individual machines. A distributed file system [8] developed in-house is used to manage the data stored on these disks. The file system uses replication to provide availability and reliability on top of unreliable hardware.

5) Users submit jobs to a scheduling system. Each job consists of a set of tasks, and is mapped by the scheduler to a set of available machines within a cluster.

3.1 Execution Overview


Map function은 여러 machine에 걸쳐 일어나며 input data는 M splits paritioning 만큼 분산된다. 각 input splits은 서로 다른 machine들에서 병렬로 처리된다. Reduce function은 주어진 intermediate key space가 R개로 partitioning 되어 실행된다. 이때 partitioning function은 'hash(key) mod R' 등의 function이 사용된다.

MapReduce operation의 overall flow는 Figure 1에서 나타내고 있다.

  1. MapReduce library는 input files를 M pieces (보통 16MB~64MB. GFS의 기본 unit 단위 크기)만큼 분할한다. 그다음 cluster 내 machine들에 구현한 computation program을 실행한다.

  2. 이 program들 중 하나는 별도의 기능을 하며 master라고 한다. 다른 program들은 worker라고 부른다. 총 M개의 map task, R개의 reduce task가 있으며 master는 idle worker를 선택하여 task를 할당한다.

  3. map task를 할당 받은 workder는 input split으로부터 data를 읽는다. input data로부터 key/value pair를 parse하여 Map function으로 전달한다. Map function에 의해 생성된 intermediate key/value pair는 memory에 buffer 된다.

  4. buffer된 key/value pair는 주기적으로 local disk에 저장된다. 이때 partitioning function에 의해 R개의 region으로 분할되어 저장된다. local disk에 저장된 각 buffered pair의 location은 master에 전해지며 master는 reduce task를 수행하는 worker에 이를 전달한다.

  5. reduce worker가 master로부터 partitioned intermediate key/value pair에 대한 location 정보를 받으면, RPC를 통해서 map worker에 있는 buffered data를 읽는다. data를 모두 읽으면 intermediate key에 대해 sort를 수행하여 같은 key를 가지고 있는 data는 group 지어지도록 한다. 이는 반드시 필요한 과정인데, 서로다른 key들이 같은 reduce task에 전달될 수 있기 때문이다. sorting 중 data가 memory에 담기 너무 크면 external sort가 수행된다.

  6. reduce worker는 sort 된 intermediate data를 iterate 하면서 key와 그에 대응하는 value set을 Reduce function으로 전달한다. Reduce function의 output은 해당 reduce partition의 최종 output file이 된다.

  7. 모든 map task, reduce task가 완료되면 master는 user program을 깨운다.

실행을 완료하면 주어진 MapReduce execution의 output은 R개의 output file로 존재한다. 종종 우리는 이 R개의 output file을 그대로 두는데 이는 또다른 MapReduce program의 input data로 바로 활용하거나 다른 distributed application에 사용된다.

3.2 Master Data Structures

master는 여러 data structure를 유지한다. 개별 map task, reduce task에 대한 state (idle, in-progress, completed)와 non-idle worker machine의 identity를 저장한다. 또한 master는 각 map task에서 reduce task로 전달되는 intermediate file region에 대한 location 정보(O(M*R))를 저장하는 수로와도 같다. map task가 완료될 때마다 location과 size 정보는 update되며 in-progress 상태의 reduce task에 점진적으로(incrementally) 전달된다.

3.3 Fault Tolerance 

MapReduce library는 수백, 수천대의 machine에서 대량의 data를 처리하기 위해 발명되었으므로 machine failure에 graceful하게 대처하여야 한다.

machine failures in a distributed system?

분산 시스템에서 machine failure는 드문 현상이 아니다. single machine에서 hardware failure가 일어날 가능성은 낮지만 이를 수천대 운용하는 cluster에서 하루에 하나의 machine 이상에서 failure가 날 가능성은 비교적 높다.

Worker Faliure

master는 모든 worker에 주기적으로 ping을 날린다. worker가 응답하지 않으면 master는 해당 node를 failed 처리하고 할당된 map task 혹은 reduce task를 완료한 worker에 실패한 task를 re-scheduling하여 재 실행될 수 있도록 한다. 만약 어떤 map task가 worker A에서 수행되다가 worker A가 실패하여 workder B에서 최종 수행되었다면, 이 결과를 read할 reduce task는 master로부터 재실행에 관해 알려지고 worker B로부터 read 해간다.

MapReduce는 다수의 workder failure에도 resilient하다. 만약 network maintenance 등으로 수분동안 수십대의 machine이 사용 불가능하더라도 master는 단순히 re-scheduling 후 재실행하여 'eventually complete' 될 수 있도록 한다.

Master Failure

master data strcuture에 대해 주기적인 checkpoint를 write하여 master가 실패했을 때 last checkpoint로부터 재수행하는 것이 가능하다. 하지만 single master에서 실패는 unlikely 하므로 현재의 구현은 MapReduce computation을 abort 시킨다. client는 이를 확인하여 언제든지 재시도 할 수 있다.

Semantics in the Presence of Failures

user가 구현한 map function, reduce function이 deterministic하다면 MapReduce의 분산 실행은 single machine에서의 non-faulting sequential execution과 같은 결과를 생성한다.

Google의 MapReduce는 이러한 성질을 위해 map, reduce task의 atomic commit에 의존한다. 각 in-progress task는 output을 private temporary file에 쓴다. map task는 R개의 file을, reduce task는 1개의 file을 생성한다. map task가 완료되면 worker는 master에게 R개의 temporary file의 이름을 알려주고, 만약 같은 이름의 file이 담긴 message를 이미 받았으면 무시하고 아니면 master data structure에다가 write 한다. reduce task가 완료되면 reduce worker는 temporary output file에 filnal output file를 atomic하게 rename한다. 이 atomic rename operation은 최종적으로 파일 시스템에 해당 reduce task가 올바르게 한번 수행될 수 있도록 한다.

만약 map, reduce task가 non-determinitic 하더라도 우리는 weaker, but still reasonable한 결과를 보장할 것이다. 하지만 map task M, reduce task R1, R2가 있는 상황을 가정하자. e(Ri)를 commit된 (exactly executed once) execution으로 정의한다면 여기서 weaker semantics이 발생하는데, e(R1)과 e(R2) 각각은 map task M의 서로 다른 non-deterministic한 결과를 읽기 때문이다.

3.4 Locality 

Google의 MapReduce의 구현에서 performance 상 매우 중요한 attribute이다.

computing environment에서 network bandwidth는 매우 희소한 자원이므로, input data가 computing cluster를 이루는 machine들의 local disk에 저장된 것을 활용하였다. 즉, MapReduce는 각 input file들의 location 정보를 고려하여 특정 map task가 input data의 replica가 있는 machine 상에서 실행될 수 있도록 하였다. 이것이 실패하더라도 replica의 주변 (ex. 같은 network switch)에서 수행되도록 하였다. 결과적으로 대부분의 input data가 local read 하였고 network bandwidth를 소모하지 않았다.

3.5 Task Granularity

위에서 map phase를 M 조각으로, reduce phase를 R 조각으로 세분하였다. 이상적으로 M, R은 worker machine 수보다 아주 크게 잡아야 한다. 각 worker가 여러 다른 task를 수행하게 하는 것은 dynamic load balancing에 유리하고 worker가 실패했을 때 recovery 속도도 높인다.

M, R을 얼마나 크게 잡아야할지에 대한 practical bound는 존재한다. master는 O(M+R)의 scheduling을 정해야 하고 위에서 언급했듯이 O(M*R)만큼의 state, location 정보를 유지해야 한다. (memory에 저장되는 절대적 크기는 크지 않다)

R은 R개의 output file을 생성하는 것을 의미하므로 구현하는 사용자에 의해서 정해진다. M은 실용적으로 input data의 크기가 16MB~64MB가 되도록 정하여 locality를 극대화하도록 한다. Google에서 수행한 실험에서는 2000대의 machine 환경에서 M = 200K, R = 5K로 종종 수행하였다.

3.6 Backup Tasks

 MapReduce의 수행시간을 길게하는 원인 중 하나로 straggler가 있다. straggler는 소수의 map task 혹은 reduce task를 수행하는 데 비정상적으로 긴 시간을 소요하는 machine을 의미한다. straggler는 여러 이유에서 발생한다. bad disk, scheduling으로 인한 CPU, memory, disk, network bandwidth 등 자원의 경합이 될 수 있다. 최근에 발견한 bug로는 processor cache를 disable 시키는 initialization code도 있었다.

이러한 straggler로 인한 문제를 완화하기 위해서 general mechanism을 도입하였다. MapReduce operation이 수행 완료에 가까워지면 master는 남아있는 in-progress task에 대한 backup execution을 schedule 한다. 그러면 그 task는 primary 혹은 backup execution에서 완료하게 된다. 논문에서는 이 mechanism을 tune 하여 수 percent의 추가 자원을 사용하면서 수행 완료시간을 상당히 줄이는 결과를 얻었다.


4. Refinements 

다소 세부적인 내용이므로 생략하였다.

5. Performance 

논문에서는 두 computation model, grep과 sort에 대해 MapReduce를 수행하여 performance를 측정하였다.

5.1 Cluster Configuration 

약 1800개 machine으로 이루어진 cluster에서 MapReduce program을 수행하였다. 각 machine은 2GHz Intel Xeon Processor (with hyper treadin enabled)를 가지며 4GB memory, 2개의 160GB IDE disk, gigabit Ethernet link로 구성되었다. machine들은 2-level tree-shaped switched network에 arrange 되어 root로부터 도합 약 100-200Gbps의 bandwidth를 가진다. 모든 machine은 같은 hosting facility (IDC)에 있어 임의의 machine 간 RTT는 1ms 이내이다.

5.2 Grep


grep program은 10^10 100-byte records를 scan하며 비교적 rare한 세글자의 pattern을 search한다. input split은 약 64MB로 분할되었고 (M = 15K) ouput은 하나의 file에 저장된다 (R = 1).

Figure 2는 시간에 따른 computation의 progress를 나타낸다. 점점 더 많은 machine에서 MapReduce computation을 할당 받을 수록 rate는 점진적으로 증가하다가 1764 workers가 할당 받았을 때 peak를 찍는다. map task가 완료되면 rate는 줄어들기 시작한다. 전체 수행시간은 약 150초가 소요되었다. 이 수행시간에는 startup overhead가 존재한다. overhead에는 propagation of the program to all worker machines, delays interacting with GPS to open the set of 1000 input files, get the inforamation needed for the locality optimization이 포함된다.

5.3 Sort 

sort program은 10^10 100-byte records (=~ 1TB)를 sort 한다. 이 program은 TeraSort benchmark를 따라서 구현되었다.

sorting program은 50 line이 안되는 user code로 이루어져 있으며 3 line의 Map function은 textline으로부터 10-byte의 sorting key를 추출하여 key와 value를 intermediate key/value pair로 보낸다. Reduce function은 built-in Identity function을 사용하며 intermediate key/value pair를 그대로 output key/value pair로 보낸다. 최종 output은 2-way replicated GFS file로 저장된다.

input data는 64MB 크기로 (M = 15000) 분할된다. sorted output은 4000개의 file에 (R = 4000) 저장된다. partitioning function은 R piece중 하나로 분리하기 위해 key의 초기 몇 byte를 사용한다.

Figure 3은 sort program의 execution에 대한 progress를 나타낸다. 위의 graph는 input이 read되는 input rate를 나타낸다. 중간의 graph는 shuffle rate, data가 map task로부터 reduce task로 network 상에서 이동하는 rate를 나타낸다. 아래의 graph는 reduce task에서 sorted data가 최종 output file로 write 되는 rate를 나타낸다.

주목해야 할 몇가지가 있는데, input rate가 shuffle rate와 output rate 보다 비약적으로 높다는 점인데 이는 locality optimization에 따른 결과이다. 제한된 network bandwidth 환경에서 local disk에서 대부분의 data read가 일어났기 때문이다. shuffle rate는 output rate보다 높은데 이는 output phase는 sorted data의 2개의 copy를 write 하기 때문이다. GFS에서 data의 availability, reliability를 위해 서로 다른 machine에 data를 저장하기 때문에 network 상의 data transfer가 발생하여 network bandwidth를 상당 부분 차지한다.

5.4 Effects of Backup Tasks

Figure 3의 (b)에서 backup task가 disable 된 program의 execution을 보여준다. execution flow는 (a)와 거의 같지만, 소수의 task에서 지연이 발생하여 전체 수행시간은 1283 secs가 되어 44%의 지연이 발생하였다.

5.5 Machine Failures 

Figure 3의 (c)은 의도적으로 1746 workers 중 200개를 죽이는 program의 execution을 나타낸다. cluster의 scheduler는 즉시 새로운 process를 수행한다. worker의 kill은 일시적으로 input rate를 음의 값에 이르게 하지만 map task의 re-execution은 매우 신속하게 일어나 전체 computation은 993 secs만에 끝나 총 5%의 지연만을 발생시켰다.

6. Experience

MapReduce library의 초기 버전은 2003년 2월에 작성되었고 locality optimization, dynamic load balancing 등의 비약적인 개선은 2003년 8월에 이루어졌다. 당시 MapReduce library가 Google에서 다루는 아주 다양한 문제들에 적용될 수 있음을 확인하였다.

  • large-scale machine learning problems,

  • clustering problems for the Google News and Froogle products,

  • extraction of data used to produce reports of popular queries (e.g. Google Zeitgeist),

  • extractionofpropertiesofwebpagesfornewexper- iments and products (e.g. extraction of geographi- cal locations from a large corpus of web pages for localized search), and

  • large-scale graph computations.


7. Related Work 

아래 MapReduce의 key idea나 implementation은 다양한 논문으로부터 왔다. 자세한 것은 논문을 참조하기를 바란다.

  • a programming model to abstractize parallelization

  • locality optimization

  • backup task mechanism

  • in-house cluster management system

  • a programming model where processes communicate with each other by sending data over distributed queues


8. Conclusion 

MapReduce programming model은 Google에서 여러 목적을 위해 성공적으로 사용되었다. 연구는 다음과 같은 이유로 성공에 기여하였다.

  • model is easy to use, even for programmers without experience with parallel and distributed systems

  • a large variety of problems are easily expressible

  • an implementation of MapReduce that scales to large clusters of machines

그리고 연구로부터 다음을 배웠다.

  • restricting the programming modle makes it easy to parallelize and distribute computations and to make such computations fault-tolerant.

  • network bandwidth is a scarce resource. the locality optimization and writing a single copy of an intermediate data to local disk saves network bandwidth.

  • redundant execution can be used to reduce the impact of slow machines, and to handle machine failures and data loss.


References

- Jeffrey Dean, & Sanjay Ghemawat (2004). MapReduce: Simplified Data Processing on Large Clusters. In OSDI'04: Sixth Symposium on Operating System Design and Implementation (pp. 137–150).

- Designing Data-Intensive Applications - Martin Kleppmann


Sunday, 26 July 2020

Intro to Docker - 1

Background

최근 프로젝트를 진행하면서 backend system에서 필요한 다양한 component를 직접 설치해보고 있다. 서버의 지표들을 수집하고 시각화된 dashboard를 제공하는 Prometheus, Grafana, 로그를 수집하여 parsing 및 indexing을 하여 로그에 대한 분석을 제공하는 ELK (ElasticSearch, LogStash, Kibana), 그리고 지속적인 통합과 전달을 제공하는 Jenkins까지 하나씩 설치했다. 각 component의 official site 혹은 개인 blog에서 제공하는 문서를 따라 진행하는데 대부분 guide에서 Docker container 상에서 실행하는 것을 권장하여서 자연스레 Docker를 사용하게 되었다. 이전에 Python 관련 프로젝트를 진행하면서 간단하게 사용한 적은 있었지만 Docker가 성능적인 측면에서 거의 손실이 없다는 점, 분산 시스템에서 부하를 처리하기 위한 전략으로서 탄력적인 scale out을 제공하는 기반 기술이 된다는 점 이 두 측면에 대한 이해도가 이전보다 증가했다고 판단하여 이번 기회에 알아보기로 했다. 

Why Docker? 


Snowflake servers

다수의 서버를 manual하게 운영한다면 다음과 같은 문제들이 발생할 수 있다. application을 실행하는 서버 instance의 환경이 각기 달라서 이후 실행 환경 차이에 의해 application이 제대로 동작하지 않을 수 있다. 겪어본 사람은 알겠지만 서버 환경에 대한 문제를 해결하기 위해서는 어떤 software engineer든 어려움을 겪을 수 있다. 최근에는 cloud 환경에서 개발이 이루어지면서 서버 환경에 대한 차이는 어느 정도 극복하였다. 하지만 application의 상황에 맞추어 복수의 instance에 대해 여러 번 할당/해제한다면 각 instance 간 초기 실행 시점, 중간 종료 시점 등 운영 기록이 달라지게 된다. 즉 instance에 설치한 application의 동작이 다행히 같을지라도 서로 다른 snapshot을 가지게 되는 것이다. 서버 운영 기록을 일종의 암묵지여서 서로 간 공유가 쉽지 않고 개별적으로 관리하여 이 암묵지가 쌓인다면 잠재적 장애의 원인이 될 수도 있다. 각기 다른 서버들 간 눈송이처럼 다른 형상을 띠고 있는 현상을 들어 snowflake server라고 한다.

Infrastructure as a code

위 문제를 해결하기 위해서 infrastrcuture를 code화 하여 관리하기 위한 여러 tool이 도입되었다. Vagrant, Ansible 등이 바로 그것이다. 이 tool들을 활용하면 관리자 역할을 node에서 여러 node에 대해 일관된 서버 운영 (provisioning, configuration management, deployment) 명령을 내릴 수 있게 되고 통합 code로 관리할 수 있게 된다. 하지만 이 방법은 위에서 언급한 snapshot의 차이를 극복하지 못한다.

Docker



> Docker provides the ability to package and run an application in a loosely isolated environment called a container. The isolation and security allow you to run many containers simultaneously on a given host.

Docker는 container의 기반이 되는 기술 (namespace, cgroups)을 활용하여 process에 loosely isolated된 실행 환경을 제공해준다. 아래에서 언급하겠지만 Docker의 실행 단위인 container는 주어진 instance의 환경과 상관없이 Docker image를 기반으로만 실행 환경이 구성되기 때문에 위에서 여러 container들에게 같은 실행 시점을 가지게 할 수 있다. 따라서 software engineer는 Docker를 통해 각기 다른 서버 instance들에 대해서도 완전히 같은 실행 환경 속에서 application을 실행할 수 있다. 또한 기존의 VM과 다르게 host OS와 Docker 실행 환경 사이에 hypervisor, guest OS가 존재하는 것이 아니기 때문에 Linux kernel의 성능을 온전히 활용하기 때문에 성능적인 측면에서도 모자람이 없다.

https://docs.docker.com/get-started/overview/#the-underlying-technology


Image, Container


Image

An image is a read-only template with instructions for creating a Docker container. Often, an image is based on another image, with some additional customization. For example, you may build an image which is based on the ubuntu image, but installs the Apache web server and your application, as well as the configuration details needed to make your application run.

Docker image는 Docker container를 생성하기 위한 instruction (실행 환경, application code 등)을 담고있는 template이다. Dockerfile을 build 하여 생성할 수 있으며 read-only이기 때문에 고정된 실행 환경을 제공해줄 수 있다. 한 image는 다른 image를 생성하기 위한 base가 될 수 있으며 다른 image는 base가 되는 image, 새로이 추가될 binary 및 library, 실행해 필요한 metadata 및 file, 환경 변수 등으로 구성이 되어 새로운 image로 생성이 된다.

Container 

> A container is a runnable instance of an image. You can create, start, stop, move, or delete a container using the Docker API or CLI. You can connect a container to one or more networks, attach storage to it, or even create a new image based on its current state.


Docker container는 image의 실행 가능한 instance이며 host OS에서 돌아가는 하나의 격리된 process이다. 위 그림이 나타내듯이 container는 read-only의 image layers과 read/write가 가능한 container layer로 구성되어 있다. 자체 network interface가 있어 host의 network interface와 연결되어 외부랑 통신하며 persistent data를 저장해야 하는 경우 host의 filesystem 위에서 volume 이나 bind mount을 통해서 저장할 수 있다.


Future Actions 

Docker가 제공하는 강점은 container orchestration 도구와 결합되었을 때 극대화된다. 집단의 container들을 대상으로 service 단위로 구분하여 관리하거나 주어진 traffic에 맞추어 container를 추가 투입하거나 회수하여 효율적으로 resource를 관리할 수 있다. Swarm Classic, Swarm Mode 등이 기존에 사용되었으나 최근에는 Google에서 개발한 Kubernetes가 사실상 orchestration의 표준으로 사용되어 AWS, GCP 등 여러 군데서 사용되고 있다. 다음 시간에는 orchestration의 개발 배경과 개념에 대해 알아보고 관련 tool들도 알아보도록 하겠다. 


References 

Sunday, 12 July 2020

Intro to Java cglib

Introduction 


cglib is a powerful, high performance and quality Code Generation Library. It is used to extend JAVA classes and implements interfaces at runtime. See samples and API documentation to learn more about features.
Byte Code Generation Library is high level API to generate and transform JAVA byte code. It is used by AOP, testing, data access frameworks to generate dynamic proxy objects and intercept field access.

Java로 programming을 하는 당신은 적어도 한번은 cglib library를 사용했을 것이다! cglib은 Java bytecode generation library로서 compile time 이후에 class를 조작하거나 생성할 수 있게 한다. bytecode를 추가적으로 장착한다는 맥락에서 bytecode instrumentation library라고도 불린다. cglib을 통해서 대상 class에 대해 dynamic proxy, method intercept가 가능하게 한다. 우리가 널리 사용하는 Spring AOP, Hibernate, Mockito가 cglib을 활용하여 구현되었다. 먼저 간단하게 cglib의 concept를 알아보고 용처에 대해서도 알아보도록 하자. 

Concepts


public class SimpleService {
    
    public String hello(String name) {
        return "Hello~ " + name;    }
    public Integer length(String name) {
        return name.length();    }
}

cglib을 적용할 class는 위와 같다. 대상 class를 subclassing 하기 위해서 cglib의 Enhancer class가 필요하다. Enhancer는 method intercept를 가능하게 하는 dynamic proxy를 생성한다. 

public static SimpleService createFixedValueProxy() {
    Enhancer enhancer = new Enhancer();    enhancer.setSuperclass(SimpleService.class);    enhancer.setCallback((FixedValue) () -> "Hello~ Fixed!");
    return (SimpleService) enhancer.create();}

@Testpublic void testFixedValueProxy() {
    SimpleService proxy = SimpleService.createFixedValueProxy();    assertEquals("Hello~ Fixed!", proxy.hello("abc"));    assertEquals("Hello~ Fixed!", proxy.hello("def"));    assertEquals("Hello~ Fixed!", proxy.hello(anyString()));}

FixedValue는 callback interface로서 proxied method에 대해 고정된 값을 return한다. intercept에 대한 어떤 정보도 제공받지 않는다. 

public static SimpleService createAroundAdviceProxy() {
    Enhancer enhancer = new Enhancer();    enhancer.setSuperclass(SimpleService.class);    enhancer.setCallback((MethodInterceptor) (obj, method, args, proxy) -> {
        if (method.getDeclaringClass() == Object.class) {
            throw new Exception("declaring class is not a SimpleService");        }
        switch (Method.find(method.getName())) {
            case HELLO:
                return "Hello~ Proxy";            case LENGTH:
                return proxy.invokeSuper(obj, args);            default:
                throw new IllegalArgumentException("cannot find "+ method.getName());        }
    });    return (SimpleService) enhancer.create();}

@Testpublic void testAroundAdviceProxy() {
    SimpleService proxy = SimpleService.createAroundAdviceProxy();    assertEquals("Hello~ Proxy", proxy.hello("hi"));    assertEquals("Hello~ Proxy", proxy.hello("hello"));    assertEquals("Hello~ Proxy", proxy.hello(anyString()));    assertEquals((Integer) 2, proxy.length("hi"));    assertEquals((Integer) 5, proxy.length("hello"));}

cglib을 활용해서 조금 더 동적인 기능을 제공하려면 MethodIntercepter interface를 사용한다. MethodIntercepter는 method, args 등의 정보를 제공 받아서 다양한 처리가 가능하다. 위 code처럼 method signature 별로 처리를 할 수 있다. 

Case Study

Spring AOP

@Before("execution(* com.xyz.myapp.dao.*.*(..))")
    public void doAccessCheck() {
        // ...
    }

@AfterReturning("com.xyz.myapp.SystemArchitecture.dataAccessOperation()")
    public void doAccessCheck() {
        // ...
    }

@After("com.xyz.myapp.SystemArchitecture.dataAccessOperation()")
    public void doReleaseLock() {
        // ...
    }

@AfterThrowing("com.xyz.myapp.SystemArchitecture.dataAccessOperation()")
    public void doRecoveryActions() {
        // ...
    }

Spring AOP에서 제공하는 annotation을 통해서 method의 실행 전, 후, exception 처리 후 시점에 대해 추가적인 처리가 가능하다. Spring AOP에서는 AOP 구현을 위해 JDK dynamic proxy 혹은 cglib을 사용한다. JDK dynamic proxy의 사용이 기본적으로 권장되지만 대상 object가 interface를 가지지 않은 경우 cglib이 사용된다. 이에 대한 내용와 code는 아래 link에서 확인할 수 있다. 

https://docs.spring.io/spring/docs/3.0.0.M3/reference/html/ch08s06.html
https://github.com/spring-projects/spring-framework/blob/v5.1.4.RELEASE/spring-aop/src/main/java/org/springframework/aop/framework/DefaultAopProxyFactory.java

@Transactional 


@Transactional
public void businessLogic() {
... use entity manager inside a transaction ...
}
UserTransaction utx = entityManager.getTransaction(); 

try { 
    utx.begin(); 
    businessLogic();
    utx.commit(); 
} catch(Exception ex) { 
    utx.rollback(); 
    throw ex; 
} 

Spring에서 persistent layer에 대한 transaction 처리를 위해 @Transactional annotation을 사용하는데 이때 cglib이 활용된다. code 상의 @Transactional이 아래 code처럼 transaction begin, commit or rollback이 될 수 있도록 bytecode가 generate 된다. 

Mockito

https://proandroiddev.com/mockito-2-x-over-powermock-migration-tips-and-tricks-top-ten-118c52abd1d8

Mockito 2+ 에서는 더이상 cglib으로 구현이 되어있지 않지만, 이전 버전의 Mockito는 cglib을 통해서 구현되었다. Mockito는 대상 class의 method를 아무런 동작을 하지 않는 empty implementation 형태로 확장하였다. 

Conclusion

Java cglib에 대해서 간단히 알아보았다! Java의 bytecode instrumentation 기법 혹은 dynamic proxy이 cglib을 통해서만 구현되어 있는 것은 아니지만 cglib에서 Enhancer class를 통해 subclassing을 하는 기본 원리는 비슷할 것이다 예상한다. 앞으로 마주칠 다양한 AOP 기법이나 testing framework에 대한 이해를 한층 더 높일 수 있을 것으로 기대해본다. 

https://github.com/taehyeok-jang/cglib-example
위 code는 해당 repository에서 확인할 수 있다. 

References 

- https://github.com/cglib/cglib
- https://www.baeldung.com/cglib
- http://cglib.sourceforge.net/apidocs/net/sf/cglib/Enhancer.html
- https://dzone.com/articles/how-does-spring-transactional
- https://docs.spring.io/spring/docs/3.0.0.M3/reference/html/ch08s06.html
- https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#aop-advice

Sunday, 5 July 2020

Intro to Java Profiling - 1

Background

규모가 있는 Java application에서는 OS, JVM의 resource를 최대치까지 사용하거나 다수의 thread들 간 race condition이 발생하는 등 시스템에 영향을 주는 일이 발생한다. 이와 같은 일이 발생하기 이전에, 혹은 이전과 동일한 에러로 인해 시스템에서 장애가 발생하지 않도록 시스템 지표들을 분석하고 이를 바탕으로 application을 개선해야 한다. 

JMX


Java에서는 어떻게 성능을 측정하고 그 지표들을 제공해줄까? JMX (Java Management Extension)를 통해 가능하다. JMX는 Java 표준 중 일부로서 application, system objects, devices, networks를 manage하고 monitoring하기 위한 tool을 제공한다. 또한 JVM에 대한 monitoring도 가능하다. 
위 그림이 나타내는 것처럼 JMX는 크게 instrumentation layer, agent layer, distributed service layer 이렇게 three layer로 구성되어 있다. instrumentation layer는 POJO를 wrapping하여 JMX에서 관리하는 MBeans (managed beans) 형태로 만드는 역할을 한다. 이렇게 만들어진 bean은 agent layer에서 creation, registration, deletion 전체 bean lifecycle에 대해서 관리가 된다. agent layer는 MBeans에 대한 control을 하는 부분과 distributed service layer에 대한 API를 제공하는 역할을 한다. Java application 내부에서는 MBeanServer를 실행시킴으로써 동작하며 target MBeans을 이 MBeanServer에등록(registration)할 수 있다. 마지막으로 distributed service layer에서는 local 혹은 remote에 대한 API를 제공하여 MBeans에 대한 operation이나 local cache evict 등의 제어를 할 수 있게 하며 성능 지표를 제공한다. 

https://github.com/taehyeok-jang/jmx-sample

위 링크는 JMX를 사용하는 간단한 code이다. JVM마다 JMX access에 관한 default 설정이 다르므로 실행 환경의 JVM을 살펴볼 필요가 있지만, JVM 실행 시 '-Dcom.sun.management.jmxremote'으로 설정하면 enable하여 JMX access를 가능하게 한다. 다른 설정들로 SSL을 통한 access만 가능하게 하거나, id/password를 사용한 접근만 허용하거나 target application에서 사용하는 listening port와 다른 port를 설정할 수 있게 한다. port를 다르게 하는 것은 특히 deployment 환경에서 중요한데 system 성능 지표를 노출하는 port가 외부로 열려있는 것은 위험하므로, 통상적으로 JMX에 대한 port는 inbound로만 접근이 가능하도록 한다. 

```
java -jar ./build/libs/jmx-sample.jar 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.port=1617 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
```


JConsole을 통해서 위 application의 JMX에 접근을 하면 MBean에 대한 정보를 확인할 수 있고 method를 실행할 수도 있다. 



JConsole, VisualVM 

JMX에서 제공하는 monitoring 기능을 활용하여 Java 성능 profiling을 제공해주는 여러 tool이 있다. 그중에서 대표적인 것이 Java 표준에 속한 JConsole과 OSS로 관리되는 VisualVM이다. 
shell에서 jconsole 명령어를 입력하면 위와 같은 JConsole GUI application이 실행된다. JConsole에서는 JVM에대한 overview, memory, threads, classes 등 시스템에 관한 전반적인 지표를 보여준다. 하지만 다소 지엽적인 형태로 제공해주고만 있는데 조금 더 의미있는 정보를 제공해주는 다른 tool을 살펴보자. 


VisualVM은 본래 Java 표준에 있던 기술이었지만 현재는 OSS로서 계속 개발되고 있으며, JConsole보다 더 다양한 성능 profiling을 제공한다. 



위 그림에서와 같이 시스템 성능 지표를 제공해줄뿐만 아니라 thread dump, heap dump가 가능하며 sampling, profiling을 제공하여 자세한 분석이 가능하다. 아직 sampling, profiling은 제대로 사용하는 방법을 몰라서 소개가 어려운데 아래 영상을 통해서 어떤 기능인지 개괄적으로 살펴볼 수 있다. 


TDA, ThreadLogic 

thread는 Java application의 핵심적인 자원을 하나로서 동작을 항상 주의깊게 살펴보아야 한다. 필요한 최소 개수의 thread로 최대한의 성능을 이끌어내는 것은 중요하다. 실행 중인 application의 thread 동작을 알기 위해서는 thread dump를 떠야하는데 이를 위한 가장 기본적인 방법으로 JDK에 포함된 jstack이 있다. 

```
jstack <target process id> > threaddump



"http-nio-8081-exec-5" #289 daemon prio=5 os_prio=31 cpu=0.10ms elapsed=528.26s tid=0x00007fda88bb0800 nid=0x15303 waiting on condition  [0x0000700012684000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.7/Native Method)                                       - parking to wait for  <0x000000070897dfc0>
... 
 "http-nio-8081-exec-6" #290 daemon prio=5 os_prio=31 cpu=0.16ms elapsed=528.26s tid=0x00007fda88bb1800 nid=0x15103 waiting on condition  [0x0000700012787000]
   java.lang.Thread.State: WAITING (parking)
```
shell에다가 위와 같이 입력하면 thread dump 정보가 담긴 파일을 얻을 수 있으며 내용을 보면 각 thread (application에서 사용하는 thread, GC thread 등의 system thread, web application의 경우 통신과 관련된 thread 등이 있다)에 대한 정보와 현재 상태 (RUNNABLE, WATING, BLOCKING, ...)를 확인할 수 있다. 그런데 enterprise level의 Java application에서는 수백, 수천개의 thread가 생성될 수도 있는데 이들 각각을 모두 살펴보고 thread들 간의 관계를 파악하는 것은 매우 어렵다. 이를 위한 tool로서 TDA (thread dump analyzer), ThreadLogic 등이 있다. 


위 그림은 ThreadLogic의 실행을 나타낸다. 각 thread에 대한 상태를 간결하게 확인할 수 있으며 공유 자원 (Java monitor lock)등에 접근하는 thread 집합이 어떻게 되는지, 그리고 그 thread들의 자원에 대한 점유율을 확인하여 효율성에 대해서도 분석한다. 위 application은 하나의 동기화된 (synchonized) 공유 자원에 접근하는 10개의 thread를 실행하고 있는데 10개 thread들 중 특정 시점에 해당 자원을 점유할 수 있는 thread는 하나임으로 그 효율이 매우 낮음을 WARN으로 알려주고 있다. 이 정보를 바탕으로 더 나은 Java application 설계가 가능하다.

GCViewer

Java의 메모리 관리는 garbage collector를 통해서 이루어지므로 이에 대한 지표가 필수적이다. Java application을 실행할 때 JVM option으로서 gc log를 남길 수가 있는데 이를 graphical하게 살펴볼 수 있는 tool이 GCViewer이다. GCViewer는 다음에 살펴보도록 하겠다. 


Resources