fix(balancer): Do not spawn dispatch gophers indefinitely.
authorsgf <sgf.dma@gmail.com>
Mon, 21 Nov 2022 13:22:25 +0000 (16:22 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 23 Nov 2022 14:16:04 +0000 (17:16 +0300)
concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go
concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go
concurrency-is-not-parallelism/balancer_v2/balancer/worker/worker.go

index 46ea317..7e70326 100644 (file)
@@ -32,9 +32,10 @@ type WorkRunner interface {
     InitRunner(WorkerId)     // Initialize runner with id inside balancer.
     Work(string, chan <- *Done) // Start runner.
     Send(Requester)             // Send request to WorkerRunner.
+    MaxRequests() int
 }
 
-// worker as seen by balancer.
+// worker is a balancer's view of WorkRunner.
 // WorkRunner implementation does not important for balancer. It only needs a
 // way to sort workers by the number of pending tasks.
 type worker struct {
@@ -108,20 +109,15 @@ out:
         case <-timeout:
             fmt.Println("balance(): your time is out.")
             break out
-        default:
-            select {
-            case req := <-work:
-                fmt.Printf("balance(): received '%v'\n", req)
-                // FIXME: Starting dispatch() as goroutine will fix deadlock,
-                // when there're now free worker. But now Balance() will spawn
-                // infinite number of dispatch() gophers, which will block
-                // until there will be free worker. This is probably wrong:
-                // requester should block, if request can't be processed, but
-                // not infinite number of dispatch() gophers are spawned.
-                go b.dispatch(req)
-            case done := <- b.done:
-                b.completed(done)
+        case req := <-work:
+            fmt.Printf("balance(): received '%v'\n", req)
+            if !b.hasFreeWorkers() {
+                fmt.Printf("balance(): No free workers, waiting for one to finish..\n")
+                b.completed(<-b.done)
             }
+            b.dispatch(req)
+        case done := <- b.done:
+            b.completed(done)
         }
     }
 }
@@ -147,14 +143,22 @@ func (b *Balancer) completed(done *Done) {
     fmt.Println("pool", b.pool)
 }
 
+func (b *Balancer) hasFreeWorkers() bool {
+    for _, w := range b.pool {
+        if w.runner.MaxRequests() > w.pending{
+            return true
+        }
+    }
+    return false
+}
+
 // InitBalancer initializes balancer with provided workers.
 func InitBalancer(runners []WorkRunner) *Balancer {
     n := len(runners)
     pool := make(pool, n)
     for i := 0; i < n; i++ {
-        w := worker{num: i, pending: 0, index: i}
-        runners[i].InitRunner(WorkerId{&w})
-        w.runner = runners[i]
+        w := worker{runner: runners[i], num: i, pending: 0, index: i}
+        w.runner.InitRunner(WorkerId{&w})
         pool[i] = &w
     }
     heap.Init(&pool)
index 745a77f..bed1346 100644 (file)
@@ -20,13 +20,13 @@ func (r reqResult) String() string {
 
 type request struct {
     reqFn func() int    // what to compute for this particular request.
-    id string           // id of Generate(), which produced this request
+    origin string       // id of Generate(), which produced this request
     num int             // Used for request identification.
     c chan reqResult    // channel for obtaining results back from worker.
 }
 
 func (r request) String() string {
-    return fmt.Sprintf("R %.2d [%v]", r.num, r.id)
+    return fmt.Sprintf("R %.2d [%v]", r.num, r.origin)
 }
 
 var _ balancer.Requester = (*request)(nil)
@@ -46,31 +46,56 @@ func furtherProcess(r reqResult) {
 
 func Generate(id string, work chan<- balancer.Requester) {
     c := make(chan reqResult)
-    reqNum := 0
+
+    genReq := func (n int) balancer.Requester {
+        req := &request{num: n, origin: id, c: c}
+        f := func () int {
+            t := rand.Intn(10)
+            fmt.Printf("... computing %v within %v secs\n", req, t)
+            time.Sleep(time.Duration(t) * time.Second)
+            fmt.Printf("... computing %v done\n", req)
+            return req.num
+        }
+        req.reqFn = f
+        return req
+    }
+
     t := rand.Intn(3)
-    fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
     timeout := time.After(time.Duration(t) * time.Second)
+    fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
+
+    reqNum := 0
+    req := genReq(reqNum)
+    ping := time.After(time.Second)
     for {
         select {
-        case <- timeout:
-            req := &request{num: reqNum, id: id, c: c}
-            f := func () int {
-                t := rand.Intn(10)
-                fmt.Printf("... computing %v for %v secs\n", req, t)
-                time.Sleep(time.Duration(t) * time.Second)
-                return req.num
+        // FIXME: Use fanIn to merge timeout with work channels.
+        case <-timeout:
+            select {
+            case work <-req:
+
+                fmt.Printf("Generate() [%v]: sent '%v'\n", id, req)
+                reqNum++
+                req = genReq(reqNum)
+
+                t := rand.Intn(3)
+                timeout = time.After(time.Duration(t) * time.Second)
+                fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req)
+
+            case result := <-c:
+                fmt.Printf("Generate() [%v]: received '%v' result %v while waiting at '%v'\n", id, result.request, result.int, req)
+                furtherProcess(result)
+            case <-ping:
+                fmt.Printf("Generate() [%v]: pong while waiting at '%v'..\n", id, req)
+                ping = time.After(time.Second)
             }
-            req.reqFn = f
-            fmt.Printf("Generate() [%v]: sending '%v'\n", id, req)
-            work <- req
-
-            t := rand.Intn(3)
-            fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
-            timeout = time.After(time.Duration(t) * time.Second)
-            reqNum++
+
         case result := <-c:
             fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int)
             furtherProcess(result)
+        case <-ping:
+            fmt.Printf("Generate() [%v]: pong..\n", id)
+            ping = time.After(time.Second)
         }
     }
 }
index 5ce0293..437a558 100644 (file)
@@ -26,7 +26,7 @@ func (w *Runner) Work(id string, done chan<- *balancer.Done) {
     t := rand.Intn(5)
     fmt.Printf("  %v [%v]: sleeping for %v secs before begin..\n", w, id, t)
     time.Sleep(time.Duration(t) * time.Second)
-    timeout := time.After(time.Second)
+    ping := time.After(time.Second)
     for {
         select {
         case req := <- w.Requests:
@@ -35,9 +35,9 @@ func (w *Runner) Work(id string, done chan<- *balancer.Done) {
             fmt.Printf("  %v [%v]: finished '%v'\n", w, id, req)
             done <- &balancer.Done{w.id, req}
             fmt.Printf("  %v [%v]: done '%v'\n", w, id, req)
-        case <-timeout:
+        case <-ping:
             fmt.Printf("  %v [%v]: i'm still here..\n", w.id, id)
-            timeout = time.After(time.Second)
+            ping = time.After(time.Second)
         }
     }
 }
@@ -46,3 +46,10 @@ func (w *Runner) Send(req balancer.Requester) {
     w.Requests <- req
 }
 
+// Unbuffered channel has cap 0.
+func (w *Runner) MaxRequests() int {
+    if cap(w.Requests) == 0 {
+        return 1
+    }
+    return cap(w.Requests)
+}