Background
Abstract
MapReduce는 large data set을 process하고 generate하기 위한 programming model과 그에 따른 implementation이다. programmer는 key/value pair를 처리하여 intermediate key/value pairs를 생성하는 map function과 같은 intermediate key들을 merge하는 reduce function을 통해 MapReduce program을 구현한다. paper에서 보여주듯 수많은 real world task들이 map reduce를 통해 표현 가능하다.
함수형으로 작성된 program은 자동으로 병렬처리 되어 상용 machine으로 이루어진 대량의 cluster에서 실행된다. MapReduce의 runtime system은 분산 시스템 환경에서 필요한 여러 실행 세부사항들인 partitioning input data, scheduling the program execution across a set of machines, handling machine failures, and managing the required inter-machine communication들을 수행한다. 이러한 abstraction은 병렬, 분산 시스템의 지식이 없는 programmer도 분산 시스템의 resource를 쉽게 활용할 수 있도록 한다.
1. Introduction
Google에서는 crawled documents, web request logs 등 대량의 raw data로부터 inverted indices, graph structe of web documents 등 derive data로 처리하기 위한 special-purpose computations들이 작성되었다. 하지만 input data의 크기가 매우 커짐으로써 computation은 필연적으로 수천대의 machine에 분산처리 될 필요가 있었다. 하지만 분산처리 환경에서의 다음과 같은 문제들은 원래 처리해야할 단순한 computation program의 본질을 흐려 complexity를 높였다.
parallelize a computation
distribtue data across a set of machines
handle machine failures (fault-tolerant)
이러한 complexity를 해결하고자, 분산 시스템에서의 처리를 위한 위의 복잡한 detail은 숨기고 단순한 computation model만을 구현하는 새로운 abstraction을 design 하였다. 이 abstraction은 LISP 및 여러 함수형 언어의 map, reduce primitive로부터 영감을 얻었다. Google은 그들이 기존에 사용하였던 대부분의 computations 들이 map, reduce를 적용하는 것을 포함하는 것을 깨달았다. 또한 이러한 functional model의 사용은 병렬처리와 fault tolerance에 대한 primary mechanism으로서 re-execution을 쉽게 하였다.
a simple and powerful interface, that enables parallization and distribution of large-scale computations
an implementation of this interface that achieves high performance on large clusters of commodity PCs.
2. Programming Model
MapReduce는 input key/value pairs를 받아서 output key/value pairs를 생성한다. MapReduce library의 사용자는 다음의 두 function을 표현한다.
Map.
input pair를 받아서 intermediate key/value pairs를 생성한다. MapReduce library는 같은 intermediate key를 가진 intermediate value들을 group 지어 Reduce function으로 보낸다.
Reduce.
intermediate key와 해당 key를 가진 value의 set들을 input으로 받는다. 실제 program 상에는 value의 iterator가 주어져서 memory에 저장하는 것 이상의 value들에 접근할 수 있도록 한다. 주어진 input을 merge 등 추가 처리하여 최종 output을 만들어낸다.
2.1 Example
2.2 Types
2.3 More Examples
다음은 MapReduce computation model로 쉽게 표현될 수 있는 여러 흥미로운 program들이다. 기대보다 더 단순한 형태로 해당 program이 작성될 수 있음에 놀랄 것이다.
MapReduce operation의 overall flow는 Figure 1에서 나타내고 있다.
MapReduce library는 input files를 M pieces (보통 16MB~64MB. GFS의 기본 unit 단위 크기)만큼 분할한다. 그다음 cluster 내 machine들에 구현한 computation program을 실행한다.
이 program들 중 하나는 별도의 기능을 하며 master라고 한다. 다른 program들은 worker라고 부른다. 총 M개의 map task, R개의 reduce task가 있으며 master는 idle worker를 선택하여 task를 할당한다.
map task를 할당 받은 workder는 input split으로부터 data를 읽는다. input data로부터 key/value pair를 parse하여 Map function으로 전달한다. Map function에 의해 생성된 intermediate key/value pair는 memory에 buffer 된다.
buffer된 key/value pair는 주기적으로 local disk에 저장된다. 이때 partitioning function에 의해 R개의 region으로 분할되어 저장된다. local disk에 저장된 각 buffered pair의 location은 master에 전해지며 master는 reduce task를 수행하는 worker에 이를 전달한다.
reduce worker가 master로부터 partitioned intermediate key/value pair에 대한 location 정보를 받으면, RPC를 통해서 map worker에 있는 buffered data를 읽는다. data를 모두 읽으면 intermediate key에 대해 sort를 수행하여 같은 key를 가지고 있는 data는 group 지어지도록 한다. 이는 반드시 필요한 과정인데, 서로다른 key들이 같은 reduce task에 전달될 수 있기 때문이다. sorting 중 data가 memory에 담기 너무 크면 external sort가 수행된다.
reduce worker는 sort 된 intermediate data를 iterate 하면서 key와 그에 대응하는 value set을 Reduce function으로 전달한다. Reduce function의 output은 해당 reduce partition의 최종 output file이 된다.
모든 map task, reduce task가 완료되면 master는 user program을 깨운다.
실행을 완료하면 주어진 MapReduce execution의 output은 R개의 output file로 존재한다. 종종 우리는 이 R개의 output file을 그대로 두는데 이는 또다른 MapReduce program의 input data로 바로 활용하거나 다른 distributed application에 사용된다.
3.2 Master Data Structures
3.3 Fault Tolerance
MapReduce library는 수백, 수천대의 machine에서 대량의 data를 처리하기 위해 발명되었으므로 machine failure에 graceful하게 대처하여야 한다.
machine failures in a distributed system?
분산 시스템에서 machine failure는 드문 현상이 아니다. single machine에서 hardware failure가 일어날 가능성은 낮지만 이를 수천대 운용하는 cluster에서 하루에 하나의 machine 이상에서 failure가 날 가능성은 비교적 높다.
Worker Faliure
master는 모든 worker에 주기적으로 ping을 날린다. worker가 응답하지 않으면 master는 해당 node를 failed 처리하고 할당된 map task 혹은 reduce task를 완료한 worker에 실패한 task를 re-scheduling하여 재 실행될 수 있도록 한다. 만약 어떤 map task가 worker A에서 수행되다가 worker A가 실패하여 workder B에서 최종 수행되었다면, 이 결과를 read할 reduce task는 master로부터 재실행에 관해 알려지고 worker B로부터 read 해간다.
MapReduce는 다수의 workder failure에도 resilient하다. 만약 network maintenance 등으로 수분동안 수십대의 machine이 사용 불가능하더라도 master는 단순히 re-scheduling 후 재실행하여 'eventually complete' 될 수 있도록 한다.
Master Failure
master data strcuture에 대해 주기적인 checkpoint를 write하여 master가 실패했을 때 last checkpoint로부터 재수행하는 것이 가능하다. 하지만 single master에서 실패는 unlikely 하므로 현재의 구현은 MapReduce computation을 abort 시킨다. client는 이를 확인하여 언제든지 재시도 할 수 있다.
Semantics in the Presence of Failures
user가 구현한 map function, reduce function이 deterministic하다면 MapReduce의 분산 실행은 single machine에서의 non-faulting sequential execution과 같은 결과를 생성한다.
Google의 MapReduce는 이러한 성질을 위해 map, reduce task의 atomic commit에 의존한다. 각 in-progress task는 output을 private temporary file에 쓴다. map task는 R개의 file을, reduce task는 1개의 file을 생성한다. map task가 완료되면 worker는 master에게 R개의 temporary file의 이름을 알려주고, 만약 같은 이름의 file이 담긴 message를 이미 받았으면 무시하고 아니면 master data structure에다가 write 한다. reduce task가 완료되면 reduce worker는 temporary output file에 filnal output file를 atomic하게 rename한다. 이 atomic rename operation은 최종적으로 파일 시스템에 해당 reduce task가 올바르게 한번 수행될 수 있도록 한다.
3.4 Locality
Google의 MapReduce의 구현에서 performance 상 매우 중요한 attribute이다.
computing environment에서 network bandwidth는 매우 희소한 자원이므로, input data가 computing cluster를 이루는 machine들의 local disk에 저장된 것을 활용하였다. 즉, MapReduce는 각 input file들의 location 정보를 고려하여 특정 map task가 input data의 replica가 있는 machine 상에서 실행될 수 있도록 하였다. 이것이 실패하더라도 replica의 주변 (ex. 같은 network switch)에서 수행되도록 하였다. 결과적으로 대부분의 input data가 local read 하였고 network bandwidth를 소모하지 않았다.
3.5 Task Granularity
위에서 map phase를 M 조각으로, reduce phase를 R 조각으로 세분하였다. 이상적으로 M, R은 worker machine 수보다 아주 크게 잡아야 한다. 각 worker가 여러 다른 task를 수행하게 하는 것은 dynamic load balancing에 유리하고 worker가 실패했을 때 recovery 속도도 높인다.
M, R을 얼마나 크게 잡아야할지에 대한 practical bound는 존재한다. master는 O(M+R)의 scheduling을 정해야 하고 위에서 언급했듯이 O(M*R)만큼의 state, location 정보를 유지해야 한다. (memory에 저장되는 절대적 크기는 크지 않다)
R은 R개의 output file을 생성하는 것을 의미하므로 구현하는 사용자에 의해서 정해진다. M은 실용적으로 input data의 크기가 16MB~64MB가 되도록 정하여 locality를 극대화하도록 한다. Google에서 수행한 실험에서는 2000대의 machine 환경에서 M = 200K, R = 5K로 종종 수행하였다.
3.6 Backup Tasks
이러한 straggler로 인한 문제를 완화하기 위해서 general mechanism을 도입하였다. MapReduce operation이 수행 완료에 가까워지면 master는 남아있는 in-progress task에 대한 backup execution을 schedule 한다. 그러면 그 task는 primary 혹은 backup execution에서 완료하게 된다. 논문에서는 이 mechanism을 tune 하여 수 percent의 추가 자원을 사용하면서 수행 완료시간을 상당히 줄이는 결과를 얻었다.
4. Refinements
5. Performance
5.1 Cluster Configuration
5.2 Grep
grep program은 10^10 100-byte records를 scan하며 비교적 rare한 세글자의 pattern을 search한다. input split은 약 64MB로 분할되었고 (M = 15K) ouput은 하나의 file에 저장된다 (R = 1).
Figure 2는 시간에 따른 computation의 progress를 나타낸다. 점점 더 많은 machine에서 MapReduce computation을 할당 받을 수록 rate는 점진적으로 증가하다가 1764 workers가 할당 받았을 때 peak를 찍는다. map task가 완료되면 rate는 줄어들기 시작한다. 전체 수행시간은 약 150초가 소요되었다. 이 수행시간에는 startup overhead가 존재한다. overhead에는 propagation of the program to all worker machines, delays interacting with GPS to open the set of 1000 input files, get the inforamation needed for the locality optimization이 포함된다.
5.3 Sort
sort program은 10^10 100-byte records (=~ 1TB)를 sort 한다. 이 program은 TeraSort benchmark를 따라서 구현되었다.
sorting program은 50 line이 안되는 user code로 이루어져 있으며 3 line의 Map function은 textline으로부터 10-byte의 sorting key를 추출하여 key와 value를 intermediate key/value pair로 보낸다. Reduce function은 built-in Identity function을 사용하며 intermediate key/value pair를 그대로 output key/value pair로 보낸다. 최종 output은 2-way replicated GFS file로 저장된다.
input data는 64MB 크기로 (M = 15000) 분할된다. sorted output은 4000개의 file에 (R = 4000) 저장된다. partitioning function은 R piece중 하나로 분리하기 위해 key의 초기 몇 byte를 사용한다.
Figure 3은 sort program의 execution에 대한 progress를 나타낸다. 위의 graph는 input이 read되는 input rate를 나타낸다. 중간의 graph는 shuffle rate, data가 map task로부터 reduce task로 network 상에서 이동하는 rate를 나타낸다. 아래의 graph는 reduce task에서 sorted data가 최종 output file로 write 되는 rate를 나타낸다.
주목해야 할 몇가지가 있는데, input rate가 shuffle rate와 output rate 보다 비약적으로 높다는 점인데 이는 locality optimization에 따른 결과이다. 제한된 network bandwidth 환경에서 local disk에서 대부분의 data read가 일어났기 때문이다. shuffle rate는 output rate보다 높은데 이는 output phase는 sorted data의 2개의 copy를 write 하기 때문이다. GFS에서 data의 availability, reliability를 위해 서로 다른 machine에 data를 저장하기 때문에 network 상의 data transfer가 발생하여 network bandwidth를 상당 부분 차지한다.
5.4 Effects of Backup Tasks
5.5 Machine Failures
Figure 3의 (c)은 의도적으로 1746 workers 중 200개를 죽이는 program의 execution을 나타낸다. cluster의 scheduler는 즉시 새로운 process를 수행한다. worker의 kill은 일시적으로 input rate를 음의 값에 이르게 하지만 map task의 re-execution은 매우 신속하게 일어나 전체 computation은 993 secs만에 끝나 총 5%의 지연만을 발생시켰다.
6. Experience
MapReduce library의 초기 버전은 2003년 2월에 작성되었고 locality optimization, dynamic load balancing 등의 비약적인 개선은 2003년 8월에 이루어졌다. 당시 MapReduce library가 Google에서 다루는 아주 다양한 문제들에 적용될 수 있음을 확인하였다.
large-scale machine learning problems,
clustering problems for the Google News and Froogle products,
extraction of data used to produce reports of popular queries (e.g. Google Zeitgeist),
extractionofpropertiesofwebpagesfornewexper- iments and products (e.g. extraction of geographi- cal locations from a large corpus of web pages for localized search), and
large-scale graph computations.
7. Related Work
아래 MapReduce의 key idea나 implementation은 다양한 논문으로부터 왔다. 자세한 것은 논문을 참조하기를 바란다.
a programming model to abstractize parallelization
locality optimization
backup task mechanism
in-house cluster management system
a programming model where processes communicate with each other by sending data over distributed queues
8. Conclusion
MapReduce programming model은 Google에서 여러 목적을 위해 성공적으로 사용되었다. 연구는 다음과 같은 이유로 성공에 기여하였다.
model is easy to use, even for programmers without experience with parallel and distributed systems
a large variety of problems are easily expressible
an implementation of MapReduce that scales to large clusters of machines
그리고 연구로부터 다음을 배웠다.
restricting the programming modle makes it easy to parallelize and distribute computations and to make such computations fault-tolerant.
network bandwidth is a scarce resource. the locality optimization and writing a single copy of an intermediate data to local disk saves network bandwidth.
redundant execution can be used to reduce the impact of slow machines, and to handle machine failures and data loss.
No comments:
Post a Comment