From fc1e85b9a613b82e574f63cd18fc3c2591e8bad5 Mon Sep 17 00:00:00 2001 From: sgf Date: Wed, 28 Jun 2023 20:06:10 +0300 Subject: [PATCH] new(cancel): Variants of cancelling operation on a struct from antoher go routine. --- cancel.go | 459 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 459 insertions(+) create mode 100644 cancel.go diff --git a/cancel.go b/cancel.go new file mode 100644 index 0000000..62658c7 --- /dev/null +++ b/cancel.go @@ -0,0 +1,459 @@ + +package main + +import ( + "fmt" + "time" + "math/rand" + "strconv" + "sync" +) + +// First variant (wrong). +// +// Here each method sends a message R over channel S.c to main gopher +// monitoring struct usage (started in NewS()). This message contains a +// channel R.c from which main gopher tries to read and blocks. When a method +// finishes, it closes this channel R.c effectively signaling to the main +// gopher, that it has finished (read from R.c in main gopher will succeed). +// Then main gopher reads next message from S.c effectively unblocking next +// method, which wants to run. +// +// Check for termination is done by checking 'S.c == nil' before starting each +// method. Thus, Cancel() will close 'S.c' channel effectively signaling main +// gopher to terminate and set it to nil. +// +// But this won't work, because when method A() and Cancel() start +// simultaneously and Cancel() has not yet set S.c to nil, A() will pass +// through 'S.c == nil' check and block at sending to S.c channel. Then, +// Cancel() will finally set S.c to nil, close it and terminate, but method +// A() is already running and blocked at send to S.c, And as soon as S.c is +// closed method A() will fail with "panic: send on closed channel". + +type R struct { + n string + c <-chan interface{} +} + +type S struct { + s string + c chan<- R +} + +func NewS() *S { + ch := make(chan R) + s := S{s: "", c: ch} + go func() { + for r := range ch { + fmt.Printf("Waiting for %v\n", r.n) + <-r.c + } + }() + return &s +} + +func (s *S) A(i int) { + if s.c == nil { + fmt.Printf("[%v]: Abort\n", i) + return + } + fmt.Printf("[%v]: Trying %+v\n", i, s) + + c := make(chan interface{}) + defer close(c) + s.c <- R{strconv.Itoa(i), c} + + t := time.Duration(rand.Intn(6)) * time.Second + fmt.Printf("[%v]: Start in %v sec\n", i, t) + time.Sleep(t) + + s.s += strconv.Itoa(i) + fmt.Printf("[%v]: %v\n", i, s.s) +} + +func (s *S) Cancel() { + if s.c == nil { + fmt.Printf("Cancel(): Abort\n") + return + } + + c := make(chan interface{}) + defer close(c) + s.c <- R{"Cancel", c} + + t := time.Duration(rand.Intn(4)) * time.Second + fmt.Printf("Cancel(): Start in %v sec\n", t) + time.Sleep(t) + + s.s += "CCC" + close(s.c) + s.c = nil + fmt.Printf("%v\n", s.s) +} + +// Second variant (wrong). +// +// Here i don't use any synchronization at all and just use boolean flag +// S.valid to signify whether struct is valid or not. +// +// Cancel() will set S.valid to false. +// +// This also won't work, because it does not prevent from running further +// method calls started when Cancel() is already running, but not yet +// finished (and set S.valid to false). + + +type S2 struct { + valid bool + s string +} + +func NewS2() *S2 { + s := S2{valid: true, s: ""} + return &s +} + +func (s *S2) A2(i int) { + fmt.Printf("[%v]: Trying %+v\n", i, s) + if !s.valid { + fmt.Printf("[%v]: Abort\n", i) + return + } + + t := time.Duration(rand.Intn(6)) * time.Second + fmt.Printf("[%v]: Start in %v sec\n", i, t) + time.Sleep(t) + + s.s += strconv.Itoa(i) + fmt.Printf("[%v]: %v\n", i, s.s) +} + +func (s *S2) Cancel2() { + fmt.Printf("Trying %+v\n", s) + if !s.valid { + fmt.Printf("Abort\n") + return + } + + fmt.Printf("Cancel(): Trying %+v\n", s) + + t := time.Duration(rand.Intn(4)) * time.Second + fmt.Printf("Cancel(): Start in %v sec\n", t) + time.Sleep(t) + + s.s += "CCC" + s.valid = false + fmt.Printf("%v\n", s.s) +} + + +// Third variant (working). +// +// It's similar to first variant with a main gopher monitoring struct usage +// (started in NewS3()) and a channel S3.c to communicate with it, but now +// this channel has the reverse direction: main gopher _sends_ a message +// (containing write-only channel R3) over channel S3.c and methods try to +// read from S3.c. As soon main gopher has sent R3 over S3.c, it blocks on +// reading from channel R3. When method finishes, it closes R3 effectively +// signaling main gopher to continue. This way i make sending on closed +// channel in methods (on which first variant fails) impossible, because now +// methods are reading, not sending. +// +// Check for termination in methods is done by just checking whether S3.c is +// closed. And because this check is impossible until either read succeeds or +// S3.c is really closed, methods won't pass through this check, when Cancel() +// is in progress. +// +// Cancel() is just a regular method sending special message over R3 channel +// to main gopher and telling it to terminate and close S3.c channel. +// +// This is effectively the same as just using mutex. + +type R3 chan<- string +type S3 struct { + s string + c <-chan R3 +} + +func NewS3() *S3 { + ch := make(chan R3) + s := S3{s: "", c: ch} + go func() { + for { + c := make(chan string) + ch <- c + fmt.Printf("Waiting..\n") + resp := <- c + fmt.Printf("Finished %v\n", resp) + if resp == "Cancel" { + fmt.Printf("Terminating\n") + close(ch) + break + } + } + }() + return &s +} + +func (s *S3) A3(i int) { + c, ok := <-s.c + fmt.Printf("[%v]: Trying %+v\n", i, s) + if !ok { + fmt.Printf("[%v]: Abort\n", i) + return + } + defer close(c) + defer func() { + c <- strconv.Itoa(i) + }() + + t := time.Duration(rand.Intn(6)) * time.Second + fmt.Printf("[%v]: Start in %v sec\n", i, t) + time.Sleep(t) + + s.s += strconv.Itoa(i) + fmt.Printf("[%v]: %v\n", i, s.s) +} + +func (s *S3) Cancel3() { + c, ok := <-s.c + fmt.Printf("Cancel(): Trying %+v\n", s) + if !ok { + fmt.Printf("Cancel(): Abort\n") + return + } + defer close(c) + defer func() { + c <- "Cancel" + }() + + t := time.Duration(rand.Intn(5)) * time.Second + fmt.Printf("Cancel(): Start in %v sec\n", t) + time.Sleep(t) + + s.s += "CCC" + fmt.Printf("%v\n", s.s) +} + +// Fourth variant (working) with mutex. +// +// Here i use mutex to make access to structure sequential. +// +// Check for termination is done by flag S.valid. Thus, Cancel() sets this +// flag to false. But unlike wrong variant two (with flag) now mutex +// guarantees, that when Cancel() is in progress, no other method will run. + +type S4 struct { + s string + valid bool + m *sync.Mutex +} + +func NewS4() *S4 { + s := S4{s: "", valid: true, m: &sync.Mutex{}} + return &s +} + +func (s *S4) A4(i int) { + s.m.Lock() + defer s.m.Unlock() + + fmt.Printf("[%v]: Trying %+v\n", i, s) + if !s.valid { + fmt.Printf("[%v]: Abort\n", i) + return + } + + t := time.Duration(rand.Intn(6)) * time.Second + fmt.Printf("[%v]: Start in %v sec\n", i, t) + time.Sleep(t) + + s.s += strconv.Itoa(i) + fmt.Printf("[%v]: %v\n", i, s.s) +} + +func (s *S4) Cancel4() { + fmt.Printf("Cancel(): Start\n") + s.m.Lock() + defer s.m.Unlock() + + fmt.Printf("Cancel(): Trying %+v\n", s) + if !s.valid { + fmt.Printf("Cancel(): Abort\n") + return + } + + t := time.Duration(rand.Intn(5)) * time.Second + fmt.Printf("Cancel(): Start in %v sec\n", t) + time.Sleep(t) + + s.s += "CCC" + s.valid = false + fmt.Printf("%v\n", s.s) +} + +// Fifth variant. +// +// Prvious two variants (3rd and 4th) work, but Cancel() goes in the same +// queue as other methods: there is no guarantee, that when currently running +// method finishes, Cancel() will really run and not some other method also +// waiting on the same lock (S.c channel read or S.m mutex). In fact in 3rd +// and 4th variants Cancel() started before A(3) in the best case i've seen +// only cancel A(5) and A(6) and in worse it runs after A(6). +// +// And, though, there is no guarantee, that go routine, where Cancel() is +// running, will be scheduled right away, but at least i may make Cancel() to +// use "out of band" locks (separate channel or mutex). +// +// Here main gopher polls on both reading from special S5.cancel channel (used +// only by Cancel() method) and on sending R3 into S5.c channel. +// +// Methods works exactly the same as in S3. And Cancel() just closes S5.cancel +// channel. Then reading from S5.cancel in main gopher's select succeeds, it +// closes S5.c channel effectively signaling to all other methods, that +// operation is terminated, and calls RealCancel5() method, which performs +// necessary cleanup. + +type S5 struct { + s string + c <-chan R3 + cancel chan<- interface{} +} + +func NewS5() *S5 { + ch := make(chan R3) + cancel := make(chan interface{}) + s := S5{s: "", c: ch, cancel: cancel} + go func() { + var c chan string + // Because value of variable c is changed later in function, i need a + // closure. If i just write 'defer close(c)' c will be an argument to + // a deferred function and will be evaluated right now. And because + // now its value is nil, the deferred function will become + // 'close(nil)', which is a panic. + defer func () { close(c) }() + defer close(ch) + defer s.RealCancel5() + +out: + for { + c = make(chan string) + // Ensure order: when some method finishes, _always_ first check + // the S5.cancel channel. + select { + case <-cancel: + break out + default: + } + + // And if S5.cancel is not ready yet, enter usual select, which + // selects branch randomly, when both are ready. + select { + case <-cancel: + break out + + case ch <- c: + fmt.Printf("Waiting..\n") + resp := <- c + fmt.Printf("Finished %v\n", resp) + } + } + + t := time.Duration(rand.Intn(5)) * time.Second + fmt.Printf("Terminating in %v sec\n", t) + time.Sleep(t) + }() + return &s +} + +func (s *S5) A5(i int) { + c, ok := <-s.c + fmt.Printf("[%v]: Trying %+v\n", i, s) + if !ok { + fmt.Printf("[%v]: Abort\n", i) + return + } + defer close(c) + defer func() { + c <- strconv.Itoa(i) + }() + + t := time.Duration(rand.Intn(6)) * time.Second + fmt.Printf("[%v]: Start in %v sec\n", i, t) + time.Sleep(t) + + s.s += strconv.Itoa(i) + fmt.Printf("[%v]: %v\n", i, s.s) +} + +func (s *S5) Cancel5() { + fmt.Printf("Cancel5(): Start\n") + close(s.cancel) + fmt.Printf("Cancel5(): Done\n") +} + +func (s *S5) RealCancel5() { + fmt.Printf("RealCancel5(): Trying %+v\n", s) + + t := time.Duration(rand.Intn(5)) * time.Second + fmt.Printf("RealCancel5(): Start in %v sec\n", t) + time.Sleep(t) + + s.s += "CCC" + fmt.Printf("%v\n", s.s) +} + +func main() { + rand.Seed(time.Now().Unix()) + + /* + s := NewS() + s.A(1) + go s.Cancel() + s.A(2) + s.A(3) + */ + + /* + s := NewS2() + s.A2(1) + go s.Cancel2() + s.A2(2) + s.A2(3) + */ + + /* + s := NewS3() + s.A3(1) + s.A3(2) + go s.Cancel3() + go s.A3(3) + s.A3(4) + s.A3(5) + s.A3(6) + */ + + /* + s := NewS4() + s.A4(1) + s.A4(2) + go s.Cancel4() + go s.A4(3) + s.A4(4) + s.A4(5) + s.A4(6) + */ + + s := NewS5() + s.A5(1) + s.A5(2) + go s.Cancel5() + go s.A5(3) + go s.A5(4) + s.A5(5) + s.A5(6) + s.A5(7) + + time.Sleep(2 * time.Second) +} + -- 2.20.1