new(cancel): Variants of cancelling operation on a struct from antoher go routine.
authorsgf <sgf.dma@gmail.com>
Wed, 28 Jun 2023 17:06:10 +0000 (20:06 +0300)
committersgf <sgf.dma@gmail.com>
Wed, 28 Jun 2023 17:06:10 +0000 (20:06 +0300)
cancel.go [new file with mode: 0644]

diff --git a/cancel.go b/cancel.go
new file mode 100644 (file)
index 0000000..62658c7
--- /dev/null
+++ b/cancel.go
@@ -0,0 +1,459 @@
+
+package main
+
+import (
+    "fmt"
+    "time"
+    "math/rand"
+    "strconv"
+    "sync"
+)
+
+// First variant (wrong).
+//
+// Here each method sends a message R over channel S.c to main gopher
+// monitoring struct usage (started in NewS()). This message contains a
+// channel R.c from which main gopher tries to read and blocks. When a method
+// finishes, it closes this channel R.c effectively signaling to the main
+// gopher, that it has finished (read from R.c in main gopher will succeed).
+// Then main gopher reads next message from S.c effectively unblocking next
+// method, which wants to run.
+//
+// Check for termination is done by checking 'S.c == nil' before starting each
+// method.  Thus, Cancel() will close 'S.c' channel effectively signaling main
+// gopher to terminate and set it to nil.
+//
+// But this won't work, because when method A() and Cancel() start
+// simultaneously and Cancel() has not yet set S.c to nil, A() will pass
+// through 'S.c == nil' check and block at sending to S.c channel.  Then,
+// Cancel() will finally set S.c to nil, close it and terminate, but method
+// A() is already running and blocked at send to S.c, And as soon as S.c is
+// closed method A() will fail with "panic: send on closed channel".
+
+type R struct {
+    n string
+    c <-chan interface{}
+}
+
+type S struct {
+    s string
+    c chan<- R
+}
+
+func NewS() *S {
+    ch := make(chan R)
+    s := S{s: "", c: ch}
+    go func() {
+        for r := range ch {
+            fmt.Printf("Waiting for %v\n", r.n)
+            <-r.c
+        }
+    }()
+    return &s
+}
+
+func (s *S) A(i int) {
+    if s.c == nil {
+        fmt.Printf("[%v]: Abort\n", i)
+        return
+    }
+    fmt.Printf("[%v]: Trying %+v\n", i, s)
+
+    c := make(chan interface{})
+    defer close(c)
+    s.c <- R{strconv.Itoa(i), c}
+
+    t := time.Duration(rand.Intn(6)) * time.Second
+    fmt.Printf("[%v]: Start in %v sec\n", i, t)
+    time.Sleep(t)
+
+    s.s += strconv.Itoa(i)
+    fmt.Printf("[%v]: %v\n", i, s.s)
+}
+
+func (s *S) Cancel() {
+    if s.c == nil {
+        fmt.Printf("Cancel(): Abort\n")
+        return
+    }
+
+    c := make(chan interface{})
+    defer close(c)
+    s.c <- R{"Cancel", c}
+
+    t := time.Duration(rand.Intn(4)) * time.Second
+    fmt.Printf("Cancel(): Start in %v sec\n", t)
+    time.Sleep(t)
+
+    s.s += "CCC"
+    close(s.c)
+    s.c = nil
+    fmt.Printf("%v\n", s.s)
+}
+
+// Second variant (wrong).
+//
+// Here i don't use any synchronization at all and just use boolean flag
+// S.valid to signify whether struct is valid or not.
+//
+// Cancel() will set S.valid to false.
+//
+// This also won't work, because it does not prevent from running further
+// method calls started when Cancel() is already running, but not yet
+// finished (and set S.valid to false).
+
+
+type S2 struct {
+    valid bool
+    s     string
+}
+
+func NewS2() *S2 {
+    s := S2{valid: true, s: ""}
+    return &s
+}
+
+func (s *S2) A2(i int) {
+    fmt.Printf("[%v]: Trying %+v\n", i, s)
+    if !s.valid {
+            fmt.Printf("[%v]: Abort\n", i)
+            return
+    }
+
+    t := time.Duration(rand.Intn(6)) * time.Second
+    fmt.Printf("[%v]: Start in %v sec\n", i, t)
+    time.Sleep(t)
+
+    s.s += strconv.Itoa(i)
+    fmt.Printf("[%v]: %v\n", i, s.s)
+}
+
+func (s *S2) Cancel2() {
+    fmt.Printf("Trying %+v\n", s)
+    if !s.valid {
+            fmt.Printf("Abort\n")
+            return
+    }
+
+    fmt.Printf("Cancel(): Trying %+v\n", s)
+
+    t := time.Duration(rand.Intn(4)) * time.Second
+    fmt.Printf("Cancel(): Start in %v sec\n", t)
+    time.Sleep(t)
+
+    s.s += "CCC"
+    s.valid = false
+    fmt.Printf("%v\n", s.s)
+}
+
+
+// Third variant (working).
+//
+// It's similar to first variant with a main gopher monitoring struct usage
+// (started in NewS3()) and a channel S3.c to communicate with it, but now
+// this channel has the reverse direction: main gopher _sends_ a message
+// (containing write-only channel R3) over channel S3.c and methods try to
+// read from S3.c. As soon main gopher has sent R3 over S3.c, it blocks on
+// reading from channel R3. When method finishes, it closes R3 effectively
+// signaling main gopher to continue.  This way i make sending on closed
+// channel in methods (on which first variant fails) impossible, because now
+// methods are reading, not sending.
+//
+// Check for termination in methods is done by just checking whether S3.c is
+// closed.  And because this check is impossible until either read succeeds or
+// S3.c is really closed, methods won't pass through this check, when Cancel()
+// is in progress.
+//
+// Cancel() is just a regular method sending special message over R3 channel
+// to main gopher and telling it to terminate and close S3.c channel.
+//
+// This is effectively the same as just using mutex.
+
+type R3 chan<- string
+type S3 struct {
+    s string
+    c <-chan R3
+}
+
+func NewS3() *S3 {
+    ch := make(chan R3)
+    s := S3{s: "", c: ch}
+    go func() {
+        for {
+            c := make(chan string)
+            ch <- c
+            fmt.Printf("Waiting..\n")
+            resp := <- c
+            fmt.Printf("Finished %v\n", resp)
+            if resp == "Cancel" {
+                fmt.Printf("Terminating\n")
+                close(ch)
+                break
+            }
+        }
+    }()
+    return &s
+}
+
+func (s *S3) A3(i int) {
+    c, ok := <-s.c
+    fmt.Printf("[%v]: Trying %+v\n", i, s)
+    if !ok {
+        fmt.Printf("[%v]: Abort\n", i)
+        return
+    }
+    defer close(c)
+    defer func() {
+        c <- strconv.Itoa(i)
+    }()
+
+    t := time.Duration(rand.Intn(6)) * time.Second
+    fmt.Printf("[%v]: Start in %v sec\n", i, t)
+    time.Sleep(t)
+
+    s.s += strconv.Itoa(i)
+    fmt.Printf("[%v]: %v\n", i, s.s)
+}
+
+func (s *S3) Cancel3() {
+    c, ok := <-s.c
+    fmt.Printf("Cancel(): Trying %+v\n", s)
+    if !ok {
+        fmt.Printf("Cancel(): Abort\n")
+        return
+    }
+    defer close(c)
+    defer func() {
+        c <- "Cancel"
+    }()
+
+    t := time.Duration(rand.Intn(5)) * time.Second
+    fmt.Printf("Cancel(): Start in %v sec\n", t)
+    time.Sleep(t)
+
+    s.s += "CCC"
+    fmt.Printf("%v\n", s.s)
+}
+
+// Fourth variant (working) with mutex.
+//
+// Here i use mutex to make access to structure sequential.
+//
+// Check for termination is done by flag S.valid. Thus, Cancel() sets this
+// flag to false. But unlike wrong variant two (with flag) now mutex
+// guarantees, that when Cancel() is in progress, no other method will run.
+
+type S4 struct {
+    s string
+    valid bool
+    m *sync.Mutex
+}
+
+func NewS4() *S4 {
+    s := S4{s: "", valid: true, m: &sync.Mutex{}}
+    return &s
+}
+
+func (s *S4) A4(i int) {
+    s.m.Lock()
+    defer s.m.Unlock()
+
+    fmt.Printf("[%v]: Trying %+v\n", i, s)
+    if !s.valid {
+        fmt.Printf("[%v]: Abort\n", i)
+        return
+    }
+
+    t := time.Duration(rand.Intn(6)) * time.Second
+    fmt.Printf("[%v]: Start in %v sec\n", i, t)
+    time.Sleep(t)
+
+    s.s += strconv.Itoa(i)
+    fmt.Printf("[%v]: %v\n", i, s.s)
+}
+
+func (s *S4) Cancel4() {
+    fmt.Printf("Cancel(): Start\n")
+    s.m.Lock()
+    defer s.m.Unlock()
+
+    fmt.Printf("Cancel(): Trying %+v\n", s)
+    if !s.valid {
+        fmt.Printf("Cancel(): Abort\n")
+        return
+    }
+
+    t := time.Duration(rand.Intn(5)) * time.Second
+    fmt.Printf("Cancel(): Start in %v sec\n", t)
+    time.Sleep(t)
+
+    s.s += "CCC"
+    s.valid = false
+    fmt.Printf("%v\n", s.s)
+}
+
+// Fifth variant.
+//
+// Prvious two variants (3rd and 4th) work, but Cancel() goes in the same
+// queue as other methods: there is no guarantee, that when currently running
+// method finishes, Cancel() will really run and not some other method also
+// waiting on the same lock (S.c channel read or S.m mutex). In fact in 3rd
+// and 4th variants Cancel() started before A(3) in the best case i've seen
+// only cancel A(5) and A(6) and in worse it runs after A(6).
+//
+// And, though, there is no guarantee, that go routine, where Cancel() is
+// running, will be scheduled right away, but at least i may make Cancel() to
+// use "out of band" locks (separate channel or mutex).
+//
+// Here main gopher polls on both reading from special S5.cancel channel (used
+// only by Cancel() method) and on sending R3 into S5.c channel.
+//
+// Methods works exactly the same as in S3. And Cancel() just closes S5.cancel
+// channel. Then reading from S5.cancel in main gopher's select succeeds, it
+// closes S5.c channel effectively signaling to all other methods, that
+// operation is terminated, and calls RealCancel5() method, which performs
+// necessary cleanup.
+
+type S5 struct {
+    s string
+    c <-chan R3
+    cancel chan<- interface{}
+}
+
+func NewS5() *S5 {
+    ch := make(chan R3)
+    cancel := make(chan interface{})
+    s := S5{s: "", c: ch, cancel: cancel}
+    go func() {
+        var c chan string
+        // Because value of variable c is changed later in function, i need a
+        // closure. If i just write 'defer close(c)' c will be an argument to
+        // a deferred function and will be evaluated right now. And because
+        // now its value is nil, the deferred function will become
+        // 'close(nil)', which is a panic.
+        defer func () { close(c) }()
+        defer close(ch)
+        defer s.RealCancel5()
+
+out:
+        for {
+            c = make(chan string)
+            // Ensure order: when some method finishes, _always_ first check
+            // the S5.cancel channel.
+            select {
+            case <-cancel:
+                break out
+            default:
+            }
+
+            // And if S5.cancel is not ready yet, enter usual select, which
+            // selects branch randomly, when both are ready.
+            select {
+            case <-cancel:
+                break out
+
+            case ch <- c:
+                fmt.Printf("Waiting..\n")
+                resp := <- c
+                fmt.Printf("Finished %v\n", resp)
+            }
+        }
+
+        t := time.Duration(rand.Intn(5)) * time.Second
+        fmt.Printf("Terminating in %v sec\n", t)
+        time.Sleep(t)
+    }()
+    return &s
+}
+
+func (s *S5) A5(i int) {
+    c, ok := <-s.c
+    fmt.Printf("[%v]: Trying %+v\n", i, s)
+    if !ok {
+        fmt.Printf("[%v]: Abort\n", i)
+        return
+    }
+    defer close(c)
+    defer func() {
+        c <- strconv.Itoa(i)
+    }()
+
+    t := time.Duration(rand.Intn(6)) * time.Second
+    fmt.Printf("[%v]: Start in %v sec\n", i, t)
+    time.Sleep(t)
+
+    s.s += strconv.Itoa(i)
+    fmt.Printf("[%v]: %v\n", i, s.s)
+}
+
+func (s *S5) Cancel5() {
+    fmt.Printf("Cancel5(): Start\n")
+    close(s.cancel)
+    fmt.Printf("Cancel5(): Done\n")
+}
+
+func (s *S5) RealCancel5() {
+    fmt.Printf("RealCancel5(): Trying %+v\n", s)
+
+    t := time.Duration(rand.Intn(5)) * time.Second
+    fmt.Printf("RealCancel5(): Start in %v sec\n", t)
+    time.Sleep(t)
+
+    s.s += "CCC"
+    fmt.Printf("%v\n", s.s)
+}
+
+func main() {
+    rand.Seed(time.Now().Unix())
+
+    /*
+    s := NewS()
+    s.A(1)
+    go s.Cancel()
+    s.A(2)
+    s.A(3)
+    */
+
+    /*
+    s := NewS2()
+    s.A2(1)
+    go s.Cancel2()
+    s.A2(2)
+    s.A2(3)
+    */
+
+    /*
+    s := NewS3()
+    s.A3(1)
+    s.A3(2)
+    go s.Cancel3()
+    go s.A3(3)
+    s.A3(4)
+    s.A3(5)
+    s.A3(6)
+    */
+
+    /*
+    s := NewS4()
+    s.A4(1)
+    s.A4(2)
+    go s.Cancel4()
+    go s.A4(3)
+    s.A4(4)
+    s.A4(5)
+    s.A4(6)
+    */
+
+    s := NewS5()
+    s.A5(1)
+    s.A5(2)
+    go s.Cancel5()
+    go s.A5(3)
+    go s.A5(4)
+    s.A5(5)
+    s.A5(6)
+    s.A5(7)
+
+    time.Sleep(2 * time.Second)
+}
+