금요일 퇴근 시간이 얼마 안남은 관계로 짧게 쓰겠습니다.
원문 : https://github.com/jprante/elasticsearch-river-jdbc
[설치방법]
# installation
# river install
https://github.com/jprante/elasticsearch-river-jdbc
bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.2.1.1/elasticsearch-river-jdbc-1.2.1.1-plugin.zip
# mysql jdbc driver install
curl http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.31.tar.gz
curl -o mysql-connector-java-5.1.31.zip -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.31.zip/from/http://cdn.mysql.com/'
# plugins/jdbc/ 아래로 mysql connector jar 파일 저장.
mv ./mysql-connector-java-5.1.31/mysql-connector-java-5.1.31-bin.jar .
제가 작성한건 약식이니까 원문 참고하셔서 설치 하시기 바랍니다.
그리고 저는 mysql 로 테스트 하였습니다.
(뭐 다른 DBMS도 크게 다르지는 않습니다.)
[River 등록]
# river register
curl -XPUT 'localhost:9200/_river/jdbc_mysql/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "",
"sql" : "SELECT item_id, item_code, item_name, price, regdate FROM TBL_ITEM_INCREMEN WHERE ts BETWEEN NOW() - 30 AND NOW() ANd fetch_flag = 'F'",
"index" : "river_jdbc_mysql", # default jdbc
"type" : "tbl_item_increment", # default jdbc
"strategy" : "simple",
"schedule" : null,
"cronpoolsize" : 4,
"rounding" : null,
"scale" : 2,
"ignore_null" : false,
"autocommit" : false,
"fetchsize" : 10, /* Integer.MIN for MySQL */
"max_rows" : 0,
"max_retries" : 3,
"max_retries_wait" : "30s",
"locale" : Locale.getDefault().toLanguageTag(),
"timezone" : TimeZone.getDefault(),
"index_settings" : null,
"type_mapping" : null,
"maxbulkactions" : 1000,
"maxconcurrentbulkactions" : 4 * available CPU cores,
}
}'
등록방법에는 원문에 나와 있는 default value 들을 보기 위해 몽땅 다 넣어 두었습니다.
그렇기 때문에 이건 목적에 맞게 수정해서 사용하시기 바랍니다.
[River 등록 - fetch/update]
curl -XPUT 'localhost:9200/_river/jdbc_mysql_fetch/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "",
"sql" : [
{
"statement" : "SELECT item_id, item_code, item_name, price, regdate FROM TBL_ITEM_INCREMENT WHERE fetch_flag = ?",
"parameter" : ["F"],
"callable" : false
},
{
"statement" : "UPDATE TBL_ITEM_INCREMENT SET fetch_flag = ? WHERE ts < NOW() - ? ANd fetch_flag = ?",
"parameter" : ["T", 180, "F"],
"callable" : false
}
],
"index" : "river_jdbc_mysql",
"type" : "tbl_item_increment",
"schedule" : "0 * * ? * *",
"maxconcurrentbulkactions" : 32
}
}'
저는 테스트로 색인해야할 데이터를 fetch 하는 쿼리와 색인 작업이 완료된 데이터를 다시 fetch해 오지 않도록 flag 를 update 하는 쿼리를 두 개 등록해서 테스트 하였습니다.
[느낀점]
- 음... 대용량 데이터에 대한 처리 시 과연 요구하는 성능이 나올까????
- 그냥 특성에 맞는 서비스에 적용하면 아주 편리 할 것 같기는 하다.
[테스트 환경]
- elasticsearch 1.2.1 : standalone, cluster
- elasticsearch-river-jdbc 1.2.1.1
- mysql 5.6.11
- mysql connector 5.1.31