From: sgf Date: Mon, 21 Aug 2023 16:16:02 +0000 (+0300) Subject: new(pipe). My io.Pipe reimplementation. X-Git-Url: https://gitweb.sgf-dma.tk/?a=commitdiff_plain;h=242b1ac417f67a0b0d5e84da1b6de35f9a17073a;p=go.git new(pipe). My io.Pipe reimplementation. --- diff --git a/pipe.go b/pipe.go new file mode 100644 index 0000000..a521621 --- /dev/null +++ b/pipe.go @@ -0,0 +1,456 @@ + +package main + +import ( + "fmt" + "log" + "io" + "bytes" + "sync" + "time" +) + +func elem(y rune, xs string) bool { + for _, x := range xs { + if y == x { + return true + } + } + return false +} + +func Execute(w io.Writer) error { + bs := "AaBbCcDdEeFfGg" + + n := 7 + + for len(bs) > 0 { + fmt.Printf("Execute(): Trying to write %v\n", bs[:n]) + w.Write([]byte(bs[:n])) + if elem('x', bs) { + fmt.Printf("Execute(): End with error\n") + return fmt.Errorf("UPS") + } + bs = bs[n:] + } + fmt.Printf("Execute(): End\n") + return nil +} + +type RW struct { + buf bytes.Buffer +} + +func (r *RW) Read(p []byte) (int, error) { + err2 := Execute(&r.buf) + n, err := r.buf.Read(p) + if err2 != nil { + return n, err2 + } + if err != nil { + return n, err + } + return n, io.EOF +} + +type Pipe1 struct { + ch chan byte + writerReady chan (chan interface{}) + writing sync.Mutex + err error +} + +func NewPipe1() *Pipe1 { + pw := Pipe1 { + ch: make(chan byte), + writerReady: make(chan (chan interface{})), + } + return &pw +} + +func (p *Pipe1) Write(bs []byte) (int, error) { + p.writing.Lock() + defer p.writing.Unlock() + + if p.err != nil { + return 0, p.err + } + + done := make(chan interface{}) + defer close(done) + + defer time.Sleep(time.Second) + + p.writerReady <- done + + fmt.Printf("[%s]: Writing (done = %p)\n", bs, done) + + for _, b := range bs { + fmt.Printf("[%s]: Try to write %q\n", bs, b) + p.ch<- b + fmt.Printf("[%s]: Written %q\n", bs, b) + } + + fmt.Printf("[%s]: Finished\n", bs) + return len(bs), nil +} + +func (p *Pipe1) Read(bs []byte) (int, error) { + done, ok := <-p.writerReady + if !ok { + return 0, p.err + } + + fmt.Printf("(%v): Start reading (done = %p)\n", len(bs), done) + for i := 0; i < len(bs); i++ { + select { + case <-done: + fmt.Printf("(%v): Writer terminated, so we're at %v\n", len(bs), i) + return i, nil + + case b := <-p.ch: + fmt.Printf("(%v): Read %q\n", len(bs), b) + bs[i] = b + } + } + + go func() { + fmt.Printf("(%v): Resending done (%p)\n", len(bs), done) + // When next writer is CloseWithError() and it closes 'writerReady' + // channel, "send on closed channel" error may occur. Because 'select' + // does not protect against writing to closed channel, even if other + // case branch also can proceed ('done' already closed). + // + // In fact, this is a Pipe1 design bug: only writers should write to + // 'writerReady' channel and, thus, this gopher is essentially a + // writer. But this gopher does not grab 'writing' lock, because then + // a whole program will deadlock (because this gopher is not a writer + // in fact and should not grab that lock). And here we're: + // CloseWithError(), which is a writer, grabs 'writing' lock and + // closes send-only (from its point of view) channel, but suddenly + // there's another writer running along.. + // + // This bug is properly fixed in Pipe2. + select { + case <-done: + fmt.Printf("(%v): Writer (%p) also terminated, abort\n", len(bs), done) + case p.writerReady<- done: + fmt.Printf("(%v): Done channel sent (%p)\n", len(bs), done) + } + }() + + return len(bs), nil +} + +func (p *Pipe1) Close() error { + return p.CloseWithError(nil) +} + +func (p *Pipe1) CloseWithError(err error) error { + p.writing.Lock() + defer p.writing.Unlock() + + if p.err != nil { + log.Fatal(fmt.Errorf("Closing of already closed pipe")) + } + + if err != nil { + p.err = err + } else { + p.err = io.EOF + } + + close(p.writerReady) + fmt.Printf("Closed writerReady channel\n") + return nil +} + +type Pipe2 struct { + ch chan byte + nextWriter chan *writer + writerReady chan writerReady + err error +} + +func NewPipe2() *Pipe2 { + p := Pipe2 { + ch: make(chan byte), + nextWriter: make(chan *writer), + writerReady: make(chan writerReady), + } + go func() { p.nextWriter <- nil }() + + return &p +} + +type writerReady struct { + done chan interface{} + readerReady chan (chan interface{}) +} + +type writer struct { + done chan interface{} + readDone chan interface{} + readerReady chan (chan interface{}) +} + +var ErrReaderClosed = fmt.Errorf("Read pipe closed") + +func (w *writer) WaitForReader(ready chan writerReady) error { + fmt.Printf("WaitForReader(): Called on done = %p, readyReady = %p\n", w.done, w.readerReady) + ready <- writerReady { + done: w.done, + readerReady: w.readerReady, + } + + var ok bool + w.readDone, ok = <-w.readerReady + if !ok { + fmt.Printf("Read pipe closed\n") + close(w.done) + return ErrReaderClosed + } + + return nil +} + +func (p *Pipe2) Write(bs []byte) (int, error) { + w, ok := <-p.nextWriter + if !ok { + fmt.Printf("Read pipe closed\n") + return 0, ErrReaderClosed + } + + if w == nil { + w = &writer { + done: make(chan interface{}), + readerReady: make(chan chan interface{}), + } + if err := w.WaitForReader(p.writerReady); err != nil { + return 0, err + } + fmt.Printf("[%s]: Got reader: %p\n", bs, w.readDone) + + } else { + fmt.Printf("[%s]: Continue writing to previous reader: %p\n", bs, w.readDone) + } + + fmt.Printf("[%s]: Writing (done = %p)\n", bs, w.done) + for i := 0; i < len(bs); { + b := bs[i] + fmt.Printf("[%s]: Trying to write %q\n", bs, b) + select { + case p.ch<- b: + fmt.Printf("[%s]: Written %q\n", bs, b) + i++ + + case <-w.readDone: + if err := w.WaitForReader(p.writerReady); err != nil { + return i, err + } + fmt.Printf("[%s]: Got next reader: %p\n", bs, w.readDone) + } + } + + go func(bs []byte) { + select { + default: + fmt.Printf("[%s]: No next writer waiting, closing\n", bs) + close(w.done) + close(w.readerReady) + p.nextWriter <- nil + + case p.nextWriter <- w: + fmt.Printf("[%s]: Passed over to next writer\n", bs) + } + }(bs) + + fmt.Printf("[%s]: Finished\n", bs) + return len(bs), nil +} + +func (p *Pipe2) Read(bs []byte) (int, error) { + wr, ok := <-p.writerReady + if !ok { + fmt.Printf("Write pipe closed\n") + return 0, p.err + } + fmt.Printf("(%v): Got writer: %p\n", len(bs), wr.done) + + done := make(chan interface{}) + defer close(done) + + writeDone := wr.done + wr.readerReady <- done + + fmt.Printf("(%v): Reading (done = %p)\n", len(bs), done) + for i := 0; i < len(bs); i++ { + select { + case <-writeDone: + fmt.Printf("(%v): Writer (%p) terminated, so we're at %v\n", len(bs), writeDone, i) + return i, nil + + case b := <-p.ch: + fmt.Printf("(%v): Read %q\n", len(bs), b) + bs[i] = b + } + } + + fmt.Printf("(%v): Read finished (done = %p)\n", len(bs), done) + return len(bs), nil +} + +func (p *Pipe2) Close() error { + return p.CloseWithError(nil) +} + +func (p *Pipe2) CloseWithError(err error) error { + select { + // Become a writer. + case w, ok := <-p.nextWriter: + if !ok { + log.Fatal(fmt.Errorf("Closing of already closed pipe")) + } + + fmt.Printf("Close(): Became a writer\n") + if w != nil { + close(w.done) + close(w.readerReady) + } + + // Become a reader. + case wr, ok := <-p.writerReady: + if !ok { + log.Fatal(fmt.Errorf("Closing of already closed pipe")) + } + + fmt.Printf("Close(): Became a reader\n") + fmt.Printf("Close(): Got writer: %p/%p\n", wr.done, wr.readerReady) + // Terminate writer. + close(wr.readerReady) + } + + // First set error, than cancel all Read-s. + p.err = io.EOF + if err != nil { + p.err = err + } + + // Cancel all new Read-s. + close(p.writerReady) + // Cancel all new Write-s. + close(p.nextWriter) + + fmt.Printf("Close(): finished\n") + return nil +} + +type WriteCloserWithError interface { + io.WriteCloser + CloseWithError(error) error +} + +type PipeFunc struct { + pReader io.Reader + pWriter WriteCloserWithError +} + +func NewPipeFunc(f func (io.Writer) error) *PipeFunc { + //t := NewPipe2() + pr, pw := io.Pipe() + p := PipeFunc{pReader: pr, pWriter: pw} + go func() { + // Test "Become a reader" of CloseWithError() from Pipe2. + //go func() { p.pWriter.CloseWithError(fmt.Errorf("NAHUY")) }() + + err := f(p.pWriter) + time.Sleep(time.Second) + p.pWriter.CloseWithError(err) + }() + return &p +} + +func (p *PipeFunc) Read(bs []byte) (int, error) { + var i int + fmt.Printf("Start reading from func to %v\n", len(bs)) + for i < len(bs) { + n, err := p.pReader.Read(bs[i:]) + fmt.Printf("Read ended at: %v, %v\n", n, err) + i += n + if err != nil { + return i, err + } + } + + fmt.Printf("Successfully read from func: %v\n", i) + return i, nil +} + +func main() { + var buf bytes.Buffer + var rw RW + n2, err := buf.ReadFrom(&rw) + fmt.Printf("\nRET: (%v)%s, err = %v\n", n2, buf.String(), err) + + fmt.Println() + + var n int + bf := make([]byte, 8) + //errC := make(chan error) + //var err2 error + + /* + fmt.Println() + err2 = nil + err = nil + r, w := io.Pipe() + go func () { errC <- Execute(w) }() + n, err = r.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + n, err = r.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + n, err = r.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + err2 = <-errC + //n, err = r.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + */ + + /* + fmt.Println() + err2 = nil + err = nil + pw := NewPipe1() + go func () { errC <- Execute(pw) }() + n, err = pw.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + n, err = pw.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + n, err = pw.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + err2 = <-errC + //n, err = pw.Read(bf) + fmt.Printf("RET: (%v)%s, err = %v, err2 = %v\n", n, bf, err, err2) + */ + + for i := 0; i < len(bf); i++ { + bf[i] = 0 + } + + fmt.Println() + err = nil + pf := NewPipeFunc(Execute) + n, err = pf.Read(bf[:3]) + fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err) + n, err = pf.Read(bf[:3]) + fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err) + n, err = pf.Read(bf[:3]) + fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err) + n, err = pf.Read(bf[:3]) + fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err) + n, err = pf.Read(bf[:2]) + fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err) + time.Sleep(time.Second) + n, err = pf.Read(bf[:3]) + fmt.Printf("RET: (%v)%s, err = %v\n", n, bf[:n], err) +}