}
*/
+ // wg.Add() must be called in main thread, see below why.
wg.Add(len(cs))
for _, c := range cs {
// I need to pass channel as an argument to _copy_ channel value from
return out
}
-func main() {
+func main3() {
in := gen(2, 3)
c1 := sq(in)
c2 := sq(in)
}
}
+func merge4(end chan<- int, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+/*
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+*/
+
+ // wg.Add() must be called in main thread, see below why.
+ wg.Add(len(cs))
+ for _, c := range cs {
+ // I need to pass channel as an argument to _copy_ channel value from
+ // particular cycle iteration. If i just use c inside closure, it
+ // won't work, because all closures created here reference _the same_
+ // c, and this c will have the value from _the last_ cycle iteration.
+ go func (ch <-chan int) {
+ // If i call wg.Add() here it won't work, because this go routine may
+ // have no chance to run until wg.Wait() will be called. And
+ // then function will terminate immediately
+ for n := range ch {
+ out<- n
+ }
+ wg.Done()
+ }(c)
+ }
+
+ go func () {
+ wg.Wait()
+ close(out)
+ end<- 0
+ }()
+
+ return out
+}
+
+func main4() {
+ in := gen(2, 3)
+ c1 := sq(in)
+ c2 := sq(in)
+ end := make(chan int)
+
+ out := merge4(end, c1, c2)
+ /*
+ for n := range out {
+ fmt.Println(n)
+ }*/
+ fmt.Println(<-out) // 4 or 9
+ <-end
+}
+
+func gen5(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out<- n
+ }
+ close(out)
+ return out
+}
+
+func main5() {
+ c := gen5(2, 3)
+ out := sq(c)
+
+ fmt.Println(<-out)
+ fmt.Println(<-out)
+}
+
+func gen6(done <-chan struct{}, nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+
+ go func () {
+ // I should close the channel, when this function returns, not when
+ // gen6() itself returns.
+ defer close(out)
+ for _, n := range nums {
+ select {
+ case out<- n:
+ case <-done:
+ return
+ }
+ }
+ }()
+ return out
+}
+
+func sq6(done <-chan struct{}, in <-chan int) <-chan int {
+ out := make(chan int)
+
+ go func () {
+ defer close(out)
+ for n := range in {
+ select {
+ case out <- n*n:
+ case <-done:
+ return
+ }
+ }
+ }()
+ return out
+}
+
+func merge6(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ output := func(c <-chan int) {
+ defer wg.Done()
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done:
+ return
+ }
+ }
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ go func () {
+ wg.Wait()
+ close(out)
+ }()
+
+ return out
+}
+
+func main6() {
+ done := make(chan struct{})
+ defer close(done)
+ in := gen6(done, 2, 3, 4, 5)
+ c1 := sq6(done, in)
+ c2 := sq6(done, in)
+
+ out := merge6(done, c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ fmt.Println(<-out) // 4 or 9
+ fmt.Println(<-out) // 4 or 9
+}
+
+func main() {
+ main6()
+}