人工智能

使用Go语言,25秒读取16GB文件

时间:2010-12-5 17:23:32  作者:IT科技类资讯   来源:系统运维  查看:  评论:0
内容摘要:当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,并且只能用于分析和解决故障。所以大部分公司倾向于将日志存储在文件中,而这些

 当今世界的使用任何计算机系统每天都会生成大量的日志或数据。随着系统的秒读发展,将调试数据存储到数据库中是使用不可行的,因为它们是秒读不可变的,并且只能用于分析和解决故障。使用所以大部分公司倾向于将日志存储在文件中,秒读而这些文件通常位于本地磁盘中。使用

我们将使用Go语言,秒读从一个大小为16GB的使用.txt或.log文件中提取日志。

让我们开始编码……

首先,秒读我们打开文件。使用对于任何文件的秒读IO,我们都将使用标准的使用Go os.File。 

f,秒读 err := os.Open(fileName)   if err != nil {      fmt.Println("cannot able to read the file", err)     return   }  // UPDATE: close after checking error  defer file.Close()  //Do not forget to close the file 

打开文件后,我们有以下两个选项可以选择:

逐行读取文件,使用这有助于减少内存紧张,源码下载但需要更多的时间。一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。

由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。

但你猜怎么着,还有第三种选择。瞧……相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。 

r := bufio.NewReader(f)  for {   buf := make([]byte,4*1024) //the chunk size  n, err := r.Read(buf) //loading chunk into buffer     bufbuf = buf[:n]  if n == 0 {           if err != nil {          fmt.Println(err)         break       }       if err == io.EOF {          break       }       return err    }  } 

一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为: 

//sync pools to reuse the memory and decrease the preassure on Garbage Collector  linesPool := sync.Pool{ New: func() interface{ } {           lines := make([]byte, 500*1024)          return lines  }}  stringPool := sync.Pool{ New: func() interface{ } {             lines := ""            return lines  }}  slicePool := sync.Pool{ New: func() interface{ } {              lines := make([]string, 100)             return lines  }}  r := bufio.NewReader(f)  var wg sync.WaitGroup //wait group to keep track off all threads  for {         buf := linesPool.Get().([]byte)       n, err := r.Read(buf)       bufbuf = buf[:n]  if n == 0 {           if err != nil {               fmt.Println(err)              break          }          if err == io.EOF {               break          }          return err       }  nextUntillNewline, err := r.ReadBytes(\n)//read entire line          if err != io.EOF {            buf = append(buf, nextUntillNewline...)       }          wg.Add(1)       go func() {                //process each chunk concurrently          //start -> log start time, end -> log end time           ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end) wg.Done()           }()  }  wg.Wait()  } 

上面的代码,引入了两个优化点:

sync.Pool是亿华云计算一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。

现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。 

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n 

我们将根据命令行提供的时间戳提取日志。 

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {   //another wait group to process every chunk further                                   var wg2 sync.WaitGroup  logs := stringPool.Get().(string)  logs = string(chunk)  linesPool.Put(chunk) //put back the chunk in pool  //split the string by "\n", so that we have slice of logs        logsSlice := strings.Split(logs, "\n")  stringPool.Put(logs) //put back the string pool  chunkSize := 100 //process the bunch of 100 logs in thread  n := len(logsSlice)  noOfThread := n / chunkSize  if n%chunkSize != 0 {  //check for overflow            noOfThread++        }  length := len(logsSlice)  //traverse the chunk       for i := 0; i < length; i += chunkSize {                    wg2.Add(1) //process each chunk in saperate chunk           go func(s int, e int) {               for i:= s; i<e;i++{                  text := logsSlice[i]  if len(text) == 0 {                     continue                 }                        logParts := strings.SplitN(text, ",", 2)              logCreationTimeString := logParts[0]              logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)  if err != nil {                    fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)                   return              }  // check if logs timestamp is inbetween our desired period            if logCreationTime.After(start) && logCreationTime.Before(end) {                        fmt.Println(text)             }          }          textSlice = nil          wg2.Done()           }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))     //passing the indexes for processing  }       wg2.Wait() //wait for a chunk to finish     logsSlice = nil  } 

对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。

完整的代码示例如下: 

func main() {    s := time.Now()   args := os.Args[1:]   if len(args) != 6 {  // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"    fmt.Println("Please give proper command line arguments")    return   }   startTimeArg := args[1]   finishTimeArg := args[3]   fileName := args[5]   file, err := os.Open(fileName)  if err != nil {     fmt.Println("cannot able to read the file", err)    return   }  defer file.Close() //close after checking err  queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)   if err != nil {     fmt.Println("Could not able to parse the start time", startTimeArg)    return   }  queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)   if err != nil {     fmt.Println("Could not able to parse the finish time", finishTimeArg)    return   }   filestat, err := file.Stat()   if err != nil {     fmt.Println("Could not able to get the file stat")    return   }   fileSize := filestat.Size()   offset := fileSize - 1   lastLineSize := 0  for {     b := make([]byte, 1)    n, err := file.ReadAt(b, offset)    if err != nil {      fmt.Println("Error reading file ", err)     break    }    char := string(b[0])    if char == "\n" {      break    }    offset--    lastLineSize += n   }   lastLine := make([]byte, lastLineSize)   _, err = file.ReadAt(lastLine, offset+1)   if err != nil {     fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)    return  }   logSlice := strings.SplitN(string(lastLine), ",", 2)   logCreationTimeString := logSlice[0]   lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)   if err != nil {     fmt.Println("can not able to parse time : ", err)   }   if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {     Process(file, queryStartTime, queryFinishTime)   }   fmt.Println("\nTime taken - ", time.Since(s))  }   func Process(f *os.File, start time.Time, end time.Time) error {    linesPool := sync.Pool{ New: func() interface{ } {     lines := make([]byte, 250*1024)    return lines   }}   stringPool := sync.Pool{ New: func() interface{ } {     lines := ""    return lines   }}   r := bufio.NewReader(f)   var wg sync.WaitGroup   for {     buf := linesPool.Get().([]byte)    n, err := r.Read(buf)    bufbuf = buf[:n]    if n == 0 {      if err != nil {       fmt.Println(err)      break     }     if err == io.EOF {       break     }     return err    }     nextUntillNewline, err := r.ReadBytes(\n)    if err != io.EOF {      buf = append(buf, nextUntillNewline...)    }    wg.Add(1)    go func() {      ProcessChunk(buf, &linesPool, &stringPool, start, end)     wg.Done()    }()    }   wg.Wait()   return nil  }   func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {    var wg2 sync.WaitGroup   logs := stringPool.Get().(string)   logs = string(chunk)    linesPool.Put(chunk)   logsSlice := strings.Split(logs, "\n")   stringPool.Put(logs)   chunkSize := 300   n := len(logsSlice)   noOfThread := n / chunkSize  if n%chunkSize != 0 {     noOfThread++   }   for i := 0; i < (noOfThread); i++ {     wg2.Add(1)    go func(s int, e int) {      defer wg2.Done() //to avaoid deadlocks     for i := s; i < e; i++ {       text := logsSlice[i]      if len(text) == 0 {        continue      }      logSlice := strings.SplitN(text, ",", 2)      logCreationTimeString := logSlice[0]      logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)      if err != nil {        fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)       return      }       if logCreationTime.After(start) && logCreationTime.Before(end)       //fmt.Println(text)      }     }     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))   }   wg2.Wait()  logsSlice = nil  }  
copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap