Logstash 技术实现数据同步快速入门

广告位

9 Logstash 数据同步 一.概述 Logstash是一个开源的服务器端数据处理管道,可以同时从多个数据…

9

Logstash 数据同步

一.概述

Logstash是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到Elasticsearch中。

Logstash 技术实现数据同步快速入门

  • Logstash 采用可插拔框架,拥有 200 多个插件。您可以将不同的输入选择、过滤器和输出选择混合搭配、精心安排,让它们在管道中和谐地运行。

二.​​​​​​​下载

Logstash 技术实现数据同步快速入门

  • 可自行选择版本下载

Logstash 技术实现数据同步快速入门

三.​​​​​​​安装

解压即可

Logstash 技术实现数据同步快速入门

四.​​​​​​​数据同步

Logstash的工作是从MySQL中读取数据,向ES中创建索引,这里需要提前创建mapping的模板文件以便logstash使用。

  • 步骤一:拷贝数据库驱动

Logstash 技术实现数据同步快速入门

  • 步骤二:创建  config/xc_course_mysql.conf ,用于描述从哪个数据库读取数据

  • 创建的文件必须是UTF-8编码,且无BOM

Logstash 技术实现数据同步快速入门

Logstash 技术实现数据同步快速入门

input {   stdin {   }   jdbc { 	#serverTimezone=UTC 	#serverTimezone=Asia/Shanghai     jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=GMT%2B8"     # the user we wish to excute our statement as     jdbc_user => "root"     jdbc_password => "1234"     # the path to our downloaded jdbc driver       jdbc_driver_library => "../lib/mysql-connector-java-5.1.47.jar"     # the name of the driver class for mysql     jdbc_driver_class => "com.mysql.jdbc.Driver"     #分页设置     jdbc_paging_enabled => "true"     jdbc_page_size => "50000"     jdbc_fetch_size => 100     #要执行的sql文件     #statement_filepath => "/conf/course.sql"     #statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"     statement => "select * from course_pub where timestamp > :sql_last_value"     #定时配置     schedule => "* * * * *"     record_last_run => true     last_run_metadata_path => "../config/logstash_metadata"   } }   output {   elasticsearch {     #ES的ip地址和端口     hosts => "localhost:9200"     #hosts => ["localhost:9200","localhost:9202","localhost:9203"]     #ES索引库名称     index => "xc_course"     document_id => "%{id}"     document_type => "CoursePub"     template =>"../config/xc_course_template.json"     template_name =>"xc_course"     template_overwrite =>"true"   }   stdout {    #日志输出     codec => json_lines   } }
  • 步骤三:创建 config/xc_course_template.json 用于配置logstash向elasticsearch写入数据的模板

Logstash 技术实现数据同步快速入门

{ 	"template" : "xc_course", 	"mappings" : { 		"CoursePub": { 			"properties": { 			  "charge": { 				"type": "keyword" 			  }, 			  "description": { 				"type": "text", 				"analyzer": "ik_max_word", 				"search_analyzer": "ik_smart" 			  }, 			  "end_time": { 				"type": "date", 				"format": "yyyy-MM-dd HH:mm:ss" 			  }, 			  "expires": { 				"type": "date", 				"format": "yyyy-MM-dd HH:mm:ss" 			  }, 			  "grade": { 				"type": "keyword" 			  }, 			  "id": { 				"type": "keyword" 			  }, 			  "mt": { 				"type": "keyword" 			  }, 			  "name": { 				"type": "text", 				"analyzer": "ik_max_word", 				"search_analyzer": "ik_smart" 			  }, 			  "pic": { 				"type": "keyword", 				"index": false 			  }, 			  "price": { 				"type": "float" 			  }, 			  "price_old": { 				"type": "float" 			  }, 			  "pub_time": { 				"type": "date", 				"format": "yyyy-MM-dd HH:mm:ss" 			  }, 			  "qq": { 				"type": "keyword", 				"index": false 			  }, 			  "st": { 				"type": "keyword" 			  }, 			  "start_time": { 				"type": "date", 				"format": "yyyy-MM-dd HH:mm:ss" 			  }, 			  "status": { 				"type": "keyword" 			  }, 			  "studymodel": { 				"type": "keyword" 			  }, 			  "teachmode": { 				"type": "keyword" 			  }, 			  "teachplan": { 				"type": "text", 				"analyzer": "ik_max_word", 				"search_analyzer": "ik_smart" 			  }, 			  "users": { 				"type": "text", 				"index": false 			  }, 			  "valid": { 				"type": "keyword" 			  } 			} 		} 	} }
  • 步骤四:启动logstash

logstash.bat -f ..configxc_course_mysql.conf

Logstash 技术实现数据同步快速入门

  • 启动成功:

Logstash 技术实现数据同步快速入门

  • 访问:

Logstash 技术实现数据同步快速入门

看完恭喜你,又知道了一点点!!!

你知道的越多,不知道的越多! 

~感谢志同道合的你阅读,  你的支持是我学习的最大动力 ! 加油 ,陌生人一起努力,共勉!!

注: 如果本篇有需要改进的地方或错误,欢迎大神们指定一二~~

月明星稀

关于作者: 月明星稀

为您推荐