--- /dev/null
+
+package main
+
+import (
+ "fmt"
+ "time"
+ "math/rand"
+ "container/heap"
+)
+
+type Request struct {
+ fn func() int
+ c chan int
+}
+
+var nWorker int = 4
+
+func workFn(t int) int {
+ fmt.Println("workFn", t)
+ return t
+}
+
+func furtherProcess(result int) {
+ fmt.Println("furtherProcess", result)
+}
+
+func requester(work chan<- Request) {
+ c := make(chan int)
+ for {
+ t := rand.Intn(5)
+ fmt.Println("requester", t)
+ time.Sleep(time.Duration(t) * time.Second)
+ work <- Request{func () int { return workFn(t) }, c}
+ fmt.Println("requester", t, ": request sent, waiting for reply")
+ result := <-c
+ furtherProcess(result)
+ }
+}
+
+type Worker struct {
+ requests chan Request
+ pending int
+ index int
+}
+
+func (w *Worker) work(done chan *Worker) {
+ fmt.Println("work started")
+ for {
+ req := <- w.requests
+ req.c <- req.fn()
+ done <- w
+ }
+}
+
+type Pool []*Worker
+
+type Balancer struct {
+ pool Pool
+ done chan *Worker
+}
+
+func (b *Balancer) balance(work chan Request) {
+ timeout := time.After(10 * time.Second)
+out:
+ for {
+ fmt.Println("\nbalance LOOP")
+ select {
+ case <-timeout:
+ fmt.Println("Balancer time is out")
+ break out
+ default:
+ select {
+ case req := <-work:
+ fmt.Println("balance received request")
+ b.dispatch(req)
+ case w := <- b.done:
+ b.completed(w)
+ }
+ }
+ }
+}
+
+func (p Pool) Len() int { return len(p) }
+func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p Pool) Less(i, j int) bool {
+ return p[i].pending < p[j].pending
+}
+
+func (p *Pool) Push(i interface{}) {
+ w := i.(*Worker)
+ w.index = len(*p)
+ *p = append(*p, w)
+}
+
+func (p *Pool) Pop() interface{} {
+ old := *p
+ n := len(old)
+ w := old[n-1]
+ w.index = -1
+ old[n-1] = nil
+ *p = old[:n-1]
+ return w
+}
+
+func (b *Balancer) dispatch(req Request) {
+ w := heap.Pop(&b.pool).(*Worker)
+ fmt.Println("dispatch to", w)
+ go w.work(b.done)
+ w.requests <- req
+ w.pending++
+ heap.Push(&b.pool, w)
+}
+
+func (b *Balancer) completed(w *Worker) {
+ w.pending--
+ heap.Remove(&b.pool, w.index)
+ heap.Push(&b.pool, w)
+}
+
+func main() {
+ work := make(chan Request)
+
+ pool := make(Pool, nWorker)
+ for i := 0; i < nWorker; i++ {
+ requests := make(chan Request)
+ pool[i] = &Worker{requests, 0, i}
+ }
+ heap.Init(&pool)
+ //for i, val := range pool {
+ // fmt.Println(i, val.pending, val.index)
+ //}
+
+ done := make(chan *Worker)
+ b := Balancer {pool, done}
+ go requester(work)
+ b.balance(work)
+}