go读写文件总结

news/2024/11/24 7:08:50/

别人的经验:

如今任何计算机系统每天都会产生大量的日志或数据。随着系统的增长,将调试数据存储到数据库中是不可行的,因为它们是不可变的,主要用于分析和解决故障的目的。因此,企业倾向于将其存储在文件中,并保存在本地磁盘中。

我们将使用Golang从大小为16 GB的.txt或.log文件中提取日志,该文件有数百万行。

直接上代码,首先打开文件,将使用标准Go os.File来读取文件IO:

f, err := os.Open(fileName)if err != nil {fmt.Println("cannot able to read the file", err)return}
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

一旦打开文件后,我们有以下两个选择进行下一步处理。
1、逐行读取文件,这有助于减少系统内存压力,但会在IO中花费更多时间。
2、一次性将整个文件都读入内存并处理该文件,这会消耗很大内存,但会节约时间。
由于文件太大例如16GB,无法将整个文件加载到内存中。但是第一个选择也不可行,因为我们希望几秒钟内处理该文件。

但是你猜怎么着,还有第三种选择。不是加载整个文件到内存,而是使用bufio.NewReader()以块的形式加载文件。

r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into bufferbuf = buf[:n]
if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}
}

一旦我们读取到文件块,我们将fork一个线程,也就是Go协程,来同时处理每个块。上面的代码将改为如下:

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {lines := make([]byte, 500*1024)return lines
}}
stringPool := sync.Pool{New: func() interface{} {lines := ""return lines
}}
slicePool := sync.Pool{New: func() interface{} {lines := make([]string, 100)return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {buf := linesPool.Get().([]byte)n, err := r.Read(buf)buf = buf[:n]
if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}
nextUntillNewline, err := r.ReadBytes('\n')//read entire lineif err != io.EOF {buf = append(buf, nextUntillNewline...)}wg.Add(1)go func() { //process each chunk concurrently//start -> log start time, end -> log end timeProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
wg.Done()}()
}
wg.Wait()
}

上面的代码引入了两个新的优化:
1、sync.Pool是一个强大的实例池,可以重用实例来减少垃圾收集器的压力。我们将重用分配的内存片。这将减少内存消耗,使代码优化更高效。
2、Go协程并发处理缓冲区块,大大提高了处理速度。
下面来实现ProcessChunk函数,处理如下格式的日志信息:

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志:

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logslogsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow noOfThread++}
length := len(logsSlice)
//traverse the chunkfor i := 0; i < length; i += chunkSize {wg2.Add(1)
//process each chunk in saperate chunkgo func(s int, e int) {for i:= s; i<e;i++{text := logsSlice[i]
if len(text) == 0 {continue}logParts := strings.SplitN(text, ",", 2)logCreationTimeString := logParts[0]logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)return}
// check if log's timestamp is inbetween our desired periodif logCreationTime.After(start) && logCreationTime.Before(end) {fmt.Println(text)}}textSlice = nilwg2.Done()}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))//passing the indexes for processing
}  wg2.Wait() //wait for a chunk to finishlogsSlice = nil
}

上面的代码使用16GB的日志文件进行基础测试,提取日志所花费的时间约25秒。
下面是完成代码:

func main() {s := time.Now()args := os.Args[1:]if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"fmt.Println("Please give proper command line arguments")return}startTimeArg := args[1]finishTimeArg := args[3]fileName := args[5]file, err := os.Open(fileName)if err != nil {fmt.Println("cannot able to read the file", err)return}defer file.Close() //close after checking errqueryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)if err != nil {fmt.Println("Could not able to parse the start time", startTimeArg)return}queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)if err != nil {fmt.Println("Could not able to parse the finish time", finishTimeArg)return}filestat, err := file.Stat()if err != nil {fmt.Println("Could not able to get the file stat")return}fileSize := filestat.Size()offset := fileSize - 1lastLineSize := 0for {b := make([]byte, 1)n, err := file.ReadAt(b, offset)if err != nil {fmt.Println("Error reading file ", err)break}char := string(b[0])if char == "\n" {break}offset--lastLineSize += n}lastLine := make([]byte, lastLineSize)_, err = file.ReadAt(lastLine, offset+1)if err != nil {fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)return}logSlice := strings.SplitN(string(lastLine), ",", 2)logCreationTimeString := logSlice[0]lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)if err != nil {fmt.Println("can not able to parse time : ", err)}if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {Process(file, queryStartTime, queryFinishTime)}fmt.Println("\nTime taken - ", time.Since(s))
}func Process(f *os.File, start time.Time, end time.Time) error {linesPool := sync.Pool{New: func() interface{} {lines := make([]byte, 250*1024)return lines}}stringPool := sync.Pool{New: func() interface{} {lines := ""return lines}}r := bufio.NewReader(f)var wg sync.WaitGroupfor {buf := linesPool.Get().([]byte)n, err := r.Read(buf)buf = buf[:n]if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}nextUntillNewline, err := r.ReadBytes('\n')if err != io.EOF {buf = append(buf, nextUntillNewline...)}wg.Add(1)go func() {ProcessChunk(buf, &linesPool, &stringPool, start, end)wg.Done()}()}wg.Wait()return nil
}func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {var wg2 sync.WaitGrouplogs := stringPool.Get().(string)logs = string(chunk)linesPool.Put(chunk)logsSlice := strings.Split(logs, "\n")stringPool.Put(logs)chunkSize := 300n := len(logsSlice)noOfThread := n / chunkSizeif n%chunkSize != 0 {noOfThread++}for i := 0; i < (noOfThread); i++ {wg2.Add(1)go func(s int, e int) {defer wg2.Done() //to avaoid deadlocksfor i := s; i < e; i++ {text := logsSlice[i]if len(text) == 0 {continue}logSlice := strings.SplitN(text, ",", 2)logCreationTimeString := logSlice[0]logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)if err != nil {fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)return}if logCreationTime.After(start) && logCreationTime.Before(end) {//fmt.Println(text)}}}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))}wg2.Wait()logsSlice = nil
}

自己实践

server/server.go

package mainimport ("bufio""fmt""net""os""sync"//"time"
)func SendFile(con net.Conn,fileName string){file, err := os.Open(fileName)if err != nil {fmt.Println("cannot able to read the file", err)return}defer file.Close() filestat, err := file.Stat()if err != nil {fmt.Println("Could not able to get the file stat")return}fileSize := filestat.Size()bufPool := sync.Pool{New: func() interface{} {buf := make([]byte, 100*1024*1024) //100M per block to memreturn buf}}r := bufio.NewReader(file)var wg sync.WaitGroupblock := 0var sendSize int = 0for {buf := bufPool.Get().([]byte)n, err := r.Read(buf)buf = buf[:n]if err != nil {fmt.Println(err)break}// nextUntillNewline, err := r.ReadBytes('\n')// if err != io.EOF {//     buf = append(buf, nextUntillNewline...)// }wg.Add(1) //just test wg not multi goroutinego func() {defer wg.Done()n1, err := con.Write(buf)if err != nil{fmt.Println(err)}else if n1 != n{fmt.Printf("send len n1 =%d != n %d\n",n1,n)}return}()wg.Wait()block++sendSize += nfmt.Printf("send block num = %d\n",block)}fmt.Printf("file size = %d ,send size = %d\n",fileSize,sendSize)}//处理客户端连接请求
func process(coon net.Conn) {defer coon.Close()//定义接收信息的字节数组var buf [1024]byte//读取数据n, err := coon.Read(buf[:])if err != nil {fmt.Println("获取信息失败,err:", err)return}filename := string(buf[:n])fmt.Printf("对方获取文件是:%s", filename)SendFile(coon, filename)fmt.Printf("发送文件文件完毕:%s", filename)
}//TCP服务端配置
func main() {//1:启用监听listener, err := net.Listen("tcp", "127.0.0.1:20000")//连接失败处理if err != nil {fmt.Println("启动服务失败,err:", err)return}//程序退出时释放端口defer listener.Close()for {conn, err := listener.Accept() //2.建立连接if err != nil {fmt.Println("接收客户连接失败,err:", err)continue}//3.启动一个人goroutine处理客户端连接go process(conn)}
}

client/client.go

package mainimport ("fmt""net""os""bufio""sync""time"
)//TCP客户端
func main() {args := os.Args[1:]if len(args) != 1 { // for format  clien fileNamefmt.Println("Please give proper command line arguments")return}fileName := args[0]f, err := os.Create(fileName) //创建文件if err != nil{fmt.Println("create file fail" + fileName)}defer f.Close()//1:拨号方式建立与服务端连接conn, err := net.Dial("tcp", "127.0.0.1:20000")if err != nil {fmt.Println("连接服务端失败,err:", err)return}//注意:关闭连接位置,不能写在连接失败判断上面defer conn.Close()//2:向服务器发送信息_, err = conn.Write([]byte(fileName))if err != nil {fmt.Println("发送信息失败,err:", err)return}s := time.Now()bufPool := sync.Pool{New: func() interface{} {buf := make([]byte, 100*1024*1024) //100M per block to memreturn buf}}w := bufio.NewWriterSize(f,100*1024*1024)block := 0var reveiveSize int = 0for {buf := bufPool.Get().([]byte)n, err := conn.Read(buf)if err != nil {fmt.Println("获取结束", err)break}buf = buf[:n]wn , err := w.Write(buf)if err != nil{fmt.Println(err)break}else if wn != n{fmt.Printf("write len wn =%d != n %d\n",wn,n)break}block++reveiveSize += nfmt.Printf("reveive block num = %d,size = %d\n",block,reveiveSize)}w.Flush()spend := time.Since(s)fmt.Printf("receive end size = %d \n",reveiveSize)fmt.Println("time spend ",spend)
}

测试2.4G本地go tcp 发送接收花费17秒

 


http://www.ppmy.cn/news/669576.html

相关文章

红米note2能刷机没显示无服务器,红米Note2怎么线刷刷机 红米Note2手机线刷机教程图解...

红米Note2怎么线刷刷机呢&#xff1f;红米Note2线刷机可以有效解决红米Note2变砖及出现死机、黑屏、频繁重启等多种问题&#xff0c;下面小编就带着大家的问题&#xff0c;来为大家整理介绍红米Note2线刷刷机教程及线刷包下载&#xff0c;供大家学习使用&#xff0c;还不会红米…

吐槽红米2

2月3日在小米官网上抢了红米2&#xff0c;过了两天到货&#xff0c;发现通话时对方听不到声音。当时还在淘宝上买了一个&#xff0c;那个正常。尝试了更新系统、恢复出厂设置、输入#检测都不行&#xff0c;百度一查&#xff0c;发现小米很多型号手机都存在这个问题&#xff0c;…

红米2 mobian(debian12)

流畅度可以与debian11不是一个级别 骁龙410 主线6.2内核 主频1Ghz 只有摄像不工作和声音有时候有点问题 第一次需开机手动扩容 resize2fs /dev/mmcblk0p50 resize2fs /dev/mmcblk1p50 果你的手机在插卡之后检测到卡但是没有信号&#xff0c;则你的手机型号是wt86047。 使…

STM32 UDS Bootloader开发-下位机篇-App软件

文章目录 前言链接文件APP软件检查预编程条件停止DTC设置禁止无关通讯定义APP_INFO调试输出hex处理 总结 前言 在之前的文章中&#xff0c;介绍了STM32 UDS Bootloader开发需求和bootloader软件开发的修改点。本文继续介绍APP软件关于UDS的部分。APP主要实现预编程阶段的部分内…

100vh

100vh 是一个 CSS 单位&#xff0c;表示视口&#xff08;浏览器窗口&#xff09;的高度。具体来说&#xff0c;它表示相对于视口高度的百分比&#xff0c;其中 vh 表示视口高度的1%。 举个例子&#xff0c;如果浏览器窗口的高度为 800 像素&#xff0c;那么 100vh 就等于 800 …

RK3568-SARADC

RK3568-SARADC RK3568有8路SARADC配置设备树 内核常用接口 获取adc值: iio_read_channel_raw() 获取电压: iio_read_channel_processed()ADC常用接口 可以通过用户态接口获取adc值&#xff0c;其中*表示adc第多少通道: cat /sys/bus/iio/devices/iio\:device0/in_voltage*…

rk3399 注册drm 驱动

rk3399 基于component 框架&#xff0c;在probe阶段解析dts中各个设备的信息&#xff0c;加到componet match 列表中&#xff0c;等所有的设备加载完毕后&#xff0c;就会引发master设备的bind。 1.vop probe static int vop_probe(struct platform_device *pdev) {struct de…

MSP430F5529输出PWM

msp430f5529产生4路PWM波&#xff0c;改变占空比从而改变电机(直流减速电机)的速度。驱动电机必然会用到驱动模块&#xff0c;接下来就介绍一下驱动模块——L298N L298N驱动 若要对直流电机进行PWM调速&#xff0c;需设置IN1和IN2,即确定电机的转动方向&#xff1b;然后对使能…