new(pipe): Add md5 digest code variations.
authorsgf <sgf.dma@gmail.com>
Thu, 1 Dec 2022 15:31:24 +0000 (18:31 +0300)
committersgf <sgf.dma@gmail.com>
Thu, 1 Dec 2022 15:31:24 +0000 (18:31 +0300)
pipelines-and-cancellation/digest.go [new file with mode: 0644]

diff --git a/pipelines-and-cancellation/digest.go b/pipelines-and-cancellation/digest.go
new file mode 100644 (file)
index 0000000..73c5e59
--- /dev/null
@@ -0,0 +1,346 @@
+package main
+
+
+import (
+        "crypto/md5"
+        "fmt"
+        "io/ioutil"
+        "os"
+        "path/filepath"
+        "sort"
+        "sync"
+        "errors"
+        "time"
+        "math/rand"
+)
+
+
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+        m := make(map[string][md5.Size]byte)
+        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+                if err != nil {
+                        return err
+                }
+
+                if !info.Mode().IsRegular() {
+                        return nil
+                }
+
+                data, err := ioutil.ReadFile(path)
+                if err != nil {
+                        return err
+                }
+
+                m[path] = md5.Sum(data)
+                return nil
+        })
+        if err != nil {
+                return nil, err
+        }
+
+        return m, nil
+}
+
+
+type result struct {
+    path string
+    sum [md5.Size]byte
+    err error
+}
+
+// My version.
+func MD5AllP(root string) (map[string][md5.Size]byte, error) {
+        c := make(chan result)
+        res := make(chan map[string][md5.Size]byte)
+
+        var wg sync.WaitGroup
+        go func() {
+            m := make(map[string][md5.Size]byte)
+            for r := range c {
+                if r.err == nil {
+                    m[r.path] = r.sum
+                }
+            }
+            res <- m
+        }()
+
+        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+                if err != nil {
+                        return err
+                }
+
+                if !info.Mode().IsRegular() {
+                        return nil
+                }
+
+                data, err := ioutil.ReadFile(path)
+                wg.Add(1)
+                go func () {
+                    r := result{path: path, err: err}
+                    if err == nil {
+                        r.sum = md5.Sum(data)
+                    }
+                    c <- r
+                    wg.Done()
+                }()
+
+                if err != nil {
+                        return err
+                }
+
+                return nil
+        })
+        if err != nil {
+                return nil, err
+        }
+        wg.Wait()
+        close(c)
+
+        return <-res, nil
+}
+
+func sumFiles2(done <-chan struct{}, root string) (<-chan result, <-chan error) {
+    c := make(chan result)
+    errc := make(chan error, 1)
+
+    go func () {
+        var wg sync.WaitGroup
+        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+            if err != nil {
+                    return err
+            }
+            if !info.Mode().IsRegular() {
+                    return nil
+            }
+
+            wg.Add(1)
+            go func () {
+                data, err := ioutil.ReadFile(path)
+                r := result{path: path, err: err}
+                if err == nil {
+                    r.sum = md5.Sum(data)
+                }
+                select {
+                case c<- r:
+                case <-done:
+                }
+                wg.Done()
+            }()
+
+            select {
+            case <-done:
+                return errors.New("canceled")
+            default:
+            }
+            return nil
+        })
+
+        go func () {
+            wg.Wait()
+            close(c)
+        }()
+
+        // errc is buffered.
+        errc<- err
+    }()
+
+    return c, errc
+}
+
+func MD5All2(root string) (map[string][md5.Size]byte, error) {
+        m := make(map[string][md5.Size]byte)
+
+        done := make(chan struct{})
+        defer close(done)
+
+        c, errc := sumFiles2(done, root)
+        for r := range c {
+            if r.err != nil {
+                return nil, r.err
+            }
+            m[r.path] = r.sum
+        }
+
+        if err := <-errc; err != nil {
+            return nil, err
+        }
+        return m, nil
+}
+
+func walkFiles3(done <- chan struct{}, root string) (<-chan string, <-chan error) {
+    paths := make(chan string)
+    errc := make(chan error, 1)
+
+    go func () {
+        defer close(paths)
+
+        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+            if err != nil {
+                    return err
+            }
+            if !info.Mode().IsRegular() {
+                    return nil
+            }
+
+            t := rand.Intn(3)
+            fmt.Printf("walkFile3(): reading %v for %v sec..\n", path, t)
+            time.Sleep(time.Duration(t) * time.Second)
+            select {
+            case paths<- path:
+            case <-done:
+                return errors.New("canceled")
+            }
+
+            return nil
+        })
+    }()
+
+    return paths, errc
+}
+
+func digester3(id int, done <-chan struct{}, paths <-chan string, c chan<- result) {
+    for path := range paths {
+        t := rand.Intn(5)
+        fmt.Printf("digester3() [%v]: digesting %v for %v sec..\n", id, path, t)
+        time.Sleep(time.Duration(t) * time.Second)
+
+        data, err := ioutil.ReadFile(path)
+        select {
+        case c <- result{path, md5.Sum(data), err}:
+        case <-done:
+            return
+        }
+    }
+    fmt.Printf("digester3() [%v]: end\n", id)
+}
+
+const numDigesters = 4
+func MD5All3(root string) (map[string][md5.Size]byte, error) {
+    done := make(chan struct{})
+    defer close(done)
+
+    paths, errc := walkFiles3(done, root)
+
+    c := make(chan result, numDigesters)
+    var wg sync.WaitGroup
+    wg.Add(numDigesters)
+    for i := 0; i < numDigesters; i++ {
+        go func (id int) {
+            digester3(id, done, paths, c)
+            wg.Done()
+        }(i)
+    }
+    go func () {
+        wg.Wait()
+        close(c)
+    }()
+
+    m := make(map[string][md5.Size]byte)
+    for r := range c {
+        if r.err != nil {
+            return nil, r.err
+        }
+        m[r.path] = r.sum
+    }
+    if err := <-errc; err != nil {
+        return nil, err
+    }
+
+    return m, nil
+}
+
+// My version.
+func digester4(id string, done <-chan struct{}, path string, c chan<- result) {
+    t := rand.Intn(5)
+    fmt.Printf("digester4() [%v]: digesting %v for %v sec..\n", id, path, t)
+    time.Sleep(time.Duration(t) * time.Second)
+
+    data, err := ioutil.ReadFile(path)
+    r := result{path: path, err: err}
+    if err == nil {
+        r.sum = md5.Sum(data)
+    }
+    select {
+    case c<- r:
+    case <-done:
+    }
+    fmt.Printf("digester4() [%v]: end\n", id)
+}
+
+func MD5All4(root string) (map[string][md5.Size]byte, error) {
+    done := make(chan struct{})
+    defer close(done)
+
+
+    paths, errc := walkFiles3(done, root)
+
+    c := make(chan result)
+    defer close(c)
+
+    m := make(map[string][md5.Size]byte)
+    completed := func (r result) error {
+        if r.err != nil {
+            return r.err
+        }
+        m[r.path] = r.sum
+        return nil
+    }
+
+    cur := 0
+out:
+    for {
+        select {
+        case path, ok := <-paths:
+            if !ok {
+                break out
+            }
+            if cur >= numDigesters {
+                if err := completed(<-c); err != nil {
+                    return nil, err
+                }
+                cur--
+            }
+            fmt.Printf("MD5All4(): Get %v\n", path)
+            time.Sleep(time.Second)
+            go digester4(time.Now().Format("05.00"), done, path, c)
+            cur++
+        case r := <-c:
+            if err := completed(r); err != nil {
+                return nil, err
+            }
+            cur--
+        }
+    }
+
+    err := <- errc
+    if err != nil {
+        return nil, err
+    }
+    for ; cur > 0; cur-- {
+        fmt.Printf("MD5All4(): wait for %v\n", cur)
+        if err := completed(<-c); err != nil {
+            return nil, err
+        }
+    }
+
+    return m, nil
+}
+
+func main() {
+        m, err := MD5All4(os.Args[1])
+        if err != nil {
+                fmt.Println(err)
+                return
+        }
+
+        var paths []string
+        for path := range m {
+                paths = append(paths, path)
+        }
+
+        sort.Strings(paths)
+        for _, path := range paths {
+                fmt.Printf("%x  %s\n", m[path], path)
+        }
+}
+