基于 Kafka + ELK + Ollama + OpenClaw 的日志收集与智能告警平台

张开发
2026/4/10 15:25:35 15 分钟阅读

分享文章

基于 Kafka + ELK + Ollama + OpenClaw 的日志收集与智能告警平台
操作系统CentOS Stream 10核心组件Kafka 3.6.1 (KRaft)、Nginx 1.26.3、Filebeat 9.3.2、Mysql 8.4.8、Elasticsearch 7.17.18、Kibana 7.17.18、Ollama (Qwen3.5:2b)、OpenClaw服务器规模10 台一、项目整体架构共 10 台 CentOS Stream 10 服务器IP 与角色规划如下IP 地址主机名部署组件核心功能192.168.117.135app1Flask (app.py)后端业务服务192.168.117.136app2Flask (app.py)后端业务服务负载均衡192.168.117.137nginxNginx Filebeat反向代理 日志采集192.168.117.138kafka1KafkaKRaft 节点 1192.168.117.139kafka2KafkaKRaft 节点 2192.168.117.140kafka3KafkaKRaft 节点 3192.168.117.141mysqlMySQL 8.4.8持久化存储192.168.117.142elkElasticsearch Kibana实时分析 可视化192.168.117.143ollamaOllama monitor.pyAI 判断 日志监控192.168.117.158openclawOpenClaw自动化运维诊断二、环境准备所有节点执行2.1 基础软件安装dnf install wget vim java-21-openjdk.x86_64 -y2.2 静态 IP 配置按各自机器修改编辑 /etc/NetworkManager/system-connections/ens33.nmconnection示例以 kafka1 为例[ipv4] methodmanual addresses1192.168.117.137/24,192.168.117.2 dns114.114.114.114重启网络nmcli connection reload nmcli device up ens332.3 配置主机名# 各节点分别执行 hostnamectl set-hostname app1 # 135节点 hostnamectl set-hostname app2 # 136节点 hostnamectl set-hostname nginx # 137节点 hostnamectl set-hostname kafka1 # 138节点 hostnamectl set-hostname kafka2 # 139节点 hostnamectl set-hostname kafka3 # 140节点 hostnamectl set-hostname mysql # 141节点 hostnamectl set-hostname elk # 142节点 hostnamectl set-hostname ollama # 143节点 hostnamectl set-hostname openclaw # 158节点2.4 Hosts 映射编辑 /etc/hosts 192.168.117.135 app1 192.168.117.136 app2 192.168.117.137 nginx 192.168.117.138 kafka1 192.168.117.139 kafka2 192.168.117.140 kafka3 192.168.117.141 mysql 192.168.117.142 elk 192.168.117.143 ollama 192.168.117.158 openclaw2.5 关闭防火墙与 SELinux# 关闭防火墙 iptables -F systemctl stop firewalld systemctl disable firewalld # 禁用 SELinux vim /etc/selinux/config SELINUXdisabled # 重启生效 reboot三、Kafka 3.6.1 集群部署KRaft 模式部署节点kafka1(138)、kafka2(139)、kafka3(140)3.1 下载解压cd /opt wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar xf kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.13.2 配置文件修改编辑 /opt/kafka_2.13-3.6.1/config/kraft/server.properties关键配置按节点修改配置项kafka1 (138)kafka2 (139)kafka3 (140)node.id123listenersPLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093PLAINTEXT://kafka3:9092,CONTROLLER://kafka3:9093advertised.listenersPLAINTEXT://kafka1:9092PLAINTEXT://kafka2:9092PLAINTEXT://kafka3:9092listenersPLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093其余配置不动3.3 集群初始化仅在 kafka1 执行生成集群 UUIDcd /opt/kafka_2.13-3.6.1 bin/kafka-storage.sh random-uuid tmp_random cat tmp_random所有节点执行格式化存储bin/kafka-storage.sh format -t 集群UUID -c /opt/kafka_2.13-3.6.1/config/kraft/server.properties3.4 配置 Systemd 服务vim /usr/lib/systemd/system/kafka.service[Unit] DescriptionApache Kafka server (KRaft mode) Documentationhttp://kafka.apache.org/documentation.html Afternetwork.target [Service] Typeforking Userroot Grouproot EnvironmentPATH/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/lib/jvm/java-21-openjdk/bin/ ExecStart/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties ExecStop/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh Restarton-failure [Install] WantedBymulti-user.targetsystemctl daemon-reload systemctl start kafka systemctl enable kafka3.5 集群验证# 创建 Topic3 副本 3 分区 bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic nginxlog # 查看 Topic bin/kafka-topics.sh --list --bootstrap-server kafka3:9092 # 测试生产消费 bin/kafka-console-producer.sh --broker-list kafka3:9092 --topic nginxlog bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic nginxlog --from-beginning四、后端 Flask 服务部署部署节点app1(135)、app2(136)4.1 环境安装dnf install epel-release -y dnf install python3 python3-pip -y python3 -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple python3 -m pip install flask -i https://pypi.tuna.tsinghua.edu.cn/simple4.2 应用部署/root/app.pyfrom flask import Flask app Flask(__name__) app.route(/) def index(): return this is flask web kafka2 app.run(host 0.0.0.0)4.3 配置 Systemd 服务vim /etc/systemd/system/app.service[Unit] DescriptionFlask App Service Afternetwork.target [Service] Userroot WorkingDirectory/root ExecStart/usr/bin/python3 /root/app.py Restartalways RestartSec5 [Install] WantedBymulti-user.targetsystemctl daemon-reload systemctl start app systemctl enable app五、Nginx 1.26.3 Filebeat 9.3.2 部署部署节点nginx (137)5.1安装 Nginxyum install epel-release -y yum install nginx -y5.2 配置反向代理代理后端两台 Flaskvim /etc/nginx/conf.d/app.confupstream app { server 192.168.117.135:5000; server 192.168.117.136:5000; } server { listen 80; location / { proxy_pass http://app; proxy_set_header Host $host; } }systemctl start nginx systemctl enable nginx5.3 安装 Filebeatcd /opt wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-9.3.2-linux-x86_64.tar.gz tar -zxf filebeat-9.3.2-linux-x86_64.tar.gz5.4 配置 Filebeatvim /opt/filebeat-9.3.2-linux-x86_64/filebeat.yml# Filebeat inputs filebeat.inputs: - type: filestream # Change to true to enable this input configuration. enabled: true # Paths that should be crawled and fetched. Glob based paths. paths: - /var/log/nginx/access.log - /var/log/nginx/error.log # 关闭文件句柄自动关闭 close_inactive: 1s # 强制每次写入立即读取 scan_frequency: 1s #句柄超时重连 close_timeout: 1s #强制从文件尾部读取 tail_files: true #------------------------------kafka----------------------------------- output.kafka: hosts: [192.168.117.138:9092,192.168.117.139:9092,192.168.117.140:9092] topic: nginxlog keep_alive: 10s # 1. 禁用压缩减少延迟 compression: none # 2. 强制100ms内必须发送 flush_interval: 100ms bulk_max_size: 1 # Filebeat modules filebeat.config.modules: # Glob pattern for configuration loading path: ${path.config}/modules.d/*.yml # Set to true to enable config reloading reload.enabled: false # Period on which files under path should be checked for changes #reload.period: 10s # Processors processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~5.5 配置 Systemd 服务vim /etc/systemd/system/filebeat.service[Unit] DescriptionFilebeat Aftersyslog.target network.target [Service] Typesimple Userroot ExecStart/opt/filebeat-9.3.2-linux-x86_64/filebeat -e -c /opt/filebeat-9.3.2-linux-x86_64/filebeat.yml Restarton-failure RestartSec5 [Install] WantedBymulti-user.targetsystemctl daemon-reload systemctl start filebeat systemctl enable filebeat六、MySQL 8.4.8 部署部署节点mysql (141)6.1 安装dnf install mysql8.4-server -y6.2 配置vim /etc/my.cnf.d/mysql-server.cnf#添加 bind-address0.0.0.0 port3306systemctl start mysqld systemctl enable mysqld6.3 创建远程账号-- 创建 root 用户允许任意 IP 登录 CREATE USER root% IDENTIFIED BY 123456; -- 授予所有权限 GRANT ALL PRIVILEGES ON *.* TO root% WITH GRANT OPTION; -- 刷新权限 FLUSH PRIVILEGES;七、ELK 栈部署部署节点elk (142)7.1 系统调优vim /etc/security/limits.conf * soft nofile 65535 * hard nofile 65535 * soft nproc 4096 * hard nproc 4096 vim /etc/sysctl.conf vm.max_map_count655300 fs.file-max655350 sysctl -p7.2 创建专用用户groupadd es useradd -g es es7.3 Elasticsearch 7.17.18 部署7.3.1 安装 Elasticsearchcd /usr/local/src wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.18-linux-x86_64.tar.gz tar -zxvf elasticsearch-7.17.18-linux-x86_64.tar.gz mv elasticsearch-7.17.18 /usr/local/elasticsearch chown -R es:es /usr/local/elasticsearch7.3.2 修改配置vim /usr/local/elasticsearch/config/elasticsearch.yml# 集群名称单机版默认即可集群版所有节点必须一致 cluster.name: my-elasticsearch # 节点名称单机版默认即可集群版每个节点唯一 node.name: node-1 # 允许所有IP访问核心开启远程连接 network.host: 0.0.0.0 # ES默认端口 http.port: 9200 # 集群发现单机版设置为单机模式 discovery.type: single-node # 关闭跨域限制对接Kibana/Head插件必配 http.cors.enabled: true http.cors.allow-origin: *7.3.3 IK 分词器安装# 切换到es用户 su es # 进入ES的插件目录 cd /usr/local/elasticsearch/plugins # 创建ik目录 mkdir ik cd ik # 下载对应版本的IK分词器 wget https://release.infinilabs.com/analysis-ik/stable/elasticsearch-analysis-ik-7.17.18.zip # 解压如果没有unzip执行 yum install unzip -y 安装 unzip elasticsearch-analysis-ik-7.17.18.zip # 删除压缩包 rm -rf elasticsearch-analysis-ik-7.17.18.zip # 重启ES生效 ps -ef | grep elasticsearch | grep -v grep | awk {print $2} | xargs kill -9 /usr/local/elasticsearch/bin/elasticsearch -d7.3.4 配置Systemd 服务vim /etc/systemd/system/elasticsearch.service[Unit] DescriptionElasticsearch Documentationhttps://www.elastic.co Wantsnetwork-online.target Afternetwork-online.target [Service] Typeforking Useres Groupes # 你的安装目录必须和你实际路径一致 EnvironmentES_HOME/usr/local/elasticsearch EnvironmentES_PATH_CONF/usr/local/elasticsearch/config EnvironmentES_PID_DIR/usr/local/elasticsearch EnvironmentES_START_SCRIPT/usr/local/elasticsearch/bin/elasticsearch ExecStart/usr/local/elasticsearch/bin/elasticsearch -d ExecStop/usr/local/elasticsearch/bin/elasticsearch-stop Restarton-failure RestartSec5s # 标准限制 LimitNOFILE65535 LimitNPROC4096 [Install] WantedBymulti-user.targetsystemctl daemon-reload systemctl start elasticsearch systemctl enable elasticsearch7.3.5 验证curl http://192.168.117.142:92007.4 Kibana 7.17.18 部署7.4.1 安装cd /usr/local/src wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.18-linux-x86_64.tar.gz tar -zxvf kibana-7.17.18-linux-x86_64.tar.gz mv kibana-7.17.18-linux-x86_64 /usr/local/kibana chown -R es:es /usr/local/kibana7.4.2 修改配置vim /usr/local/kibana/kibana-7.17.18-linux-x86_64/config/kibana.ymlserver.host: 0.0.0.0 server.port: 5601 elasticsearch.hosts: [http://192.168.117.142:9200] elasticsearch.username: es i18n.locale: zh-CN7.4.3 配置Systemd 服务vim /etc/systemd/system/kibana.service[Unit] DescriptionKibana Documentationhttps://www.elastic.co/ Afterelasticsearch.service [Service] Useres Groupes WorkingDirectory/usr/local/kibana/kibana-7.17.18-linux-x86_64 ExecStart/usr/local/kibana/kibana-7.17.18-linux-x86_64/bin/kibana Restartalways RestartSec10 [Install] WantedBymulti-user.targetsystemctl daemon-reload systemctl start kibana systemctl enable kibana7.4.4 验证访问http://192.168.117.142:5601八、Python 数据处理层8.1 日志清洗脚本部署位置kafka1/kafka2/kafka3功能描述作为 Kafka Consumer 订阅nginxlogTopic消费原始 Nginx 日志解析日志格式提取结构化字段客户端 IP、请求时间、请求方法、URL、HTTP 状态码、省份地区、运营商等数据清洗标准化时间格式、过滤无效记录、字段类型转换分流输出将清洗后的数据同时发送至 Elasticsearch192.168.117.142实时索引和 MySQL192.168.117.141持久化存储运行方式后台持续运行建议 Systemd 托管9.2 数据入库脚本部署位置与日志清洗脚本同节点功能描述接收日志清洗脚本的输出数据建立 MySQL192.168.117.141连接池执行批量 INSERT 操作将结构化日志写入指定数据表九、智能监控与 AIOps 闭环层9.1 Ollama 部署部署节点ollama1439.1.1 安装ollamacurl -fsSL https://ollama.com/install.sh | sh9.1.2 拉取模型ollama pull qwen3.5:2b9.2 日志监控脚本monitor.py部署节点ollama (143)功能描述定时巡检每一分钟查询 Elasticsearch192.168.117.142一分钟内日志数据数据聚合统计时间窗口内的 HTTP 状态码分布4xx 错误数、5xx 错误数、请求量 QPS、平均响应时间、错误率百分比AI 决策将聚合指标通过本地 API 发送至 Ollama (qwen3.5:2b)Prompt 要求模型判断是否存在异常返回布尔值分支处理返回false系统正常记录巡检日志进入下一轮循环返回true触发异常处理流程构造异常上下文异常类型、时间范围、关键指标联动诊断HTTP 调用 OpenClaw 服务192.168.117.158传递异常上下文告警闭环接收 OpenClaw 返回的诊断结果整合后通过 SMTP 发送 QQ 邮件告警9.3 OpenClaw 自动化运维部署位置openclaw (158)部署过程参考OpenClaw源码安装-CSDN博客功能描述服务监听暴露 HTTP/gRPC 接口接收 monitor.py 的异常通知诊断执行根据异常类型自动执行预定义诊断命令集检查后端 Flask 服务状态systemctl status或ps查询查看 Nginx192.168.117.137最近错误日志tail -n 100检测后端服务器端口连通性nc或curl探测 5000 端口验证服务进程存活状态结果返回将诊断输出格式化 JSON 返回给 monitor.py十、项目结构总结十一、项目总结本文完整搭建了一套企业级日志收集、存储、分析、监控、智能告警平台包含日志采集Filebeat 实时监控 Nginx 日志低资源占用削峰解耦Kafka KRaft 3节点集群无需 ZooKeeper存储双写Elasticsearch 实时分析 MySQL 持久化归档可视化Kibana 日志检索与图表展示智能监控Ollama 本地大模型Qwen3.5:2b异常判断自动化排障OpenClaw 诊断服务状态、端口、日志邮件告警闭环整套架构采用 10 台 CentOS Stream 10 服务器组件版本经过兼容性验证全流程可复现适合作为企业级运维平台实践项目。

更多文章