logstash使用
安装Logstash
Logstash版本和elasticsearch版本一定要对应上,否则报错会摸不着头脑
参考https://www.elastic.co/cn/support/matrix#matrix_compatibility
参考 部署-linux-centOs-大数据基础配置
#logstash-8.12.2
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.12.2-linux-x86_64.tar.gz
tar -zxvf logstash-8.12.2-linux-x86_64.tar.gz -C /opt/
mv logstash-8.12.2 logstash
vim config/logstash.yml
#添加到末尾,作用是为了跨域请求
http.host: "0.0.0.0"
#启动测试
./bin/logstash -e 'input {stdin {}} output{stdout{}}'
#检查配置文件
./bin/logstash -f myconf.conf -t
#以配置文件启动
./bin/logstash -f myconf.conf
#访问url
http://192.168.1.74:9600/
访问成功后输出以下信息
初步使用Logstash
我们可以使用Logstash来收集日志,然后进行解析。在进行解析时,我们需要配置输入,输出和过滤器。 这是一个完整的带输入输出过滤器的配置文件.从文件输入,输出到elasticsearch
但logstash功能不止于此.
#配置文件
vim myconf.conf
input {
file {
path => "/opt/logs/*.log"
start_position => "beginning" # 从文件头开始读取(默认为end)
sincedb_path => "/path/to/sincedb_file" # 自动记录每个文件最后读取的位置
ignore_older => 0 # 忽略比当前时间更旧的文件,默认为0表示不忽略任何文件
discover_interval => 15 # 发现新文件的时间间隔(秒),可选配置项
}
}
filter {
if [message] =~ "AXBBackWordReq" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp},%{NUMBER} - %{WORD:action}\(sessionid=%{DATA:sessionid}, callerNumber=%{DATA:callerNumber}, calledNumber=%{DATA:calledNumber}, extNumber=%{DATA:extNumber}, dialTime=%{NUMBER:dialTime}\)" }
}
}
else if [message] =~ "AXBBackWordResp" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp},%{NUMBER} - %{WORD:action}\(sessionid=%{DATA:sessionid}, code=%{NUMBER:code}, calledNumber=%{DATA:calledNumber}, userData=%{DATA:userData}, displayNumber=%{DATA:displayNumber}\)" }
}
}
}
output{
elasticsearch {
hosts => ["http://192.168.1.74:9200"]
user => "elastic"
password => "nsQywcgUoC3ZljMJJ0p1"
index => "my_index-%{+YYYY.MM.dd}" # 包含日期的动态索引名称
manage_template => false # 禁用Logstash自动生成模板
template => "myTemplate.json" # 指定模板文件路径
}
}
vim myTemplate.json
#json中不能存在注释,需要原样复制
{
"index_patterns": ["axblogstash-*"],
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"action": { "type": "keyword" },
"sessionid": { "type": "keyword" },
"callerNumber": { "type": "keyword" },
"calledNumber": { "type": "keyword" },
"extNumber": { "type": "keyword" },
"dialTime": { "type": "long" },
"code": { "type": "integer" },
"userData": { "type": "text", "analyzer": "standard" },
"displayNumber": { "type": "keyword" }
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}
输入
先从数据输入源入手
Logstash提供了一个非常长的数据来源支持列表,这里我们举例几个常用的输入配置,要寻找自己不常用的数据源,请搜索官网 https://www.elastic.co/guide/en/logstash/current/input-plugins.html
#最简单的输入配置
input {
stdin {}
syslog {}
}
文件输入
#从文件输入
input {
file {
# 日志文件地址
path => ["/var/log/*.log", "/var/log/message"]
# 默认一小时
close_older => 3600
# 设置新行分隔符,默认为" \ n"。
delimiter => "\n"
# 此参数配合stat_interval,此值用来发现新文件,最终频率为discover_interval × stat_interval。默认15
discover_interval =>
# 默认为1秒
stat_interval => "1 second"
# 忽略压缩包
exclude => exclude => "*.gz"
# 默认为false
exit_after_read => false
# 默认为4611686018427387903,是为了保证在读取下一个文件前保证当前文件已经读取完毕
file_chunk_count => 4611686018427387903
# 默认为32kb
file_chunk_size => 32768
# 默认值为delete,可选值:delete,log,log_and_delete
file_completed_action => delete
# 将完全读取的文件路径附加到哪个文件,此内容没有默认值
file_completed_log_path => "/usr/local/log2/completed.log"
# 默认值last_modified,可设置内容:last_modified, path
file_sort_by => last_modified
# 默认值asc,可设置的值asc, desc
file_sort_direction =>
# 设置了忽略1000秒之前修改的文件,此内容没有默认值
ignore_older => 1000
# 设置最多打开文件量,此值没有默认值,但是存在一个内部限制4095
max_open_files => 4095
# 设置了输入模式为tail
# tail模式下,start_position 和close_older参数将被忽略。start_position始终从头开始读取文件,close_older到达EOF时文件自动关闭
# read模式下需要设置ignore_older 、file_completed_action 、file_completed_log_path 参数
mode => tail
# 默认值2周
sincedb_clean_after => "2 weeks"
# 此为默认值,此值为文件路径而不是目录路径
sincedb_path => path.data>/plugins/inputs/file
# 默认值15秒
sincedb_write_interval => "15 seconds"
# 默认值"end",可选值beginning,end。如果启动logstash的时候需要读取旧数据需要设置为beginning
start_position => "end"
# 下面是公共配置
# 设置了type为system
type => "system"
# 默认line
codec => "json"
# 默认值为true
enable_metric => false
# 指定此数据输入id为input1
id => input1
# 添加了键位key值为value的数据到时间
add_field => {
"key" => "value"
}
}
}
参数 | 作用 | 参数类型 |
---|---|---|
close_older | 文件输入将关闭最近一次在指定持续时间(如果指定了数字,则为秒)之前读取的所有文件 | number或者string_duration |
delimiter | 设置新行分隔符,默认为" \ n"。 | string |
discover_interval | 每隔多久去检查一次被监听的 path 下是否有新文件。 | number |
exclude | 排除例外文件 | array |
exit_after_read | 可以在read模式下使用此选项,以在读取文件时强制关闭所有观察程序。可以用于文件内容为静态且在执行期间不会更改的情况。 | boolean |
file_chunk_count | 在移至下一个活动文件之前从每个文件读取多少条数据 | number |
file_chunk_size | 每条数据读取的大小 | number |
file_completed_action | 在read模式下,完成文件后应执行什么操作。如果指定了删除,则文件将被删除。如果指定了日志,则文件的完整路径将记录到file_completed_log_path设置中指定的文件中 。如果log_and_delete指定,则以上两个动作都会发生。 | string 可选参["delete", "log", "log_and_delete"]数 |
file_completed_log_path | 完全读取的文件路径应附加到哪个文件。只有当指定文件这条道路file_completed_action是日志或log_and_delete是使用 | string |
file_sort_by | 应该使用"监视"文件的哪个属性对其进行排序。文件可以按修改日期或全路径字母排序。 | string 可选参数["last_modified", "path"] |
file_sort_direction | 排序"监视"的文件时,在升序和降序之间进行选择 | string 可选参数["asc", "desc"] |
ignore_older | 当文件输入发现在指定持续时间(如果指定了数字,则为秒)之前最后修改的文件时,将忽略该文件。 | number或者string_duration |
max_open_files | 此输入一次一次消耗的file_handles的最大数量是多少。当打开的文件数量超过指定数量则会关闭一些文件 | number |
mode | 您希望文件输入以哪种模式操作。 | string 可选参数 ["tail", "read"] |
path | 输入的文件的路径。 | array |
sincedb_clean_after | 如果在过去N天内未在跟踪文件中检测到任何更改,则它的sincedb跟踪记录将过期,并且不会保留。 | number或者string_duration |
sincedb_path | 定义 sincedb 文件的位置 | string |
sincedb_write_interval | 每隔多久写一次 sincedb 文件 | number或者string_duration |
start_position | 从什么位置开始读取文件数据。支持beginning或者end。end的时候从结束位置开始读取数据,而beginning则是从开头开始读取数 | string 可选参数["beginning", "end"] |
stat_interval | 我们统计文件的频率(以秒为单位),以查看它们是否已被修改。 | number或者string_duration |
TCP输入
#最基础的Socket通信
input {
tcp {
# 主机地址
host => "192.168.0.2"
# 此时需要监听客户端
mode => "server"
# 要监听的端口
port => 8081
# 默认值为false
tcp_keep_alive => false
# 默认值为true
dns_reverse_lookup_enabled => true
# 下面是公共配置
# 设置了type为system
type => "system"
# 默认line
codec => "json"
# 默认值为true
enable_metric => false
# 指定此数据输入id为input1
id => input1
# 添加了键位key值为value的数据到时间
add_field => {
"key" => "value"
}
}
}
参数 | 作用 | 参数类型 |
---|---|---|
host | 监听的地址(mode=server),连接的地址(mode=client) | string |
mode | 运行模式。server指侦听客户端连接, client指连接到服务器。 | string 可选参数["server", "client"] |
port | 当mode为时server,要监听的端口。当mode为时client,要连接的端口 | number |
proxy_protocol | 代理协议 | boolean |
ssl_cert | PEM格式的证书路径。 | |
ssl_certificate_authorities | 根据这些权限验证客户证书或证书链。 | array |
ssl_enable | 启用SSL | boolean |
ssl_extra_chain_certs | 额外的X509证书的路径数组。 | array |
ssl_key | 指定证书(PEM格式)的私钥的路径。 | |
ssl_key_passphrase | 私钥的SSL密钥密码。 | password |
ssl_verify | 根据CA验证SSL连接另一端的身份 | boolean |
tcp_keep_alive | 指示套接字使用TCP保持活动。 | boolean |
dns_reverse_lookup_enabled | 通过禁用此设置可以避免DNS反向查找。 | boolean |
rabbitmq输入
input {
rabbitmq {
# 队列的主机
host => "192.168.1.2"
# 默认为guest
password => "guest"
# 消息服务器端口,默认为5672
port => 5672
# 默认为""
queue => ""
# 默认值为true
ack => true
# 默认值为{}
arguments => { "x-ha-policy" => "all" }
# 默认值为false
auto_delete => false
# 默认值为true
automatic_recovery => true
# 默认值为1秒
connect_retry_interval => 1
# 没有默认值,超时时间为无限
connection_timeout => 1000
# 默认值为false
durable => false
# 队列的交换器信息
exchange => "log.exchange"
# 队列的交换器信息
exchange_type => "direct"
# 默认值为false
exclusive => false
# 没有默认值,但是不指定的时候未60秒,秒为单位
heartbeat => 60
# 默认值为logstash,路由键
key => logstash
# 默认值为false,启动此功能保存元数据会影响性能
metadata_enabled => false
# 默认值为false,当设置true的时候表明为被动队列,则在消息服务器上,此队列已经存在
passive => false
# 默认为256
prefetch_count => 256
# 下面是公共配置
# 设置了type为system
type => "system"
# 默认line
codec => "json"
# 默认值为true
enable_metric => false
# 指定此数据输入id为input1
id => input1
# 添加了键位key值为value的数据到时间
add_field => {
"key" => "value"
}
}
}
参数 | 作用 | 参数类型 |
---|---|---|
ack | 启用消息确认 | boolean |
arguments | 可选队列参数 | array |
auto_delete | 当最后一个使用者断开连接时,是否应该在代理上删除队列 | boolean |
automatic_recovery | 将此设置为自动从断开的连接中恢复 | boolean |
connect_retry_interval | 重试连接之前等待的时间 | number |
connection_timeout | 默认连接超时(以毫秒为单位) | number |
durable | 是否持久队列 | boolean |
exchange | 绑定队列的交换的名称 | string |
exchange_type | 要绑定的交换类型 | string |
exclusive | 队列是否排他 | boolean |
heartbeat | 心跳超时(以秒为单位 | number |
host | Rabbitmq输入/输出RabbitMQ服务器地址主机的通用功能可以是单个主机,也可以是主机列表,即主机⇒" localhost"或主机⇒[" host01"," host02] | string |
key | 将队列绑定到交换机时要使用的路由密钥。 | string |
metadata_enabled | 在中启用消息标头和属性的存储@metadata | boolean |
passive | 如果为true,将被动声明队列,这意味着它必须已经存在于服务器上。 | boolean |
password | RabbitMQ密码 | password |
port | RabbitMQ端口进行连接 | number |
prefetch_count | 预取计数。如果使用该ack 选项启用了确认,则指定允许的未完成的未确认消息的数量。 | number |
queue | 从每条消息中提取并存储在@metadata字段中的属性。 | string |
ssl | 启用或禁用SSL。请注意,默认情况下,远程证书验证处于关闭状态。 | boolean |
ssl_certificate_password | ssl_certificate_path中指定的加密PKCS12(.p12)证书文件的密码 | string |
ssl_certificate_path | PKCS12(.p12)格式的SSL证书路径,用于验证远程主机 | |
ssl_version | 要使用的SSL协议版本。 | string |
subscription_retry_interval_seconds | 订阅请求失败后,重试之前要等待的时间(秒)。 | number |
Redis输入
input {
redis {
# 默认值为 125
batch_count => 125
# 没有默认值,但其可选内容list,channel,pattern_channel
data_type => list
# 默认值为 0
db => 0
# 默认值为 "127.0.0.1"
host => "127.0.0.1"
# 指定channel,没有默认值
key => "channel"
# redis的用户密码
password => "password"
# redis服务器端口,默认值为 6379
port => 6379
# 默认不开启SSL
ssl => false
# 初始超时为1秒
timeout => 1
}
}
参数 | 作用 | 参数类型 |
---|---|---|
batch_count | 使用EVAL从Redis返回的事件数 | number |
data_type | 指定列表或频道,可选内容为:list,channel,pattern_channel,如果data_type为list,将对密钥进行BLPOP锁定。如果data_type为channel,将订阅该密钥。如果data_type为pattern_channel,将订阅该密钥。 | string 可选参数["list", "channel", "pattern_channel"] |
db | Redis数据库号 | number |
host | Redis服务器的主机名 | string |
path | Redis服务器的unix套接字路径 | string |
key | Redis列表或通道的名称 | string |
password | 用于验证的密码。默认情况下没有身份验证 | password |
port | 要连接的端口。 | number |
ssl | 启用SSL支持。 | boolean |
threads | 启动的线程 | number |
timeout | 初始连接超时(以秒为单位) | number |
command_map | 以"旧名称"⇒"新名称"的形式配置重命名的redis命令。 | hash |
JDBC输入
#每一分钟执行一遍SELECT * from songs where artist = Beethoven语句获取结果
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
jdbc_password => "root"
parameters => { "favorite_artist" => "Beethoven" }
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
#预编译来进行数据查询
input {
jdbc {
statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 4]
prepared_statement_name => "foobar"
use_prepared_statements => true
use_column_value => true
tracking_column_type => "numeric"
tracking_column => "_sequence_key"
last_run_metadata_path => "/elastic/tmp/testing/confs/test-jdbc-int-sql_last_value.yml"
}
}
#编辑好的SQL文件来进行数据查询
input {
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "logstash\sql\mysql\jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
参数 | 作用 | 参数类型 |
---|---|---|
clean_run | 是否应保留先前的运行状态 | boolean |
columns_charset | 特定列的字符编码 | hash |
connection_retry_attempts | 尝试连接数据库的最大次数 | number |
connection_retry_attempts_wait_time | 两次尝试之间休眠的秒数 | number |
jdbc_connection_string | JDBC连接字符串 | string |
jdbc_default_timezone | 时区转换。 | string |
jdbc_driver_class | JDBC驱动程序类 | string |
jdbc_driver_library | 第三方驱动程序库的JDBC驱动程序库路径。 | string |
jdbc_fetch_size | JDBC提取大小。 | number |
jdbc_page_size | JDBC页大小 | number |
jdbc_paging_enabled | JDBC启用分页 | boolean |
jdbc_password | JDBC密码 | password |
jdbc_password_filepath | JDBC密码文件名 | |
jdbc_pool_timeout | PoolTimeoutError之前等待获取连接的秒数 | number |
jdbc_user | JDBC用户 | string |
jdbc_validate_connection | 使用前验证连接。 | boolean |
jdbc_validation_timeout | 验证连接的频率(以秒为单位) | number |
last_run_metadata_path | 上次运行时间的文件路径 | string |
lowercase_column_names | 是否强制使用标识符字段的小写 | boolean |
parameters | 查询参数的哈希,例如 | hash |
plugin_timezone | 将时间戳偏移到UTC以外的时区,则可以将此设置设置为local,插件将使用OS时区进行偏移调整。 | string 可选参数 ["local", "utc"] |
prepared_statement_bind_values | 准备好的语句的绑定值数组。 | array |
prepared_statement_name | 准备好的语句的名称 | string |
record_last_run | 是否保存状态 | boolean |
schedule | 定期运行语句的时间表,例如Cron格式:" **** *"(每分钟,每分钟执行一次查询) | string |
sequel_opts | 连接池的最大连接数 | hash |
sql_log_level | 记录SQL查询的日志级别 | string 可选参数["fatal", "error", "warn", "info", "debug"] |
statement | 执行的语句的内容 | string |
statement_filepath | 执行的语句的文件的路径 | |
tracking_column | 要跟踪的列use_column_value | string |
tracking_column_type | 跟踪列的类型 | |
use_column_value | 设置为时true,将定义的 tracking_column值用作:sql_last_value。设置为时false,:sql_last_value反映上一次执行查询的时间。 | string 可选参数["numeric", "timestamp"] |
use_prepared_statements | 设置为时true,启用prepare语句用法 | boolean |
创建测试数据(Generator)
在上线之前,可以使用此功能在实际环境中,测试 Logstash 和 Elasticsearch 的性能状况。对于极大数据情况下生产环境的评估有很重要意义
input {
generator {
# 默认值是0 具体根据需要测试数据量
count => 100000
# 此时消息将会顺序发出,需要注意的是,此配置和message冲突
lines => [
"line 1",
"line 2",
"line 3"
]
# 默认值是"Hello world!"
message => '{"key1":"value1","key2":[1,2],"key3":{"subkey1":"subvalue1"}}'
# 下面是公共配置
# 设置了type为system
type => "system"
# 默认line
codec => "json"
# 默认值为true
enable_metric => false
# 指定此数据输入id为input1
id => input1
# 添加了键位key值为value的数据到时间
add_field => {
"key" => "value"
}
}
}
参数 | 作用 | 参数类型 |
---|---|---|
count | 生成消息数量 | number |
lines | 顺序消息生成 | array |
message | 生成的消息 | string |
threads | 启动的线程 | number |
公共配置
参数 | 作用 | 参数类型 |
---|---|---|
add_field | 向事件添加字段 | hash |
codec | 输入数据的编解码器 | codec(解编码) |
enable_metric | 默认情况下,我们会记录我们能记录的所有度量,但是你可以禁用特定插件的度量集合 | boolean |
id | 设置数据输入的ID,如果未指定ID,Logstash将生成一个。 | string |
tags | 为你的数据事件设置标签,这样在数据分析的时候可以通过这个标签实现更多的分析结果 | array |
type | type向此输入处理的所有事件添加一个字段。此参数可以在数据过滤环节实现更多逻辑 | string |
过滤器filter
输入数据后我们一般会对原始数据进行基本的处理再送达目的地
grok正则表达式
grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。 他是目前logstash 中解析非结构化日志数据最好的方式
#实验数据
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
#示例filter
input {
stdin {
}
}
filter{
grok{
match => {"message" => "%{IPV4:ip}"}
}
}
output {
stdout {
}
}
启动一下并在命令行中输入示例数据,得到输出
#输出结果
{
"message" => "172.16.213.132 [07/Feb/2018:16:24:19 +0800]\"GET /HTTP/1.1\" 403 5039",
#IP字段就是我们进行grok过滤出的想要值
"ip" => "172.16.213.132",
"@version" => "1",
"host" => "ip-172-31-22-29.ec2.internal",
"@timestamp" => 2019-01-22T09:48:15.354Z
}
#过滤时间试试?
filter{
grok{
match => {"message" => "%{IPV4:ip}\ \[%{HTTPDATE:timestamp}\]"}
}
}
#输出结果
{
"@version" => "1",
"timestamp" => "07/Feb/2018:16:24:19 +0800",
"@timestamp" => 2019-01-22T10:16:14.205Z,
"message" => "172.16.213.132 [07/Feb/2018:16:24:19 +0800]\"GET /HTTP/1.1\" 403 5039",
"ip" => "172.16.213.132",
"host" => "ip-172-31-22-29.ec2.internal"
}
如果我们换个输入,额外多了一些字符,那么需要重新修改我们的过滤器
172.16.213.132 - - [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
#输入多了两个- -
#需要注意的是:正则中,匹配空格和中括号要加上转义符
filter{
grok{
match => {"message" => "%{IPV4:ip}\ -\ -\ \[%{HTTPDATE:timestamp}\]"}
}
}
#也能得到正确的输出
常用的正则
正则匹配稍显复杂,如果纯手工书写的话,会非常麻烦,所以logstash提供了一种更简单的方式来写正则,官网提供了全部示例
https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/ecs-v1/
或者在安装目录下也能找到所有示例 vendor/bundle/jruby/3.1.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/
#IPV4 格式校验
^(?:(?:\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.){3}(?:\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5])$
#IPv4 (私网)格式校验
^(127\.0\.0\.1)|(localhost)|(10\.\d{1,3}\.\d{1,3}\.\d{1,3})|(172\.((1[6-9])|(2\d)|(3[01]))\.\d{1,3}\.\d{1,3})|(192\.168\.\d{1,3}\.\d{1,3})$
#IPv4(公网)格式校验
^(((?!(127\\.0\\.0\\.1)|(localhost)|(10\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})|(172\\.((1[6-9])|(2\\d)|(3[01]))\\.\\d{1,3}\\.\\d{1,3})|(192\\.168\\.\\d{1,3}\\.\\d{1,3})).)*)(?:(?:\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.){3}(?:\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5])$
而在logstash中只需要 %{IPV4:ip}
#常用的正则 数据 正则 输出
#获取IP
172.16.213.132
%{IPV4:ip}
"ip" => "172.16.213.132"
#获取时间
07/Feb/2018:16:24:19 +0800
%{HTTPDATE:timestamp}
"timestamp" => "07/Feb/2018:16:24:19 +0800"
#获取时间另一种格式
2024-03-18 07:00:03
%{TIMESTAMP_ISO8601:timestamp}
timestamp => "2024-03-18 07:00:03"
#获取文本
asgcvxrw
%{WORD:action}
action => "asgcvxrw"
#获取数字
123456789
%{NUMBER:number}
number => "123456789"
#获取数据
id=123
%{Data:id}
id => "123"
#复杂的正则
2024-03-18 07:00:03,414 - AXBBackWordReq(sessionid=isbc7jkmNI4fzVR_kDEF6-qJKN2SlPkQ-d9d9K12ceca-sbcseq.10.167c4.fd53400a, callerNumber=13456481626, calledNumber=13389442749, extNumber=null, dialTime=1710716400338)
2024-03-18 07:00:03,414 - AXBBackWordResp(sessionid=isbc7jkmNI4fzVR_kDEF6-qJKN2SlPkQ-d9d9K12ceca-sbcseq.10.167c4.fd53400a, code=50001, calledNumber=, userData=, displayNumber=)
#在filtre中添加if判断
filter {
if [message] =~ "AXBBackWordReq" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp},%{NUMBER} - %{WORD:action}\(sessionid=%{DATA:sessionid}, callerNumber=%{DATA:callerNumber}, calledNumber=%{DATA:calledNumber}, extNumber=%{DATA:extNumber}, dialTime=%{NUMBER:dialTime}\)" }
}
}
else if [message] =~ "AXBBackWordResp" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp},%{NUMBER} - %{WORD:action}\(sessionid=%{DATA:sessionid}, code=%{NUMBER:code}, calledNumber=%{DATA:calledNumber}, userData=%{DATA:userData}, displayNumber=%{DATA:displayNumber}\)" }
}
}
}
#解析结果 AXBBackWordReq
"timestamp": "2024-03-18 07:00:03"
"action": "AXBBackWordReq"
"sessionid": "isbc7jkmNI4fzVR_kDEF6-qJKN2SlPkQ-d9d9K12ceca-sbcseq.10.167c4.fd53400a"
"callerNumber": "13456481626"
"calledNumber": "13389442749"
"extNumber": null
"dialTime": 1710716400338
#解析结果 AXBBackWordResp
"timestamp": "2024-03-18 07:00:03"
"action": "AXBBackWordResp"
"sessionid": "isbc7jkmNI4fzVR_kDEF6-qJKN2SlPkQ-d9d9K12ceca-sbcseq.10.167c4.fd53400a"
"code": 50001
"calledNumber": ""
"userData": ""
"displayNumber": ""
#%{NUMBER}用于占位符
date过滤器
用于解析事件中的时间数据
#输入
2020-05-07 23:59:59
#过滤
filter {
date {
match => [ "message", "yyyy-MM-dd HH:mm:ss" ]
locale => "Asia/Shanghai"
timezone => "Europe/Paris"
target => "messageDate"
}
}
#输出
{
"@timestamp" => 2020-05-12T13:47:26.094Z,
"type" => "date",
"tags" => [[0] "_jsonparsefailure"],
"@version" => "1",
"message" => "2020-05-07 23:59:59",
"messageDate" => 2020-05-07T21:59:59.000Z
}
参数 | 作用 | 参数类型 |
---|---|---|
locale | 指定用于日期解析的区域设置 | string |
match | 如何匹配时间格式 | array |
tag_on_failure | 匹配失败后追加的内容 | array |
target | 匹配成功后的内容需要设置的目标字段 | string |
timezone | 指定用于日期解析的时区规范ID | string |
extractnumbers过滤器
注意默认情况下次过滤器为捆绑需要执行bin/logstash-plugin install logstash-filter-extractnumbers操作安装插件
此过滤器自动提取字符串中找到的所有数字
#输入
zhangSan5,age16 + 0.5 456 789
#过滤
filter {
extractnumbers {
source => "message"
target => "message2"
}
}
#输出
#部分和字母结合的字符串结构并没有被提取出来,而那些被字母和其他符号被包裹起来的数字没有被完整的提取出来
{
"float1" => 0.5,
"int2" => 789,
"int1" => 456,
"@timestamp" => 2020-05-17T03:51:23.695Z,
"@version" => "1",
"type" => "extractnumbers",
"message" => "zhangSan5,age16 + 0.5 456 789",
"tags" => [[0] "_jsonparsefailure"]
}
输出
先回到我们最初的简单示例示例
原样输出
./bin/logstash -e 'input {stdin {}} output{stdout{}}'
#输入
123
#输出
123
标准的控制台输出
#
output {
stdout {
codec => rubydebug
}
}
#输入
hello
#输出
{
"@timestamp" => 2020-05-17T03:51:23.695Z,
"@version" => "1",
"host" => "localhost",
"message" => "hello"
}
文件输出
#根据年月日小时分割日志
output {
file {
path => "/logs/%{+YYYY-MM-dd}-%{host}.log"
codec => line {
format => "%{message}"
}
gzip => true
}
}
elasticsearch输出
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
flush_size => 20000. #做批次执行,当数据量达到20000条以后,才会落地到es中
idle_flush_time => 10 #当时间达到10S ,我们就落地到es中
user => elastic
password => changeme
}
}
redis输出
output {
redis {
host => "hadoop01"
data_type => "list"
port => "6379"
db => 6
key => "logstash-redis-%{+yyyy.MM.dd}"
batch=> true. #开启批处理
batch_events=> 10000 #批处理的大小
batch_timeout=> 3 #批处理的超时时间
#防止OOM---》list
congestion_interval => 3
congestion_threhold => 100
}
}
参考
https://blog.csdn.net/u014646662/article/details/104938156https://www.elastic.co/guide/en/logstash/current/index.html
到这一步你已经初步学会使用Logstash和自己书写简单的配置文件了!记得多问问ChatGPT!