fix(balancer): Update worker index in heap during Swap() call.
authorsgf <sgf.dma@gmail.com>
Wed, 2 Nov 2022 16:26:39 +0000 (19:26 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 2 Nov 2022 16:29:21 +0000 (19:29 +0300)
concurrency-is-not-parallelism/balancer.go
concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go
concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go

index 2b5df4a..6642b18 100644 (file)
@@ -88,7 +88,11 @@ func (w *Worker) work(id string, done chan<- *Done) {
 type Pool []*Worker
 
 func (p Pool) Len() int { return len(p) }
-func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p Pool) Swap(i, j int) {
+    p[i], p[j] = p[j], p[i]
+    p[i].index = i
+    p[j].index = j
+}
 // FIXME: Does it works correctly? Or i need p[i] > p[j] ? The highest-pri
 // worker should be the last.
 func (p Pool) Less(i, j int) bool {
index a09a776..3182b6e 100644 (file)
@@ -46,7 +46,7 @@ type worker struct {
 }
 
 func (w worker) String() string {
-    return fmt.Sprintf("W %.2d (%v)", w.num, w.pending)
+    return fmt.Sprintf("W %.2d (%v [%v])", w.num, w.pending, w.index)
 }
 
 // Done message is sent by work runner to balancer, when request is completed.
@@ -59,7 +59,11 @@ type Done struct {
 type pool []*worker
 
 func (p pool) Len() int { return len(p) }
-func (p pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p pool) Swap(i, j int) {
+    p[i], p[j] = p[j], p[i]
+    p[i].index = i
+    p[j].index = j
+}
 func (p pool) Less(i, j int) bool {
     return p[i].pending < p[j].pending
 }
@@ -119,11 +123,11 @@ out:
 // dispatch sends requests to least loaded worker.
 func (b *Balancer) dispatch(req Requester) {
     w := heap.Pop(&b.pool).(*worker)
-    fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
     w.pending++
     heap.Push(&b.pool, w)
+    fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
     w.runner.Send(req)
-    fmt.Printf("dispatch() '%v': sent\n", req)
+    fmt.Printf("dispatch() '%v': sent to %v\n", req, w)
 }
 
 // completed re-inserts worker, which finished request, into proper place in the pool.
@@ -133,6 +137,7 @@ func (b *Balancer) completed(done *Done) {
     w.pending--
     heap.Remove(&b.pool, w.index)
     heap.Push(&b.pool, w)
+    fmt.Println("pool", b.pool)
 }
 
 // InitBalancer initializes balancer with provided workers.
index e79bfbd..5ce0293 100644 (file)
@@ -32,8 +32,9 @@ func (w *Runner) Work(id string, done chan<- *balancer.Done) {
         case req := <- w.Requests:
             fmt.Printf("  %v [%v]: start computing '%v'\n", w, id, req)
             req.Send(req.Fn())
-            done <- &balancer.Done{w.id, req}
             fmt.Printf("  %v [%v]: finished '%v'\n", w, id, req)
+            done <- &balancer.Done{w.id, req}
+            fmt.Printf("  %v [%v]: done '%v'\n", w, id, req)
         case <-timeout:
             fmt.Printf("  %v [%v]: i'm still here..\n", w.id, id)
             timeout = time.After(time.Second)