1
0
Fork 0
mirror of https://github.com/alice-lg/birdwatcher.git synced 2025-03-09 00:00:05 +01:00

implemented workers to improve full table parsing time

This commit is contained in:
Daniel Czerwonk 2018-01-28 20:41:43 +01:00
parent 37d5eeaa13
commit d6fc5998ee

View file

@ -6,13 +6,15 @@ import (
"regexp"
"strconv"
"strings"
"sync"
)
const workerPoolSize = 8
var (
ParserConf ParserConfig
regex struct {
lineSeperator *regexp.Regexp
status struct {
status struct {
startLine *regexp.Regexp
routerID *regexp.Regexp
currentServer *regexp.Regexp
@ -177,15 +179,107 @@ func parseSymbols(reader io.Reader) Parsed {
return Parsed{"symbols": res}
}
func parseRoutes(reader io.Reader) Parsed {
res := Parsed{}
routes := []Parsed{}
route := Parsed{}
type blockJob struct {
lines []string
position int
}
type blockParsed struct {
items []Parsed
position int
}
func parseRoutes(reader io.Reader) Parsed {
jobs := make(chan blockJob)
out := startRouteWorkers(jobs)
res := startRouteConsumer(out)
defer close(res)
pos := 0
block := []string{}
lines := newLineIterator(reader, true)
for lines.next() {
line := lines.string()
if line[0] != 32 && line[0] != 9 && len(block) > 0 {
jobs <- blockJob{block, pos}
pos++
block = []string{}
}
block = append(block, line)
}
if len(block) > 0 {
jobs <- blockJob{block, pos}
}
close(jobs)
return <-res
}
func startRouteWorkers(jobs chan blockJob) chan blockParsed {
out := make(chan blockParsed)
wg := &sync.WaitGroup{}
wg.Add(workerPoolSize)
go func() {
for i := 0; i < workerPoolSize; i++ {
go workerForRouteBlockParsing(jobs, out, wg)
}
wg.Wait()
close(out)
}()
return out
}
func startRouteConsumer(out <-chan blockParsed) chan Parsed {
res := make(chan Parsed)
go func() {
byBlock := map[int][]Parsed{}
count := 0
for r := range out {
count++
byBlock[r.position] = r.items
}
res <- Parsed{"routes": sortedSliceForRouteBlocks(byBlock, count)}
}()
return res
}
func sortedSliceForRouteBlocks(byBlock map[int][]Parsed, numBlocks int) []Parsed {
res := []Parsed{}
for i := 0; i < numBlocks; i++ {
routes, ok := byBlock[i]
if !ok {
continue
}
res = append(res, routes...)
}
return res
}
func workerForRouteBlockParsing(jobs <-chan blockJob, out chan<- blockParsed, wg *sync.WaitGroup) {
for j := range jobs {
parseRouteLines(j.lines, j.position, out)
}
wg.Done()
}
func parseRouteLines(lines []string, position int, ch chan<- blockParsed) {
route := Parsed{}
routes := []Parsed{}
for _, line := range lines {
if specialLine(line) {
continue
}
@ -233,8 +327,7 @@ func parseRoutes(reader io.Reader) Parsed {
routes = append(routes, route)
}
res["routes"] = routes
return res
ch <- blockParsed{routes, position}
}
func parseMainRouteDetail(groups []string, route Parsed) {