fmt.Printf("%v: further processing result %v..\n", r, result)
}
-func requester(work chan<- Request) {
+func requester(work chan<- *Request) {
c := make(chan int)
reqNum := 0
for {
fn := func () int { return workFn(req) }
req.fn = fn
fmt.Printf("requester() '%v': sending\n", req)
- work <- *req
+ work <- req
result := <-c
fmt.Printf("requester() '%v': received result %v\n", req, result)
furtherProcess(req, result)
// FIXME: Add interfaces to abstreact from buffered/unbuffered channels and be
// able to switch implementation immediately.
type Worker struct {
- requests chan Request
+ requests chan *Request
pending int // Used by sort.
num int // Used for worker identification.
index int // Used by heap.
req := <- w.requests
fmt.Printf("%v: start computing '%v'\n", w, req)
req.c <- req.fn()
- done <- &Done{w, &req}
+ done <- &Done{w, req}
fmt.Printf("%v: finished '%v'\n", w, req)
}
}
done chan *Done
}
-func (b *Balancer) balance(work <-chan Request) {
+func (b *Balancer) balance(work <-chan *Request) {
timeout := time.After(20 * time.Second)
out:
for {
}
}
-func (b *Balancer) dispatch(req Request) {
+func (b *Balancer) dispatch(req *Request) {
w := heap.Pop(&b.pool).(*Worker)
fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
go w.work(b.done)
var nWorker int = 4
func main() {
- work := make(chan Request)
+ work := make(chan *Request)
pool := make(Pool, nWorker)
for i := 0; i < nWorker; i++ {
- requests := make(chan Request)
+ requests := make(chan *Request)
pool[i] = &Worker{requests, 0, i, i}
}
heap.Init(&pool)