From 4e0a8c1c0664f044aa83eea204b86454a8796095 Mon Sep 17 00:00:00 2001 From: sgf Date: Mon, 28 Nov 2022 20:10:04 +0300 Subject: [PATCH] new(pipe): Read further. --- pipelines-and-cancellation/pipe.go | 156 ++++++++++++++++++++++++++++- 1 file changed, 155 insertions(+), 1 deletion(-) diff --git a/pipelines-and-cancellation/pipe.go b/pipelines-and-cancellation/pipe.go index e64cd4e..c6a21c5 100644 --- a/pipelines-and-cancellation/pipe.go +++ b/pipelines-and-cancellation/pipe.go @@ -59,6 +59,7 @@ func merge(cs ...<-chan int) <-chan int { } */ + // wg.Add() must be called in main thread, see below why. wg.Add(len(cs)) for _, c := range cs { // I need to pass channel as an argument to _copy_ channel value from @@ -84,7 +85,7 @@ func merge(cs ...<-chan int) <-chan int { return out } -func main() { +func main3() { in := gen(2, 3) c1 := sq(in) c2 := sq(in) @@ -94,3 +95,156 @@ func main() { } } +func merge4(end chan<- int, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + +/* + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } +*/ + + // wg.Add() must be called in main thread, see below why. + wg.Add(len(cs)) + for _, c := range cs { + // I need to pass channel as an argument to _copy_ channel value from + // particular cycle iteration. If i just use c inside closure, it + // won't work, because all closures created here reference _the same_ + // c, and this c will have the value from _the last_ cycle iteration. + go func (ch <-chan int) { + // If i call wg.Add() here it won't work, because this go routine may + // have no chance to run until wg.Wait() will be called. And + // then function will terminate immediately + for n := range ch { + out<- n + } + wg.Done() + }(c) + } + + go func () { + wg.Wait() + close(out) + end<- 0 + }() + + return out +} + +func main4() { + in := gen(2, 3) + c1 := sq(in) + c2 := sq(in) + end := make(chan int) + + out := merge4(end, c1, c2) + /* + for n := range out { + fmt.Println(n) + }*/ + fmt.Println(<-out) // 4 or 9 + <-end +} + +func gen5(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out<- n + } + close(out) + return out +} + +func main5() { + c := gen5(2, 3) + out := sq(c) + + fmt.Println(<-out) + fmt.Println(<-out) +} + +func gen6(done <-chan struct{}, nums ...int) <-chan int { + out := make(chan int, len(nums)) + + go func () { + // I should close the channel, when this function returns, not when + // gen6() itself returns. + defer close(out) + for _, n := range nums { + select { + case out<- n: + case <-done: + return + } + } + }() + return out +} + +func sq6(done <-chan struct{}, in <-chan int) <-chan int { + out := make(chan int) + + go func () { + defer close(out) + for n := range in { + select { + case out <- n*n: + case <-done: + return + } + } + }() + return out +} + +func merge6(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + output := func(c <-chan int) { + defer wg.Done() + for n := range c { + select { + case out <- n: + case <-done: + return + } + } + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + go func () { + wg.Wait() + close(out) + }() + + return out +} + +func main6() { + done := make(chan struct{}) + defer close(done) + in := gen6(done, 2, 3, 4, 5) + c1 := sq6(done, in) + c2 := sq6(done, in) + + out := merge6(done, c1, c2) + fmt.Println(<-out) // 4 or 9 + fmt.Println(<-out) // 4 or 9 + fmt.Println(<-out) // 4 or 9 +} + +func main() { + main6() +} -- 2.20.1