'하둡'에 해당되는 글 4건

  1. 2012.03.07 hadoop map reduce working flow.
  2. 2012.03.06 Hadoop MapReducer WordCount 막 따라해보기..
  3. 2012.03.06 Hadoop 막 따라하고 테스트 하기...
  4. 2012.02.29 hadoop-1.0.1 설치 및 테스트 맛보기

hadoop map reduce working flow.

ITWeb/Hadoop일반 2012. 3. 7. 14:09
[참고사이트]
http://architects.dzone.com/articles/how-hadoop-mapreduce-works
http://en.wikipedia.org/wiki/MapReduce
http://www.jaso.co.kr/265
http://nadayyh.springnote.com/pages/6064899?print=1
http://hadoop.apache.org/common/docs/r0.20.2/cluster_setup.html

이해하기 쉽게 설명된 문서를 발견했내요.
뭐든 기초가 중요합니다.
어설프게 알았다고.. 그냥 넘어 가지 말자구요.
저도 계속 의문점들에 대해서 찾아보고 배우고 있습니다.. ㅎㅎ

[퍼온글]

In my previous post, I talk about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm, one of the popular framework we can use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

 

Multithreading is one of the popular way of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity of co-ordination will disappear.

This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyze the parallelism is much easier when there is no hidden dependency from shared state.

User defined Map/Reduce functions

 

Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a “map” function which transform a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result.

1.map(input_record) {
2....
3.emit(k1, v1)
4....
5.emit(k2, v2)
6....
7.}
1.reduce (key, values) {
2.aggregate = initialize()
3.while (values.has_next) {
4.aggregate = merge(values.next)
5.}
6.collect(key, aggregate)
7.}

 

The Map/Reduce DAG is organized in this way.


A parallel algorithm is usually structure as multiple rounds of Map/Reduce


HDFS

 

The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.



There is a master “NameNode” to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed. DataNode reports all its chunks to the NameNode at bootup. Each chunk has a version number which will be increased for all update. Therefore, the NameNode know if any of the chunks of a DataNode is stale (e.g. when the DataNode crash for some period of time). Those stale chunks will be garbage collected at a later time.

To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a “commit” request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success. All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file. In case of the NameNode crash, a new NameNode will take over after restoring the state from the last checkpoint file and replay the operation log.

 

MapRed

 

The job execution starts when the client program submit to the JobTracker a job configuration, which specifies the map, combine and reduce function, as well as the input and output path of data.


The JobTracker will first determine the number of splits (each split is configurable, ~16-64MB) from the input path, and select some TaskTracker based on their network proximity to the data sources, then the JobTracker send the task requests to those selected TaskTrackers.

Each TaskTracker will start the map phase processing by extracting the input data from the splits. For each record parsed by the “InputFormat”, it invoke the user provided “map” function, which emits a number of key/value pair in the memory buffer. A periodic wakeup process will sort the memory buffer into different reducer node by invoke the “combine” function. The key/value pairs are sorted into one of the R local files (suppose there are R reducer nodes).

When the map task completes (all splits are done), the TaskTracker will notify the JobTracker. When all the TaskTrackers are done, the JobTracker will notify the selected TaskTrackers for the reduce phase.

Each TaskTracker will read the region files remotely. It sorts the key/value pairs and for each key, it invoke the “reduce” function, which collects the key/aggregatedValue into the output file (one per reducer node).

Map/Reduce framework is resilient to crash of any components. The JobTracker keep tracks of the progress of each phases and periodically ping the TaskTracker for their health status. When any of the map phase TaskTracker crashes, the JobTracker will reassign the map task to a different TaskTracker node, which will rerun all the assigned splits. If the reduce phase TaskTracker crashes, the JobTracker will rerun the reduce at a different TaskTracker.

After both phase completes, the JobTracker will unblock the client program.



[Private Thinking+Reference]
[구성은 어떻게?]
 - [Master : Namenode : JobTracker] : Single Namenode Cluster
 - [Slave : Datanode : TaskTracker] : 1-N Datanode Cluster
 - [Client : Run Job] : Job 을 실행 시키기 위한 서버(?)
 - 아파치 하둡 사이트의 cluster setup 문서를 보니 아래와 같이 되어 있군요.
Typically you choose one machine in the cluster to act as the NameNode and one machine as to act as the JobTracker, exclusively. The rest of the machines act as both a DataNode and TaskTracker and are referred to as slaves.


[JobTracker 실행은 어떻게?]
 - 실행 시키는 방법이야 여러가지가 있겠지만 기본 예제들을 통해서 보면..
 - Client 의 Request 를 받아서 실행 시키거나
 - Cron 이나 Scheduler 에 등록 시켜 놓고 주기적으로 실행 시키거나

[그럼 MapReducer 프로그램이 어디에 있어야 하지?]
 - Master(Namenode) 에 있으면 될것 같습니다. 
 - 윗 줄은 잘못 되었으니 삭제하구요. 최소 3대로 구성을 해야 겠내요.
 - 근데 Client Node 라는 구성이 더 필요 할 것 같다는 생각이 듭니다.
 - 어차피 jar 로 묶어서 배포 하고 기본 실행도 WordCount 예제에서 보듯이..
bin/hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount input output
 - 이렇게 실행 command 를 request 시점에 또는 scheduler 가 실행 시키면 될 것 같습니다.
 - 이를 이해 하기 위한 JobTracker 와 TaskTracker 의 동작 원리는 아래 내용을 참고하세요.


사용자가 만든 main() 메소드가 수행되면서 JobClient 클래스의 runJob()을 호출하게 되면 JobClient에서는 다음과 같은 작업을 수행한다.
 1. jobConf에 설정된 정보를 이용하여 job.xml을 구성한 다음 HDFS에 저장
 2. 사용자의  Job 클래스 또는 Job 클래스가 있는 jar 파일을 job.jar로 묶어 HDFS에 저장
 3. InputFormat의 getSplit() 메소드를 호출하여 반환되는 값을 이용하여 job.split 파일을 HDFS에 저장


:

Hadoop MapReducer WordCount 막 따라해보기..

ITWeb/Hadoop일반 2012. 3. 6. 16:50
[참고사이트]
http://hadoop.apache.org/common/docs/current/mapred_tutorial.html
http://hadoop.apache.org/common/docs/r0.20.0/hdfs_shell.html


[시작하기전에]
- 일단 hadoop-0.21.0 으로 위에 tutorial 을 보고 시작 하였습니다.
- 바로 문제 봉착....
hadoop-0.21.0-core.jar 파일이 없어 compile 할때.. 계속 에러를 냅니다.
- 일단 classpath 문제로 생각해서 설정을 막 해보았으나 잘 안됩니다.
- 그래서 hadoop*.jar 를 모두 풀어서 걍 hadoop-0.21.0-core.jar 로 묶어 버렸습니다.
- 이렇게 해서 classpath 에 hadoop-0.21.0-core.jar 를 설정해 주고 compile 하니 Success!!
- hadoop-0.20.0 부터 하위 버전에는 그냥 hadoop*core.jar 가 tar.gz 파일에 들어 있습니다.


[WordCount.java Source Code]
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
   
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
     
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
 
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


[Compile 하기]
cd $HADOOP_HOME
mkdir example
ubuntu:~/app/hadoop$ javac -cp ./hadoop-0.21.0-core.jar:./lib/commons-cli-1.2.jar -d example WordCount.java
ubuntu:~/app/hadoop$ jar cvf wordcount.jar -C example/ .


[WordCount 테스트하기]
- 먼저 테스트할 파일을 생성 합니다.
cd $HADOOP_HOME/example
vi file01
Hello World Bye World

vi file02
Hello Hadoop Goodbye Hadoop

ubuntu:~/app/hadoop/example$ ../bin/hadoop fs -mkdir input
ubuntu:~/app/hadoop/example$ ../bin/hadoop fs -put file01 ./input/
ubuntu:~/app/hadoop/example$ ../bin/hadoop fs -put file02 ./input/
ubuntu:~/app/hadoop/example$ ../bin/hadoop jar ../wordcount.jar org.apache.hadoop.examples.WordCount input output
ubuntu:~/app/hadoop/example$ ../bin/hadoop jar ../wordcount.jar org.apache.hadoop.examples.WordCount input output
Bye    1
Goodbye    1
Hadoop    2
Hello    2
World    2

- 정상적으로 잘 동작 하는 걸 확인 하실 수 있습니다.
- 여기서 중요한건.. hadoop-*-core.jar 가 없어서 짜증 나시는 분들이 계실텐데요. 위에서 이야기한 방식을 아래 작성해 놓았으니 참고하세요.


[hadoop-0.21.0-core.jar 만들기]
cd $HADOOP_HOME
mkdir hadoop-0.21.0-core
cp *.jar ./hadoop-0.21.0-core/
cd ./hadoop-0.21.0
jar xvf hadoop-hdfs-ant-0.21.0.jar          
jar xvf hadoop-mapred-examples-0.21.0.jar
jar xvf hadoop-common-0.21.0.jar       
jar xvf hadoop-hdfs-test-0.21.0-sources.jar 
jar xvf hadoop-mapred-test-0.21.0.jar
jar xvf hadoop-common-test-0.21.0.jar  
jar xvf hadoop-hdfs-test-0.21.0.jar         
jar xvf hadoop-mapred-tools-0.21.0.jar
jar xvf hadoop-hdfs-0.21.0-sources.jar 
jar xvf hadoop-mapred-0.21.0-sources.jar
jar xvf hadoop-hdfs-0.21.0.jar         
jar xvf hadoop-mapred-0.21.0.jar
# org 폴더만 남기고 모든 파일 및 폴더를 삭제 합니다.
cd ..
jar cvf hadoop-0.21.0-core.jar -C hadoop-0.21.0-core/ .
# 이제 ls -al 해 보시면 hadoop-0.21.0-core.jar 가 생성된걸 보실 수 있습니다.
# 완전 노가다 방법이니.. 걍 참고만 하시길..

:

Hadoop 막 따라하고 테스트 하기...

ITWeb/Hadoop일반 2012. 3. 6. 12:01
[참고문서]
http://apache.mirror.cdnetworks.com//hadoop/common/
http://wiki.apache.org/hadoop/GettingStartedWithHadoop
http://wiki.apache.org/hadoop/HowToConfigure
http://wiki.apache.org/hadoop/QuickStart
http://hadoop.apache.org/common/docs/current/cluster_setup.html
http://hadoop.apache.org/common/docs/current/single_node_setup.html


[Prepare to Start the Hadoop Cluster]
Unpack the downloaded Hadoop distribution. In the distribution, edit the file conf/hadoop-env.sh to define at least JAVA_HOME to be the root of your Java installation.

Try the following command:
$ bin/hadoop
This will display the usage documentation for the hadoop script.


[Standalone Operation]
[hadoop-0.21.0]
cd $HADOOP_HOME
mkdir input
cp conf/*.xml input
bin/hadoop jar hadoop-mapred-examples-0.21.0.jar grep input output 'dfs[a-z.]+'
cat output/*

[hadoop-0.22.0]
ubuntu:~/app/hadoop-0.22.0$ mkdir input
ubuntu:~/app/hadoop-0.22.0$ cp conf/*.xml input
ubuntu:~/app/hadoop-0.22.0$ bin/hadoop jar hadoop-mapred-examples-0.22.0.jar grep input output 'dfs[a-z.]+'
ubuntu:~/app/hadoop-0.22.0$ cat output/*

[hadoop-1.0.1]
ubuntu:~/app/hadoop-1.0.1$ mkdir input
ubuntu:~/app/hadoop-1.0.1$ cp conf/*.xml input
ubuntu:~/app/hadoop-1.0.1$ bin/hadoop jar hadoop-mapred-examples-0.22.0.jar grep input output 'dfs[a-z.]+'
ubuntu:~/app/hadoop-1.0.1$ cat output/*
- 직접 해보시면 아시겠지만.. 동일하게 동작하며 똑같은 결과가 나옵니다.


[Pseudo-Distributed Operation]
[hadoop-0.21.0]
{conf/core-site.xml}
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

{conf/hdfs-site.xml}
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

{conf/mapred-site.xml}
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>localhost:9001</value>
    </property>
</configuration>

{Setup passphraseless ssh}
Now check that you can ssh to the localhost without a passphrase:
$ ssh localhost

If you cannot ssh to localhost without a passphrase, execute the following commands:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
- "ssh: connect to host localhost port 22: Connection refused" 가 나오면 일단 ssh 가 정상 설치 되어 있는지 확인을 하고, 설치가 되어 있다면 /etc/ssh/sshd_config 에 port 설정은 잘 되어 있는지 보시고 restart 후 재시도 하시면 될 겁니다. (더 상세한 내용은 구글링을 통해서 해결해 보세요.)

{Execution}
ubuntu:~/app/hadoop-0.21.0$ bin/hadoop namenode -format
ubuntu:~/app/hadoop-0.21.0$ bin/start-all.sh

The hadoop daemon log output is written to the ${HADOOP_LOG_DIR} directory (defaults to ${HADOOP_HOME}/logs).

Browse the web interface for the NameNode and the JobTracker; by default they are available at:

    NameNode - http://localhost:50070/
    JobTracker - http://localhost:50030/

ubuntu:~/app/hadoop-0.21.0$ bin/hadoop fs -put conf input
ubuntu:~/app/hadoop-0.21.0$ bin/hadoop jar hadoop-mapred-examples-0.21.0.jar grep input output 'dfs[a-z.]+'
ubuntu:~/app/hadoop-0.21.0$ bin/hadoop fs -get output output
ubuntu:~/app/hadoop-0.21.0$ cat output/*
or
ubuntu:~/app/hadoop-0.21.0$ bin/hadoop fs -cat output/*
ubuntu:~/app/hadoop-0.21.0$ cat output/*


이하 다른 버전들도 동일하게 테스트 수행 하면 됨.
다음에는 HDFS 에 읽고/쓰기를 테스트 해보려 합니다.
:

hadoop-1.0.1 설치 및 테스트 맛보기

ITWeb/Hadoop일반 2012. 2. 29. 15:26
[참고사이트]
http://blog.softwaregeeks.org/archives/category/develop/hadoop 

http://www.ibm.com/developerworks/kr/library/l-hadoop-1/
http://www.ibm.com/developerworks/kr/library/l-hadoop-2/
http://hadoop.apache.org/common/docs/r0.20.2/cluster_setup.html



hadoop-1.0.1 설치 및 테스트를 해보자.. (Single Machine 설정 입니다.)
기존에 0.20.X 버전과 설치 방법과 테스트 방법은 동일 합니다.

먼저 필요한 파일들을 다운 받아야 겠죠.
- JDK : http://www.oracle.com/technetwork/java/javase/downloads/jdk-6u31-download-1501634.html
- Hadoop : http://mirror.apache-kr.org//hadoop/common/hadoop-1.0.1/
- 파일을 다운로드 받으셔서 압축을 해제 하신 후 아래 폴더 구조에 맞게 넣어 주시면 됩니다.

기본 디렉토리 구조
- 저는 그냥 일반 계정에 설치를 하였습니다.
- 일반적으로는 hadoop 계정을 만드셔서 해당 계정에 설치를 하시면 될 것 같습니다.
/home
    /henry
        /app
            /jdk
            /hadoop

환경설정하기
- bash 설정입니다.
- vi .bash_profile 또는 .bashrc 에 추가 합니다.
export JAVA_HOME=/home/henry/app/jdk
export HADOOP_HOME=/home/henry/app/hadoop
export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH

샘플 실행하기
- 0.20.X 와 같이 example jar 파일이 있습니다.
- hadoop-examples-1.0.1.jar
$HADOOP_HOME에서 수행하는 command 순서 입니다.
mkdir temp
cp hadoop-examples-1.0.1.jar ./temp/
cd temp
jar xvf hadoop-examples-1.0.1.jar
rm -rf META-INF
rm -f hadoop-examples-1.0.1.jar
jar cvf ../hadoop-examples-1.0.1-0.jar .
cd ..
vi input.txt # 여러 단어들을 입력 하시면 됩니다. 예제가 word count 이므로
a
aa
b
bb
a
aaa
b
bb
c
cc
ccc
dd
cc
# 이와 같이 넣어 봤습니다.
hadoop jar hadoop-examples-1.0.1.0.jar org.apache.hadoop.examples.WordCount input.txt output
# 실행 결과
henry@ubuntu:~/app/hadoop$ hadoop jar hadoop-examples-1.0.1.0.jar org.apache.hadoop.examples.WordCount input.txt output
Warning: $HADOOP_HOME is deprecated.

12/02/29 15:16:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library
****file:/home/henry/app/hadoop-1.0.1/input.txt
12/02/29 15:16:06 INFO input.FileInputFormat: Total input paths to process : 1
12/02/29 15:16:07 INFO mapred.JobClient: Running job: job_local_0001
12/02/29 15:16:07 INFO util.ProcessTree: setsid exited with exit code 0
12/02/29 15:16:07 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@ae533a
12/02/29 15:16:07 INFO mapred.MapTask: io.sort.mb = 100
12/02/29 15:16:07 INFO mapred.MapTask: data buffer = 79691776/99614720
12/02/29 15:16:07 INFO mapred.MapTask: record buffer = 262144/327680
12/02/29 15:16:07 INFO mapred.MapTask: Starting flush of map output
12/02/29 15:16:07 INFO mapred.MapTask: Finished spill 0
12/02/29 15:16:07 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/02/29 15:16:08 INFO mapred.JobClient:  map 0% reduce 0%
12/02/29 15:16:10 INFO mapred.LocalJobRunner:
12/02/29 15:16:10 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/02/29 15:16:10 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@6782a9
12/02/29 15:16:10 INFO mapred.LocalJobRunner:
12/02/29 15:16:10 INFO mapred.Merger: Merging 1 sorted segments
12/02/29 15:16:10 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 82 bytes
12/02/29 15:16:10 INFO mapred.LocalJobRunner:
12/02/29 15:16:10 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/02/29 15:16:10 INFO mapred.LocalJobRunner:
12/02/29 15:16:10 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/02/29 15:16:13 INFO mapred.LocalJobRunner: reduce > reduce
12/02/29 15:16:13 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
12/02/29 15:16:14 INFO mapred.JobClient:  map 100% reduce 100%
12/02/29 15:16:14 INFO mapred.JobClient: Job complete: job_local_0001
12/02/29 15:16:14 INFO mapred.JobClient: Counters: 20
12/02/29 15:16:14 INFO mapred.JobClient:   File Output Format Counters
12/02/29 15:16:14 INFO mapred.JobClient:     Bytes Written=56
12/02/29 15:16:14 INFO mapred.JobClient:   FileSystemCounters
12/02/29 15:16:14 INFO mapred.JobClient:     FILE_BYTES_READ=288058
12/02/29 15:16:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=354898
12/02/29 15:16:14 INFO mapred.JobClient:   File Input Format Counters
12/02/29 15:16:14 INFO mapred.JobClient:     Bytes Read=36
12/02/29 15:16:14 INFO mapred.JobClient:   Map-Reduce Framework
12/02/29 15:16:14 INFO mapred.JobClient:     Map output materialized bytes=86
12/02/29 15:16:14 INFO mapred.JobClient:     Map input records=13
12/02/29 15:16:14 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/02/29 15:16:14 INFO mapred.JobClient:     Spilled Records=18
12/02/29 15:16:14 INFO mapred.JobClient:     Map output bytes=88
12/02/29 15:16:14 INFO mapred.JobClient:     Total committed heap usage (bytes)=324665344
12/02/29 15:16:14 INFO mapred.JobClient:     CPU time spent (ms)=0
12/02/29 15:16:14 INFO mapred.JobClient:     SPLIT_RAW_BYTES=115
12/02/29 15:16:14 INFO mapred.JobClient:     Combine input records=13
12/02/29 15:16:14 INFO mapred.JobClient:     Reduce input records=9
12/02/29 15:16:14 INFO mapred.JobClient:     Reduce input groups=9
12/02/29 15:16:14 INFO mapred.JobClient:     Combine output records=9
12/02/29 15:16:14 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
12/02/29 15:16:14 INFO mapred.JobClient:     Reduce output records=9
12/02/29 15:16:14 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
12/02/29 15:16:14 INFO mapred.JobClient:     Map output records=13


henry@ubuntu:~/app/hadoop$ cat output/*
a    2
aa    1
aaa    1
b    2
bb    2
c    1
cc    2
ccc    1
dd    1


참 쉽죠.. ^^;
그럼 Hadoop Example 코드를 볼까요?
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

아주 기초적인 내용이니 활용은 함께 공부해 보아요.. ^^
: