From d3f0bfaf9331ff70df7ba9f1662d100faa6a4c89 Mon Sep 17 00:00:00 2001 From: sgf Date: Wed, 26 Oct 2022 20:29:19 +0300 Subject: [PATCH] chg(balancer): More strict channel types. --- concurrency-is-not-parallelism/balancer.go | 29 +++++++++++++--------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/concurrency-is-not-parallelism/balancer.go b/concurrency-is-not-parallelism/balancer.go index f7ad161..2cae58c 100644 --- a/concurrency-is-not-parallelism/balancer.go +++ b/concurrency-is-not-parallelism/balancer.go @@ -38,8 +38,8 @@ func requester(work chan<- Request) { req := &Request{num: reqNum, c: c} fn := func () int { return workFn(*req) } req.fn = fn - work <- *req fmt.Printf("requester() '%v': sending\n", req) + work <- *req result := <-c fmt.Printf("requester() '%v': received result %v\n", req, result) furtherProcess(*req, result) @@ -47,18 +47,26 @@ func requester(work chan<- Request) { } } +// FIXME: Add buffers to channels. What changes? +// FIXME: Add interfaces to abstreact from buffered/unbuffered channels and be +// able to switch implementation immediately. type Worker struct { requests chan Request - pending int + pending int // Used by sort. num int // Used for worker identification. index int // Used by heap. } +type Done struct { + w *Worker + r Request +} + func (w Worker) String() string { - return fmt.Sprintf("W %.2d", w.num) + return fmt.Sprintf("W %.2d (%v)", w.num, w.pending) } -func (w *Worker) work(done chan *Done) { +func (w *Worker) work(done chan<- *Done) { t := rand.Intn(5) fmt.Printf("%v: sleeping for %v secs before begin..\n", w, t) time.Sleep(time.Duration(t) * time.Second) @@ -75,6 +83,8 @@ type Pool []*Worker func (p Pool) Len() int { return len(p) } func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +// FIXME: Does it works correctly? Or i need p[i] > p[j] ? The highest-pri +// worker should be the last. func (p Pool) Less(i, j int) bool { return p[i].pending < p[j].pending } @@ -95,17 +105,12 @@ func (p *Pool) Pop() interface{} { return w } -type Done struct { - w *Worker - r Request -} - type Balancer struct { pool Pool done chan *Done } -func (b *Balancer) balance(work chan Request) { +func (b *Balancer) balance(work <-chan Request) { timeout := time.After(20 * time.Second) out: for { @@ -129,7 +134,7 @@ out: func (b *Balancer) dispatch(req Request) { w := heap.Pop(&b.pool).(*Worker) - fmt.Printf("dispatch() '%v': sending '%v'\n", req, w) + fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w) go w.work(b.done) w.pending++ heap.Push(&b.pool, w) @@ -138,8 +143,8 @@ func (b *Balancer) dispatch(req Request) { } func (b *Balancer) completed(done *Done) { - fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r) done.w.pending-- + fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r) heap.Remove(&b.pool, done.w.index) heap.Push(&b.pool, done.w) } -- 2.20.1