[HADOOP] snappy 압축 파일 읽기.
ITWeb/Hadoop일반 2014. 7. 15. 12:55테스트를 하다 보면 hdfs 에 snappy 로 압축 된 파일을 읽어서 내용을 검증 하는 기능을 넣을때가 있습니다.
ㅎㅎ 제가 구현 하던 모듈에 필요해서 공유 합니다.
기본 구조는 아래 링크 참고 해서 작성 하였습니다.
[참조소스]
https://code.google.com/p/hadoop-snappy/source/browse/trunk/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCodec.java
/*
* Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.snappy;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.util.ReflectionUtils;
public class TestSnappyCodec extends TestCase {
private String inputDir;
@Override
protected void setUp() throws Exception {
super.setUp();
inputDir = System.getProperty("test.build.data", "target");
}
public void testFile() throws Exception {
run("test.txt");
}
private void run(String filename) throws FileNotFoundException, IOException{
File snappyFile = new File(inputDir, filename + new SnappyCodec().getDefaultExtension());
if (snappyFile.exists()) {
snappyFile.delete();
}
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(SnappyCodec.class, conf);
// Compress
InputStream is = this.getClass().getClassLoader().getResourceAsStream("test.txt");
FileOutputStream os = new FileOutputStream(snappyFile);
CompressionOutputStream cos = codec.createOutputStream(os);
byte buffer[] = new byte[8192];
try {
int bytesRead = 0;
while ((bytesRead = is.read(buffer)) > 0) {
cos.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
System.err.println("Compress Error");
e.printStackTrace();
} finally {
is.close();
cos.close();
os.close();
}
// Decompress
is = this.getClass().getClassLoader().getResourceAsStream("test.txt");
FileInputStream is2 = new FileInputStream(snappyFile);
CompressionInputStream cis = codec.createInputStream(is2);
BufferedReader r = new BufferedReader(new InputStreamReader(is));
BufferedReader cr = new BufferedReader(new InputStreamReader(cis));
try {
String line, rline;
int lineNum = 0;
while ((line = r.readLine()) != null) {
lineNum++;
rline = cr.readLine();
if (!rline.equals(line)) {
System.err.println("Decompress error at line " + line + " of file " + filename);
System.err.println("Original: [" + line + "]");
System.err.println("Decompressed: [" + rline + "]");
}
assertEquals(rline, line);
}
assertNull(cr.readLine());
} catch (IOException e) {
System.err.println("Decompress Error");
e.printStackTrace();
} finally {
cis.close();
is.close();
os.close();
}
}
}
[구현소스]
- 위 예제는 로컬테스트용으로 작성 되어 있기 때문에 hdfs 에서 직접 읽어오는 형태로 약간 수정했습니다.
public static void readLines(Path location, Configuration conf) throws Exception {
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(SnappyCodec.class, conf);
FileSystem hdfs = FileSystem.get(conf);
try{
FileStatus[] fileStatus = hdfs.listStatus(location);
for (FileStatus file : fileStatus) {
if ( file.isDirectory() ) {
continue;
}
CompressionInputStream cis = codec.createInputStream(hdfs.open(file.getPath()));
BufferedReader br = new BufferedReader(new InputStreamReader(cis));
try {
String line;
line=br.readLine();
while (line != null){
line = br.readLine();
System.out.println(line);
}
} finally {
br.close();
}
}
} catch ( IOException e ) {
}
}