From 13f2a52fa36194da86f569852b31eeff092d132d Mon Sep 17 00:00:00 2001 From: sgf Date: Mon, 21 Nov 2022 20:20:23 +0300 Subject: [PATCH] fix(balancer): Send requests with given interval properly. --- .../balancer/requester/requester.go | 55 +++++++++++-------- .../balancer_v2/balancer_v2.go | 6 +- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go index bed1346..7f6629f 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go @@ -44,7 +44,26 @@ func furtherProcess(r reqResult) { fmt.Printf("%v: further processing result %v..\n", r.request, r.int) } -func Generate(id string, work chan<- balancer.Requester) { +type tuple struct { + a interface{} + b interface{} +} + +func sendDelay(waitBefore func (balancer.Requester) <-chan time.Time, work chan<- balancer.Requester) chan<- balancer.Requester { + c := make(chan balancer.Requester) + + go func() { + for { + req := <-c + <-waitBefore(req) + work<- req + } + }() + + return c +} + +func Generate(id string, wCh chan<- balancer.Requester) { c := make(chan reqResult) genReq := func (n int) balancer.Requester { @@ -60,39 +79,27 @@ func Generate(id string, work chan<- balancer.Requester) { return req } - t := rand.Intn(3) - timeout := time.After(time.Duration(t) * time.Second) - fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t) + waitBefore := func (req balancer.Requester) <-chan time.Time { + t := rand.Intn(3) + fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req) + return time.After(time.Duration(t) * time.Second) + } reqNum := 0 req := genReq(reqNum) + work := sendDelay(waitBefore, wCh) ping := time.After(time.Second) for { select { - // FIXME: Use fanIn to merge timeout with work channels. - case <-timeout: - select { - case work <-req: - - fmt.Printf("Generate() [%v]: sent '%v'\n", id, req) - reqNum++ - req = genReq(reqNum) - - t := rand.Intn(3) - timeout = time.After(time.Duration(t) * time.Second) - fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req) - - case result := <-c: - fmt.Printf("Generate() [%v]: received '%v' result %v while waiting at '%v'\n", id, result.request, result.int, req) - furtherProcess(result) - case <-ping: - fmt.Printf("Generate() [%v]: pong while waiting at '%v'..\n", id, req) - ping = time.After(time.Second) - } + case work<- req: + fmt.Printf("Generate() [%v]: sent '%v'\n", id, req) + reqNum++ + req = genReq(reqNum) case result := <-c: fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int) furtherProcess(result) + case <-ping: fmt.Printf("Generate() [%v]: pong..\n", id) ping = time.After(time.Second) diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go b/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go index 368e02c..5c404ae 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go @@ -8,17 +8,15 @@ import ( "balancer/requester" ) -// FIXME: Add buffers to channels. What changes? - var nWorker int = 4 // How many workers to start. var nGenerator int = 2 // How many request generators to start. func main() { - work := make(chan balancer.Requester) + work := make(chan balancer.Requester, 3) workers := make([]balancer.WorkRunner, nWorker) for i := 0; i < nWorker; i++ { - c := make(chan balancer.Requester) + c := make(chan balancer.Requester, 2) workers[i] = &worker.Runner{Requests: c} } b := balancer.InitBalancer(workers) -- 2.20.1