fix(balancer): Send requests with given interval properly.
authorsgf <sgf.dma@gmail.com>
Mon, 21 Nov 2022 17:20:23 +0000 (20:20 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 23 Nov 2022 14:23:47 +0000 (17:23 +0300)
concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go
concurrency-is-not-parallelism/balancer_v2/balancer_v2.go

index bed1346..7f6629f 100644 (file)
@@ -44,7 +44,26 @@ func furtherProcess(r reqResult) {
     fmt.Printf("%v: further processing result %v..\n", r.request, r.int)
 }
 
-func Generate(id string, work chan<- balancer.Requester) {
+type tuple struct {
+    a interface{}
+    b interface{}
+}
+
+func sendDelay(waitBefore func (balancer.Requester) <-chan time.Time, work chan<- balancer.Requester) chan<- balancer.Requester {
+    c := make(chan balancer.Requester)
+
+    go func() {
+        for {
+            req := <-c
+            <-waitBefore(req)
+            work<- req
+        }
+    }()
+
+    return c
+}
+
+func Generate(id string, wCh chan<- balancer.Requester) {
     c := make(chan reqResult)
 
     genReq := func (n int) balancer.Requester {
@@ -60,39 +79,27 @@ func Generate(id string, work chan<- balancer.Requester) {
         return req
     }
 
-    t := rand.Intn(3)
-    timeout := time.After(time.Duration(t) * time.Second)
-    fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
+    waitBefore := func (req balancer.Requester) <-chan time.Time {
+        t := rand.Intn(3)
+        fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req)
+        return time.After(time.Duration(t) * time.Second)
+    }
 
     reqNum := 0
     req := genReq(reqNum)
+    work := sendDelay(waitBefore, wCh)
     ping := time.After(time.Second)
     for {
         select {
-        // FIXME: Use fanIn to merge timeout with work channels.
-        case <-timeout:
-            select {
-            case work <-req:
-
-                fmt.Printf("Generate() [%v]: sent '%v'\n", id, req)
-                reqNum++
-                req = genReq(reqNum)
-
-                t := rand.Intn(3)
-                timeout = time.After(time.Duration(t) * time.Second)
-                fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req)
-
-            case result := <-c:
-                fmt.Printf("Generate() [%v]: received '%v' result %v while waiting at '%v'\n", id, result.request, result.int, req)
-                furtherProcess(result)
-            case <-ping:
-                fmt.Printf("Generate() [%v]: pong while waiting at '%v'..\n", id, req)
-                ping = time.After(time.Second)
-            }
+        case work<- req:
+            fmt.Printf("Generate() [%v]: sent '%v'\n", id, req)
+            reqNum++
+            req = genReq(reqNum)
 
         case result := <-c:
             fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int)
             furtherProcess(result)
+
         case <-ping:
             fmt.Printf("Generate() [%v]: pong..\n", id)
             ping = time.After(time.Second)
index 368e02c..5c404ae 100644 (file)
@@ -8,17 +8,15 @@ import (
     "balancer/requester"
 )
 
-// FIXME: Add buffers to channels. What changes?
-
 var nWorker int = 4     // How many workers to start.
 var nGenerator int = 2  // How many request generators to start.
 
 func main() {
-    work := make(chan balancer.Requester)
+    work := make(chan balancer.Requester, 3)
 
     workers := make([]balancer.WorkRunner, nWorker)
     for i := 0; i < nWorker; i++ {
-        c := make(chan balancer.Requester)
+        c := make(chan balancer.Requester, 2)
         workers[i] = &worker.Runner{Requests: c}
     }
     b := balancer.InitBalancer(workers)