--- /dev/null
+
+package main
+
+import (
+ "fmt"
+ "time"
+ "math/rand"
+ balancer "./balancer_v2"
+)
+
+// FIXME: Add buffers to channels. What changes?
+
+type request struct {
+ reqFn func() int // what to compute for this particular request.
+ num int // Used for request identification.
+ c chan int // channel for obtaining results back from worker.
+}
+
+func (r request) String() string {
+ return fmt.Sprintf("R %.2d", r.num)
+}
+
+var _ balancer.Requester = (*request)(nil)
+
+func (r *request) Fn() interface{} {
+ fmt.Printf("%v: Fn()..\n", r)
+ return interface{}(r.reqFn())
+}
+
+func (r *request) Send(res interface{}) {
+ r.c <- res.(int)
+}
+
+func furtherProcess(r *request, result int) {
+ fmt.Printf("%v: further processing result %v..\n", r, result)
+}
+
+func requester(work chan<- balancer.Requester) {
+ c := make(chan int)
+ reqNum := 0
+ for {
+ t := rand.Intn(5)
+ fmt.Printf("requester(): sleeping for %v secs\n", t)
+ time.Sleep(time.Duration(t) * time.Second)
+
+ f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum }
+ req := &request{reqFn: f, num: reqNum, c: c}
+ fmt.Printf("requester() '%v': sending\n", req)
+ work <- req
+ result := <-c
+ fmt.Printf("requester() '%v': received result %v\n", req, result)
+ furtherProcess(req, result)
+ reqNum++
+ }
+}
+
+
+type workRun struct {
+ requests chan balancer.Requester
+ id balancer.WorkerId
+}
+
+func (w workRun) String() string {
+ return fmt.Sprintf("%v", w.id)
+}
+var _ balancer.WorkRunner = (*workRun)(nil)
+
+func (w *workRun) InitRunner(wid balancer.WorkerId) {
+ w.id = wid
+}
+
+func (w *workRun) Work(id string, done chan<- *balancer.Done) {
+ t := rand.Intn(5)
+ fmt.Printf(" %v [%v]: sleeping for %v secs before begin..\n", w, id, t)
+ time.Sleep(time.Duration(t) * time.Second)
+ timeout := time.After(time.Second)
+ for {
+ select {
+ case req := <- w.requests:
+ fmt.Printf(" %v [%v]: start computing '%v'\n", w, id, req)
+ req.Send(req.Fn())
+ done <- &balancer.Done{w.id, req}
+ fmt.Printf(" %v [%v]: finished '%v'\n", w, id, req)
+ case <-timeout:
+ fmt.Printf(" %v [%v]: i'm still here..\n", w.id, id)
+ timeout = time.After(time.Second)
+ }
+ }
+}
+
+func (w *workRun) Send(req balancer.Requester) {
+ w.requests <- req
+}
+
+var nWorker int = 4
+
+func main() {
+ work := make(chan balancer.Requester)
+
+ workers := make([]balancer.WorkRunner, nWorker)
+ for i := 0; i < nWorker; i++ {
+ c := make(chan balancer.Requester)
+ workers[i] = &workRun{requests: c}
+ }
+ b := balancer.InitBalancer(workers)
+ go requester(work)
+ b.Balance(work)
+}
+
--- /dev/null
+// Balancer v2 with requester and work runner interfaces.
+package balancer_v2
+
+import (
+ "fmt"
+ "time"
+ "container/heap"
+)
+
+// Requester is an interface to requester.
+// It's used by WorkRunner's implementation, not by balancer itself.
+type Requester interface {
+ Fn() interface{}
+ Send(interface{}) // Send result of Fn() back to requester.
+}
+
+// WorkerId is identification of a particular runner.
+// It's needed to abstract '*worker' pointer in Done channel.
+type WorkerId *worker
+
+// WorkRunner is an interface, which every work runner should conform to.
+// This is an actual implementation running Requester's Fn() function and
+// returning result back to Requester.
+type WorkRunner interface {
+ InitRunner(WorkerId) // Initialize runner with id inside balancer.
+ Work(string, chan <- *Done) // Start runner.
+ Send(Requester) // Send request to WorkerRunner.
+}
+
+// worker as seen by balancer.
+// WorkRunner implementation does not important for balancer. It only needs a
+// way to sort workers by the number of pending tasks.
+type worker struct {
+ runner WorkRunner
+ num int // Used for worker identification.
+
+ pending int // Used by sort for pool.
+ index int // Used by heap for pool.
+}
+
+func (w worker) String() string {
+ return fmt.Sprintf("W %.2d (%v)", w.num, w.pending)
+}
+
+// Done message is sent by work runner to balancer, when request is completed.
+type Done struct {
+ Worker WorkerId
+ Req Requester
+}
+
+// Pool of workers, which balancer manages.
+type pool []*worker
+
+func (p pool) Len() int { return len(p) }
+func (p pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p pool) Less(i, j int) bool {
+ return p[i].pending < p[j].pending
+}
+
+func (p *pool) Push(i interface{}) {
+ w := i.(*worker)
+ w.index = len(*p)
+ *p = append(*p, w)
+}
+
+func (p *pool) Pop() interface{} {
+ old := *p
+ n := len(old)
+ w := old[n-1]
+ w.index = -1
+ old[n-1] = nil
+ *p = old[:n-1]
+ return w
+}
+
+// Balancer itself.
+// It consists from pool of workers and done channel, using which work runners
+// will report finished requests back to balancer.
+type Balancer struct {
+ pool pool
+ done chan *Done
+}
+
+func (b *Balancer) Balance(work <-chan Requester) {
+ fmt.Println("balance(): Starting workers..")
+ for _, w := range b.pool {
+ go w.runner.Work(time.Now().Format("05.000"), b.done)
+ time.Sleep(time.Second)
+ }
+
+ timeout := time.After(20 * time.Second)
+out:
+ for {
+ fmt.Println()
+ fmt.Println("balance(): Starting loop..")
+ select {
+ case <-timeout:
+ fmt.Println("balance(): your time is out.")
+ break out
+ default:
+ select {
+ case req := <-work:
+ fmt.Printf("balance(): received '%v'\n", req)
+ b.dispatch(req)
+ case done := <- b.done:
+ b.completed(done)
+ }
+ }
+ }
+}
+
+// dispatch sends requests to least loaded worker.
+func (b *Balancer) dispatch(req Requester) {
+ w := heap.Pop(&b.pool).(*worker)
+ fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
+ w.pending++
+ heap.Push(&b.pool, w)
+ w.runner.Send(req)
+ fmt.Printf("dispatch() '%v': sent\n", req)
+}
+
+// completed re-inserts worker, which finished request, into proper place in the pool.
+func (b *Balancer) completed(done *Done) {
+ fmt.Printf("completed() '%v': finished '%v'\n", done.Worker, done.Req)
+ w := (*worker)(done.Worker)
+ w.pending--
+ heap.Remove(&b.pool, w.index)
+ heap.Push(&b.pool, w)
+}
+
+// InitBalancer initializes balancer with provided workers.
+func InitBalancer(runners []WorkRunner) *Balancer {
+ n := len(runners)
+ pool := make(pool, n)
+ for i := 0; i < n; i++ {
+ w := worker{num: i, pending: 0, index: i}
+ runners[i].InitRunner(&w)
+ w.runner = runners[i]
+ pool[i] = &w
+ }
+ heap.Init(&pool)
+
+ done := make(chan *Done)
+ return &Balancer {pool: pool, done: done}
+}
+