nginx日志导入elasticsearch

将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建。

1、配置nginx日志格式

log_format  main        '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
                        '$status $body_bytes_sent $http_referer ' 
                        '"$http_user_agent" '
                        '"$connection" '
                        '"$http_cookie" '
                        '$request_time '
                        '$upstream_response_time';

2、安装配置filebeat,启用nginx module

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat

启用nginx模块
./filebeat modules enable nginx
查看模块
./filebeat modules list
创建配置文件

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
  access:
    enabled: true
    var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
  #error:
  #  enabled: true
  #  var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
  hosts: ["192.168.15.91:5044"]

启动filebeat
./filebeat -c blog_module_logstash.yml -e

3、配置logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd /usr/local/logstash

logstash内置的模板目录
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
编辑 grok-patterns 添加一个支持多ip的正则
FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

官方grok

http://grokdebug.herokuapp.com/patterns#

创建logstash pipline配置文件

#input {
#	stdin {}
#}
# 从filebeat接受数据
input {
	beats {
		port => 5044
		host => "0.0.0.0"
	}
}

filter {
	# 添加一个调试的开关
	mutate{add_field => {"[@metadata][debug]"=>true}}
	grok {
		# 过滤nginx日志
		#match => { "message" => "%{NGINXACCESS_TEST2}" }
		#match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
		#match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
        match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
	}
	# 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
	ruby { 
		#code => "event.set('@read_timestamp',event.get('@timestamp'))"
		#将时区改为东8区
		code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
	}
	# 将nginx的日志记录时间格式化
	# 格式化时间 20/May/2015:21:05:56 +0000
	date {
		locale => "en"
		match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"]
	}
	# 将bytes字段由字符串转换为数字
	mutate {
		convert => {"bytes" => "integer"}
	}
	# 将cookie字段解析成一个json
	#mutate {
	#	gsub => ["cookies",'\;',',']
	#}	
	# 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
	if[http_x_forwarded_for] =~ ", "{
        	ruby {
                	code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
              	}
       	}
	# 解析ip,获得ip的地理位置
	geoip {
		source => "http_x_forwarded_for"
	#	# 只获取ip的经纬度、国家、城市、时区
		fields => ["location","country_name","city_name","region_name"]	
	}
	# 将agent字段解析,获得浏览器、系统版本等具体信息
	useragent {
		source => "agent"
		target => "useragent"
	}
	#指定要删除的数据
	#mutate{remove_field=>["message"]}
	# 根据日志名设置索引名的前缀
	ruby {
		code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
	}	
	# 将@timestamp 格式化为2019.04.23
	ruby {
		code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
	}
	# 设置输出的默认索引名
	mutate {
		add_field => {
			#"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
			"[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
		}
	}
	# 将cookies字段解析成json
#	mutate {
#		gsub => [
#			"cookies", ";", ",",
#			"cookies", "=", ":"
#		]
#		#split => {"cookies" => ","}
#	}
#	json_encode {
#		source => "cookies"
#		target => "cookies_json"
#	}
#	mutate {
#		gsub => [
#			"cookies_json", ',', '","',
#			"cookies_json", ':', '":"'
#		]
#	}
#	json {
#		source => "cookies_json"
#		target => "cookies2"
#	}
	# 如果grok解析存在错误,将错误独立写入一个索引
	if "_grokparsefailure" in [tags] {
	#if "_dateparsefailure" in [tags] {
		mutate {
			replace => {
				#"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
				"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
			}
		}
	# 如果不存在错误就删除message
	}else{
		mutate{remove_field=>["message"]}
	}
}

output {
	if [@metadata][debug]{
		# 输出到rubydebuyg并输出metadata
		stdout{codec => rubydebug{metadata => true}}
	}else{
		# 将输出内容转换成 "."
		stdout{codec => dots}	
		# 将输出到指定的es
		elasticsearch {
			hosts => ["192.168.15.160:9200"]
			index => "%{[@metadata][index]}"
			document_type => "doc"
		}	
	}
}

启动logstash
nohup bin/logstash -f test_pipline2.conf &

 

发表评论