이해하기 쉽게 설명된 문서를 발견했내요.
뭐든 기초가 중요합니다.
어설프게 알았다고.. 그냥 넘어 가지 말자구요.
저도 계속 의문점들에 대해서 찾아보고 배우고 있습니다.. ㅎㅎ
In my previous post, I talk about the methodology of transforming a
sequential algorithm into parallel. After that, we can implement the
parallel algorithm, one of the popular framework we can use is the
Apache Opensource Hadoop Map/Reduce framework.
Functional Programming
Multithreading is one of
the popular way of doing parallel programming, but major complexity of
multi-thread programming is to co-ordinate the access of each thread to
the shared data. We need things like semaphores, locks, and also use
them with great care, otherwise dead locks will result.
If we can eliminate the shared state completely, then the complexity of co-ordination will disappear.
This
is the fundamental concept of functional programming. Data is
explicitly passed between functions as parameters or return values which
can only be changed by the active function at that moment. Imagine
functions are connected to each other via a directed acyclic graph.
Since there is no hidden dependency (via shared state), functions in the
DAG can run anywhere in parallel as long as one is not an ancestor of
the other. In other words, analyze the parallelism is much easier when
there is no hidden dependency from shared state.
User defined Map/Reduce functions
Map/reduce is a
special form of such a DAG which is applicable in a wide range of use
cases. It is organized as a “map” function which transform a piece of
data into some number of key/value pairs. Each of these elements will
then be sorted by their key and reach to the same node, where a “reduce”
function is use to merge the values (of the same key) into a single
result.
1.
map(input_record) {
2.
...
3.
emit(k1, v1)
4.
...
5.
emit(k2, v2)
6.
...
7.
}
1.
reduce (key, values) {
2.
aggregate = initialize()
3.
while
(values.has_next) {
4.
aggregate = merge(values.
next
)
5.
}
6.
collect(key, aggregate)
7.
}
The Map/Reduce DAG is organized in this way.
A parallel algorithm is usually structure as multiple rounds of Map/Reduce
HDFS
The distributed file system is designed to
handle large files (multi-GB) with sequential read/write operation.
Each file is broken into chunks, and stored across multiple data nodes
as local OS files.
There is a master “NameNode” to keep track of overall file directory
structure and the placement of chunks. This NameNode is the central
control point and may re-distributed replicas as needed.
DataNode reports all its chunks to the NameNode at bootup. Each chunk
has a version number which will be increased for all update. Therefore,
the NameNode know if any of the chunks of a DataNode is stale (e.g.
when the DataNode crash for some period of time). Those stale chunks
will be garbage collected at a later time.
To read a file, the client API will calculate the chunk index
based on the offset of the file pointer and make a request to the
NameNode. The NameNode will reply which DataNodes has a copy of that
chunk. From this points, the client contacts the DataNode directly
without going through the NameNode.
To write a file, client API will first contact the NameNode who will
designate one of the replica as the primary (by granting it a lease).
The response of the NameNode contains who is the primary and who are the
secondary replicas. Then the client push its changes to all DataNodes
in any order, but this change is stored in a buffer of each DataNode.
After changes are buffered at all DataNodes, the client send a “commit”
request to the primary, which determines an order to update and then
push this order to all other secondaries. After all secondaries
complete the commit, the primary will response to the client about the
success.
All changes of chunk distribution and metadata changes will be written
to an operation log file at the NameNode. This log file maintain an
order list of operation which is important for the NameNode to recover
its view after a crash. The NameNode also maintain its persistent state
by regularly check-pointing to a file. In case of the NameNode crash, a
new NameNode will take over after restoring the state from the last
checkpoint file and replay the operation log.
MapRed
The job execution starts when the
client program submit to the JobTracker a job configuration, which
specifies the map, combine and reduce function, as well as the input and
output path of data.
The JobTracker will first determine the number of splits (each split
is configurable, ~16-64MB) from the input path, and select some
TaskTracker based on their network proximity to the data sources, then
the JobTracker send the task requests to those selected TaskTrackers.
Each
TaskTracker will start the map phase processing by extracting the input
data from the splits. For each record parsed by the “InputFormat”, it
invoke the user provided “map” function, which emits a number of
key/value pair in the memory buffer.
A periodic wakeup process will sort the memory buffer into different
reducer node by invoke the “combine” function. The key/value pairs are
sorted into one of the R local files (suppose there are R reducer
nodes).
When the map task completes (all splits are done), the TaskTracker will notify the JobTracker.
When all the TaskTrackers are done, the JobTracker will notify the selected TaskTrackers for the reduce phase.
Each TaskTracker will read the region files remotely. It sorts the
key/value pairs and for each key, it invoke the “reduce” function, which
collects the key/aggregatedValue into the output file (one per reducer
node).
Map/Reduce framework is resilient to crash of any components.
The JobTracker keep tracks of the progress of each phases and
periodically ping the TaskTracker for their health status. When any of
the map phase TaskTracker crashes, the JobTracker will reassign the map
task to a different TaskTracker node, which will rerun all the assigned
splits. If the reduce phase TaskTracker crashes, the JobTracker will
rerun the reduce at a different TaskTracker.
After both phase completes, the JobTracker will unblock the client program.
[구성은 어떻게?]
- [Master : Namenode : JobTracker] : Single Namenode Cluster
- [Slave : Datanode : TaskTracker] : 1-N Datanode Cluster
- [Client : Run Job] : Job 을 실행 시키기 위한 서버(?)
- 아파치 하둡 사이트의 cluster setup 문서를 보니 아래와 같이 되어 있군요.
Typically you choose one machine in the cluster to act as the
NameNode and one machine as to act as the
JobTracker, exclusively. The rest of the machines act as
both a DataNode and TaskTracker and are
referred to as slaves.
[JobTracker 실행은 어떻게?]
- 실행 시키는 방법이야 여러가지가 있겠지만 기본 예제들을 통해서 보면..
- Client 의 Request 를 받아서 실행 시키거나
- Cron 이나 Scheduler 에 등록 시켜 놓고 주기적으로 실행 시키거나
[그럼 MapReducer 프로그램이 어디에 있어야 하지?]
- Master(Namenode) 에 있으면 될것 같습니다.
- 윗 줄은 잘못 되었으니 삭제하구요. 최소 3대로 구성을 해야 겠내요.
- 근데 Client Node 라는 구성이 더 필요 할 것 같다는 생각이 듭니다.
- 어차피 jar 로 묶어서 배포 하고 기본 실행도 WordCount 예제에서 보듯이..
bin/hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output
- 이렇게 실행 command 를 request 시점에 또는 scheduler 가 실행 시키면 될 것 같습니다.
- 이를 이해 하기 위한 JobTracker 와 TaskTracker 의 동작 원리는 아래 내용을 참고하세요.
사용자가 만든 main() 메소드가 수행되면서 JobClient 클래스의 runJob()을 호출하게 되면 JobClient에서는 다음과 같은 작업을 수행한다.
1. jobConf에 설정된 정보를 이용하여 job.xml을 구성한 다음 HDFS에 저장
2. 사용자의 Job 클래스 또는 Job 클래스가 있는 jar 파일을 job.jar로 묶어 HDFS에 저장
3. InputFormat의 getSplit() 메소드를 호출하여 반환되는 값을 이용하여 job.split 파일을 HDFS에 저장