new(balancer): Balancer v2 abstracting communication with interfaces.
authorsgf <sgf.dma@gmail.com>
Thu, 27 Oct 2022 16:40:57 +0000 (19:40 +0300)
committersgf <sgf.dma@gmail.com>
Mon, 31 Oct 2022 17:38:46 +0000 (20:38 +0300)
Abstract worker-requester and balancer-worker communication with
interfaces, so i may substitute different requesters and workers.

concurrency-is-not-parallelism/balancer_v2.go [new file with mode: 0644]
concurrency-is-not-parallelism/balancer_v2/balancer.go [new file with mode: 0644]
concurrency-is-not-parallelism/balancer_v2/go.mod [new file with mode: 0644]

diff --git a/concurrency-is-not-parallelism/balancer_v2.go b/concurrency-is-not-parallelism/balancer_v2.go
new file mode 100644 (file)
index 0000000..d341899
--- /dev/null
@@ -0,0 +1,109 @@
+
+package main
+
+import (
+    "fmt"
+    "time"
+    "math/rand"
+    balancer "./balancer_v2"
+)
+
+// FIXME: Add buffers to channels. What changes?
+
+type request struct {
+    reqFn func() int    // what to compute for this particular request.
+    num int             // Used for request identification.
+    c chan int          // channel for obtaining results back from worker.
+}
+
+func (r request) String() string {
+    return fmt.Sprintf("R %.2d", r.num)
+}
+
+var _ balancer.Requester = (*request)(nil)
+
+func (r *request) Fn() interface{} {
+    fmt.Printf("%v: Fn()..\n", r)
+    return interface{}(r.reqFn())
+}
+
+func (r *request) Send(res interface{}) {
+    r.c <- res.(int)
+}
+
+func furtherProcess(r *request, result int) {
+    fmt.Printf("%v: further processing result %v..\n", r, result)
+}
+
+func requester(work chan<- balancer.Requester) {
+    c := make(chan int)
+    reqNum := 0
+    for {
+        t := rand.Intn(5)
+        fmt.Printf("requester(): sleeping for %v secs\n", t)
+        time.Sleep(time.Duration(t) * time.Second)
+
+        f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum }
+        req := &request{reqFn: f, num: reqNum, c: c}
+        fmt.Printf("requester() '%v': sending\n", req)
+        work <- req
+        result := <-c
+        fmt.Printf("requester() '%v': received result %v\n", req, result)
+        furtherProcess(req, result)
+        reqNum++
+    }
+}
+
+
+type workRun struct {
+    requests chan balancer.Requester
+    id balancer.WorkerId
+}
+
+func (w workRun) String() string {
+    return fmt.Sprintf("%v", w.id)
+}
+var _ balancer.WorkRunner = (*workRun)(nil)
+
+func (w *workRun) InitRunner(wid balancer.WorkerId) {
+    w.id = wid
+}
+
+func (w *workRun) Work(id string, done chan<- *balancer.Done) {
+    t := rand.Intn(5)
+    fmt.Printf("  %v [%v]: sleeping for %v secs before begin..\n", w, id, t)
+    time.Sleep(time.Duration(t) * time.Second)
+    timeout := time.After(time.Second)
+    for {
+        select {
+        case req := <- w.requests:
+            fmt.Printf("  %v [%v]: start computing '%v'\n", w, id, req)
+            req.Send(req.Fn())
+            done <- &balancer.Done{w.id, req}
+            fmt.Printf("  %v [%v]: finished '%v'\n", w, id, req)
+        case <-timeout:
+            fmt.Printf("  %v [%v]: i'm still here..\n", w.id, id)
+            timeout = time.After(time.Second)
+        }
+    }
+}
+
+func (w *workRun) Send(req balancer.Requester) {
+    w.requests <- req
+}
+
+var nWorker int = 4
+
+func main() {
+    work := make(chan balancer.Requester)
+
+    workers := make([]balancer.WorkRunner, nWorker)
+    for i := 0; i < nWorker; i++ {
+        c := make(chan balancer.Requester)
+        workers[i] = &workRun{requests: c}
+    }
+    b := balancer.InitBalancer(workers)
+    go requester(work)
+    b.Balance(work)
+}
+
diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer.go b/concurrency-is-not-parallelism/balancer_v2/balancer.go
new file mode 100644 (file)
index 0000000..c62989c
--- /dev/null
@@ -0,0 +1,146 @@
+// Balancer v2 with requester and work runner interfaces.
+package balancer_v2
+
+import (
+    "fmt"
+    "time"
+    "container/heap"
+)
+
+// Requester is an interface to requester.
+// It's used by WorkRunner's implementation, not by balancer itself.
+type Requester interface {
+    Fn() interface{}
+    Send(interface{}) // Send result of Fn() back to requester.
+}
+
+// WorkerId is identification of a particular runner.
+// It's needed to abstract '*worker' pointer in Done channel.
+type WorkerId *worker
+
+// WorkRunner is an interface, which every work runner should conform to.
+// This is an actual implementation running Requester's Fn() function and
+// returning result back to Requester.
+type WorkRunner interface {
+    InitRunner(WorkerId)     // Initialize runner with id inside balancer.
+    Work(string, chan <- *Done) // Start runner.
+    Send(Requester)             // Send request to WorkerRunner.
+}
+
+// worker as seen by balancer.
+// WorkRunner implementation does not important for balancer. It only needs a
+// way to sort workers by the number of pending tasks.
+type worker struct {
+    runner WorkRunner
+    num int     // Used for worker identification.
+
+    pending int // Used by sort for pool.
+    index int   // Used by heap for pool.
+}
+
+func (w worker) String() string {
+    return fmt.Sprintf("W %.2d (%v)", w.num, w.pending)
+}
+
+// Done message is sent by work runner to balancer, when request is completed.
+type Done struct {
+    Worker WorkerId
+    Req Requester
+}
+
+// Pool of workers, which balancer manages.
+type pool []*worker
+
+func (p pool) Len() int { return len(p) }
+func (p pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p pool) Less(i, j int) bool {
+    return p[i].pending < p[j].pending
+}
+
+func (p *pool) Push(i interface{}) {
+    w := i.(*worker)
+    w.index = len(*p)
+    *p = append(*p, w)
+}
+
+func (p *pool) Pop() interface{} {
+    old := *p
+    n := len(old)
+    w := old[n-1]
+    w.index = -1
+    old[n-1] = nil
+    *p = old[:n-1]
+    return w
+}
+
+// Balancer itself.
+// It consists from pool of workers and done channel, using which work runners
+// will report finished requests back to balancer.
+type Balancer struct {
+    pool pool
+    done chan *Done
+}
+
+func (b *Balancer) Balance(work <-chan Requester) {
+    fmt.Println("balance(): Starting workers..")
+    for _, w := range b.pool {
+        go w.runner.Work(time.Now().Format("05.000"), b.done)
+        time.Sleep(time.Second)
+    }
+
+    timeout := time.After(20 * time.Second)
+out:
+    for {
+        fmt.Println()
+        fmt.Println("balance(): Starting loop..")
+        select {
+        case <-timeout:
+            fmt.Println("balance(): your time is out.")
+            break out
+        default:
+            select {
+            case req := <-work:
+                fmt.Printf("balance(): received '%v'\n", req)
+                b.dispatch(req)
+            case done := <- b.done:
+                b.completed(done)
+            }
+        }
+    }
+}
+
+// dispatch sends requests to least loaded worker.
+func (b *Balancer) dispatch(req Requester) {
+    w := heap.Pop(&b.pool).(*worker)
+    fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
+    w.pending++
+    heap.Push(&b.pool, w)
+    w.runner.Send(req)
+    fmt.Printf("dispatch() '%v': sent\n", req)
+}
+
+// completed re-inserts worker, which finished request, into proper place in the pool.
+func (b *Balancer) completed(done *Done) {
+    fmt.Printf("completed() '%v': finished '%v'\n", done.Worker, done.Req)
+    w := (*worker)(done.Worker)
+    w.pending--
+    heap.Remove(&b.pool, w.index)
+    heap.Push(&b.pool, w)
+}
+
+// InitBalancer initializes balancer with provided workers.
+func InitBalancer(runners []WorkRunner) *Balancer {
+    n := len(runners)
+    pool := make(pool, n)
+    for i := 0; i < n; i++ {
+        w := worker{num: i, pending: 0, index: i}
+        runners[i].InitRunner(&w)
+        w.runner = runners[i]
+        pool[i] = &w
+    }
+    heap.Init(&pool)
+
+    done := make(chan *Done)
+    return &Balancer {pool: pool, done: done}
+}
+
diff --git a/concurrency-is-not-parallelism/balancer_v2/go.mod b/concurrency-is-not-parallelism/balancer_v2/go.mod
new file mode 100644 (file)
index 0000000..e35bd31
--- /dev/null
@@ -0,0 +1,3 @@
+module balancer_v2
+
+go 1.15