From: sgf Date: Wed, 5 Oct 2022 17:43:51 +0000 (+0300) Subject: new(balancer): Working version of balancer from "Concurrency is not Parallelism ... X-Git-Url: https://gitweb.sgf-dma.tk/?a=commitdiff_plain;h=bbcf947ae27a12f1b4e89d5e8eec9ee0f7617a73;p=go.git new(balancer): Working version of balancer from "Concurrency is not Parallelism " talk. --- diff --git a/concurrency-is-not-parallelism/balancer.go b/concurrency-is-not-parallelism/balancer.go new file mode 100644 index 0000000..809ef59 --- /dev/null +++ b/concurrency-is-not-parallelism/balancer.go @@ -0,0 +1,137 @@ + +package main + +import ( + "fmt" + "time" + "math/rand" + "container/heap" +) + +type Request struct { + fn func() int + c chan int +} + +var nWorker int = 4 + +func workFn(t int) int { + fmt.Println("workFn", t) + return t +} + +func furtherProcess(result int) { + fmt.Println("furtherProcess", result) +} + +func requester(work chan<- Request) { + c := make(chan int) + for { + t := rand.Intn(5) + fmt.Println("requester", t) + time.Sleep(time.Duration(t) * time.Second) + work <- Request{func () int { return workFn(t) }, c} + fmt.Println("requester", t, ": request sent, waiting for reply") + result := <-c + furtherProcess(result) + } +} + +type Worker struct { + requests chan Request + pending int + index int +} + +func (w *Worker) work(done chan *Worker) { + fmt.Println("work started") + for { + req := <- w.requests + req.c <- req.fn() + done <- w + } +} + +type Pool []*Worker + +type Balancer struct { + pool Pool + done chan *Worker +} + +func (b *Balancer) balance(work chan Request) { + timeout := time.After(10 * time.Second) +out: + for { + fmt.Println("\nbalance LOOP") + select { + case <-timeout: + fmt.Println("Balancer time is out") + break out + default: + select { + case req := <-work: + fmt.Println("balance received request") + b.dispatch(req) + case w := <- b.done: + b.completed(w) + } + } + } +} + +func (p Pool) Len() int { return len(p) } +func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p Pool) Less(i, j int) bool { + return p[i].pending < p[j].pending +} + +func (p *Pool) Push(i interface{}) { + w := i.(*Worker) + w.index = len(*p) + *p = append(*p, w) +} + +func (p *Pool) Pop() interface{} { + old := *p + n := len(old) + w := old[n-1] + w.index = -1 + old[n-1] = nil + *p = old[:n-1] + return w +} + +func (b *Balancer) dispatch(req Request) { + w := heap.Pop(&b.pool).(*Worker) + fmt.Println("dispatch to", w) + go w.work(b.done) + w.requests <- req + w.pending++ + heap.Push(&b.pool, w) +} + +func (b *Balancer) completed(w *Worker) { + w.pending-- + heap.Remove(&b.pool, w.index) + heap.Push(&b.pool, w) +} + +func main() { + work := make(chan Request) + + pool := make(Pool, nWorker) + for i := 0; i < nWorker; i++ { + requests := make(chan Request) + pool[i] = &Worker{requests, 0, i} + } + heap.Init(&pool) + //for i, val := range pool { + // fmt.Println(i, val.pending, val.index) + //} + + done := make(chan *Worker) + b := Balancer {pool, done} + go requester(work) + b.balance(work) +}