chg(balancer): Add verbose print for tracking execution.
authorsgf <sgf.dma@gmail.com>
Thu, 6 Oct 2022 12:53:45 +0000 (15:53 +0300)
committersgf <sgf.dma@gmail.com>
Thu, 6 Oct 2022 12:53:45 +0000 (15:53 +0300)
concurrency-is-not-parallelism/balancer.go

index 809ef59..f7ad161 100644 (file)
@@ -10,76 +10,69 @@ import (
 
 type Request struct {
     fn func() int
+    num int     // Used for request identification.
     c chan int
 }
 
-var nWorker int = 4
+func (r Request) String() string {
+    return fmt.Sprintf("R %.2d", r.num)
+}
 
-func workFn(t int) int {
-    fmt.Println("workFn", t)
-    return t
+func workFn(r Request) int {
+    fmt.Printf("%v: computing (workFn).. (%p)\n", r, r.fn)
+    return r.num
 }
 
-func furtherProcess(result int) {
-    fmt.Println("furtherProcess", result)
+func furtherProcess(r Request, result int) {
+    fmt.Printf("%v: further processing result %v..\n", r, result)
 }
 
 func requester(work chan<- Request) {
     c := make(chan int)
+    reqNum := 0
     for {
         t := rand.Intn(5)
-        fmt.Println("requester", t)
+        fmt.Printf("requester(): sleeping for %v secs\n", t)
         time.Sleep(time.Duration(t) * time.Second)
-        work <- Request{func () int { return workFn(t) }, c}
-        fmt.Println("requester", t, ": request sent, waiting for reply")
+
+        req := &Request{num: reqNum, c: c}
+        fn := func () int { return workFn(*req) }
+        req.fn = fn
+        work <- *req
+        fmt.Printf("requester() '%v': sending\n", req)
         result := <-c
-        furtherProcess(result)
+        fmt.Printf("requester() '%v': received result %v\n", req, result)
+        furtherProcess(*req, result)
+        reqNum++
     }
 }
 
 type Worker struct {
     requests chan Request
     pending int
-    index int
+    num int     // Used for worker identification.
+    index int   // Used by heap.
 }
 
-func (w *Worker) work(done chan *Worker) {
-    fmt.Println("work started")
+func (w Worker) String() string {
+    return fmt.Sprintf("W %.2d", w.num)
+}
+
+func (w *Worker) work(done chan *Done) {
+    t := rand.Intn(5)
+    fmt.Printf("%v: sleeping for %v secs before begin..\n", w, t)
+    time.Sleep(time.Duration(t) * time.Second)
     for {
         req := <- w.requests
+        fmt.Printf("%v: start computing '%v'\n", w, req)
         req.c <- req.fn()
-        done <- w
+        done <- &Done{w, req}
+        fmt.Printf("%v: finished '%v'\n", w, req)
     }
 }
 
 type Pool []*Worker
 
-type Balancer struct {
-    pool Pool
-    done chan *Worker
-}
-
-func (b *Balancer) balance(work chan Request) {
-    timeout := time.After(10 * time.Second)
-out:
-    for {
-        fmt.Println("\nbalance LOOP")
-        select {
-        case <-timeout:
-            fmt.Println("Balancer time is out")
-            break out
-        default:
-            select {
-            case req := <-work:
-                fmt.Println("balance received request")
-                b.dispatch(req)
-            case w := <- b.done:
-                b.completed(w)
-            }
-        }
-    }
-}
-
 func (p Pool) Len() int { return len(p) }
 func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
 func (p Pool) Less(i, j int) bool {
@@ -102,35 +95,71 @@ func (p *Pool) Pop() interface{} {
     return w
 }
 
+type Done struct {
+    w *Worker
+    r Request
+}
+
+type Balancer struct {
+    pool Pool
+    done chan *Done
+}
+
+func (b *Balancer) balance(work chan Request) {
+    timeout := time.After(20 * time.Second)
+out:
+    for {
+        fmt.Println()
+        fmt.Println("balance(): Starting loop..")
+        select {
+        case <-timeout:
+            fmt.Println("balance(): your time is out.")
+            break out
+        default:
+            select {
+            case req := <-work:
+                fmt.Printf("balance(): received '%v'\n", req)
+                b.dispatch(req)
+            case done := <- b.done:
+                b.completed(done)
+            }
+        }
+    }
+}
+
 func (b *Balancer) dispatch(req Request) {
     w := heap.Pop(&b.pool).(*Worker)
-    fmt.Println("dispatch to", w)
+    fmt.Printf("dispatch() '%v': sending '%v'\n", req, w)
     go w.work(b.done)
-    w.requests <- req
     w.pending++
     heap.Push(&b.pool, w)
+    w.requests <- req
+    fmt.Printf("dispatch() '%v': sent\n", req)
 }
 
-func (b *Balancer) completed(w *Worker) {
-    w.pending--
-    heap.Remove(&b.pool, w.index)
-    heap.Push(&b.pool, w)
+func (b *Balancer) completed(done *Done) {
+    fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r)
+    done.w.pending--
+    heap.Remove(&b.pool, done.w.index)
+    heap.Push(&b.pool, done.w)
 }
 
+var nWorker int = 4
+
 func main() {
     work := make(chan Request)
 
     pool := make(Pool, nWorker)
     for i := 0; i < nWorker; i++ {
         requests := make(chan Request)
-        pool[i] = &Worker{requests, 0, i}
+        pool[i] = &Worker{requests, 0, i, i}
     }
     heap.Init(&pool)
     //for i, val := range pool {
     //    fmt.Println(i, val.pending, val.index)
     //}
 
-    done := make(chan *Worker)
+    done := make(chan *Done)
     b := Balancer {pool, done}
     go requester(work)
     b.balance(work)