type Request struct {
fn func() int
+ num int // Used for request identification.
c chan int
}
-var nWorker int = 4
+func (r Request) String() string {
+ return fmt.Sprintf("R %.2d", r.num)
+}
-func workFn(t int) int {
- fmt.Println("workFn", t)
- return t
+func workFn(r Request) int {
+ fmt.Printf("%v: computing (workFn).. (%p)\n", r, r.fn)
+ return r.num
}
-func furtherProcess(result int) {
- fmt.Println("furtherProcess", result)
+func furtherProcess(r Request, result int) {
+ fmt.Printf("%v: further processing result %v..\n", r, result)
}
func requester(work chan<- Request) {
c := make(chan int)
+ reqNum := 0
for {
t := rand.Intn(5)
- fmt.Println("requester", t)
+ fmt.Printf("requester(): sleeping for %v secs\n", t)
time.Sleep(time.Duration(t) * time.Second)
- work <- Request{func () int { return workFn(t) }, c}
- fmt.Println("requester", t, ": request sent, waiting for reply")
+
+ req := &Request{num: reqNum, c: c}
+ fn := func () int { return workFn(*req) }
+ req.fn = fn
+ work <- *req
+ fmt.Printf("requester() '%v': sending\n", req)
result := <-c
- furtherProcess(result)
+ fmt.Printf("requester() '%v': received result %v\n", req, result)
+ furtherProcess(*req, result)
+ reqNum++
}
}
type Worker struct {
requests chan Request
pending int
- index int
+ num int // Used for worker identification.
+ index int // Used by heap.
}
-func (w *Worker) work(done chan *Worker) {
- fmt.Println("work started")
+func (w Worker) String() string {
+ return fmt.Sprintf("W %.2d", w.num)
+}
+
+func (w *Worker) work(done chan *Done) {
+ t := rand.Intn(5)
+ fmt.Printf("%v: sleeping for %v secs before begin..\n", w, t)
+ time.Sleep(time.Duration(t) * time.Second)
for {
req := <- w.requests
+ fmt.Printf("%v: start computing '%v'\n", w, req)
req.c <- req.fn()
- done <- w
+ done <- &Done{w, req}
+ fmt.Printf("%v: finished '%v'\n", w, req)
}
}
type Pool []*Worker
-type Balancer struct {
- pool Pool
- done chan *Worker
-}
-
-func (b *Balancer) balance(work chan Request) {
- timeout := time.After(10 * time.Second)
-out:
- for {
- fmt.Println("\nbalance LOOP")
- select {
- case <-timeout:
- fmt.Println("Balancer time is out")
- break out
- default:
- select {
- case req := <-work:
- fmt.Println("balance received request")
- b.dispatch(req)
- case w := <- b.done:
- b.completed(w)
- }
- }
- }
-}
-
func (p Pool) Len() int { return len(p) }
func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p Pool) Less(i, j int) bool {
return w
}
+type Done struct {
+ w *Worker
+ r Request
+}
+
+type Balancer struct {
+ pool Pool
+ done chan *Done
+}
+
+func (b *Balancer) balance(work chan Request) {
+ timeout := time.After(20 * time.Second)
+out:
+ for {
+ fmt.Println()
+ fmt.Println("balance(): Starting loop..")
+ select {
+ case <-timeout:
+ fmt.Println("balance(): your time is out.")
+ break out
+ default:
+ select {
+ case req := <-work:
+ fmt.Printf("balance(): received '%v'\n", req)
+ b.dispatch(req)
+ case done := <- b.done:
+ b.completed(done)
+ }
+ }
+ }
+}
+
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&b.pool).(*Worker)
- fmt.Println("dispatch to", w)
+ fmt.Printf("dispatch() '%v': sending '%v'\n", req, w)
go w.work(b.done)
- w.requests <- req
w.pending++
heap.Push(&b.pool, w)
+ w.requests <- req
+ fmt.Printf("dispatch() '%v': sent\n", req)
}
-func (b *Balancer) completed(w *Worker) {
- w.pending--
- heap.Remove(&b.pool, w.index)
- heap.Push(&b.pool, w)
+func (b *Balancer) completed(done *Done) {
+ fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r)
+ done.w.pending--
+ heap.Remove(&b.pool, done.w.index)
+ heap.Push(&b.pool, done.w)
}
+var nWorker int = 4
+
func main() {
work := make(chan Request)
pool := make(Pool, nWorker)
for i := 0; i < nWorker; i++ {
requests := make(chan Request)
- pool[i] = &Worker{requests, 0, i}
+ pool[i] = &Worker{requests, 0, i, i}
}
heap.Init(&pool)
//for i, val := range pool {
// fmt.Println(i, val.pending, val.index)
//}
- done := make(chan *Worker)
+ done := make(chan *Done)
b := Balancer {pool, done}
go requester(work)
b.balance(work)