ElasticSearch创建索引
创建ES索引
PUT logstash_demo
{
"mappings" : {
"properties" : {
"email" : {
"type" : "text"
},
"first_name" : {
"type" : "text"
},
"last_name" : {
"type" : "text"
},
"uid" : {
"type" : "long"
}
}
}
}
创建好后查询索引数据
GET logstash_demo/_search
{
"query": {
"match_all": {}
}
}
这时查询熟的数据为空。
MySql创建表
创建表并插入数据
create table contacts (
uid serial,
email VARCHAR(80) not null,
first_name VARCHAR(80) NOT NULL,
last_name VARCHAR(80) NOT NULL
);
INSERT INTO contacts(email, first_name, last_name) VALUES('jim@example.com', 'Jim', 'Smith');
INSERT INTO contacts(email, first_name, last_name) VALUES('john@example.com', 'John', 'Smith');
INSERT INTO contacts(email, first_name, last_name) VALUES('carol@example.com', 'Carol', 'Smith');
INSERT INTO contacts(email, first_name, last_name) VALUES('sam@example.com', 'Sam', Smith);
准备LogStash
安装LogStash
去官网下载LogStash安装包,并解压。
安装jdbc插件
解压后进入logstash解压包的bin目录,执行命令
./logstash-plugin install logstash-input-jdbc
如何是在win上,则执行
./logstash-plugin.bat install logstash-input-jdbc
如果这个命令安装不了,可以去这里下载 mysql-connector-java jar包,我这里下载的是mysql-connector-java-8.0.18.jar。
然后把jar包放在LogStash文件夹下的vendor下,创建/jar/jdbc ,放入jar。
编写LogStash脚本
demo1
简单的脚本,把数据库中数据打印出来
input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/demo"
# 用户名密码
jdbc_user => "root"
jdbc_password => "123456"
# jar包的位置
jdbc_driver_library => "../vendor/jar/jdbc/mysql-connector-java-8.0.18.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
# our query
statement => "SELECT * FROM contacts"
}
}
output {
stdout { codec => json_lines }
}
demo2
把MySql中数据割进ElasticSearch中
input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/demo"
# 用户名密码
jdbc_user => "root"
jdbc_password => "123456"
# jar包的位置
jdbc_driver_library => "../vendor/jar/jdbc/mysql-connector-java-8.0.18.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
# our query
statement => "SELECT * FROM contacts"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
mutate {
remove_field => "@version"
remove_field => "@timestamp"
}
}
output {
elasticsearch {
index => "contact1"
action => "create"
document_type => "_doc"
document_id => "%{uid}"
hosts => "127.0.0.1:9200"
}
}
参考
comments powered by Disqus