• (function(){ var bp = document.createElement('script'); var curProtocol = window.location.protocol.split(':')[0]; if (curProtocol === 'https') { bp.src = 'https://zz.bdstatic.com/linksubmit/push.js'; } else { bp.src = 'http://push.zhanzhang.baidu.com/push.js'; } var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(bp, s); })();

    Logstash使用介紹

    Logstash介紹

    Logstash是一個數據收集處理轉發系統,是 Java開源項目。 它只做三件事:

    • 數據輸入
    • 數據加工(不是必須的):如過濾,改寫等
    • 數據輸出

     

    下載安裝

    logstash是基于Java的服務,各操作系統安裝Java環境均可使用。 

    Java 
    https://www.java.com/zh_CN/ 
    安裝后配置好java環境變量。
    logstash 
    最新版 https://www.elastic.co/downloads/logstash 
    2.3.4版 https://www.elastic.co/downloads/past-releases/logstash-2-3-4

     

    配置結構

    #輸入
    Input{
       Jdbc{}
    }
    #加工過濾
    Filter{
       Json{}
    }
    #輸出
    Output{
       elasticsearch{}
    }
     

     

    支持的插件

    Input: elasticsearch,file,http_poller,jdbc,log4j,rss,rabbitmq,redis,syslog,tcp,udp…等

    Filter: grok,json,mutate,split …等

    Output: email,elasticsearch,file,http,mongodb,rabbitmq,redis,stdout,tcp,udp …等

    配置說明地址: https://www.elastic.co/guide/en/logstash/current/input-plugins.html

     

    應用示例

    #jdbc_demo
    input {
      jdbc {
            #數據庫鏈接設置
            jdbc_driver_library => "k:\k\k\sqljdbc_6.0\chs\sqljdbc42.jar"
            jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
            jdbc_connection_string => "jdbc:sqlserver://192.168.0.1;user=sa;password=sa;DatabaseName=EStatistic;"
            jdbc_user => "sa"
            jdbc_password => "sa"
        
    	statement_filepath => "../config-demo/sqlscript/jdbc_demo.sql"         #sql腳本
    	schedule => "* * * * *"                                                #執行計劃
    
    	record_last_run => true                                                #記錄最后運行值
    	use_column_value => true                                               #
    	tracking_column => id                                                  #要記錄的字段
    	last_run_metadata_path => "../config-demo/log/jdbc_demo"               #記錄的位置
    
    	lowercase_column_names => true                                         #將字段名全部改為小寫
    	clean_run => false
    	
            jdbc_fetch_size => 1000                                                #分頁設置
    	jdbc_page_size => 1000
    	jdbc_paging_enabled => true
      }
    }
    filter {
    	mutate {
            #重命名,可以將字段名改掉
            rename => {
                    "id" => "Id"
                    "bid" => "BId"
                    "saleconsultantid" => "SaleConsultantId"
                    "avgreplyduration" => "AvgReplyDuration"
                    "avgreplyratio" => "AvgReplyRatio"
                    "avgonlineduration" => "AvgOnlineDuration"
                    "stattime" => "StatTime"
    			}
        }
    }
    output { 
    	elasticsearch { 
    		hosts => ["192.168.0.1:9200"]                            #es 服務地址
    		index => "jdbc_demo"                                       #索引的名字
    		document_type => "demoinfo"                                #類型的名字
    		workers => 1                                               #輸出時進程數
    		document_id=>"%{Id}"                                       #文檔的唯一ID
    		template => "../config-demo/templates/jdbc_demo.json"      #模板的路徑
    		template_name => "jdbc_demo"                               #模板的名字
    		template_overwrite => false                                ##是否覆蓋已存在的模板
    	} 
        #  stdout{
        #     codec => rubydebug 
        # }
    }

     

    索引模板

    {
        "order": 1,
        "template": "jdbc_demo",                  //模板匹配規則,已經索引名匹配(eg:jdbc_demo-*,可以匹配到,jdbc_demo-1,jdbc_demo-14...)
        "settings": {
            "index.number_of_shards": 4,          //分片數
            "number_of_replicas": 1               //每個分配備份數
        },
        "mappings": {
            "_default_": {
                "_source": {
                    "enabled": true
                }
            },
            "demoinfo": {           //動態模板 如果映射后,有新的字段添加進來,并且在字段區域沒有映射會按該動態模板匹配映射
                "dynamic_templates": [
                    {
                        "string_field": {
                            "match": "*",
                            "match_mapping_type": "string", //匹配到所有字符串 設置為不分詞
                            "mapping": {
                                "index": "not_analyzed",
                                "type": "string"
                            }
                        }
                    }
                ],                      //類型名
                "_source": {
                    "enabled": true
                },
                "_all": {
                    "enabled": false
                },
                "properties": {                 //字段區域
                    "Id": {
                        "type": "long"
                    },
                    "BId": {
                        "type": "integer"
                    },
                    "SaleConsultantId": {
                        "type": "integer"
                    },
                    "AvgReplyDuration": {
                        "type": "integer"
                    },
                    "AvgReplyRatio": {
                        "type": "double"
                    },
                    "AvgOnlineDuration": {
                        "type": "integer"
                    },
                    "StatTime": {
                        "format": "strict_date_optional_time||epoch_millis",
                        "type": "date"
                    }
                }
            }
        },
        "aliases": {
            "logstashdemo": {}       //別名,匹配到該模板的會設置一個別名
        }
    }

     

    Sqlserver 查詢語句

    SELECT  [Id]
          ,[BId]
          ,[SaleConsultantId]
          ,[AvgReplyDuration]
          ,[AvgReplyRatio]
          ,[AvgOnlineDuration]
          ,[StatTime]
      FROM [EStatistic].[dbo].[StatSessionBy7Day] 
      WHERE Id>:sql_last_value
      --:sql_last_value是記錄的最后的一個值

     

    5.x模板String字段

     {"properties": {                 //字段區域
                     "NewField": {
                        "type": "keyword",       // keyword 不分詞
                       "index": false            //不建立索引
                       },
                     "NewFieldText": {
                      "type": "text",           // text分詞 
                       "index": true            // 建立索引
                     }
                }
                }

     

     

    注意事項

    • Jdbc(input)拉取數據使用分頁功能時無法查詢text、ntext 和 image字段。
    • jdbc(input)使用分頁時會將字段全部轉換為大寫。
    • elasticsearch(output)的模板中匹配符,一定要能夠匹配到索引名稱才能生效,并且要避免讓其他索引匹配到,以免影響其它新索引。
    • elasticsearch(output)定義索引名稱必須全小寫(es限制)。
    • 以時間字段進行跟蹤時sql查詢語句不能使用Top x限制每次查詢條數,這會導致最后的記錄值并非是最大的值,所拉取的數據可能會出現數據重復拉取(但是在es中不會體現為多條重復數據,只是version字段會>1)或數據丟失。
    • 跟蹤主鍵ID時需顯示設置 order by id asc
    • 無檢索文本內容需設置“index”: “not_analyzed”
     

    資料

    posted @ 2017-09-28 10:30 Mr. Hu 閱讀(...) 評論(...) 編輯 收藏
    Map
    凯发娱乐