From 2aaf32072dbef8701bf757c36b04ee454ee32942 Mon Sep 17 00:00:00 2001 From: sgf Date: Thu, 1 Dec 2022 18:31:24 +0300 Subject: [PATCH] new(pipe): Add md5 digest code variations. --- pipelines-and-cancellation/digest.go | 346 +++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 pipelines-and-cancellation/digest.go diff --git a/pipelines-and-cancellation/digest.go b/pipelines-and-cancellation/digest.go new file mode 100644 index 0000000..73c5e59 --- /dev/null +++ b/pipelines-and-cancellation/digest.go @@ -0,0 +1,346 @@ +package main + + +import ( + "crypto/md5" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" + "errors" + "time" + "math/rand" +) + + +func MD5All(root string) (map[string][md5.Size]byte, error) { + m := make(map[string][md5.Size]byte) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.Mode().IsRegular() { + return nil + } + + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + + m[path] = md5.Sum(data) + return nil + }) + if err != nil { + return nil, err + } + + return m, nil +} + + +type result struct { + path string + sum [md5.Size]byte + err error +} + +// My version. +func MD5AllP(root string) (map[string][md5.Size]byte, error) { + c := make(chan result) + res := make(chan map[string][md5.Size]byte) + + var wg sync.WaitGroup + go func() { + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err == nil { + m[r.path] = r.sum + } + } + res <- m + }() + + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.Mode().IsRegular() { + return nil + } + + data, err := ioutil.ReadFile(path) + wg.Add(1) + go func () { + r := result{path: path, err: err} + if err == nil { + r.sum = md5.Sum(data) + } + c <- r + wg.Done() + }() + + if err != nil { + return err + } + + return nil + }) + if err != nil { + return nil, err + } + wg.Wait() + close(c) + + return <-res, nil +} + +func sumFiles2(done <-chan struct{}, root string) (<-chan result, <-chan error) { + c := make(chan result) + errc := make(chan error, 1) + + go func () { + var wg sync.WaitGroup + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + + wg.Add(1) + go func () { + data, err := ioutil.ReadFile(path) + r := result{path: path, err: err} + if err == nil { + r.sum = md5.Sum(data) + } + select { + case c<- r: + case <-done: + } + wg.Done() + }() + + select { + case <-done: + return errors.New("canceled") + default: + } + return nil + }) + + go func () { + wg.Wait() + close(c) + }() + + // errc is buffered. + errc<- err + }() + + return c, errc +} + +func MD5All2(root string) (map[string][md5.Size]byte, error) { + m := make(map[string][md5.Size]byte) + + done := make(chan struct{}) + defer close(done) + + c, errc := sumFiles2(done, root) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + + if err := <-errc; err != nil { + return nil, err + } + return m, nil +} + +func walkFiles3(done <- chan struct{}, root string) (<-chan string, <-chan error) { + paths := make(chan string) + errc := make(chan error, 1) + + go func () { + defer close(paths) + + errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + + t := rand.Intn(3) + fmt.Printf("walkFile3(): reading %v for %v sec..\n", path, t) + time.Sleep(time.Duration(t) * time.Second) + select { + case paths<- path: + case <-done: + return errors.New("canceled") + } + + return nil + }) + }() + + return paths, errc +} + +func digester3(id int, done <-chan struct{}, paths <-chan string, c chan<- result) { + for path := range paths { + t := rand.Intn(5) + fmt.Printf("digester3() [%v]: digesting %v for %v sec..\n", id, path, t) + time.Sleep(time.Duration(t) * time.Second) + + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: + case <-done: + return + } + } + fmt.Printf("digester3() [%v]: end\n", id) +} + +const numDigesters = 4 +func MD5All3(root string) (map[string][md5.Size]byte, error) { + done := make(chan struct{}) + defer close(done) + + paths, errc := walkFiles3(done, root) + + c := make(chan result, numDigesters) + var wg sync.WaitGroup + wg.Add(numDigesters) + for i := 0; i < numDigesters; i++ { + go func (id int) { + digester3(id, done, paths, c) + wg.Done() + }(i) + } + go func () { + wg.Wait() + close(c) + }() + + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + if err := <-errc; err != nil { + return nil, err + } + + return m, nil +} + +// My version. +func digester4(id string, done <-chan struct{}, path string, c chan<- result) { + t := rand.Intn(5) + fmt.Printf("digester4() [%v]: digesting %v for %v sec..\n", id, path, t) + time.Sleep(time.Duration(t) * time.Second) + + data, err := ioutil.ReadFile(path) + r := result{path: path, err: err} + if err == nil { + r.sum = md5.Sum(data) + } + select { + case c<- r: + case <-done: + } + fmt.Printf("digester4() [%v]: end\n", id) +} + +func MD5All4(root string) (map[string][md5.Size]byte, error) { + done := make(chan struct{}) + defer close(done) + + + paths, errc := walkFiles3(done, root) + + c := make(chan result) + defer close(c) + + m := make(map[string][md5.Size]byte) + completed := func (r result) error { + if r.err != nil { + return r.err + } + m[r.path] = r.sum + return nil + } + + cur := 0 +out: + for { + select { + case path, ok := <-paths: + if !ok { + break out + } + if cur >= numDigesters { + if err := completed(<-c); err != nil { + return nil, err + } + cur-- + } + fmt.Printf("MD5All4(): Get %v\n", path) + time.Sleep(time.Second) + go digester4(time.Now().Format("05.00"), done, path, c) + cur++ + case r := <-c: + if err := completed(r); err != nil { + return nil, err + } + cur-- + } + } + + err := <- errc + if err != nil { + return nil, err + } + for ; cur > 0; cur-- { + fmt.Printf("MD5All4(): wait for %v\n", cur) + if err := completed(<-c); err != nil { + return nil, err + } + } + + return m, nil +} + +func main() { + m, err := MD5All4(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + + var paths []string + for path := range m { + paths = append(paths, path) + } + + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} + -- 2.20.1