Will Dx

人世一身霜雪, 归来仍是少年.

Django+Nginx日志+用户行为分析

Posted April 07, 2017

1.现状分析

1.1 现在状态

Raw
1. Nginx 日志是存文本
2. filebeat读取日志文件到队列
3. logstash 解析队列日志,转为JSON[grok 语法正则匹配] 存入 ELK

1.2 不足之处

Raw
1. logstash 并不能解析所有的日志条目
2. 添加session_id或者添加额外字段都需要更改 logstash 的格式化语句
  • 考虑: 将Nginx存JSON日志

1.3 理想状态:

Raw
1. Nginx存 JSON 日志, 使用filebeat将数据导出到redis(ps:后续如果要在 Apple 中添加页面来源字段也方便)
2. 自己写Python消费者将session_id 转化为 user_id 并添加基本信息, 格式化后的数据存入新队列
3. Filebeat或者 Logstash 消费新队列写入ELK

2.改造方案

2.1 Nginx日志更改为JSON存储

#Nginx 参数不完全表格

Python
log_format xingzhe_json '{"session_id":"$cookie_sessionid", "upstream_response_time":"$upstream_response_time", "addr":"$remote_addr", "user":"$remote_user", "local_time":"$time_local","status":"$status", "body_bytes_sent":"$body_bytes_sent", "http_referer":"$http_referer", "http_user_agent":"$http_user_agent", "request_time":"$request_time", "request": "$request", "request_method":"$request_method", "uri":"$uri", "args":"$args"}';

access_log  /var/log/nginx/access.log access_log_json;   # 使用刚才定义的日志格式

2.2 更改filebeat配置, 将 JSON 日志写入 redis

Python
# vim /docker_data/elk_stack_server/filebat/filebeat_nginx_access_sx1.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /home/jacky007wang/logs/xingzhe/access.log # 更改日志路径
  encoding: utf-8
  document_type: nginx_access
  scan_frequency: 10s
  harvester_buffer_size: 16384
  #multiline:
  #  pattern: '^\d'
  #  match: after

tags: ["xz-web-node1"] # 更改 tags, 目前用作表示数据来源
ignore_older: 31

output.redis:
  enabled: true
  hosts: ["xxx.xxx.xx.xx:6379"]  # 这里替换 Elasticsearch 的地址
  port: 6379
  key: nginx_access
  #password:
  db: 0
  datatype: list
  worker: 1
  loadbalance: true # 负载均衡
  timeout: 5s
  max_retries: 3
  bulk_max_size: 2048

  #ssl.enabled: true
  #ssl.verification_mode: full
  #ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
  #ssl.certificate: "/etc/pki/client/cert.pem"
  #ssl.key: "/etc/pki/client/cert.key"
  #ssl.key_passphrase: ''
  #ssl.cipher_suites: []
  #ssl.curve_types: []

logging.to_files: true
logging.files:
  path: /docker_data/elk_data/filebeat
  name: filebeat
  rotateeverybytes: 80485760
  keepfiles: 7

2.3 自建消费者,将 sessionid 转化为 user_id, 存入另一 redis

  • 注意: godbox是自建的库, 所以可能下面的源代码你无法使用, 但是代码逻辑是比较简单的
  • 注意: session 解密部分是查看的 Django 源码, 具体位置在: /usr/local/lib/python2.7/dist-packages/django/contrib/sessions
Python
# vim web/management/commands/elk_nginx_log_format.py

# -*- coding: utf-8 -*-
"""
File Name: elk_nginx_log_format.py
Author: WillDX
mail: daixiang1992@gmail.com
Created Time: Tue 11 Oct 2016 02:43:15 PM CST
"""

import sys
import time
import json
import traceback
from django.core.management.base import BaseCommand, CommandError
from godbox.utils.wechat import wechat_msg
from godbox.utils.queue import QueueHelper
from godbox.utils.queue import thread_receive_messages, receive_message
from importlib import import_module
from django.conf import settings

# 编码
reload(sys)
sys.setdefaultencoding('utf8')


class Command(BaseCommand):
    # session 解密
    engine = import_module(settings.SESSION_ENGINE)
    SessionStore = engine.SessionStore

    # redis
    queue_source = dict(
        class_name="RedisQueue",
        queue_name="nginx_access",
        host="10.117.44.44",
        port=6379,
        db=0,
        password=None)

    queue_distination = dict(
        class_name="RedisQueue",
        queue_name="nginx_access_format",
        host="10.117.44.44",
        port=6379,
        db=0,
        password=None)
    source = QueueHelper(queue_source)
    distination = QueueHelper(queue_distination)

    def session_to_uid(session_key):
        """将请求 session 中的 sessionid转化为 userid"""
        session = self.SessionStore(session_key)
        uid = session.get('_auth_user_id')
        return uidr

    def receive_message_func(self, queue_info):
        """多线程消费"""
        receive_msg = receive_message(queue_info)
        receive_msg = json.loads(receive_msg)
        session_id = receive_msg.get("session_id", "-")
        if session_id and session_id != '-':
            user_id = session_to_uid(session_id)
        else:
            user_id = '-'
        self.distination.send_message(receive_msg)
        return True

    def handle(self, *args, **options):
        """
        格式化 Nginx 日志

        1. 获取 带Sessionid 的 Nginx 日志
        2. 格式化为 Userid
        3. 写入新的 redis 队列 (后续 Logstash 将数据写入 ELK)
        """
        print("start")
        try:
            # while 1:
            #     # 消费数据
            #     #source = QueueHelper(queue_source)
            #     receive_msg = self.source.receive_message()
            #     print("receive_msg type:{}".format(type(receive_msg)))
            #     receive_msg = json.loads(receive_msg)
            #     session_id = receive_msg.get("session_id", "-")
            #     if session_id and session_id != '-':
            #         user_id = session_to_uid(session_id)
            #     else:
            #         user_id = '-'
            #     #del receive_msg['session_id']
            #     receive_msg["user_id"] = user_id
            #
            #     # 生产数据
            #     #distination = QueueHelper(queue_distination)
            #     self.distination.send_message(receive_msg)

            thread_receive_messages(self.receive_message_func, self.queue_source, num=5, exit=0)
        except Exception as e:
            traceback.print_exc()
            today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            file_name = "elk_nginx_log_format.py"
            wc_user = "dx1426409836"
            wechat_msg("[ERROR]:[{}]@{},脚本:{},任务失败!".format(today, wc_user, file_name), wc_user)
            sys.exit(1)
        print("end")

2.4 更改 Logstash 的配置文件将 redis 内容放入 ELK

Python
input {
  redis {
    host =>"xxxxx"
    type =>"nginx_access_format"
    data_type =>"list"
    key =>"nginx_access_format"
  }
}

filter {
  if [type] == "nginx_access_format" {
    #grok {
    #  match =>{
    #    "message" =>["%{BASE16FLOAT:upstream_response_time} \| %{COMBINEDAPACHELOG} %{GREEDYDATA:request_time}", "%{NGINXCOMBINEDAPACHELOG}"]
    #  }
    #  patterns_dir =>["/etc/logstash/patterns"]
    #  remove_field =>["message"]
    #}

    #mutate {
    #  convert =>["response", "integer"]
    #  convert =>["bytes", "integer"]
    #  convert =>["responsetime", "float"]
    #  convert =>["upstream_response_time", "float"]
    #  convert =>["request_time", "float"]
    #  gsub =>["referrer", "[\"]", ""]
    #  gsub =>["agent", "[\"]", ""]
    #}

    json {
      source => "message"
      et target => "nginx_access_format"
      remove_field => ["message"]
    }

    geoip {
      source =>"addr"
      target =>"geoip"
    }

    useragent {
      source =>"agent"
    }

    date {
      match =>["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
  }
}

output {
  elasticsearch {
    hosts =>["xxxx:9200", "xxx:9200"]
    index =>"format-nginx-access-%{+YYYY.MM}"
  }
  #stdout {
  #  codec =>rubydebug
  #}
}

3.最终结果

  • 最终在 Kibana 上获取到了 user_id, 后续可以监控用户在 APP 上的所有动作