[HADOOP] snappy 압축 파일 읽기.

ITWeb/Hadoop일반 2014. 7. 15. 12:55

테스트를 하다 보면 hdfs 에 snappy 로 압축 된 파일을 읽어서 내용을 검증 하는 기능을 넣을때가 있습니다.

ㅎㅎ 제가 구현 하던 모듈에 필요해서 공유 합니다.

기본 구조는 아래 링크 참고 해서 작성 하였습니다.


[참조소스]

https://code.google.com/p/hadoop-snappy/source/browse/trunk/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCodec.java

/*

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io.compress.snappy;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;

import junit.framework.TestCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.util.ReflectionUtils;

public class TestSnappyCodec extends TestCase {
  private String inputDir;
  
  @Override
  protected void setUp(throws Exception {
    super.setUp();
    inputDir System.getProperty("test.build.data""target");
  }
  
  public void testFile(throws Exception {
    run("test.txt");
  }
  
  private void run(String filenamethrows FileNotFoundExceptionIOException{
    File snappyFile new File(inputDirfilename new SnappyCodec().getDefaultExtension());
    if (snappyFile.exists(){
      snappyFile.delete();
    }
    
    Configuration conf new Configuration();
    CompressionCodec codec (CompressionCodecReflectionUtils.newInstance(SnappyCodec.classconf);
    
    // Compress
    InputStream is this.getClass().getClassLoader().getResourceAsStream("test.txt");
    FileOutputStream os new FileOutputStream(snappyFile);
    CompressionOutputStream cos codec.createOutputStream(os);
    
    byte buffer[new byte[8192];
    try {
      int bytesRead 0;
      while ((bytesRead is.read(buffer)0{
        cos.write(buffer0bytesRead);
      }
    catch (IOException e{
      System.err.println("Compress Error");
      e.printStackTrace();
    finally {
      is.close();
      cos.close();
      os.close();
    }
    
    // Decompress
    is =  this.getClass().getClassLoader().getResourceAsStream("test.txt");
    FileInputStream is2 new FileInputStream(snappyFile);
    CompressionInputStream cis codec.createInputStream(is2);
    BufferedReader new BufferedReader(new InputStreamReader(is));
    BufferedReader cr new BufferedReader(new InputStreamReader(cis));
    
    
    try {
      String linerline;
      int lineNum 0;
      while ((line r.readLine()!= null{
        lineNum++;
        rline cr.readLine();
        if (!rline.equals(line){
          System.err.println("Decompress error at line " line " of file " filename);
          System.err.println("Original: [" line "]");
          System.err.println("Decompressed: [" rline "]");
        }
        assertEquals(rlineline);
      }
      assertNull(cr.readLine());
    catch (IOException e{
      System.err.println("Decompress Error");
      e.printStackTrace();
    finally {
      cis.close();
      is.close();
      os.close();
    }
  }
}



[구현소스]

- 위 예제는 로컬테스트용으로 작성 되어 있기 때문에 hdfs 에서 직접 읽어오는 형태로 약간 수정했습니다.

  public static void readLines(Path locationConfiguration confthrows Exception {

    CompressionCodec codec (CompressionCodecReflectionUtils.newInstance(SnappyCodec.classconf);
    FileSystem hdfs FileSystem.get(conf);
    
    try{
      FileStatus[fileStatus hdfs.listStatus(location);
      
      for (FileStatus file fileStatus{
        if file.isDirectory({
          continue;
        }
        
        CompressionInputStream cis codec.createInputStream(hdfs.open(file.getPath()));
        BufferedReader br new BufferedReader(new InputStreamReader(cis));

        try {
          String line;
          line=br.readLine();
          
          while (line != null){
            line br.readLine();

                     System.out.println(line);
          }
        finally {
          br.close();
        }
      }
    catch IOException {
    }
  }


: