From 9b60e375daa8d61395e6a41e0ec40085a32f59c0 Mon Sep 17 00:00:00 2001 From: sgf Date: Thu, 6 Oct 2022 15:53:45 +0300 Subject: [PATCH] chg(balancer): Add verbose print for tracking execution. --- concurrency-is-not-parallelism/balancer.go | 125 +++++++++++++-------- 1 file changed, 77 insertions(+), 48 deletions(-) diff --git a/concurrency-is-not-parallelism/balancer.go b/concurrency-is-not-parallelism/balancer.go index 809ef59..f7ad161 100644 --- a/concurrency-is-not-parallelism/balancer.go +++ b/concurrency-is-not-parallelism/balancer.go @@ -10,76 +10,69 @@ import ( type Request struct { fn func() int + num int // Used for request identification. c chan int } -var nWorker int = 4 +func (r Request) String() string { + return fmt.Sprintf("R %.2d", r.num) +} -func workFn(t int) int { - fmt.Println("workFn", t) - return t +func workFn(r Request) int { + fmt.Printf("%v: computing (workFn).. (%p)\n", r, r.fn) + return r.num } -func furtherProcess(result int) { - fmt.Println("furtherProcess", result) +func furtherProcess(r Request, result int) { + fmt.Printf("%v: further processing result %v..\n", r, result) } func requester(work chan<- Request) { c := make(chan int) + reqNum := 0 for { t := rand.Intn(5) - fmt.Println("requester", t) + fmt.Printf("requester(): sleeping for %v secs\n", t) time.Sleep(time.Duration(t) * time.Second) - work <- Request{func () int { return workFn(t) }, c} - fmt.Println("requester", t, ": request sent, waiting for reply") + + req := &Request{num: reqNum, c: c} + fn := func () int { return workFn(*req) } + req.fn = fn + work <- *req + fmt.Printf("requester() '%v': sending\n", req) result := <-c - furtherProcess(result) + fmt.Printf("requester() '%v': received result %v\n", req, result) + furtherProcess(*req, result) + reqNum++ } } type Worker struct { requests chan Request pending int - index int + num int // Used for worker identification. + index int // Used by heap. } -func (w *Worker) work(done chan *Worker) { - fmt.Println("work started") +func (w Worker) String() string { + return fmt.Sprintf("W %.2d", w.num) +} + +func (w *Worker) work(done chan *Done) { + t := rand.Intn(5) + fmt.Printf("%v: sleeping for %v secs before begin..\n", w, t) + time.Sleep(time.Duration(t) * time.Second) for { req := <- w.requests + fmt.Printf("%v: start computing '%v'\n", w, req) req.c <- req.fn() - done <- w + done <- &Done{w, req} + fmt.Printf("%v: finished '%v'\n", w, req) } } type Pool []*Worker -type Balancer struct { - pool Pool - done chan *Worker -} - -func (b *Balancer) balance(work chan Request) { - timeout := time.After(10 * time.Second) -out: - for { - fmt.Println("\nbalance LOOP") - select { - case <-timeout: - fmt.Println("Balancer time is out") - break out - default: - select { - case req := <-work: - fmt.Println("balance received request") - b.dispatch(req) - case w := <- b.done: - b.completed(w) - } - } - } -} - func (p Pool) Len() int { return len(p) } func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p Pool) Less(i, j int) bool { @@ -102,35 +95,71 @@ func (p *Pool) Pop() interface{} { return w } +type Done struct { + w *Worker + r Request +} + +type Balancer struct { + pool Pool + done chan *Done +} + +func (b *Balancer) balance(work chan Request) { + timeout := time.After(20 * time.Second) +out: + for { + fmt.Println() + fmt.Println("balance(): Starting loop..") + select { + case <-timeout: + fmt.Println("balance(): your time is out.") + break out + default: + select { + case req := <-work: + fmt.Printf("balance(): received '%v'\n", req) + b.dispatch(req) + case done := <- b.done: + b.completed(done) + } + } + } +} + func (b *Balancer) dispatch(req Request) { w := heap.Pop(&b.pool).(*Worker) - fmt.Println("dispatch to", w) + fmt.Printf("dispatch() '%v': sending '%v'\n", req, w) go w.work(b.done) - w.requests <- req w.pending++ heap.Push(&b.pool, w) + w.requests <- req + fmt.Printf("dispatch() '%v': sent\n", req) } -func (b *Balancer) completed(w *Worker) { - w.pending-- - heap.Remove(&b.pool, w.index) - heap.Push(&b.pool, w) +func (b *Balancer) completed(done *Done) { + fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r) + done.w.pending-- + heap.Remove(&b.pool, done.w.index) + heap.Push(&b.pool, done.w) } +var nWorker int = 4 + func main() { work := make(chan Request) pool := make(Pool, nWorker) for i := 0; i < nWorker; i++ { requests := make(chan Request) - pool[i] = &Worker{requests, 0, i} + pool[i] = &Worker{requests, 0, i, i} } heap.Init(&pool) //for i, val := range pool { // fmt.Println(i, val.pending, val.index) //} - done := make(chan *Worker) + done := make(chan *Done) b := Balancer {pool, done} go requester(work) b.balance(work) -- 2.20.1