func (b *Balancer) Balance(work <-chan Requester) {
fmt.Println("balance(): Starting workers..")
for _, w := range b.pool {
- go w.runner.Work(time.Now().Format("05.000"), b.done)
+ go w.runner.Work(time.Now().Format("05.0"), b.done)
time.Sleep(time.Second)
}
select {
case req := <-work:
fmt.Printf("balance(): received '%v'\n", req)
- b.dispatch(req)
+ // FIXME: Starting dispatch() as goroutine will fix deadlock,
+ // when there're now free worker. But now Balance() will spawn
+ // infinite number of dispatch() gophers, which will block
+ // until there will be free worker. This is probably wrong:
+ // requester should block, if request can't be processed, but
+ // not infinite number of dispatch() gophers are spawned.
+ go b.dispatch(req)
case done := <- b.done:
b.completed(done)
}
fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w)
w.runner.Send(req)
fmt.Printf("dispatch() '%v': sent to %v\n", req, w)
+ fmt.Println("pool", b.pool)
}
// completed re-inserts worker, which finished request, into proper place in the pool.
type request struct {
reqFn func() int // what to compute for this particular request.
+ id string // id of Generate(), which produced this request
num int // Used for request identification.
c chan reqResult // channel for obtaining results back from worker.
}
func (r request) String() string {
- return fmt.Sprintf("R %.2d", r.num)
+ return fmt.Sprintf("R %.2d [%v]", r.num, r.id)
}
var _ balancer.Requester = (*request)(nil)
fmt.Printf("%v: further processing result %v..\n", r.request, r.int)
}
-func Generate(work chan<- balancer.Requester) {
+func Generate(id string, work chan<- balancer.Requester) {
c := make(chan reqResult)
reqNum := 0
- t := rand.Intn(5)
- fmt.Printf("Generate(): sleeping for %v secs\n", t)
+ t := rand.Intn(3)
+ fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
timeout := time.After(time.Duration(t) * time.Second)
for {
select {
case <- timeout:
- req := &request{num: reqNum, c: c}
- f := func () int { fmt.Printf("... computing %v\n", req); return req.num }
+ req := &request{num: reqNum, id: id, c: c}
+ f := func () int {
+ t := rand.Intn(10)
+ fmt.Printf("... computing %v for %v secs\n", req, t)
+ time.Sleep(time.Duration(t) * time.Second)
+ return req.num
+ }
req.reqFn = f
- fmt.Printf("Generate(): sending '%v'\n", req)
+ fmt.Printf("Generate() [%v]: sending '%v'\n", id, req)
work <- req
- t := rand.Intn(5)
- fmt.Printf("Generate(): sleeping for %v secs\n", t)
+ t := rand.Intn(3)
+ fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
timeout = time.After(time.Duration(t) * time.Second)
reqNum++
case result := <-c:
- fmt.Printf("Generate(): received '%v' result %v\n", result.request, result.int)
+ fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int)
furtherProcess(result)
}
}
package main
import (
+ "time"
"balancer"
"balancer/worker"
"balancer/requester"
// FIXME: Add buffers to channels. What changes?
-var nWorker int = 4
+var nWorker int = 4 // How many workers to start.
+var nGenerator int = 2 // How many request generators to start.
func main() {
work := make(chan balancer.Requester)
workers[i] = &worker.Runner{Requests: c}
}
b := balancer.InitBalancer(workers)
- go requester.Generate(work)
+ for i := 0; i < nGenerator; i++ {
+ go requester.Generate(time.Now().Format("05.0"), work)
+ time.Sleep(time.Second)
+ }
b.Balance(work)
}