From 184d7c4acb0527bbe91d29f8fde6f6b0e8d22f23 Mon Sep 17 00:00:00 2001 From: sgf Date: Wed, 2 Nov 2022 19:26:39 +0300 Subject: [PATCH] fix(balancer): Update worker index in heap during Swap() call. --- concurrency-is-not-parallelism/balancer.go | 6 +++++- .../balancer_v2/balancer/balancer.go | 13 +++++++++---- .../balancer_v2/balancer/worker/worker.go | 3 ++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/concurrency-is-not-parallelism/balancer.go b/concurrency-is-not-parallelism/balancer.go index 2b5df4a..6642b18 100644 --- a/concurrency-is-not-parallelism/balancer.go +++ b/concurrency-is-not-parallelism/balancer.go @@ -88,7 +88,11 @@ func (w *Worker) work(id string, done chan<- *Done) { type Pool []*Worker func (p Pool) Len() int { return len(p) } -func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p Pool) Swap(i, j int) { + p[i], p[j] = p[j], p[i] + p[i].index = i + p[j].index = j +} // FIXME: Does it works correctly? Or i need p[i] > p[j] ? The highest-pri // worker should be the last. func (p Pool) Less(i, j int) bool { diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go index a09a776..3182b6e 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go @@ -46,7 +46,7 @@ type worker struct { } func (w worker) String() string { - return fmt.Sprintf("W %.2d (%v)", w.num, w.pending) + return fmt.Sprintf("W %.2d (%v [%v])", w.num, w.pending, w.index) } // Done message is sent by work runner to balancer, when request is completed. @@ -59,7 +59,11 @@ type Done struct { type pool []*worker func (p pool) Len() int { return len(p) } -func (p pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p pool) Swap(i, j int) { + p[i], p[j] = p[j], p[i] + p[i].index = i + p[j].index = j +} func (p pool) Less(i, j int) bool { return p[i].pending < p[j].pending } @@ -119,11 +123,11 @@ out: // dispatch sends requests to least loaded worker. func (b *Balancer) dispatch(req Requester) { w := heap.Pop(&b.pool).(*worker) - fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w) w.pending++ heap.Push(&b.pool, w) + fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w) w.runner.Send(req) - fmt.Printf("dispatch() '%v': sent\n", req) + fmt.Printf("dispatch() '%v': sent to %v\n", req, w) } // completed re-inserts worker, which finished request, into proper place in the pool. @@ -133,6 +137,7 @@ func (b *Balancer) completed(done *Done) { w.pending-- heap.Remove(&b.pool, w.index) heap.Push(&b.pool, w) + fmt.Println("pool", b.pool) } // InitBalancer initializes balancer with provided workers. diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go b/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go index e79bfbd..5ce0293 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go @@ -32,8 +32,9 @@ func (w *Runner) Work(id string, done chan<- *balancer.Done) { case req := <- w.Requests: fmt.Printf(" %v [%v]: start computing '%v'\n", w, id, req) req.Send(req.Fn()) - done <- &balancer.Done{w.id, req} fmt.Printf(" %v [%v]: finished '%v'\n", w, id, req) + done <- &balancer.Done{w.id, req} + fmt.Printf(" %v [%v]: done '%v'\n", w, id, req) case <-timeout: fmt.Printf(" %v [%v]: i'm still here..\n", w.id, id) timeout = time.After(time.Second) -- 2.20.1