InitRunner(WorkerId) // Initialize runner with id inside balancer.
Work(string, chan <- *Done) // Start runner.
Send(Requester) // Send request to WorkerRunner.
+ MaxRequests() int
}
-// worker as seen by balancer.
+// worker is a balancer's view of WorkRunner.
// WorkRunner implementation does not important for balancer. It only needs a
// way to sort workers by the number of pending tasks.
type worker struct {
case <-timeout:
fmt.Println("balance(): your time is out.")
break out
- default:
- select {
- case req := <-work:
- fmt.Printf("balance(): received '%v'\n", req)
- // FIXME: Starting dispatch() as goroutine will fix deadlock,
- // when there're now free worker. But now Balance() will spawn
- // infinite number of dispatch() gophers, which will block
- // until there will be free worker. This is probably wrong:
- // requester should block, if request can't be processed, but
- // not infinite number of dispatch() gophers are spawned.
- go b.dispatch(req)
- case done := <- b.done:
- b.completed(done)
+ case req := <-work:
+ fmt.Printf("balance(): received '%v'\n", req)
+ if !b.hasFreeWorkers() {
+ fmt.Printf("balance(): No free workers, waiting for one to finish..\n")
+ b.completed(<-b.done)
}
+ b.dispatch(req)
+ case done := <- b.done:
+ b.completed(done)
}
}
}
fmt.Println("pool", b.pool)
}
+func (b *Balancer) hasFreeWorkers() bool {
+ for _, w := range b.pool {
+ if w.runner.MaxRequests() > w.pending{
+ return true
+ }
+ }
+ return false
+}
+
// InitBalancer initializes balancer with provided workers.
func InitBalancer(runners []WorkRunner) *Balancer {
n := len(runners)
pool := make(pool, n)
for i := 0; i < n; i++ {
- w := worker{num: i, pending: 0, index: i}
- runners[i].InitRunner(WorkerId{&w})
- w.runner = runners[i]
+ w := worker{runner: runners[i], num: i, pending: 0, index: i}
+ w.runner.InitRunner(WorkerId{&w})
pool[i] = &w
}
heap.Init(&pool)
type request struct {
reqFn func() int // what to compute for this particular request.
- id string // id of Generate(), which produced this request
+ origin string // id of Generate(), which produced this request
num int // Used for request identification.
c chan reqResult // channel for obtaining results back from worker.
}
func (r request) String() string {
- return fmt.Sprintf("R %.2d [%v]", r.num, r.id)
+ return fmt.Sprintf("R %.2d [%v]", r.num, r.origin)
}
var _ balancer.Requester = (*request)(nil)
func Generate(id string, work chan<- balancer.Requester) {
c := make(chan reqResult)
- reqNum := 0
+
+ genReq := func (n int) balancer.Requester {
+ req := &request{num: n, origin: id, c: c}
+ f := func () int {
+ t := rand.Intn(10)
+ fmt.Printf("... computing %v within %v secs\n", req, t)
+ time.Sleep(time.Duration(t) * time.Second)
+ fmt.Printf("... computing %v done\n", req)
+ return req.num
+ }
+ req.reqFn = f
+ return req
+ }
+
t := rand.Intn(3)
- fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
timeout := time.After(time.Duration(t) * time.Second)
+ fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
+
+ reqNum := 0
+ req := genReq(reqNum)
+ ping := time.After(time.Second)
for {
select {
- case <- timeout:
- req := &request{num: reqNum, id: id, c: c}
- f := func () int {
- t := rand.Intn(10)
- fmt.Printf("... computing %v for %v secs\n", req, t)
- time.Sleep(time.Duration(t) * time.Second)
- return req.num
+ // FIXME: Use fanIn to merge timeout with work channels.
+ case <-timeout:
+ select {
+ case work <-req:
+
+ fmt.Printf("Generate() [%v]: sent '%v'\n", id, req)
+ reqNum++
+ req = genReq(reqNum)
+
+ t := rand.Intn(3)
+ timeout = time.After(time.Duration(t) * time.Second)
+ fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req)
+
+ case result := <-c:
+ fmt.Printf("Generate() [%v]: received '%v' result %v while waiting at '%v'\n", id, result.request, result.int, req)
+ furtherProcess(result)
+ case <-ping:
+ fmt.Printf("Generate() [%v]: pong while waiting at '%v'..\n", id, req)
+ ping = time.After(time.Second)
}
- req.reqFn = f
- fmt.Printf("Generate() [%v]: sending '%v'\n", id, req)
- work <- req
-
- t := rand.Intn(3)
- fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
- timeout = time.After(time.Duration(t) * time.Second)
- reqNum++
+
case result := <-c:
fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int)
furtherProcess(result)
+ case <-ping:
+ fmt.Printf("Generate() [%v]: pong..\n", id)
+ ping = time.After(time.Second)
}
}
}
t := rand.Intn(5)
fmt.Printf(" %v [%v]: sleeping for %v secs before begin..\n", w, id, t)
time.Sleep(time.Duration(t) * time.Second)
- timeout := time.After(time.Second)
+ ping := time.After(time.Second)
for {
select {
case req := <- w.Requests:
fmt.Printf(" %v [%v]: finished '%v'\n", w, id, req)
done <- &balancer.Done{w.id, req}
fmt.Printf(" %v [%v]: done '%v'\n", w, id, req)
- case <-timeout:
+ case <-ping:
fmt.Printf(" %v [%v]: i'm still here..\n", w.id, id)
- timeout = time.After(time.Second)
+ ping = time.After(time.Second)
}
}
}
w.Requests <- req
}
+// Unbuffered channel has cap 0.
+func (w *Runner) MaxRequests() int {
+ if cap(w.Requests) == 0 {
+ return 1
+ }
+ return cap(w.Requests)
+}