)
// Requester is an interface to requester.
-// It's used by WorkRunner's implementation, not by balancer itself.
+// It's used by WorkRunner's implementation in 'balancer/worker' package, not
+// by balancer itself. But i can't move this definition to 'balancer/worker',
+// because that'll create cyclic package dependencies: balancer needs
+// 'Requester' interface in various places, thus needs to import
+// 'balancer/worker', but worker needs 'WorkRunner' interface for implementing
+// it, thus needs to import 'balancer'.
type Requester interface {
Fn() interface{}
Send(interface{}) // Send result of Fn() back to requester.
--- /dev/null
+module balancer/requester
+
+go 1.15
+
+replace balancer => ../../balancer
+
+require balancer v0.0.0-00010101000000-000000000000
--- /dev/null
+
+package requester
+
+import (
+ "fmt"
+ "time"
+ "math/rand"
+ "balancer"
+)
+
+type request struct {
+ reqFn func() int // what to compute for this particular request.
+ num int // Used for request identification.
+ c chan int // channel for obtaining results back from worker.
+}
+
+func (r request) String() string {
+ return fmt.Sprintf("R %.2d", r.num)
+}
+
+var _ balancer.Requester = (*request)(nil)
+
+func (r *request) Fn() interface{} {
+ fmt.Printf("%v: Fn()..\n", r)
+ return interface{}(r.reqFn())
+}
+
+func (r *request) Send(res interface{}) {
+ r.c <- res.(int)
+}
+
+func furtherProcess(r *request, result int) {
+ fmt.Printf("%v: further processing result %v..\n", r, result)
+}
+
+func Generate(work chan<- balancer.Requester) {
+ c := make(chan int)
+ reqNum := 0
+ for {
+ t := rand.Intn(5)
+ fmt.Printf("Generate(): sleeping for %v secs\n", t)
+ time.Sleep(time.Duration(t) * time.Second)
+
+ f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum }
+ req := &request{reqFn: f, num: reqNum, c: c}
+ fmt.Printf("Generate() '%v': sending\n", req)
+ work <- req
+ result := <-c
+ fmt.Printf("Generate() '%v': received result %v\n", req, result)
+ furtherProcess(req, result)
+ reqNum++
+ }
+}
+
package main
import (
- "fmt"
- "time"
- "math/rand"
"balancer"
"balancer/worker"
+ "balancer/requester"
)
// FIXME: Add buffers to channels. What changes?
-type request struct {
- reqFn func() int // what to compute for this particular request.
- num int // Used for request identification.
- c chan int // channel for obtaining results back from worker.
-}
-
-func (r request) String() string {
- return fmt.Sprintf("R %.2d", r.num)
-}
-
-var _ balancer.Requester = (*request)(nil)
-
-func (r *request) Fn() interface{} {
- fmt.Printf("%v: Fn()..\n", r)
- return interface{}(r.reqFn())
-}
-
-func (r *request) Send(res interface{}) {
- r.c <- res.(int)
-}
-
-func furtherProcess(r *request, result int) {
- fmt.Printf("%v: further processing result %v..\n", r, result)
-}
-
-func requester(work chan<- balancer.Requester) {
- c := make(chan int)
- reqNum := 0
- for {
- t := rand.Intn(5)
- fmt.Printf("requester(): sleeping for %v secs\n", t)
- time.Sleep(time.Duration(t) * time.Second)
-
- f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum }
- req := &request{reqFn: f, num: reqNum, c: c}
- fmt.Printf("requester() '%v': sending\n", req)
- work <- req
- result := <-c
- fmt.Printf("requester() '%v': received result %v\n", req, result)
- furtherProcess(req, result)
- reqNum++
- }
-}
-
var nWorker int = 4
func main() {
workers[i] = &worker.Runner{Requests: c}
}
b := balancer.InitBalancer(workers)
- go requester(work)
+ go requester.Generate(work)
b.Balance(work)
}
require (
balancer v0.0.0-00010101000000-000000000000
+ balancer/requester v0.0.0-00010101000000-000000000000
balancer/worker v0.0.0-00010101000000-000000000000
)
+
+replace balancer/requester => ./balancer/requester