From 64eae0303e2c80f78d7fba755015b57197228406 Mon Sep 17 00:00:00 2001 From: sgf Date: Mon, 21 Nov 2022 16:22:25 +0300 Subject: [PATCH] fix(balancer): Do not spawn dispatch gophers indefinitely. --- .../balancer_v2/balancer/balancer.go | 38 ++++++----- .../balancer/requester/requester.go | 63 +++++++++++++------ .../balancer_v2/balancer/worker/worker.go | 13 +++- 3 files changed, 75 insertions(+), 39 deletions(-) diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go index 46ea317..7e70326 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go @@ -32,9 +32,10 @@ type WorkRunner interface { InitRunner(WorkerId) // Initialize runner with id inside balancer. Work(string, chan <- *Done) // Start runner. Send(Requester) // Send request to WorkerRunner. + MaxRequests() int } -// worker as seen by balancer. +// worker is a balancer's view of WorkRunner. // WorkRunner implementation does not important for balancer. It only needs a // way to sort workers by the number of pending tasks. type worker struct { @@ -108,20 +109,15 @@ out: case <-timeout: fmt.Println("balance(): your time is out.") break out - default: - select { - case req := <-work: - fmt.Printf("balance(): received '%v'\n", req) - // FIXME: Starting dispatch() as goroutine will fix deadlock, - // when there're now free worker. But now Balance() will spawn - // infinite number of dispatch() gophers, which will block - // until there will be free worker. This is probably wrong: - // requester should block, if request can't be processed, but - // not infinite number of dispatch() gophers are spawned. - go b.dispatch(req) - case done := <- b.done: - b.completed(done) + case req := <-work: + fmt.Printf("balance(): received '%v'\n", req) + if !b.hasFreeWorkers() { + fmt.Printf("balance(): No free workers, waiting for one to finish..\n") + b.completed(<-b.done) } + b.dispatch(req) + case done := <- b.done: + b.completed(done) } } } @@ -147,14 +143,22 @@ func (b *Balancer) completed(done *Done) { fmt.Println("pool", b.pool) } +func (b *Balancer) hasFreeWorkers() bool { + for _, w := range b.pool { + if w.runner.MaxRequests() > w.pending{ + return true + } + } + return false +} + // InitBalancer initializes balancer with provided workers. func InitBalancer(runners []WorkRunner) *Balancer { n := len(runners) pool := make(pool, n) for i := 0; i < n; i++ { - w := worker{num: i, pending: 0, index: i} - runners[i].InitRunner(WorkerId{&w}) - w.runner = runners[i] + w := worker{runner: runners[i], num: i, pending: 0, index: i} + w.runner.InitRunner(WorkerId{&w}) pool[i] = &w } heap.Init(&pool) diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go index 745a77f..bed1346 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go @@ -20,13 +20,13 @@ func (r reqResult) String() string { type request struct { reqFn func() int // what to compute for this particular request. - id string // id of Generate(), which produced this request + origin string // id of Generate(), which produced this request num int // Used for request identification. c chan reqResult // channel for obtaining results back from worker. } func (r request) String() string { - return fmt.Sprintf("R %.2d [%v]", r.num, r.id) + return fmt.Sprintf("R %.2d [%v]", r.num, r.origin) } var _ balancer.Requester = (*request)(nil) @@ -46,31 +46,56 @@ func furtherProcess(r reqResult) { func Generate(id string, work chan<- balancer.Requester) { c := make(chan reqResult) - reqNum := 0 + + genReq := func (n int) balancer.Requester { + req := &request{num: n, origin: id, c: c} + f := func () int { + t := rand.Intn(10) + fmt.Printf("... computing %v within %v secs\n", req, t) + time.Sleep(time.Duration(t) * time.Second) + fmt.Printf("... computing %v done\n", req) + return req.num + } + req.reqFn = f + return req + } + t := rand.Intn(3) - fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t) timeout := time.After(time.Duration(t) * time.Second) + fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t) + + reqNum := 0 + req := genReq(reqNum) + ping := time.After(time.Second) for { select { - case <- timeout: - req := &request{num: reqNum, id: id, c: c} - f := func () int { - t := rand.Intn(10) - fmt.Printf("... computing %v for %v secs\n", req, t) - time.Sleep(time.Duration(t) * time.Second) - return req.num + // FIXME: Use fanIn to merge timeout with work channels. + case <-timeout: + select { + case work <-req: + + fmt.Printf("Generate() [%v]: sent '%v'\n", id, req) + reqNum++ + req = genReq(reqNum) + + t := rand.Intn(3) + timeout = time.After(time.Duration(t) * time.Second) + fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req) + + case result := <-c: + fmt.Printf("Generate() [%v]: received '%v' result %v while waiting at '%v'\n", id, result.request, result.int, req) + furtherProcess(result) + case <-ping: + fmt.Printf("Generate() [%v]: pong while waiting at '%v'..\n", id, req) + ping = time.After(time.Second) } - req.reqFn = f - fmt.Printf("Generate() [%v]: sending '%v'\n", id, req) - work <- req - - t := rand.Intn(3) - fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t) - timeout = time.After(time.Duration(t) * time.Second) - reqNum++ + case result := <-c: fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int) furtherProcess(result) + case <-ping: + fmt.Printf("Generate() [%v]: pong..\n", id) + ping = time.After(time.Second) } } } diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go b/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go index 5ce0293..437a558 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go @@ -26,7 +26,7 @@ func (w *Runner) Work(id string, done chan<- *balancer.Done) { t := rand.Intn(5) fmt.Printf(" %v [%v]: sleeping for %v secs before begin..\n", w, id, t) time.Sleep(time.Duration(t) * time.Second) - timeout := time.After(time.Second) + ping := time.After(time.Second) for { select { case req := <- w.Requests: @@ -35,9 +35,9 @@ func (w *Runner) Work(id string, done chan<- *balancer.Done) { fmt.Printf(" %v [%v]: finished '%v'\n", w, id, req) done <- &balancer.Done{w.id, req} fmt.Printf(" %v [%v]: done '%v'\n", w, id, req) - case <-timeout: + case <-ping: fmt.Printf(" %v [%v]: i'm still here..\n", w.id, id) - timeout = time.After(time.Second) + ping = time.After(time.Second) } } } @@ -46,3 +46,10 @@ func (w *Runner) Send(req balancer.Requester) { w.Requests <- req } +// Unbuffered channel has cap 0. +func (w *Runner) MaxRequests() int { + if cap(w.Requests) == 0 { + return 1 + } + return cap(w.Requests) +} -- 2.20.1