From: sgf Date: Tue, 1 Nov 2022 16:11:24 +0000 (+0300) Subject: chg(balancer): Move Requester implementation to separate package too. X-Git-Url: https://gitweb.sgf-dma.tk/?a=commitdiff_plain;h=e5fcbefbbd565815468f7fffab8438cb1e8d4ea8;p=go.git chg(balancer): Move Requester implementation to separate package too. --- diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go index cc295d9..a09a776 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/balancer.go @@ -8,7 +8,12 @@ import ( ) // Requester is an interface to requester. -// It's used by WorkRunner's implementation, not by balancer itself. +// It's used by WorkRunner's implementation in 'balancer/worker' package, not +// by balancer itself. But i can't move this definition to 'balancer/worker', +// because that'll create cyclic package dependencies: balancer needs +// 'Requester' interface in various places, thus needs to import +// 'balancer/worker', but worker needs 'WorkRunner' interface for implementing +// it, thus needs to import 'balancer'. type Requester interface { Fn() interface{} Send(interface{}) // Send result of Fn() back to requester. diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/go.mod b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/go.mod new file mode 100644 index 0000000..398bbc2 --- /dev/null +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/go.mod @@ -0,0 +1,7 @@ +module balancer/requester + +go 1.15 + +replace balancer => ../../balancer + +require balancer v0.0.0-00010101000000-000000000000 diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go new file mode 100644 index 0000000..593b98a --- /dev/null +++ b/concurrency-is-not-parallelism/balancer_v2/balancer/requester/requester.go @@ -0,0 +1,54 @@ + +package requester + +import ( + "fmt" + "time" + "math/rand" + "balancer" +) + +type request struct { + reqFn func() int // what to compute for this particular request. + num int // Used for request identification. + c chan int // channel for obtaining results back from worker. +} + +func (r request) String() string { + return fmt.Sprintf("R %.2d", r.num) +} + +var _ balancer.Requester = (*request)(nil) + +func (r *request) Fn() interface{} { + fmt.Printf("%v: Fn()..\n", r) + return interface{}(r.reqFn()) +} + +func (r *request) Send(res interface{}) { + r.c <- res.(int) +} + +func furtherProcess(r *request, result int) { + fmt.Printf("%v: further processing result %v..\n", r, result) +} + +func Generate(work chan<- balancer.Requester) { + c := make(chan int) + reqNum := 0 + for { + t := rand.Intn(5) + fmt.Printf("Generate(): sleeping for %v secs\n", t) + time.Sleep(time.Duration(t) * time.Second) + + f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum } + req := &request{reqFn: f, num: reqNum, c: c} + fmt.Printf("Generate() '%v': sending\n", req) + work <- req + result := <-c + fmt.Printf("Generate() '%v': received result %v\n", req, result) + furtherProcess(req, result) + reqNum++ + } +} + diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go b/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go index a2f3ecd..d92d138 100644 --- a/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go +++ b/concurrency-is-not-parallelism/balancer_v2/balancer_v2.go @@ -2,59 +2,13 @@ package main import ( - "fmt" - "time" - "math/rand" "balancer" "balancer/worker" + "balancer/requester" ) // FIXME: Add buffers to channels. What changes? -type request struct { - reqFn func() int // what to compute for this particular request. - num int // Used for request identification. - c chan int // channel for obtaining results back from worker. -} - -func (r request) String() string { - return fmt.Sprintf("R %.2d", r.num) -} - -var _ balancer.Requester = (*request)(nil) - -func (r *request) Fn() interface{} { - fmt.Printf("%v: Fn()..\n", r) - return interface{}(r.reqFn()) -} - -func (r *request) Send(res interface{}) { - r.c <- res.(int) -} - -func furtherProcess(r *request, result int) { - fmt.Printf("%v: further processing result %v..\n", r, result) -} - -func requester(work chan<- balancer.Requester) { - c := make(chan int) - reqNum := 0 - for { - t := rand.Intn(5) - fmt.Printf("requester(): sleeping for %v secs\n", t) - time.Sleep(time.Duration(t) * time.Second) - - f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum } - req := &request{reqFn: f, num: reqNum, c: c} - fmt.Printf("requester() '%v': sending\n", req) - work <- req - result := <-c - fmt.Printf("requester() '%v': received result %v\n", req, result) - furtherProcess(req, result) - reqNum++ - } -} - var nWorker int = 4 func main() { @@ -66,7 +20,7 @@ func main() { workers[i] = &worker.Runner{Requests: c} } b := balancer.InitBalancer(workers) - go requester(work) + go requester.Generate(work) b.Balance(work) } diff --git a/concurrency-is-not-parallelism/balancer_v2/go.mod b/concurrency-is-not-parallelism/balancer_v2/go.mod index 9a4e6e1..efdfb32 100644 --- a/concurrency-is-not-parallelism/balancer_v2/go.mod +++ b/concurrency-is-not-parallelism/balancer_v2/go.mod @@ -8,5 +8,8 @@ replace balancer/worker => ./balancer/worker require ( balancer v0.0.0-00010101000000-000000000000 + balancer/requester v0.0.0-00010101000000-000000000000 balancer/worker v0.0.0-00010101000000-000000000000 ) + +replace balancer/requester => ./balancer/requester