'tasktracker'에 해당되는 글 1건

  1. 2012.03.07 hadoop map reduce working flow.

hadoop map reduce working flow.

ITWeb/Hadoop일반 2012. 3. 7. 14:09
[참고사이트]
http://architects.dzone.com/articles/how-hadoop-mapreduce-works
http://en.wikipedia.org/wiki/MapReduce
http://www.jaso.co.kr/265
http://nadayyh.springnote.com/pages/6064899?print=1
http://hadoop.apache.org/common/docs/r0.20.2/cluster_setup.html

이해하기 쉽게 설명된 문서를 발견했내요.
뭐든 기초가 중요합니다.
어설프게 알았다고.. 그냥 넘어 가지 말자구요.
저도 계속 의문점들에 대해서 찾아보고 배우고 있습니다.. ㅎㅎ

[퍼온글]

In my previous post, I talk about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

 

Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity of co-ordination will disappear.

This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.

User defined Map/Reduce functions

 

Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.

1.map(input_record) {
2....
3.emit(k1, v1)
4....
5.emit(k2, v2)
6....
7.}
1.reduce (key, values) {
2.aggregate = initialize()
3.while (values.has_next) {
4.aggregate = merge(values.next)
5.}
6.collect(key, aggregate)
7.}

 

The Map/Reduce DAG is organized in this way.


A parallel algorithm is usually structure as multiple rounds of Map/Reduce


HDFS

 

The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.



There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed. DataNode reports all its chunks to the NameNode at bootup. Each chunk has a version number which will be increased for all update. Therefore, the NameNode know if any of the chunks of a DataNode is stale (e.g. when the DataNode crash for some period of time). Those stale chunks will be garbage collected at a later time.

To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success. All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file. In case of the NameNode crash, a new NameNode will take over after restoring the state from the last checkpoint file and replay the operation log.

 

MapRed

 

The job execution starts when the client program submit to the JobTracker a job configuration, which specifies the map, combine and reduce function, as well as the input and output path of data.


The JobTracker will first determine the number of splits (each split is configurable, ~16-64MB) from the input path, and select some TaskTracker based on their network proximity to the data sources, then the JobTracker send the task requests to those selected TaskTrackers.

Each TaskTracker will start the map phase processing by extracting the input data from the splits. For each record parsed by the “InputFormat”, it invoke the user provided “map” function, which emits a number of key/value pair in the memory buffer. A periodic wakeup process will sort the memory buffer into different reducer node by invoke the “combine” function. The key/value pairs are sorted into one of the R local files (suppose there are R reducer nodes).

When the map task completes (all splits are done), the TaskTracker will notify the JobTracker. When all the TaskTrackers are done, the JobTracker will notify the selected TaskTrackers for the reduce phase.

Each TaskTracker will read the region files remotely. It sorts the key/value pairs and for each key, it invoke the “reduce” function, which collects the key/aggregatedValue into the output file (one per reducer node).

Map/Reduce framework is resilient to crash of any components. The JobTracker keep tracks of the progress of each phases and periodically ping the TaskTracker for their health status. When any of the map phase TaskTracker crashes, the JobTracker will reassign the map task to a different TaskTracker node, which will rerun all the assigned splits. If the reduce phase TaskTracker crashes, the JobTracker will rerun the reduce at a different TaskTracker.

After both phase completes, the JobTracker will unblock the client program.



[Private Thinking+Reference]
[구성은 어떻게?]
 - [Master : Namenode : JobTracker] : Single Namenode Cluster
 - [Slave : Datanode : TaskTracker] : 1-N Datanode Cluster
 - [Client : Run Job] : Job 을 실행 시키기 위한 서버(?)
 - 아파치 하둡 사이트의 cluster setup 문서를 보니 아래와 같이 되어 있군요.
Typically you choose one machine in the cluster to act as the NameNode and one machine as to act as the JobTracker, exclusively. The rest of the machines act as both a DataNode and TaskTracker and are referred to as slaves.


[JobTracker 실행은 어떻게?]
 - 실행 시키는 방법이야 여러가지가 있겠지만 기본 예제들을 통해서 보면..
 - Client 의 Request 를 받아서 실행 시키거나
 - Cron 이나 Scheduler 에 등록 시켜 놓고 주기적으로 실행 시키거나

[그럼 MapReducer 프로그램이 어디에 있어야 하지?]
 - Master(Namenode) 에 있으면 될것 같습니다. 
 - 윗 줄은 잘못 되었으니 삭제하구요. 최소 3대로 구성을 해야 겠내요.
 - 근데 Client Node 라는 구성이 더 필요 할 것 같다는 생각이 듭니다.
 - 어차피 jar 로 묶어서 배포 하고 기본 실행도 WordCount 예제에서 보듯이..
bin/hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output
 - 이렇게 실행 command 를 request 시점에 또는 scheduler 가 실행 시키면 될 것 같습니다.
 - 이를 이해 하기 위한 JobTracker 와 TaskTracker 의 동작 원리는 아래 내용을 참고하세요.


사용자가 만든 main() 메소드가 수행되면서 JobClient 클래스의 runJob()을 호출하게 되면 JobClient에서는 다음과 같은 작업을 수행한다.
 1. jobConf에 설정된 정보를 이용하여 job.xml을 구성한 다음 HDFS에 저장
 2. 사용자의  Job 클래스 또는 Job 클래스가 있는 jar 파일을 job.jar로 묶어 HDFS에 저장
 3. InputFormat의 getSplit() 메소드를 호출하여 반환되는 값을 이용하여 job.split 파일을 HDFS에 저장


: