From 3cdf66bedf9a0ea83b1d07b4cd728846f0708142 Mon Sep 17 00:00:00 2001 From: sgf Date: Wed, 2 Nov 2022 19:54:57 +0300 Subject: [PATCH] new(balancer): Start several Generate() goroutines and generate more requests, than workers available. --- .../balancer_v2/balancer/balancer.go | 11 ++++++-- .../balancer/requester/requester.go | 26 ++++++++++++------- .../balancer_v2/balancer_v2.go | 9 +++++-- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go index 3182b6e..46ea317 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go @@ -95,7 +95,7 @@ type Balancer struct { func (b *Balancer) Balance(work <-chan Requester) { fmt.Println("balance(): Starting workers..") for _, w := range b.pool { - go w.runner.Work(time.Now().Format("05.000"), b.done) + go w.runner.Work(time.Now().Format("05.0"), b.done) time.Sleep(time.Second) } @@ -112,7 +112,13 @@ out: select { case req := <-work: fmt.Printf("balance(): received '%v'\n", req) - b.dispatch(req) + // FIXME: Starting dispatch() as goroutine will fix deadlock, + // when there're now free worker. But now Balance() will spawn + // infinite number of dispatch() gophers, which will block + // until there will be free worker. This is probably wrong: + // requester should block, if request can't be processed, but + // not infinite number of dispatch() gophers are spawned. + go b.dispatch(req) case done := <- b.done: b.completed(done) } @@ -128,6 +134,7 @@ func (b *Balancer) dispatch(req Requester) { fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w) w.runner.Send(req) fmt.Printf("dispatch() '%v': sent to %v\n", req, w) + fmt.Println("pool", b.pool) } // completed re-inserts worker, which finished request, into proper place in the pool. diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go index 53128e3..745a77f 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go @@ -20,12 +20,13 @@ func (r reqResult) String() string { type request struct { reqFn func() int // what to compute for this particular request. + id string // id of Generate(), which produced this request num int // Used for request identification. c chan reqResult // channel for obtaining results back from worker. } func (r request) String() string { - return fmt.Sprintf("R %.2d", r.num) + return fmt.Sprintf("R %.2d [%v]", r.num, r.id) } var _ balancer.Requester = (*request)(nil) @@ -43,27 +44,32 @@ func furtherProcess(r reqResult) { fmt.Printf("%v: further processing result %v..\n", r.request, r.int) } -func Generate(work chan<- balancer.Requester) { +func Generate(id string, work chan<- balancer.Requester) { c := make(chan reqResult) reqNum := 0 - t := rand.Intn(5) - fmt.Printf("Generate(): sleeping for %v secs\n", t) + t := rand.Intn(3) + fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t) timeout := time.After(time.Duration(t) * time.Second) for { select { case <- timeout: - req := &request{num: reqNum, c: c} - f := func () int { fmt.Printf("... computing %v\n", req); return req.num } + req := &request{num: reqNum, id: id, c: c} + f := func () int { + t := rand.Intn(10) + fmt.Printf("... computing %v for %v secs\n", req, t) + time.Sleep(time.Duration(t) * time.Second) + return req.num + } req.reqFn = f - fmt.Printf("Generate(): sending '%v'\n", req) + fmt.Printf("Generate() [%v]: sending '%v'\n", id, req) work <- req - t := rand.Intn(5) - fmt.Printf("Generate(): sleeping for %v secs\n", t) + t := rand.Intn(3) + fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t) timeout = time.After(time.Duration(t) * time.Second) reqNum++ case result := <-c: - fmt.Printf("Generate(): received '%v' result %v\n", result.request, result.int) + fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int) furtherProcess(result) } } diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go b/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go index d92d138..368e02c 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go @@ -2,6 +2,7 @@ package main import ( + "time" "balancer" "balancer/worker" "balancer/requester" @@ -9,7 +10,8 @@ import ( // FIXME: Add buffers to channels. What changes? -var nWorker int = 4 +var nWorker int = 4 // How many workers to start. +var nGenerator int = 2 // How many request generators to start. func main() { work := make(chan balancer.Requester) @@ -20,7 +22,10 @@ func main() { workers[i] = &worker.Runner{Requests: c} } b := balancer.InitBalancer(workers) - go requester.Generate(work) + for i := 0; i < nGenerator; i++ { + go requester.Generate(time.Now().Format("05.0"), work) + time.Sleep(time.Second) + } b.Balance(work) } -- 2.20.1