[Elasticsearch] JDBC River 알아보기.

Elastic/Elasticsearch 2014. 6. 13. 18:25
금요일 퇴근 시간이 얼마 안남은 관계로 짧게 쓰겠습니다.

원문 : https://github.com/jprante/elasticsearch-river-jdbc

[설치방법]

# installation

# river install

https://github.com/jprante/elasticsearch-river-jdbc

bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.2.1.1/elasticsearch-river-jdbc-1.2.1.1-plugin.zip


# mysql jdbc driver install

curl http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.31.tar.gz

curl -o mysql-connector-java-5.1.31.zip -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.31.zip/from/http://cdn.mysql.com/'

# plugins/jdbc/ 아래로 mysql connector jar 파일 저장.

mv ./mysql-connector-java-5.1.31/mysql-connector-java-5.1.31-bin.jar .


제가 작성한건 약식이니까 원문 참고하셔서 설치 하시기 바랍니다.

그리고 저는 mysql  로 테스트 하였습니다.

(뭐 다른 DBMS도 크게 다르지는 않습니다.)


[River 등록]

# river register

curl -XPUT 'localhost:9200/_river/jdbc_mysql/_meta' -d '{

    "type" : "jdbc",

    "jdbc" : {

        "url" : "jdbc:mysql://localhost:3306/test",

        "user" : "root",

        "password" : "",

        "sql" : "SELECT item_id, item_code, item_name, price, regdate FROM TBL_ITEM_INCREMEN WHERE ts BETWEEN NOW() - 30 AND NOW() ANd fetch_flag = 'F'",

        "index" : "river_jdbc_mysql", # default jdbc

        "type" : "tbl_item_increment", # default jdbc

        "strategy" : "simple",

        "schedule" : null,

        "cronpoolsize" : 4,

        "rounding" : null,

        "scale" : 2,

        "ignore_null" : false,

        "autocommit" : false,

        "fetchsize" : 10, /* Integer.MIN for MySQL */

        "max_rows" : 0,

        "max_retries" : 3,

        "max_retries_wait" : "30s",

        "locale" : Locale.getDefault().toLanguageTag(),

        "timezone" : TimeZone.getDefault(),

        "index_settings" : null,

        "type_mapping" : null,

        "maxbulkactions" : 1000,

        "maxconcurrentbulkactions" : 4 * available CPU cores,

    }

}'


등록방법에는 원문에 나와 있는 default value 들을 보기 위해 몽땅 다 넣어 두었습니다.
그렇기 때문에 이건 목적에 맞게 수정해서 사용하시기 바랍니다.

[River 등록 - fetch/update]
curl -XPUT 'localhost:9200/_river/jdbc_mysql_fetch/_meta' -d '{

    "type" : "jdbc",

    "jdbc" : {

        "url" : "jdbc:mysql://localhost:3306/test",

        "user" : "root",

        "password" : "",

        "sql" : [

        {

        "statement" : "SELECT item_id, item_code, item_name, price, regdate FROM TBL_ITEM_INCREMENT WHERE fetch_flag = ?",

        "parameter" : ["F"],

        "callable" : false

        },

        {

        "statement" : "UPDATE TBL_ITEM_INCREMENT SET fetch_flag = ? WHERE ts < NOW() - ? ANd fetch_flag = ?",

        "parameter" : ["T", 180, "F"],

        "callable" : false

         }

        ],

        "index" : "river_jdbc_mysql",

        "type" : "tbl_item_increment",

        "schedule" : "0 * * ? * *",

        "maxconcurrentbulkactions" : 32

    }

}'


저는 테스트로 색인해야할 데이터를 fetch  하는 쿼리와 색인 작업이 완료된 데이터를 다시 fetch해 오지 않도록 flag 를 update 하는 쿼리를 두 개 등록해서 테스트 하였습니다.

[느낀점]
- 음... 대용량 데이터에 대한 처리 시 과연 요구하는 성능이 나올까????
- 그냥 특성에 맞는 서비스에 적용하면 아주 편리 할 것 같기는 하다.

[테스트 환경]
- elasticsearch 1.2.1 : standalone, cluster
- elasticsearch-river-jdbc 1.2.1.1
- mysql 5.6.11
- mysql connector 5.1.31


: