chg(balancer): More strict channel types.
authorsgf <sgf.dma@gmail.com>
Wed, 26 Oct 2022 17:29:19 +0000 (20:29 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 26 Oct 2022 17:29:19 +0000 (20:29 +0300)
concurrency-is-not-parallelism/balancer.go

index f7ad161..2cae58c 100644 (file)
@@ -38,8 +38,8 @@ func requester(work chan<- Request) {
         req := &Request{num: reqNum, c: c}
         fn := func () int { return workFn(*req) }
         req.fn = fn
-        work <- *req
         fmt.Printf("requester() '%v': sending\n", req)
+        work <- *req
         result := <-c
         fmt.Printf("requester() '%v': received result %v\n", req, result)
         furtherProcess(*req, result)
@@ -47,18 +47,26 @@ func requester(work chan<- Request) {
     }
 }
 
+// FIXME: Add buffers to channels. What changes?
+// FIXME: Add interfaces to abstreact from buffered/unbuffered channels and be
+// able to switch implementation immediately.
 type Worker struct {
     requests chan Request
-    pending int
+    pending int // Used by sort.
     num int     // Used for worker identification.
     index int   // Used by heap.
 }
 
+type Done struct {
+    w *Worker
+    r Request
+}
+
 func (w Worker) String() string {
-    return fmt.Sprintf("W %.2d", w.num)
+    return fmt.Sprintf("W %.2d (%v)", w.num, w.pending)
 }
 
-func (w *Worker) work(done chan *Done) {
+func (w *Worker) work(done chan<- *Done) {
     t := rand.Intn(5)
     fmt.Printf("%v: sleeping for %v secs before begin..\n", w, t)
     time.Sleep(time.Duration(t) * time.Second)
@@ -75,6 +83,8 @@ type Pool []*Worker
 
 func (p Pool) Len() int { return len(p) }
 func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+// FIXME: Does it works correctly? Or i need p[i] > p[j] ? The highest-pri
+// worker should be the last.
 func (p Pool) Less(i, j int) bool {
     return p[i].pending < p[j].pending
 }
@@ -95,17 +105,12 @@ func (p *Pool) Pop() interface{} {
     return w
 }
 
-type Done struct {
-    w *Worker
-    r Request
-}
-
 type Balancer struct {
     pool Pool
     done chan *Done
 }
 
-func (b *Balancer) balance(work chan Request) {
+func (b *Balancer) balance(work <-chan Request) {
     timeout := time.After(20 * time.Second)
 out:
     for {
@@ -129,7 +134,7 @@ out:
 
 func (b *Balancer) dispatch(req Request) {
     w := heap.Pop(&b.pool).(*Worker)
-    fmt.Printf("dispatch() '%v': sending '%v'\n", req, w)
+    fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
     go w.work(b.done)
     w.pending++
     heap.Push(&b.pool, w)
@@ -138,8 +143,8 @@ func (b *Balancer) dispatch(req Request) {
 }
 
 func (b *Balancer) completed(done *Done) {
-    fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r)
     done.w.pending--
+    fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r)
     heap.Remove(&b.pool, done.w.index)
     heap.Push(&b.pool, done.w)
 }