new(pipe): Read further.
authorsgf <sgf.dma@gmail.com>
Mon, 28 Nov 2022 17:10:04 +0000 (20:10 +0300)
committersgf <sgf.dma@gmail.com>
Mon, 28 Nov 2022 17:10:04 +0000 (20:10 +0300)
pipelines-and-cancellation/pipe.go

index e64cd4e..c6a21c5 100644 (file)
@@ -59,6 +59,7 @@ func merge(cs ...<-chan int) <-chan int {
     }
 */
 
+    // wg.Add() must be called in main thread, see below why.
     wg.Add(len(cs))
     for _, c := range cs {
         // I need to pass channel as an argument to _copy_ channel value from
@@ -84,7 +85,7 @@ func merge(cs ...<-chan int) <-chan int {
     return out
 }
 
-func main() {
+func main3() {
     in := gen(2, 3)
     c1 := sq(in)
     c2 := sq(in)
@@ -94,3 +95,156 @@ func main() {
     }
 }
 
+func merge4(end chan<- int, cs ...<-chan int) <-chan int {
+    var wg sync.WaitGroup
+    out := make(chan int)
+
+/*
+    output := func(c <-chan int) {
+        for n := range c {
+            out <- n
+        }
+        wg.Done()
+    }
+    wg.Add(len(cs))
+    for _, c := range cs {
+        go output(c)
+    }
+*/
+
+    // wg.Add() must be called in main thread, see below why.
+    wg.Add(len(cs))
+    for _, c := range cs {
+        // I need to pass channel as an argument to _copy_ channel value from
+        // particular cycle iteration. If i just use c inside closure, it
+        // won't work, because all closures created here reference _the same_
+        // c, and this c will have the value from _the last_ cycle iteration.
+        go func (ch <-chan int) {
+            // If i call wg.Add() here it won't work, because this go routine may
+            // have no chance to run until wg.Wait() will be called. And
+            // then function will terminate immediately
+            for n := range ch {
+                out<- n
+            }
+            wg.Done()
+        }(c)
+    }
+
+    go func () {
+        wg.Wait()
+        close(out)
+        end<- 0
+    }()
+
+    return out
+}
+
+func main4() {
+    in := gen(2, 3)
+    c1 := sq(in)
+    c2 := sq(in)
+    end := make(chan int)
+
+    out := merge4(end, c1, c2)
+    /*
+    for n := range out {
+        fmt.Println(n)
+    }*/
+    fmt.Println(<-out) // 4 or 9
+    <-end
+}
+
+func gen5(nums ...int) <-chan int {
+    out := make(chan int, len(nums))
+    for _, n := range nums {
+        out<- n
+    }
+    close(out)
+    return out
+}
+
+func main5() {
+    c := gen5(2, 3)
+    out := sq(c)
+
+    fmt.Println(<-out)
+    fmt.Println(<-out)
+}
+
+func gen6(done <-chan struct{}, nums ...int) <-chan int {
+    out := make(chan int, len(nums))
+
+    go func () {
+        // I should close the channel, when this function returns, not when
+        // gen6() itself returns.
+        defer close(out)
+        for _, n := range nums {
+            select {
+            case out<- n:
+            case <-done:
+                return
+            }
+        }
+    }()
+    return out
+}
+
+func sq6(done <-chan struct{}, in <-chan int) <-chan int {
+    out := make(chan int)
+
+    go func () {
+        defer close(out)
+        for n := range in {
+            select {
+            case out <- n*n:
+            case <-done:
+                return
+            }
+        }
+    }()
+    return out
+}
+
+func merge6(done <-chan struct{}, cs ...<-chan int) <-chan int {
+    var wg sync.WaitGroup
+    out := make(chan int)
+
+    output := func(c <-chan int) {
+        defer wg.Done()
+        for n := range c {
+            select {
+            case out <- n:
+            case <-done:
+                return
+            }
+        }
+    }
+    wg.Add(len(cs))
+    for _, c := range cs {
+        go output(c)
+    }
+
+    go func () {
+        wg.Wait()
+        close(out)
+    }()
+
+    return out
+}
+
+func main6() {
+    done := make(chan struct{})
+    defer close(done)
+    in := gen6(done, 2, 3, 4, 5)
+    c1 := sq6(done, in)
+    c2 := sq6(done, in)
+
+    out := merge6(done, c1, c2)
+    fmt.Println(<-out) // 4 or 9
+    fmt.Println(<-out) // 4 or 9
+    fmt.Println(<-out) // 4 or 9
+}
+
+func main() {
+    main6()
+}