go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

news/2024/9/22 21:09:52/

前言

在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile模块进行重构就成了我们必须要做的事情了。

TailFiile模块的重构流程

储存数据结构体的重构

在上一篇博文中我们定义了collectEntry来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile模块也需要使用,所以这里我们再创建一个common模块来专门储存这个数据:

type CollectEntry struct {Path  string `json:"path"`Topic string `json:"topic"`
}

tailFile模块中也需要一个结构体来储存需要的信息:

type tailTask struct{path stringtopic stringTailObj *tail.Tail
}

tail初始化模块的重构

由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:

type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}return
}

之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go中,但是现在会创建多个tailTask来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:

package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}

修改模块的全部代码

  • main.go
package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}
  • common.go
package commontype CollectEntry struct {Path  string `json:"path"`Topic string `json:"topic"`
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}

运行结果

在这里插入图片描述
当你对不同日志文件修改都有反馈时就代表运行成功啦!


http://www.ppmy.cn/news/1445702.html

相关文章

Django框架模板位置(默认自定义)

1、默认模板位置 如果当前项目名称为learning_log 在该项目下&#xff0c;有一个应用&#xff0c;名称为learning_logs 那么Django默认的‘模板位置’是&#xff1a; .\learning_log\learning_logs\templates\learning_logs 2、自定义模板位置 修改项目learning_log的sett…

【目标检测】Yolov7 的 ELAN 和 E-ELAN 模块演进(涉及到分组卷积,cardinality,梯度路径)

感觉从 YOLOv6 开始&#xff0c;YOLOv6 系列感觉优化点都着重于推理速度上面&#xff0c;YOLOv6 的 RepBlock 重参数化&#xff0c;给我的感觉就是算子融合进行加速。而 YOLOv7&#xff0c;为了在各种架构的边缘设备上获得极致的推理速度。 YOLOv7 的工作&#xff1a; 新的 b…

写一个函数,求两个整数之和,要求在函数体内不得使用 +、-、×、÷ 四则运算符号。

class Solution { public: int add(int num1, int num2){ int res 0; int Cin 0; int tmp 1; for(int i 0;i<32;i){ int a num1 & tmp;//取得num1和num2的第i位的值 int b num2 & tmp; …

SpringSecurity授权流程(自己做笔记用的)

目录 一、RABC表的设计 二、查询权限并添加Security中 三、通过注解进行授权 四、授权进行前端访问 一、RABC表的设计 基本概念就是五个表&#xff1a; 用户表&#xff1a;users 角色表&#xff1a;role 权限表&#xff1a;permission 还需要两种关系表&#xff0c;才能通过…

Python数组类+AI插件

目录 规划实现初始化插入删除查找 AI插件单测注释调优建议 小结 规划 先想清楚都写哪些&#xff0c;然后再动手操作 用Python写了一个简单数组类&#xff0c;首先思考下都写哪些功能&#xff1a; 插入删除查找用插件做单元测试和写注释 目的只是实现一个简单的数组类&#x…

移动应用安全

移动应用安全 移动应用安全主要关注Android、iOS、Windows Phone等平台上移动应用软件安全状态。它涉及应用程序在其设计运行的平台上下文中的安全问题、它们使用的框架以及预期的用户集。所有主流的移动平台都提供大量可选的安全控制&#xff0c;旨在帮助软件开发人员构建安全…

GIT入门到实战

文章目录 版本控制常见的版本控制工具版本控制分类Git与SVN的主要区别 Git基本理论&#xff08;重要&#xff09;三个区域工作流程 GIT文件操作文件的四种状态查看文件状态忽略文件 GIT 常见问题 版本控制 版本控制&#xff08;Revision control&#xff09;是一种在开发的过程…

Spring - 3 ( 12000 字 Spring 入门级教程 )

一&#xff1a;Spring Web MVC入门 1.1 响应 在我们前⾯的代码例子中&#xff0c;都已经设置了响应数据, Http 响应结果可以是数据, 也可以是静态页面&#xff0c;也可以针对响应设置状态码, Header 信息等. 1.2 返回静态页面 创建前端页面 index.html(注意路径) html代码 …