From 5fe02c63fbb9a380820cea6c098f8c9476deec77 Mon Sep 17 00:00:00 2001 From: sgf Date: Fri, 25 Nov 2022 17:42:12 +0300 Subject: [PATCH] new(pipe): Save some code from "Pipelines and cancellation" blog post. --- pipelines-and-cancellation/pipe.go | 96 ++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 pipelines-and-cancellation/pipe.go diff --git a/pipelines-and-cancellation/pipe.go b/pipelines-and-cancellation/pipe.go new file mode 100644 index 0000000..e64cd4e --- /dev/null +++ b/pipelines-and-cancellation/pipe.go @@ -0,0 +1,96 @@ + +package main + +import ( + "fmt" + "sync" +) + +func gen(nums ...int) <-chan int { + out := make(chan int) + go func () { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} + +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func () { + for n := range in { + out <- n*n + } + close(out) + }() + return out +} + +func main1() { + c := gen(2, 3) + out := sq(c) + + fmt.Println(<-out) + fmt.Println(<-out) +} + +func main2() { + for n := range sq(sq(gen(2, 3))) { + fmt.Println(n) + } +} + +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + +/* + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } +*/ + + wg.Add(len(cs)) + for _, c := range cs { + // I need to pass channel as an argument to _copy_ channel value from + // particular cycle iteration. If i just use c inside closure, it + // won't work, because all closures created here reference _the same_ + // c, and this c will have the value from _the last_ cycle iteration. + go func (ch <-chan int) { + // If i call wg.Add() here it won't work, because this go routine may + // have no chance to run until wg.Wait() will be called. And + // then function will terminate immediately + for n := range ch { + out<- n + } + wg.Done() + }(c) + } + + go func () { + wg.Wait() + close(out) + }() + + return out +} + +func main() { + in := gen(2, 3) + c1 := sq(in) + c2 := sq(in) + + for n := range merge(c1, c2) { + fmt.Println(n) + } +} + -- 2.20.1