new(balancer): Start several Generate() goroutines and generate more requests, than...
authorsgf <sgf.dma@gmail.com>
Wed, 2 Nov 2022 16:54:57 +0000 (19:54 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 2 Nov 2022 16:54:57 +0000 (19:54 +0300)
concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go
concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go
concurrency-is-not-parallelism/balancer_v2/balancer_v2.go

index 3182b6e..46ea317 100644 (file)
@@ -95,7 +95,7 @@ type Balancer struct {
 func (b *Balancer) Balance(work <-chan Requester) {
     fmt.Println("balance(): Starting workers..")
     for _, w := range b.pool {
-        go w.runner.Work(time.Now().Format("05.000"), b.done)
+        go w.runner.Work(time.Now().Format("05.0"), b.done)
         time.Sleep(time.Second)
     }
 
@@ -112,7 +112,13 @@ out:
             select {
             case req := <-work:
                 fmt.Printf("balance(): received '%v'\n", req)
-                b.dispatch(req)
+                // FIXME: Starting dispatch() as goroutine will fix deadlock,
+                // when there're now free worker. But now Balance() will spawn
+                // infinite number of dispatch() gophers, which will block
+                // until there will be free worker. This is probably wrong:
+                // requester should block, if request can't be processed, but
+                // not infinite number of dispatch() gophers are spawned.
+                go b.dispatch(req)
             case done := <- b.done:
                 b.completed(done)
             }
@@ -128,6 +134,7 @@ func (b *Balancer) dispatch(req Requester) {
     fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
     w.runner.Send(req)
     fmt.Printf("dispatch() '%v': sent to %v\n", req, w)
+    fmt.Println("pool", b.pool)
 }
 
 // completed re-inserts worker, which finished request, into proper place in the pool.
index 53128e3..745a77f 100644 (file)
@@ -20,12 +20,13 @@ func (r reqResult) String() string {
 
 type request struct {
     reqFn func() int    // what to compute for this particular request.
+    id string           // id of Generate(), which produced this request
     num int             // Used for request identification.
     c chan reqResult    // channel for obtaining results back from worker.
 }
 
 func (r request) String() string {
-    return fmt.Sprintf("R %.2d", r.num)
+    return fmt.Sprintf("R %.2d [%v]", r.num, r.id)
 }
 
 var _ balancer.Requester = (*request)(nil)
@@ -43,27 +44,32 @@ func furtherProcess(r reqResult) {
     fmt.Printf("%v: further processing result %v..\n", r.request, r.int)
 }
 
-func Generate(work chan<- balancer.Requester) {
+func Generate(id string, work chan<- balancer.Requester) {
     c := make(chan reqResult)
     reqNum := 0
-    t := rand.Intn(5)
-    fmt.Printf("Generate(): sleeping for %v secs\n", t)
+    t := rand.Intn(3)
+    fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
     timeout := time.After(time.Duration(t) * time.Second)
     for {
         select {
         case <- timeout:
-            req := &request{num: reqNum, c: c}
-            f := func () int { fmt.Printf("... computing %v\n", req); return req.num }
+            req := &request{num: reqNum, id: id, c: c}
+            f := func () int {
+                t := rand.Intn(10)
+                fmt.Printf("... computing %v for %v secs\n", req, t)
+                time.Sleep(time.Duration(t) * time.Second)
+                return req.num
+            }
             req.reqFn = f
-            fmt.Printf("Generate(): sending '%v'\n", req)
+            fmt.Printf("Generate() [%v]: sending '%v'\n", id, req)
             work <- req
 
-            t := rand.Intn(5)
-            fmt.Printf("Generate(): sleeping for %v secs\n", t)
+            t := rand.Intn(3)
+            fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
             timeout = time.After(time.Duration(t) * time.Second)
             reqNum++
         case result := <-c:
-            fmt.Printf("Generate(): received '%v' result %v\n", result.request, result.int)
+            fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int)
             furtherProcess(result)
         }
     }
index d92d138..368e02c 100644 (file)
@@ -2,6 +2,7 @@
 package main
 
 import (
+    "time"
     "balancer"
     "balancer/worker"
     "balancer/requester"
@@ -9,7 +10,8 @@ import (
 
 // FIXME: Add buffers to channels. What changes?
 
-var nWorker int = 4
+var nWorker int = 4     // How many workers to start.
+var nGenerator int = 2  // How many request generators to start.
 
 func main() {
     work := make(chan balancer.Requester)
@@ -20,7 +22,10 @@ func main() {
         workers[i] = &worker.Runner{Requests: c}
     }
     b := balancer.InitBalancer(workers)
-    go requester.Generate(work)
+    for i := 0; i < nGenerator; i++ {
+        go requester.Generate(time.Now().Format("05.0"), work)
+        time.Sleep(time.Second)
+    }
     b.Balance(work)
 }