new(pipe): Save some code from "Pipelines and cancellation" blog post.
authorsgf <sgf.dma@gmail.com>
Fri, 25 Nov 2022 14:42:12 +0000 (17:42 +0300)
committersgf <sgf.dma@gmail.com>
Fri, 25 Nov 2022 14:42:12 +0000 (17:42 +0300)
pipelines-and-cancellation/pipe.go [new file with mode: 0644]

diff --git a/pipelines-and-cancellation/pipe.go b/pipelines-and-cancellation/pipe.go
new file mode 100644 (file)
index 0000000..e64cd4e
--- /dev/null
@@ -0,0 +1,96 @@
+
+package main
+
+import (
+    "fmt"
+    "sync"
+)
+
+func gen(nums ...int) <-chan int {
+    out := make(chan int)
+    go func () {
+        for _, n := range nums {
+            out <- n
+        }
+        close(out)
+    }()
+    return out
+}
+
+func sq(in <-chan int) <-chan int {
+    out := make(chan int)
+    go func () {
+        for n := range in {
+            out <- n*n
+        }
+        close(out)
+    }()
+    return out
+}
+
+func main1() {
+    c := gen(2, 3)
+    out := sq(c)
+
+    fmt.Println(<-out)
+    fmt.Println(<-out)
+}
+
+func main2() {
+    for n := range sq(sq(gen(2, 3))) {
+        fmt.Println(n)
+    }
+}
+
+func merge(cs ...<-chan int) <-chan int {
+    var wg sync.WaitGroup
+    out := make(chan int)
+
+/*
+    output := func(c <-chan int) {
+        for n := range c {
+            out <- n
+        }
+        wg.Done()
+    }
+    wg.Add(len(cs))
+    for _, c := range cs {
+        go output(c)
+    }
+*/
+
+    wg.Add(len(cs))
+    for _, c := range cs {
+        // I need to pass channel as an argument to _copy_ channel value from
+        // particular cycle iteration. If i just use c inside closure, it
+        // won't work, because all closures created here reference _the same_
+        // c, and this c will have the value from _the last_ cycle iteration.
+        go func (ch <-chan int) {
+            // If i call wg.Add() here it won't work, because this go routine may
+            // have no chance to run until wg.Wait() will be called. And
+            // then function will terminate immediately
+            for n := range ch {
+                out<- n
+            }
+            wg.Done()
+        }(c)
+    }
+
+    go func () {
+        wg.Wait()
+        close(out)
+    }()
+
+    return out
+}
+
+func main() {
+    in := gen(2, 3)
+    c1 := sq(in)
+    c2 := sq(in)
+
+    for n := range merge(c1, c2) {
+        fmt.Println(n)
+    }
+}
+