req := &Request{num: reqNum, c: c}
fn := func () int { return workFn(*req) }
req.fn = fn
- work <- *req
fmt.Printf("requester() '%v': sending\n", req)
+ work <- *req
result := <-c
fmt.Printf("requester() '%v': received result %v\n", req, result)
furtherProcess(*req, result)
}
}
+// FIXME: Add buffers to channels. What changes?
+// FIXME: Add interfaces to abstreact from buffered/unbuffered channels and be
+// able to switch implementation immediately.
type Worker struct {
requests chan Request
- pending int
+ pending int // Used by sort.
num int // Used for worker identification.
index int // Used by heap.
}
+type Done struct {
+ w *Worker
+ r Request
+}
+
func (w Worker) String() string {
- return fmt.Sprintf("W %.2d", w.num)
+ return fmt.Sprintf("W %.2d (%v)", w.num, w.pending)
}
-func (w *Worker) work(done chan *Done) {
+func (w *Worker) work(done chan<- *Done) {
t := rand.Intn(5)
fmt.Printf("%v: sleeping for %v secs before begin..\n", w, t)
time.Sleep(time.Duration(t) * time.Second)
func (p Pool) Len() int { return len(p) }
func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+// FIXME: Does it works correctly? Or i need p[i] > p[j] ? The highest-pri
+// worker should be the last.
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
return w
}
-type Done struct {
- w *Worker
- r Request
-}
-
type Balancer struct {
pool Pool
done chan *Done
}
-func (b *Balancer) balance(work chan Request) {
+func (b *Balancer) balance(work <-chan Request) {
timeout := time.After(20 * time.Second)
out:
for {
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&b.pool).(*Worker)
- fmt.Printf("dispatch() '%v': sending '%v'\n", req, w)
+ fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
go w.work(b.done)
w.pending++
heap.Push(&b.pool, w)
}
func (b *Balancer) completed(done *Done) {
- fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r)
done.w.pending--
+ fmt.Printf("completed() '%v': finished '%v'\n", done.w, done.r)
heap.Remove(&b.pool, done.w.index)
heap.Push(&b.pool, done.w)
}