From: sgf Date: Thu, 27 Oct 2022 16:40:57 +0000 (+0300) Subject: new(balancer): Balancer v2 abstracting communication with interfaces. X-Git-Url: https://gitweb.sgf-dma.tk/?a=commitdiff_plain;h=179c7dbbee0263994cc64c79573abdf8405d367e;p=go.git new(balancer): Balancer v2 abstracting communication with interfaces. Abstract worker-requester and balancer-worker communication with interfaces, so i may substitute different requesters and workers. --- diff --git a/concurrency-is-not-parallelism/balancer_v2.go b/concurrency-is-not-parallelism/balancer_v2.go new file mode 100644 index 0000000..d341899 --- /dev/null +++ b/concurrency-is-not-parallelism/balancer_v2.go @@ -0,0 +1,109 @@ + +package main + +import ( + "fmt" + "time" + "math/rand" + balancer "./balancer_v2" +) + +// FIXME: Add buffers to channels. What changes? + +type request struct { + reqFn func() int // what to compute for this particular request. + num int // Used for request identification. + c chan int // channel for obtaining results back from worker. +} + +func (r request) String() string { + return fmt.Sprintf("R %.2d", r.num) +} + +var _ balancer.Requester = (*request)(nil) + +func (r *request) Fn() interface{} { + fmt.Printf("%v: Fn()..\n", r) + return interface{}(r.reqFn()) +} + +func (r *request) Send(res interface{}) { + r.c <- res.(int) +} + +func furtherProcess(r *request, result int) { + fmt.Printf("%v: further processing result %v..\n", r, result) +} + +func requester(work chan<- balancer.Requester) { + c := make(chan int) + reqNum := 0 + for { + t := rand.Intn(5) + fmt.Printf("requester(): sleeping for %v secs\n", t) + time.Sleep(time.Duration(t) * time.Second) + + f := func () int { fmt.Printf("... computing req num %v\n", reqNum); return reqNum } + req := &request{reqFn: f, num: reqNum, c: c} + fmt.Printf("requester() '%v': sending\n", req) + work <- req + result := <-c + fmt.Printf("requester() '%v': received result %v\n", req, result) + furtherProcess(req, result) + reqNum++ + } +} + + +type workRun struct { + requests chan balancer.Requester + id balancer.WorkerId +} + +func (w workRun) String() string { + return fmt.Sprintf("%v", w.id) +} +var _ balancer.WorkRunner = (*workRun)(nil) + +func (w *workRun) InitRunner(wid balancer.WorkerId) { + w.id = wid +} + +func (w *workRun) Work(id string, done chan<- *balancer.Done) { + t := rand.Intn(5) + fmt.Printf(" %v [%v]: sleeping for %v secs before begin..\n", w, id, t) + time.Sleep(time.Duration(t) * time.Second) + timeout := time.After(time.Second) + for { + select { + case req := <- w.requests: + fmt.Printf(" %v [%v]: start computing '%v'\n", w, id, req) + req.Send(req.Fn()) + done <- &balancer.Done{w.id, req} + fmt.Printf(" %v [%v]: finished '%v'\n", w, id, req) + case <-timeout: + fmt.Printf(" %v [%v]: i'm still here..\n", w.id, id) + timeout = time.After(time.Second) + } + } +} + +func (w *workRun) Send(req balancer.Requester) { + w.requests <- req +} + +var nWorker int = 4 + +func main() { + work := make(chan balancer.Requester) + + workers := make([]balancer.WorkRunner, nWorker) + for i := 0; i < nWorker; i++ { + c := make(chan balancer.Requester) + workers[i] = &workRun{requests: c} + } + b := balancer.InitBalancer(workers) + go requester(work) + b.Balance(work) +} + diff --git a/concurrency-is-not-parallelism/balancer_v2/balancer.go b/concurrency-is-not-parallelism/balancer_v2/balancer.go new file mode 100644 index 0000000..c62989c --- /dev/null +++ b/concurrency-is-not-parallelism/balancer_v2/balancer.go @@ -0,0 +1,146 @@ +// Balancer v2 with requester and work runner interfaces. +package balancer_v2 + +import ( + "fmt" + "time" + "container/heap" +) + +// Requester is an interface to requester. +// It's used by WorkRunner's implementation, not by balancer itself. +type Requester interface { + Fn() interface{} + Send(interface{}) // Send result of Fn() back to requester. +} + +// WorkerId is identification of a particular runner. +// It's needed to abstract '*worker' pointer in Done channel. +type WorkerId *worker + +// WorkRunner is an interface, which every work runner should conform to. +// This is an actual implementation running Requester's Fn() function and +// returning result back to Requester. +type WorkRunner interface { + InitRunner(WorkerId) // Initialize runner with id inside balancer. + Work(string, chan <- *Done) // Start runner. + Send(Requester) // Send request to WorkerRunner. +} + +// worker as seen by balancer. +// WorkRunner implementation does not important for balancer. It only needs a +// way to sort workers by the number of pending tasks. +type worker struct { + runner WorkRunner + num int // Used for worker identification. + + pending int // Used by sort for pool. + index int // Used by heap for pool. +} + +func (w worker) String() string { + return fmt.Sprintf("W %.2d (%v)", w.num, w.pending) +} + +// Done message is sent by work runner to balancer, when request is completed. +type Done struct { + Worker WorkerId + Req Requester +} + +// Pool of workers, which balancer manages. +type pool []*worker + +func (p pool) Len() int { return len(p) } +func (p pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p pool) Less(i, j int) bool { + return p[i].pending < p[j].pending +} + +func (p *pool) Push(i interface{}) { + w := i.(*worker) + w.index = len(*p) + *p = append(*p, w) +} + +func (p *pool) Pop() interface{} { + old := *p + n := len(old) + w := old[n-1] + w.index = -1 + old[n-1] = nil + *p = old[:n-1] + return w +} + +// Balancer itself. +// It consists from pool of workers and done channel, using which work runners +// will report finished requests back to balancer. +type Balancer struct { + pool pool + done chan *Done +} + +func (b *Balancer) Balance(work <-chan Requester) { + fmt.Println("balance(): Starting workers..") + for _, w := range b.pool { + go w.runner.Work(time.Now().Format("05.000"), b.done) + time.Sleep(time.Second) + } + + timeout := time.After(20 * time.Second) +out: + for { + fmt.Println() + fmt.Println("balance(): Starting loop..") + select { + case <-timeout: + fmt.Println("balance(): your time is out.") + break out + default: + select { + case req := <-work: + fmt.Printf("balance(): received '%v'\n", req) + b.dispatch(req) + case done := <- b.done: + b.completed(done) + } + } + } +} + +// dispatch sends requests to least loaded worker. +func (b *Balancer) dispatch(req Requester) { + w := heap.Pop(&b.pool).(*worker) + fmt.Printf("dispatch() '%v': sending to '%v'\n", req, w) + w.pending++ + heap.Push(&b.pool, w) + w.runner.Send(req) + fmt.Printf("dispatch() '%v': sent\n", req) +} + +// completed re-inserts worker, which finished request, into proper place in the pool. +func (b *Balancer) completed(done *Done) { + fmt.Printf("completed() '%v': finished '%v'\n", done.Worker, done.Req) + w := (*worker)(done.Worker) + w.pending-- + heap.Remove(&b.pool, w.index) + heap.Push(&b.pool, w) +} + +// InitBalancer initializes balancer with provided workers. +func InitBalancer(runners []WorkRunner) *Balancer { + n := len(runners) + pool := make(pool, n) + for i := 0; i < n; i++ { + w := worker{num: i, pending: 0, index: i} + runners[i].InitRunner(&w) + w.runner = runners[i] + pool[i] = &w + } + heap.Init(&pool) + + done := make(chan *Done) + return &Balancer {pool: pool, done: done} +} + diff --git a/concurrency-is-not-parallelism/balancer_v2/go.mod b/concurrency-is-not-parallelism/balancer_v2/go.mod new file mode 100644 index 0000000..e35bd31 --- /dev/null +++ b/concurrency-is-not-parallelism/balancer_v2/go.mod @@ -0,0 +1,3 @@ +module balancer_v2 + +go 1.15