--- /dev/null
+
+package main
+
+import (
+ "fmt"
+ "log"
+ "io"
+ "bytes"
+ "sync"
+ "time"
+)
+
+func elem(y rune, xs string) bool {
+ for _, x := range xs {
+ if y == x {
+ return true
+ }
+ }
+ return false
+}
+
+func Execute(w io.Writer) error {
+ bs := "AaBbCcDdEeFfGg"
+
+ n := 7
+
+ for len(bs) > 0 {
+ fmt.Printf("Execute(): Trying to write %v\n", bs[:n])
+ w.Write([]byte(bs[:n]))
+ if elem('x', bs) {
+ fmt.Printf("Execute(): End with error\n")
+ return fmt.Errorf("UPS")
+ }
+ bs = bs[n:]
+ }
+ fmt.Printf("Execute(): End\n")
+ return nil
+}
+
+type RW struct {
+ buf bytes.Buffer
+}
+
+func (r *RW) Read(p []byte) (int, error) {
+ err2 := Execute(&r.buf)
+ n, err := r.buf.Read(p)
+ if err2 != nil {
+ return n, err2
+ }
+ if err != nil {
+ return n, err
+ }
+ return n, io.EOF
+}
+
+type Pipe1 struct {
+ ch chan byte
+ writerReady chan (chan interface{})
+ writing sync.Mutex
+ err error
+}
+
+func NewPipe1() *Pipe1 {
+ pw := Pipe1 {
+ ch: make(chan byte),
+ writerReady: make(chan (chan interface{})),
+ }
+ return &pw
+}
+
+func (p *Pipe1) Write(bs []byte) (int, error) {
+ p.writing.Lock()
+ defer p.writing.Unlock()
+
+ if p.err != nil {
+ return 0, p.err
+ }
+
+ done := make(chan interface{})
+ defer close(done)
+
+ defer time.Sleep(time.Second)
+
+ p.writerReady <- done
+
+ fmt.Printf("[%s]: Writing (done = %p)\n", bs, done)
+
+ for _, b := range bs {
+ fmt.Printf("[%s]: Try to write %q\n", bs, b)
+ p.ch<- b
+ fmt.Printf("[%s]: Written %q\n", bs, b)
+ }
+
+ fmt.Printf("[%s]: Finished\n", bs)
+ return len(bs), nil
+}
+
+func (p *Pipe1) Read(bs []byte) (int, error) {
+ done, ok := <-p.writerReady
+ if !ok {
+ return 0, p.err
+ }
+
+ fmt.Printf("(%v): Start reading (done = %p)\n", len(bs), done)
+ for i := 0; i < len(bs); i++ {
+ select {
+ case <-done:
+ fmt.Printf("(%v): Writer terminated, so we're at %v\n", len(bs), i)
+ return i, nil
+
+ case b := <-p.ch:
+ fmt.Printf("(%v): Read %q\n", len(bs), b)
+ bs[i] = b
+ }
+ }
+
+ go func() {
+ fmt.Printf("(%v): Resending done (%p)\n", len(bs), done)
+ // When next writer is CloseWithError() and it closes 'writerReady'
+ // channel, "send on closed channel" error may occur. Because 'select'
+ // does not protect against writing to closed channel, even if other
+ // case branch also can proceed ('done' already closed).
+ //
+ // In fact, this is a Pipe1 design bug: only writers should write to
+ // 'writerReady' channel and, thus, this gopher is essentially a
+ // writer. But this gopher does not grab 'writing' lock, because then
+ // a whole program will deadlock (because this gopher is not a writer
+ // in fact and should not grab that lock). And here we're:
+ // CloseWithError(), which is a writer, grabs 'writing' lock and
+ // closes send-only (from its point of view) channel, but suddenly
+ // there's another writer running along..
+ //
+ // This bug is properly fixed in Pipe2.
+ select {
+ case <-done:
+ fmt.Printf("(%v): Writer (%p) also terminated, abort\n", len(bs), done)
+ case p.writerReady<- done:
+ fmt.Printf("(%v): Done channel sent (%p)\n", len(bs), done)
+ }
+ }()
+
+ return len(bs), nil
+}
+
+func (p *Pipe1) Close() error {
+ return p.CloseWithError(nil)
+}
+
+func (p *Pipe1) CloseWithError(err error) error {
+ p.writing.Lock()
+ defer p.writing.Unlock()
+
+ if p.err != nil {
+ log.Fatal(fmt.Errorf("Closing of already closed pipe"))
+ }
+
+ if err != nil {
+ p.err = err
+ } else {
+ p.err = io.EOF
+ }
+
+ close(p.writerReady)
+ fmt.Printf("Closed writerReady channel\n")
+ return nil
+}
+
+type Pipe2 struct {
+ ch chan byte
+ nextWriter chan *writer
+ writerReady chan writerReady
+ err error
+}
+
+func NewPipe2() *Pipe2 {
+ p := Pipe2 {
+ ch: make(chan byte),
+ nextWriter: make(chan *writer),
+ writerReady: make(chan writerReady),
+ }
+ go func() { p.nextWriter <- nil }()
+
+ return &p
+}
+
+type writerReady struct {
+ done chan interface{}
+ readerReady chan (chan interface{})
+}
+
+type writer struct {
+ done chan interface{}
+ readDone chan interface{}
+ readerReady chan (chan interface{})
+}
+
+var ErrReaderClosed = fmt.Errorf("Read pipe closed")
+
+func (w *writer) WaitForReader(ready chan writerReady) error {
+ fmt.Printf("WaitForReader(): Called on done = %p, readyReady = %p\n", w.done, w.readerReady)
+ ready <- writerReady {
+ done: w.done,
+ readerReady: w.readerReady,
+ }
+
+ var ok bool
+ w.readDone, ok = <-w.readerReady
+ if !ok {
+ fmt.Printf("Read pipe closed\n")
+ close(w.done)
+ return ErrReaderClosed
+ }
+
+ return nil
+}
+
+func (p *Pipe2) Write(bs []byte) (int, error) {
+ w, ok := <-p.nextWriter
+ if !ok {
+ fmt.Printf("Read pipe closed\n")
+ return 0, ErrReaderClosed
+ }
+
+ if w == nil {
+ w = &writer {
+ done: make(chan interface{}),
+ readerReady: make(chan chan interface{}),
+ }
+ if err := w.WaitForReader(p.writerReady); err != nil {
+ return 0, err
+ }
+ fmt.Printf("[%s]: Got reader: %p\n", bs, w.readDone)
+
+ } else {
+ fmt.Printf("[%s]: Continue writing to previous reader: %p\n", bs, w.readDone)
+ }
+
+ fmt.Printf("[%s]: Writing (done = %p)\n", bs, w.done)
+ for i := 0; i < len(bs); {
+ b := bs[i]
+ fmt.Printf("[%s]: Trying to write %q\n", bs, b)
+ select {
+ case p.ch<- b:
+ fmt.Printf("[%s]: Written %q\n", bs, b)
+ i++
+
+ case <-w.readDone:
+ if err := w.WaitForReader(p.writerReady); err != nil {
+ return i, err
+ }
+ fmt.Printf("[%s]: Got next reader: %p\n", bs, w.readDone)
+ }
+ }
+
+ go func(bs []byte) {
+ select {
+ default:
+ fmt.Printf("[%s]: No next writer waiting, closing\n", bs)
+ close(w.done)
+ close(w.readerReady)
+ p.nextWriter <- nil
+
+ case p.nextWriter <- w:
+ fmt.Printf("[%s]: Passed over to next writer\n", bs)
+ }
+ }(bs)
+
+ fmt.Printf("[%s]: Finished\n", bs)
+ return len(bs), nil
+}
+
+func (p *Pipe2) Read(bs []byte) (int, error) {
+ wr, ok := <-p.writerReady
+ if !ok {
+ fmt.Printf("Write pipe closed\n")
+ return 0, p.err
+ }
+ fmt.Printf("(%v): Got writer: %p\n", len(bs), wr.done)
+
+ done := make(chan interface{})
+ defer close(done)
+
+ writeDone := wr.done
+ wr.readerReady <- done
+
+ fmt.Printf("(%v): Reading (done = %p)\n", len(bs), done)
+ for i := 0; i < len(bs); i++ {
+ select {
+ case <-writeDone:
+ fmt.Printf("(%v): Writer (%p) terminated, so we're at %v\n", len(bs), writeDone, i)
+ return i, nil
+
+ case b := <-p.ch:
+ fmt.Printf("(%v): Read %q\n", len(bs), b)
+ bs[i] = b
+ }
+ }
+
+ fmt.Printf("(%v): Read finished (done = %p)\n", len(bs), done)
+ return len(bs), nil
+}
+
+func (p *Pipe2) Close() error {
+ return p.CloseWithError(nil)
+}
+
+func (p *Pipe2) CloseWithError(err error) error {
+ select {
+ // Become a writer.
+ case w, ok := <-p.nextWriter:
+ if !ok {
+ log.Fatal(fmt.Errorf("Closing of already closed pipe"))
+ }
+
+ fmt.Printf("Close(): Became a writer\n")
+ if w != nil {
+ close(w.done)
+ close(w.readerReady)
+ }
+
+ // Become a reader.
+ case wr, ok := <-p.writerReady:
+ if !ok {
+ log.Fatal(fmt.Errorf("Closing of already closed pipe"))
+ }
+
+ fmt.Printf("Close(): Became a reader\n")
+ fmt.Printf("Close(): Got writer: %p/%p\n", wr.done, wr.readerReady)
+ // Terminate writer.
+ close(wr.readerReady)
+ }
+
+ // First set error, than cancel all Read-s.
+ p.err = io.EOF
+ if err != nil {
+ p.err = err
+ }
+
+ // Cancel all new Read-s.
+ close(p.writerReady)
+ // Cancel all new Write-s.
+ close(p.nextWriter)
+
+ fmt.Printf("Close(): finished\n")
+ return nil
+}
+
+type WriteCloserWithError interface {
+ io.WriteCloser
+ CloseWithError(error) error
+}
+
+type PipeFunc struct {
+ pReader io.Reader
+ pWriter WriteCloserWithError
+}
+
+func NewPipeFunc(f func (io.Writer) error) *PipeFunc {
+ //t := NewPipe2()
+ pr, pw := io.Pipe()
+ p := PipeFunc{pReader: pr, pWriter: pw}
+ go func() {
+ // Test "Become a reader" of CloseWithError() from Pipe2.
+ //go func() { p.pWriter.CloseWithError(fmt.Errorf("NAHUY")) }()
+
+ err := f(p.pWriter)
+ time.Sleep(time.Second)
+ p.pWriter.CloseWithError(err)
+ }()
+ return &p
+}
+
+func (p *PipeFunc) Read(bs []byte) (int, error) {
+ var i int
+ fmt.Printf("Start reading from func to %v\n", len(bs))
+ for i < len(bs) {
+ n, err := p.pReader.Read(bs[i:])
+ fmt.Printf("Read ended at: %v, %v\n", n, err)
+ i += n
+ if err != nil {
+ return i, err
+ }
+ }
+
+ fmt.Printf("Successfully read from func: %v\n", i)
+ return i, nil
+}
+
+func main() {
+ var buf bytes.Buffer
+ var rw RW
+ n2, err := buf.ReadFrom(&rw)
+ fmt.Printf("\nRET: (%v)%s, err = %v\n", n2, buf.String(), err)
+
+ fmt.Println()
+
+ var n int
+ bf := make([]byte, 8)
+ //errC := make(chan error)
+ //var err2 error
+
+ /*
+ fmt.Println()
+ err2 = nil
+ err = nil
+ r, w := io.Pipe()
+ go func () { errC <- Execute(w) }()
+ n, err = r.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ n, err = r.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ n, err = r.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ err2 = <-errC
+ //n, err = r.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ */
+
+ /*
+ fmt.Println()
+ err2 = nil
+ err = nil
+ pw := NewPipe1()
+ go func () { errC <- Execute(pw) }()
+ n, err = pw.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ n, err = pw.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ n, err = pw.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ err2 = <-errC
+ //n, err = pw.Read(bf)
+ fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+ */
+
+ for i := 0; i < len(bf); i++ {
+ bf[i] = 0
+ }
+
+ fmt.Println()
+ err = nil
+ pf := NewPipeFunc(Execute)
+ n, err = pf.Read(bf[:3])
+ fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+ n, err = pf.Read(bf[:3])
+ fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+ n, err = pf.Read(bf[:3])
+ fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+ n, err = pf.Read(bf[:3])
+ fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+ n, err = pf.Read(bf[:2])
+ fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+ time.Sleep(time.Second)
+ n, err = pf.Read(bf[:3])
+ fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+}