new(balancer): Working version of balancer from "Concurrency is not Parallelism ...
authorsgf <sgf.dma@gmail.com>
Wed, 5 Oct 2022 17:43:51 +0000 (20:43 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 5 Oct 2022 17:43:51 +0000 (20:43 +0300)
concurrency-is-not-parallelism/balancer.go [new file with mode: 0644]

diff --git a/concurrency-is-not-parallelism/balancer.go b/concurrency-is-not-parallelism/balancer.go
new file mode 100644 (file)
index 0000000..809ef59
--- /dev/null
@@ -0,0 +1,137 @@
+
+package main
+
+import (
+    "fmt"
+    "time"
+    "math/rand"
+    "container/heap"
+)
+
+type Request struct {
+    fn func() int
+    c chan int
+}
+
+var nWorker int = 4
+
+func workFn(t int) int {
+    fmt.Println("workFn", t)
+    return t
+}
+
+func furtherProcess(result int) {
+    fmt.Println("furtherProcess", result)
+}
+
+func requester(work chan<- Request) {
+    c := make(chan int)
+    for {
+        t := rand.Intn(5)
+        fmt.Println("requester", t)
+        time.Sleep(time.Duration(t) * time.Second)
+        work <- Request{func () int { return workFn(t) }, c}
+        fmt.Println("requester", t, ": request sent, waiting for reply")
+        result := <-c
+        furtherProcess(result)
+    }
+}
+
+type Worker struct {
+    requests chan Request
+    pending int
+    index int
+}
+
+func (w *Worker) work(done chan *Worker) {
+    fmt.Println("work started")
+    for {
+        req := <- w.requests
+        req.c <- req.fn()
+        done <- w
+    }
+}
+
+type Pool []*Worker
+
+type Balancer struct {
+    pool Pool
+    done chan *Worker
+}
+
+func (b *Balancer) balance(work chan Request) {
+    timeout := time.After(10 * time.Second)
+out:
+    for {
+        fmt.Println("\nbalance LOOP")
+        select {
+        case <-timeout:
+            fmt.Println("Balancer time is out")
+            break out
+        default:
+            select {
+            case req := <-work:
+                fmt.Println("balance received request")
+                b.dispatch(req)
+            case w := <- b.done:
+                b.completed(w)
+            }
+        }
+    }
+}
+
+func (p Pool) Len() int { return len(p) }
+func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p Pool) Less(i, j int) bool {
+    return p[i].pending < p[j].pending
+}
+
+func (p *Pool) Push(i interface{}) {
+    w := i.(*Worker)
+    w.index = len(*p)
+    *p = append(*p, w)
+}
+
+func (p *Pool) Pop() interface{} {
+    old := *p
+    n := len(old)
+    w := old[n-1]
+    w.index = -1
+    old[n-1] = nil
+    *p = old[:n-1]
+    return w
+}
+
+func (b *Balancer) dispatch(req Request) {
+    w := heap.Pop(&b.pool).(*Worker)
+    fmt.Println("dispatch to", w)
+    go w.work(b.done)
+    w.requests <- req
+    w.pending++
+    heap.Push(&b.pool, w)
+}
+
+func (b *Balancer) completed(w *Worker) {
+    w.pending--
+    heap.Remove(&b.pool, w.index)
+    heap.Push(&b.pool, w)
+}
+
+func main() {
+    work := make(chan Request)
+
+    pool := make(Pool, nWorker)
+    for i := 0; i < nWorker; i++ {
+        requests := make(chan Request)
+        pool[i] = &Worker{requests, 0, i}
+    }
+    heap.Init(&pool)
+    //for i, val := range pool {
+    //    fmt.Println(i, val.pending, val.index)
+    //}
+
+    done := make(chan *Worker)
+    b := Balancer {pool, done}
+    go requester(work)
+    b.balance(work)
+}