new(pipe). My io.Pipe reimplementation.
authorsgf <sgf.dma@gmail.com>
Mon, 21 Aug 2023 16:16:02 +0000 (19:16 +0300)
committersgf <sgf.dma@gmail.com>
Mon, 21 Aug 2023 16:16:02 +0000 (19:16 +0300)
pipe.go [new file with mode: 0644]

diff --git a/pipe.go b/pipe.go
new file mode 100644 (file)
index 0000000..a521621
--- /dev/null
+++ b/pipe.go
@@ -0,0 +1,456 @@
+
+package main
+
+import (
+    "fmt"
+    "log"
+    "io"
+    "bytes"
+    "sync"
+    "time"
+)
+
+func elem(y rune, xs string) bool {
+    for _, x := range xs {
+        if y == x {
+            return true
+        }
+    }
+    return false
+}
+
+func Execute(w io.Writer) error {
+    bs := "AaBbCcDdEeFfGg"
+
+    n := 7
+
+    for len(bs) > 0 {
+        fmt.Printf("Execute(): Trying to write %v\n", bs[:n])
+        w.Write([]byte(bs[:n]))
+        if elem('x', bs) {
+            fmt.Printf("Execute(): End with error\n")
+            return fmt.Errorf("UPS")
+        }
+        bs = bs[n:]
+    }
+    fmt.Printf("Execute(): End\n")
+    return nil
+}
+
+type RW struct {
+    buf bytes.Buffer
+}
+
+func (r *RW) Read(p []byte) (int, error) {
+    err2 := Execute(&r.buf)
+    n, err := r.buf.Read(p)
+    if err2 != nil {
+        return n, err2
+    }
+    if err != nil {
+        return n, err
+    }
+    return n, io.EOF
+}
+
+type Pipe1 struct {
+    ch chan byte
+    writerReady chan (chan interface{})
+    writing sync.Mutex
+    err error
+}
+
+func NewPipe1() *Pipe1 {
+    pw := Pipe1 {
+        ch: make(chan byte),
+        writerReady: make(chan (chan interface{})),
+    }
+    return &pw
+}
+
+func (p *Pipe1) Write(bs []byte) (int, error) {
+    p.writing.Lock()
+    defer p.writing.Unlock()
+
+    if p.err != nil {
+        return 0, p.err
+    }
+
+    done := make(chan interface{})
+    defer close(done)
+
+    defer time.Sleep(time.Second)
+
+    p.writerReady <- done
+
+    fmt.Printf("[%s]: Writing (done = %p)\n", bs, done)
+
+    for _, b := range bs {
+        fmt.Printf("[%s]: Try to write %q\n", bs, b)
+        p.ch<- b
+        fmt.Printf("[%s]: Written %q\n", bs, b)
+    }
+
+    fmt.Printf("[%s]: Finished\n", bs)
+    return len(bs), nil
+}
+
+func (p *Pipe1) Read(bs []byte) (int, error) {
+    done, ok := <-p.writerReady
+    if !ok {
+        return 0, p.err
+    }
+
+    fmt.Printf("(%v): Start reading (done = %p)\n", len(bs), done)
+    for i := 0; i < len(bs); i++ {
+        select {
+        case <-done:
+            fmt.Printf("(%v): Writer terminated, so we're at %v\n", len(bs), i)
+            return i, nil
+
+        case b := <-p.ch:
+            fmt.Printf("(%v): Read %q\n", len(bs), b)
+            bs[i] = b
+        }
+    }
+
+    go func() {
+        fmt.Printf("(%v): Resending done (%p)\n", len(bs), done)
+        // When next writer is CloseWithError() and it closes 'writerReady'
+        // channel, "send on closed channel" error may occur. Because 'select'
+        // does not protect against writing to closed channel, even if other
+        // case branch also can proceed ('done' already closed).
+        //
+        // In fact, this is a Pipe1 design bug: only writers should write to
+        // 'writerReady' channel and, thus, this gopher is essentially a
+        // writer. But this gopher does not grab 'writing' lock, because then
+        // a whole program will deadlock (because this gopher is not a writer
+        // in fact and should not grab that lock). And here we're:
+        // CloseWithError(), which is a writer, grabs 'writing' lock and
+        // closes send-only (from its point of view) channel, but suddenly
+        // there's another writer running along..
+        //
+        // This bug is properly fixed in Pipe2.
+        select {
+        case <-done:
+            fmt.Printf("(%v): Writer (%p) also terminated, abort\n", len(bs), done)
+        case p.writerReady<- done:
+            fmt.Printf("(%v): Done channel sent (%p)\n", len(bs), done)
+        }
+    }()
+
+    return len(bs), nil
+}
+
+func (p *Pipe1) Close() error {
+    return p.CloseWithError(nil)
+}
+
+func (p *Pipe1) CloseWithError(err error) error {
+    p.writing.Lock()
+    defer p.writing.Unlock()
+
+    if p.err != nil {
+        log.Fatal(fmt.Errorf("Closing of already closed pipe"))
+    }
+
+    if err != nil {
+        p.err = err
+    } else {
+        p.err = io.EOF
+    }
+
+    close(p.writerReady)
+    fmt.Printf("Closed writerReady channel\n")
+    return nil
+}
+
+type Pipe2 struct {
+    ch chan byte
+    nextWriter chan *writer
+    writerReady chan writerReady
+    err error
+}
+
+func NewPipe2() *Pipe2 {
+    p := Pipe2 {
+        ch: make(chan byte),
+        nextWriter: make(chan *writer),
+        writerReady: make(chan writerReady),
+    }
+    go func() { p.nextWriter <- nil }()
+
+    return &p
+}
+
+type writerReady struct {
+    done chan interface{}
+    readerReady chan (chan interface{})
+}
+
+type writer struct {
+    done chan interface{}
+    readDone chan interface{}
+    readerReady chan (chan interface{})
+}
+
+var ErrReaderClosed = fmt.Errorf("Read pipe closed")
+
+func (w *writer) WaitForReader(ready chan writerReady) error {
+    fmt.Printf("WaitForReader(): Called on done = %p, readyReady = %p\n", w.done, w.readerReady)
+    ready <- writerReady {
+        done: w.done,
+        readerReady: w.readerReady,
+    }
+
+    var ok bool
+    w.readDone, ok = <-w.readerReady
+    if !ok {
+        fmt.Printf("Read pipe closed\n")
+        close(w.done)
+        return ErrReaderClosed
+    }
+
+    return nil
+}
+
+func (p *Pipe2) Write(bs []byte) (int, error) {
+    w, ok := <-p.nextWriter
+    if !ok {
+        fmt.Printf("Read pipe closed\n")
+        return 0, ErrReaderClosed
+    }
+
+    if w == nil {
+        w = &writer {
+            done: make(chan interface{}),
+            readerReady: make(chan chan interface{}),
+        }
+        if err := w.WaitForReader(p.writerReady); err != nil {
+            return 0, err
+        }
+        fmt.Printf("[%s]: Got reader: %p\n", bs, w.readDone)
+
+    } else {
+        fmt.Printf("[%s]: Continue writing to previous reader: %p\n", bs, w.readDone)
+    }
+
+    fmt.Printf("[%s]: Writing (done = %p)\n", bs, w.done)
+    for i := 0; i < len(bs); {
+        b := bs[i]
+        fmt.Printf("[%s]: Trying to write %q\n", bs, b)
+        select {
+        case p.ch<- b:
+            fmt.Printf("[%s]: Written %q\n", bs, b)
+            i++
+
+        case <-w.readDone:
+            if err := w.WaitForReader(p.writerReady); err != nil {
+                return i, err
+            }
+            fmt.Printf("[%s]: Got next reader: %p\n", bs, w.readDone)
+        }
+    }
+
+    go func(bs []byte) {
+        select {
+        default:
+            fmt.Printf("[%s]: No next writer waiting, closing\n", bs)
+            close(w.done)
+            close(w.readerReady)
+            p.nextWriter <- nil
+
+        case p.nextWriter <- w:
+            fmt.Printf("[%s]: Passed over to next writer\n", bs)
+        }
+    }(bs)
+
+    fmt.Printf("[%s]: Finished\n", bs)
+    return len(bs), nil
+}
+
+func (p *Pipe2) Read(bs []byte) (int, error) {
+    wr, ok := <-p.writerReady
+    if !ok {
+        fmt.Printf("Write pipe closed\n")
+        return 0, p.err
+    }
+    fmt.Printf("(%v): Got writer: %p\n", len(bs), wr.done)
+
+    done := make(chan interface{})
+    defer close(done)
+
+    writeDone := wr.done
+    wr.readerReady <- done
+
+    fmt.Printf("(%v): Reading (done = %p)\n", len(bs), done)
+    for i := 0; i < len(bs); i++ {
+        select {
+        case <-writeDone:
+            fmt.Printf("(%v): Writer (%p) terminated, so we're at %v\n", len(bs), writeDone, i)
+            return i, nil
+
+        case b := <-p.ch:
+            fmt.Printf("(%v): Read %q\n", len(bs), b)
+            bs[i] = b
+        }
+    }
+
+    fmt.Printf("(%v): Read finished (done = %p)\n", len(bs), done)
+    return len(bs), nil
+}
+
+func (p *Pipe2) Close() error {
+    return p.CloseWithError(nil)
+}
+
+func (p *Pipe2) CloseWithError(err error) error {
+    select {
+    // Become a writer.
+    case w, ok := <-p.nextWriter:
+        if !ok {
+            log.Fatal(fmt.Errorf("Closing of already closed pipe"))
+        }
+
+        fmt.Printf("Close(): Became a writer\n")
+        if w != nil {
+            close(w.done)
+            close(w.readerReady)
+        }
+
+    // Become a reader.
+    case wr, ok := <-p.writerReady:
+        if !ok {
+            log.Fatal(fmt.Errorf("Closing of already closed pipe"))
+        }
+
+        fmt.Printf("Close(): Became a reader\n")
+        fmt.Printf("Close(): Got writer: %p/%p\n", wr.done, wr.readerReady)
+        // Terminate writer.
+        close(wr.readerReady)
+    }
+
+    // First set error, than cancel all Read-s.
+    p.err = io.EOF
+    if err != nil {
+        p.err = err
+    }
+
+    // Cancel all new Read-s.
+    close(p.writerReady)
+    // Cancel all new Write-s.
+    close(p.nextWriter)
+
+    fmt.Printf("Close(): finished\n")
+    return nil
+}
+
+type WriteCloserWithError interface {
+    io.WriteCloser
+    CloseWithError(error) error
+}
+
+type PipeFunc struct {
+    pReader io.Reader
+    pWriter WriteCloserWithError
+}
+
+func NewPipeFunc(f func (io.Writer) error) *PipeFunc {
+    //t := NewPipe2()
+    pr, pw := io.Pipe()
+    p := PipeFunc{pReader: pr, pWriter: pw}
+    go func() {
+        // Test "Become a reader" of CloseWithError() from Pipe2.
+        //go func() { p.pWriter.CloseWithError(fmt.Errorf("NAHUY")) }()
+
+        err := f(p.pWriter)
+        time.Sleep(time.Second)
+        p.pWriter.CloseWithError(err)
+    }()
+    return &p
+}
+
+func (p *PipeFunc) Read(bs []byte) (int, error) {
+    var i int
+    fmt.Printf("Start reading from func to %v\n", len(bs))
+    for i < len(bs) {
+        n, err := p.pReader.Read(bs[i:])
+        fmt.Printf("Read ended at: %v, %v\n", n, err)
+        i += n
+        if err != nil {
+            return i, err
+        }
+    }
+
+    fmt.Printf("Successfully read from func: %v\n", i)
+    return i, nil
+}
+
+func main() {
+    var buf bytes.Buffer
+    var rw RW
+    n2, err := buf.ReadFrom(&rw)
+    fmt.Printf("\nRET: (%v)%s, err = %v\n", n2, buf.String(), err)
+
+    fmt.Println()
+
+    var n int
+    bf := make([]byte, 8)
+    //errC := make(chan error)
+    //var err2 error
+
+    /*
+    fmt.Println()
+    err2 = nil
+    err = nil
+    r, w := io.Pipe()
+    go func () { errC <- Execute(w) }()
+    n, err = r.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    n, err = r.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    n, err = r.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    err2 = <-errC
+    //n, err = r.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    */
+
+    /*
+    fmt.Println()
+    err2 = nil
+    err = nil
+    pw := NewPipe1()
+    go func () { errC <- Execute(pw) }()
+    n, err = pw.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    n, err = pw.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    n, err = pw.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    err2 = <-errC
+    //n, err = pw.Read(bf)
+    fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2)
+    */
+
+    for i := 0; i < len(bf); i++ {
+        bf[i] = 0
+    }
+
+    fmt.Println()
+    err = nil
+    pf := NewPipeFunc(Execute)
+    n, err = pf.Read(bf[:3])
+    fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+    n, err = pf.Read(bf[:3])
+    fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+    n, err = pf.Read(bf[:3])
+    fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+    n, err = pf.Read(bf[:3])
+    fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+    n, err = pf.Read(bf[:2])
+    fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+    time.Sleep(time.Second)
+    n, err = pf.Read(bf[:3])
+    fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err)
+}