Process huge log files
Recently, I was faced with a situation where I had to process huge (in order of several gigabytes) log files.
The log files consisted of either CSV format or AWS ALB logs format
Here the process means, I had to process each row to either filter it or store the valuable information from it for later analysis.
First, I attempted with python to parse it with the simple logic
file_name = home + '/Downloads/huge_file.csv'
filtered_rows = []
with open('./filtered.csv', 'w', newline='') as csvfile:
dict_writer = csv.DictWriter(csvfile, ["client_ip", "target_processing_time", "target_status_code", "received_bytes", "request_url", "request_creation_time", "ssl_protocol", "trace_id" ])
with open(file_name, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
line_count = 0
for row in csv_reader:
# Do some processing
filtered_rows.append(obj)
dict_writer.writerow(obj)
The above program reads the CSV line by line and optionally writes few columns of it to another file.
This was done on a 12 GB CSV file and it took 4-5 hours.
I was not satisfied with it as I had to do multiple iterations on multiple such files. There had to be a better way.
Turned to Go due to its concurrency features
There is a encoding/csv package in go which can help read the CSV files. However, it is single threaded and does not make use of multiple goroutines.
Came across another package github.com/actforgood/bigcsvreader which can help read the big CSV files with the use of multiple routines.
Quickly wrote a program which reads the bunch of huge CSV files in a folder and applies processing logic to it. Below is the sample snippet:
The below program reads the folder of log files and processes each one after another. However, since the reading is done using multiple goroutines, the below program can process 12 GB of ALB log file in matter of few seconds. The log files in my case was the list of ALB log files from AWS. Note that ALB log files are space character separated.
Also, if the files are huge, we need to increase the buffer space in library so that its heap memory does not runs out.
bigCSV.ColumnsDelimiter = ' '
bigCSV.BufferSize = 81920
The processRow()
function receives each row and based on the process / filtering criteria publishes the data of type ALBLog
on the resultChan
channel.
There is another inline goroutine which reads all the data from resultChan
channel and writes it to filtered.csv
package main
import (
"context"
"encoding/csv"
"fmt"
"log"
"os"
"strings"
"sync"
"sync/atomic"
"github.com/actforgood/bigcsvreader"
"net/url"
)
const noOfColumns = 29
var rowCount = 0
type ALBLog struct {
ClientIP string
TargetProcessingTime string
TargetStatusCode string
ReceivedBytes string
RequestUrl string
RequestCreationTime string
SSLProtocol string
TraceID string
RawRow []string
}
type count32 int32
func (c *count32) inc() int32 {
return atomic.AddInt32((*int32)(c), 1)
}
func (c *count32) get() int32 {
return atomic.LoadInt32((*int32)(c))
}
func (c *count32) reset() {
atomic.StoreInt32((*int32)(c), 0)
}
var counter count32
func main() {
homedir, _ := os.UserHomeDir()
csvLocation := homedir + "/logs/"
entries, err := os.ReadDir(csvLocation)
if err != nil {
log.Fatal(err)
}
var TotalProcessRows int32
var FilteredProcessedRows int
f, err := os.Create("filtered.csv")
w := csv.NewWriter(f)
w.Write([]string{"request_url", "target_processing_time"})
w.Flush()
f.Close()
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".log") {
totalRows, filteredRows := processFile(csvLocation, e.Name())
TotalProcessRows = TotalProcessRows + totalRows
FilteredProcessedRows = FilteredProcessedRows + filteredRows
}
}
fmt.Fprintf(os.Stdout, "Total Rows: %d Filtered Rows: %d", TotalProcessRows, FilteredProcessedRows)
}
func processFile(csvLocation string, fileName string) (int32, int) {
counter.reset()
// initialize the big csv reader
bigCSV := bigcsvreader.New()
bigCSV.SetFilePath(csvLocation + fileName)
bigCSV.ColumnsCount = noOfColumns
bigCSV.MaxGoroutinesNo = 16
bigCSV.BufferSize = 81920
bigCSV.FileHasHeader = true
bigCSV.ColumnsDelimiter = ' '
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
var wg sync.WaitGroup
var filteredRows int
// start multi-thread reading
rowsChans, errsChan := bigCSV.Read(ctx)
resultChan := make(chan ALBLog)
for i := 0; i < len(rowsChans); i++ {
wg.Add(1)
go rowWorker(rowsChans[i], &wg, resultChan)
}
wg.Add(1)
go errWorker(errsChan, &wg)
var writeCSVWaitGroup sync.WaitGroup
go func() {
writeCSVWaitGroup.Add(1)
var result []ALBLog
for filteredLog := range resultChan {
result = append(result, filteredLog)
}
fmt.Fprintf(os.Stdout, "%s Filtered rows %d\n", fileName, len(result))
filteredRows = len(result)
f, err := os.OpenFile("filtered.csv", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
defer f.Close()
if err != nil {
log.Fatalln("failed to open file", err)
}
w := csv.NewWriter(f)
defer w.Flush()
if err != nil {
log.Fatal(err)
}
for _, row := range result {
w.Write(row.RawRow)
}
writeCSVWaitGroup.Done()
}()
wg.Wait()
close(resultChan)
writeCSVWaitGroup.Wait()
fmt.Fprintf(os.Stdout, " %s Total rows: %d\n", fileName, counter.get())
return counter.get(), filteredRows
}
func rowWorker(rowsChan bigcsvreader.RowsChan, waitGr *sync.WaitGroup, resultChan chan<- ALBLog) {
for row := range rowsChan {
processRow(row, resultChan)
}
waitGr.Done()
}
func errWorker(errsChan bigcsvreader.ErrsChan, waitGr *sync.WaitGroup) {
for err := range errsChan {
handleError(err)
}
waitGr.Done()
}
// processRow can be used to implement business logic
// like validation / converting to a struct / persisting row into a storage.
func processRow(row []string, resultChan chan<- ALBLog) {
clientIP := strings.Split(row[3], ":")[0]
requestUrl := strings.Split(row[12], " ")[1]
urlObject, err := url.Parse(requestUrl)
counter.inc()
RawRow := []string {urlObject.Path, row[6]}
alblog := ALBLog{
ClientIP: clientIP,
TargetProcessingTime: row[6],
TargetStatusCode: row[9],
ReceivedBytes: row[10],
RequestUrl: row[12],
RequestCreationTime: row[21],
RawRow: RawRow,
}
if doSomeFiltering() {
resultChan <- alblog
}
}
// handleError handles the error.
// errors can be fatal like file does not exist, or row related like a given row could not be parsed, etc...
func handleError(err error) {
fmt.Println("error", err)
}