logstash数据采集
前期准备
首先你需要安装Logstash和es,并掌握基本使用方法
参考
部署-linux-centOs-大数据基础配置
大数据-示例集合-logstash
大数据-示例集合-elasticsearch大数据-项目-日志数据处理中心
任务需求
有这样一个日志数据,用逗号表达式分割,如果为空需要赋值为空,现在需要通过logstash进行解析,将数据写入es中. 其中 日志文件为YYYY-MM-DD.log命名,需要每天晚上定时导入当日文件
数据导入
首先编写logstash配置文件,解析日志文件并输出到控制台确认是否正确
数据预处理
#文件位置/opt/logs/2024-07-26.log
#文件格式 呼入被叫,呼出主叫,起始时间,终止时间,通话时长,终止原因,挂断方,接续时长,接通延迟
#文件内容
26165884476,,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,主叫前缀受限,服务器,0.000,未接通
202751235100,,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,主叫前缀受限,服务器,0.001,未接通
44413507803336,19975766763,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,RequestTerminated(487),被叫,22.982,17.406
718398185567,17310493424,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,主叫挂断,主叫,5.474,1.524
213864381335,,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,外部黑名单,服务器,0.004,未接通
218512328787,,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,外部黑名单,服务器,0.003,未接通
215023908999,19946900749,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,Forbidden(403),被叫,2.124,未接通
72217306963359,0070217368413047,2024-07-26 14:11:00.0,2024-07-26 14:11:00.0,0,主叫挂断,主叫,10.721,2.006
我们使用csv逗号分隔的方式读取日志文件
#新建logstash配置文件 big_data.conf
input {
file {
path => "/opt/logs/2024-07-26.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => plain { charset => "UTF-8" }
}
}
filter {
csv {
separator => ","
columns => ["呼入被叫", "呼出主叫", "起始时间", "终止时间", "通话时长", "终止原因", "挂断方", "接续时长", "接通延迟"]
skip_header => true
}
}
output {
stdout {
codec => rubydebug
}
}
#执行检查./bin/logstash -f big_data.conf -t
#执行 ./bin/logstash -f big_data.conf
输出结果可以看到输出了许多logstash自带的字段,我们需要进行额外剔除,并对数据进行英文命名
#英文化字段,并剔除不需要的字段,然后处理呼入呼出字段
input {
file {
path => "/opt/logs/2024-07-26.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => plain { charset => "UTF-8" }
}
}
filter {
csv {
separator => ","
#columns => ["呼入被叫", "呼出主叫", "起始时间", "终止时间", "通话时长", "终止原因", "挂断方", "接续时长", "接通延迟"]
columns => ["called", "calling", "start", "end", "duration", "reason", "hangUp", "continue", "latencies"]
skip_header => true
}
mutate {
# 移除不需要的字段
remove_field => ["host", "message", "log" , "event"]
# 处理主叫被叫字段,先转换为纯数字
gsub => ["called", "[^0-9]", ""]
gsub => ["calling", "[^0-9]", ""]
}
# 处理 called 字段
ruby {
code => "
if event.get('called').to_s.length >= 11
event.set('called', event.get('called')[-11, 11])
else
event.set('called', '')
end
"
}
# 处理 calling 字段
ruby {
code => "
if event.get('calling').to_s.length >= 11
event.set('calling', event.get('calling')[-11, 11])
else
event.set('calling', '')
end
"
}
# 处理 时间字段 如果没有这个处理 在传入es中会报错
date {
match => ["start", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "start"
}
date {
match => ["end", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "end"
}
}
output {
stdout {
codec => rubydebug
}
}
#执行输出
{
"hangUp" => "服务器",
"@timestamp" => 2024-07-29T09:46:05.601644065Z,
"latencies" => "未接通",
"@version" => "1",
"calling" => "",
"end" => 2024-07-26T06:11:00.000Z,
"called" => "26165884476",
"duration" => "0",
"start" => 2024-07-26T06:11:00.000Z,
"reason" => "主叫前缀受限",
"continue" => "0.000"
}
数据定时采集
我们已经完成了数据格式化处理,根据任务需求,每天晚上定时采集.Logstash没有定时器.那么我们使用脚本+配置文件的方式实现
move_log.sh
#!/bin/bash
mv /opt/logs/$(date +"%Y-%m-%d").log /opt/logs/big_data_logs/
#记得执行
chmod +x move_log.sh
#输入配置
input {
file {
path => "/opt/logs/big_data_logs/*.log"
start_position => "beginning"
sincedb_path => "/opt/logs/reading.log"
# 使用实际路径记录读取位置
codec => plain { charset => "UTF-8" }
}
}
#系统定时器任务
# 编辑 crontab 文件
crontab -e
# 添加以下条目以每天午夜运行脚本
0 0 * * * /opt/logs/move_logs.sh
我们手动运行一次脚本看看是否生效
首先启动logstash,然后执行脚本./move_log.sh
数据输出到es
根据任务需求,我们需要将数据输出到es中,并且按每日分别索引
output {
#为了观察运行情况,实际生产环境不需要输出到控制台
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://192.168.1.74:9200"]
user => "elastic"
password => "nsQywcgUoC3ZljMJJ0p1"
index => "big_data_%{+YYYY.MM.dd}"
#这里需要修改为
#index => "big_data_%{+YYYY_MM_dd}"
}
}
在手动运行脚本的时候记得先清空/opt/logs/reading.log内容,再将日志文件还原到初始位置.否则无法完成测试
可以看到,已经成功自动创建输出到es中,但存在一点点问题,index big_data_2024.07.29 点需要替换为下划线 再看看传入的类型
{
"called": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"calling": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"continue": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"duration": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"end": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
传入类型全部成了text,我们需要对每个字段单独指定类型 我们可以在es中自定义模板,也可以在logstash中配置 这里以logstash配置为例
output {
#为了观察运行情况,实际生产环境不需要输出到控制台
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://192.168.1.74:9200"]
user => "elastic"
password => "nsQywcgUoC3ZljMJJ0p1"
index => "big_data_%{+YYYY_MM_dd}"
# 指定模板文件路径
template => "big_data_template.json"
template_name => "big_data"
}
}
编写 big_data_template.json 注意!这个模板json中不要出现#注释
text 类型:适用于全文搜索,自动对内容进行分词和分析。例如,搜索内容包含某个词。
keyword 类型:适用于精确匹配搜索,内容不会被分词。例如,搜索整个字段的确切值。
在设计es数据类型时要特别注意其中的区别
{
"index_patterns": ["big_data_*"],
"template": {
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"calling": {
"type": "long"
},
"called": {
"type": "long"
},
"@timestamp": {
"type": "date"
},
"start": {
"type": "date"
},
"end": {
"type": "date"
},
"duration": {
"type": "float"
},
"continue": {
"type": "float"
},
"hangUp": {
"type": "keyword"
},
"reason": {
"type": "keyword"
},
"@version": {
"type": "integer"
},
"latencies": {
"type": "keyword"
}
}
}
}
}
重新清空后执行 可以看到,类型正常定义了,并且数据也正常输出到es中