fmt.Printf("%v: further processing result %v..\n", r.request, r.int)
}
-func Generate(id string, work chan<- balancer.Requester) {
+type tuple struct {
+ a interface{}
+ b interface{}
+}
+
+func sendDelay(waitBefore func (balancer.Requester) <-chan time.Time, work chan<- balancer.Requester) chan<- balancer.Requester {
+ c := make(chan balancer.Requester)
+
+ go func() {
+ for {
+ req := <-c
+ <-waitBefore(req)
+ work<- req
+ }
+ }()
+
+ return c
+}
+
+func Generate(id string, wCh chan<- balancer.Requester) {
c := make(chan reqResult)
genReq := func (n int) balancer.Requester {
return req
}
- t := rand.Intn(3)
- timeout := time.After(time.Duration(t) * time.Second)
- fmt.Printf("Generate() [%v]: sleeping for %v secs\n", id, t)
+ waitBefore := func (req balancer.Requester) <-chan time.Time {
+ t := rand.Intn(3)
+ fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req)
+ return time.After(time.Duration(t) * time.Second)
+ }
reqNum := 0
req := genReq(reqNum)
+ work := sendDelay(waitBefore, wCh)
ping := time.After(time.Second)
for {
select {
- // FIXME: Use fanIn to merge timeout with work channels.
- case <-timeout:
- select {
- case work <-req:
-
- fmt.Printf("Generate() [%v]: sent '%v'\n", id, req)
- reqNum++
- req = genReq(reqNum)
-
- t := rand.Intn(3)
- timeout = time.After(time.Duration(t) * time.Second)
- fmt.Printf("Generate() [%v]: sleeping for %v secs before '%v'\n", id, t, req)
-
- case result := <-c:
- fmt.Printf("Generate() [%v]: received '%v' result %v while waiting at '%v'\n", id, result.request, result.int, req)
- furtherProcess(result)
- case <-ping:
- fmt.Printf("Generate() [%v]: pong while waiting at '%v'..\n", id, req)
- ping = time.After(time.Second)
- }
+ case work<- req:
+ fmt.Printf("Generate() [%v]: sent '%v'\n", id, req)
+ reqNum++
+ req = genReq(reqNum)
case result := <-c:
fmt.Printf("Generate() [%v]: received '%v' result %v\n", id, result.request, result.int)
furtherProcess(result)
+
case <-ping:
fmt.Printf("Generate() [%v]: pong..\n", id)
ping = time.After(time.Second)
"balancer/requester"
)
-// FIXME: Add buffers to channels. What changes?
-
var nWorker int = 4 // How many workers to start.
var nGenerator int = 2 // How many request generators to start.
func main() {
- work := make(chan balancer.Requester)
+ work := make(chan balancer.Requester, 3)
workers := make([]balancer.WorkRunner, nWorker)
for i := 0; i < nWorker; i++ {
- c := make(chan balancer.Requester)
+ c := make(chan balancer.Requester, 2)
workers[i] = &worker.Runner{Requests: c}
}
b := balancer.InitBalancer(workers)