diff --git a/go.mod b/go.mod index 28599e2c..e70bd6ec 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/aws/smithy-go v1.23.0 github.com/azr/phash v0.2.0 github.com/bbrks/go-blurhash v1.1.1 + github.com/chocolatkey/gzran v0.0.0-20251014025324-d3baa4d16d07 github.com/deckarep/golang-set v1.8.0 github.com/disintegration/imaging v1.6.2 github.com/go-viper/mapstructure/v2 v2.4.0 diff --git a/go.sum b/go.sum index ab1ade3a..42b1fe51 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,8 @@ github.com/bbrks/go-blurhash v1.1.1/go.mod h1:lkAsdyXp+EhARcUo85yS2G1o+Sh43I2ebF github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chocolatkey/gzran v0.0.0-20251014025324-d3baa4d16d07 h1:oiVpQPcpZFD929NscXZfzs5WTy9SPdiZsWOJyzgKDig= +github.com/chocolatkey/gzran v0.0.0-20251014025324-d3baa4d16d07/go.mod h1:jk2T+gAWOv82T5A5XU+h/bA+9ngcj+DkHNrP/Ktyt88= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 68a44375..1599cf43 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -80,6 +80,8 @@ type Entry interface { StreamCompressedGzip(w io.Writer) (int64, error) // Streams the compressed content of this entry to a writer in a GZIP container. ReadCompressed() ([]byte, error) // Reads the compressed content of this entry. ReadCompressedGzip() ([]byte, error) // Reads the compressed content of this entry inside a GZIP container. + + CRC32Checksum() *uint32 // Returns the CRC32 checksum of the uncompressed data. } // Represents an immutable archive. diff --git a/pkg/archive/archive_exploded.go b/pkg/archive/archive_exploded.go index e94478c7..55c03e66 100644 --- a/pkg/archive/archive_exploded.go +++ b/pkg/archive/archive_exploded.go @@ -26,6 +26,10 @@ func (e explodedArchiveEntry) CompressedLength() uint64 { return 0 } +func (e explodedArchiveEntry) CRC32Checksum() *uint32 { + return nil +} + func (e explodedArchiveEntry) CompressedAs(compressionMethod CompressionMethod) bool { return false } @@ -54,7 +58,11 @@ func (e explodedArchiveEntry) Read(start int64, end int64) ([]byte, error) { } } data := make([]byte, end-start+1) - n, err := f.Read(data) + n, err := io.ReadFull(f, data) + if n > 0 && err == io.ErrUnexpectedEOF { + // Not EOF error if some data was read + err = nil + } return data[:n], err } diff --git a/pkg/archive/archive_zip.go b/pkg/archive/archive_zip.go index d0968cd7..a488680c 100644 --- a/pkg/archive/archive_zip.go +++ b/pkg/archive/archive_zip.go @@ -11,30 +11,39 @@ import ( "path" "sync" + "github.com/chocolatkey/gzran" "github.com/pkg/errors" ) type gozipArchiveEntry struct { file *zip.File minimizeReads bool + + gi gzran.Index + gm sync.Mutex } -func (e gozipArchiveEntry) Path() string { +func (e *gozipArchiveEntry) Path() string { return path.Clean(e.file.Name) } -func (e gozipArchiveEntry) Length() uint64 { +func (e *gozipArchiveEntry) Length() uint64 { return e.file.UncompressedSize64 } -func (e gozipArchiveEntry) CompressedLength() uint64 { +func (e *gozipArchiveEntry) CompressedLength() uint64 { if e.file.Method == zip.Store { return 0 } return e.file.CompressedSize64 } -func (e gozipArchiveEntry) CompressedAs(compressionMethod CompressionMethod) bool { +func (e *gozipArchiveEntry) CRC32Checksum() *uint32 { + c := e.file.CRC32 + return &c +} + +func (e *gozipArchiveEntry) CompressedAs(compressionMethod CompressionMethod) bool { if compressionMethod != CompressionMethodDeflate { return false } @@ -45,11 +54,11 @@ func (e gozipArchiveEntry) CompressedAs(compressionMethod CompressionMethod) boo // It's especially useful when trying to stream the ZIP from a remote file, e.g. // cloud storage. It's only enabled when trying to read the entire file and compression // is enabled. Care needs to be taken to cover every edge case. -func (e gozipArchiveEntry) couldMinimizeReads() bool { +func (e *gozipArchiveEntry) couldMinimizeReads() bool { return e.minimizeReads && e.CompressedLength() > 0 } -func (e gozipArchiveEntry) Read(start int64, end int64) ([]byte, error) { +func (e *gozipArchiveEntry) Read(start int64, end int64) ([]byte, error) { if end < start { return nil, errors.New("range not satisfiable") } @@ -73,14 +82,50 @@ func (e gozipArchiveEntry) Read(start int64, end int64) ([]byte, error) { } if minimizeReads { - compressedData := make([]byte, e.file.CompressedSize64) - _, err := io.ReadFull(f, compressedData) - if err != nil { - return nil, err + // If uncompressed size is smaller than 1MB, it's not worth + // using deflate random access, because the state itself takes memory + if e.file.UncompressedSize64 < ZRandCutoff { + compressedData := make([]byte, e.file.CompressedSize64) + _, err := io.ReadFull(f, compressedData) + if err != nil { + return nil, err + } + frdr := flate.NewReader(bytes.NewReader(compressedData)) + defer frdr.Close() + f = frdr + } else { + e.gm.Lock() + var lastCompressedOffset int64 + for _, v := range e.gi { + if v.CompressedOffset > lastCompressedOffset && v.UncompressedOffset <= start { + lastCompressedOffset = v.CompressedOffset + } + } + e.gm.Unlock() + + compressedData := make([]byte, e.file.CompressedSize64) + f.(io.Seeker).Seek(lastCompressedOffset, io.SeekStart) + _, err := io.ReadFull(f, compressedData[lastCompressedOffset:]) + if err != nil { + return nil, err + } + + fzr, err := gzran.NewDReader(bytes.NewReader(compressedData)) // Default interval = 1MB, same as current ZRandCutoff + if err != nil { + return nil, err + } + e.gm.Lock() + defer e.gm.Unlock() + defer func() { + e.gi = fzr.Index + }() + defer fzr.Close() + if len(e.gi) > 0 { + fzr.Index = e.gi + } + + f = fzr } - frdr := flate.NewReader(bytes.NewReader(compressedData)) - defer frdr.Close() - f = frdr } if start == 0 && end == 0 { @@ -92,21 +137,25 @@ func (e gozipArchiveEntry) Read(start int64, end int64) ([]byte, error) { return data, nil } if start > 0 { - _, err := io.CopyN(io.Discard, f, start) + if skr, ok := f.(io.Seeker); ok { + _, err = skr.Seek(start, io.SeekStart) + } else { + _, err = io.CopyN(io.Discard, f, start) + } if err != nil { return nil, err } } data := make([]byte, end-start+1) - n, err := f.Read(data) - if n > 0 && err == io.EOF { + n, err := io.ReadFull(f, data) + if n > 0 && err == io.ErrUnexpectedEOF { // Not EOF error if some data was read err = nil } return data[:n], err } -func (e gozipArchiveEntry) Stream(w io.Writer, start int64, end int64) (int64, error) { +func (e *gozipArchiveEntry) Stream(w io.Writer, start int64, end int64) (int64, error) { if end < start { return -1, errors.New("range not satisfiable") } @@ -157,7 +206,7 @@ func (e gozipArchiveEntry) Stream(w io.Writer, start int64, end int64) (int64, e return n, err } -func (e gozipArchiveEntry) StreamCompressed(w io.Writer) (int64, error) { +func (e *gozipArchiveEntry) StreamCompressed(w io.Writer) (int64, error) { if e.file.Method != zip.Deflate { return -1, errors.New("not a compressed resource") } @@ -169,7 +218,7 @@ func (e gozipArchiveEntry) StreamCompressed(w io.Writer) (int64, error) { return io.Copy(w, f) } -func (e gozipArchiveEntry) StreamCompressedGzip(w io.Writer) (int64, error) { +func (e *gozipArchiveEntry) StreamCompressedGzip(w io.Writer) (int64, error) { if e.file.Method != zip.Deflate { return -1, errors.New("not a compressed resource") } @@ -205,7 +254,7 @@ func (e gozipArchiveEntry) StreamCompressedGzip(w io.Writer) (int64, error) { return int64(n) + nn + int64(nnn), nil } -func (e gozipArchiveEntry) ReadCompressed() ([]byte, error) { +func (e *gozipArchiveEntry) ReadCompressed() ([]byte, error) { if e.file.Method != zip.Deflate { return nil, errors.New("not a compressed resource") } @@ -223,7 +272,7 @@ func (e gozipArchiveEntry) ReadCompressed() ([]byte, error) { return compressedData, nil } -func (e gozipArchiveEntry) ReadCompressedGzip() ([]byte, error) { +func (e *gozipArchiveEntry) ReadCompressedGzip() ([]byte, error) { if e.file.Method != zip.Deflate { return nil, errors.New("not a compressed resource") } @@ -280,7 +329,7 @@ func (a *gozipArchive) Entries() []Entry { aentry, ok := a.cachedEntries.Load(f.Name) if !ok { - aentry = gozipArchiveEntry{ + aentry = &gozipArchiveEntry{ file: f, minimizeReads: a.minimizeReads, } @@ -307,7 +356,7 @@ func (a *gozipArchive) Entry(p string) (Entry, error) { for _, f := range a.zip.File { fp := path.Clean(f.Name) if fp == cpath { - aentry := gozipArchiveEntry{ + aentry := &gozipArchiveEntry{ file: f, minimizeReads: a.minimizeReads, } diff --git a/pkg/archive/gzip.go b/pkg/archive/gzip.go index 79705bf4..46dae781 100644 --- a/pkg/archive/gzip.go +++ b/pkg/archive/gzip.go @@ -8,5 +8,9 @@ const ( gzipDeflate = 8 ) -const GzipWrapperLength = 18 +const GzipHeaderLength = 10 +const GzipTrailerLength = 8 +const GzipWrapperLength = GzipHeaderLength + GzipTrailerLength const GzipMaxLength = math.MaxUint32 + +const ZRandCutoff = 1024 * 1024 // 1MB diff --git a/pkg/fetcher/fetcher_archive.go b/pkg/fetcher/fetcher_archive.go index 31827a01..0b3f4763 100644 --- a/pkg/fetcher/fetcher_archive.go +++ b/pkg/fetcher/fetcher_archive.go @@ -194,6 +194,11 @@ func (r *entryResource) CompressedLength(ctx context.Context) int64 { return int64(r.entry.CompressedLength()) } +// CRC32Checksum implements CompressedResource +func (r *entryResource) CRC32Checksum(ctx context.Context) *uint32 { + return r.entry.CRC32Checksum() +} + // StreamCompressed implements CompressedResource func (r *entryResource) StreamCompressed(ctx context.Context, w io.Writer) (int64, *ResourceError) { i, err := r.entry.StreamCompressed(w) diff --git a/pkg/fetcher/fetcher_file.go b/pkg/fetcher/fetcher_file.go index e38809dd..fbd7cbef 100644 --- a/pkg/fetcher/fetcher_file.go +++ b/pkg/fetcher/fetcher_file.go @@ -199,8 +199,8 @@ func (r *FileResource) Read(ctx context.Context, start int64, end int64) ([]byte } return data[:n], nil } else { - n, err := f.Read(data) - if err != nil && err != io.EOF { + n, err := io.ReadFull(f, data) + if err != nil && err != io.ErrUnexpectedEOF { return nil, Other(err) } return data[:n], nil diff --git a/pkg/fetcher/resource.go b/pkg/fetcher/resource.go index f76d8970..58f59d9b 100644 --- a/pkg/fetcher/resource.go +++ b/pkg/fetcher/resource.go @@ -351,6 +351,15 @@ func (r ProxyResource) CompressedLength(ctx context.Context) int64 { return cres.CompressedLength(ctx) } +// CRC32Checksum implements CompressedResource +func (r ProxyResource) CRC32Checksum(ctx context.Context) *uint32 { + cres, ok := r.Res.(CompressedResource) + if !ok { + return nil + } + return cres.CRC32Checksum(ctx) +} + // StreamCompressed implements CompressedResource func (r ProxyResource) StreamCompressed(ctx context.Context, w io.Writer) (int64, *ResourceError) { cres, ok := r.Res.(CompressedResource) @@ -578,6 +587,15 @@ func (r *LazyResource) CompressedLength(ctx context.Context) int64 { return cres.CompressedLength(ctx) } +// CRC32Checksum implements CompressedResource +func (r *LazyResource) CRC32Checksum(ctx context.Context) *uint32 { + cres, ok := r.resource().(CompressedResource) + if !ok { + return nil + } + return cres.CRC32Checksum(ctx) +} + // StreamCompressed implements CompressedResource func (r *LazyResource) StreamCompressed(ctx context.Context, w io.Writer) (int64, *ResourceError) { cres, ok := r.resource().(CompressedResource) diff --git a/pkg/fetcher/traits.go b/pkg/fetcher/traits.go index ddbb1013..8817eff3 100644 --- a/pkg/fetcher/traits.go +++ b/pkg/fetcher/traits.go @@ -14,4 +14,5 @@ type CompressedResource interface { StreamCompressedGzip(ctx context.Context, w io.Writer) (int64, *ResourceError) ReadCompressed(ctx context.Context) ([]byte, *ResourceError) ReadCompressedGzip(ctx context.Context) ([]byte, *ResourceError) + CRC32Checksum(ctx context.Context) *uint32 } diff --git a/pkg/mediatype/sniffer_content.go b/pkg/mediatype/sniffer_content.go index 166eb6f3..ce0490bc 100644 --- a/pkg/mediatype/sniffer_content.go +++ b/pkg/mediatype/sniffer_content.go @@ -39,18 +39,14 @@ func (s SnifferFileContent) Read() []byte { if of, ok := s.file.(io.ReadSeeker); ok { of.Seek(0, io.SeekStart) data := make([]byte, info.Size()) - _, err = s.file.Read(data) - if err != nil && err != io.EOF { + if _, err := io.ReadFull(s.file, data); err != nil { return nil } return data } else { if s.buffer == nil { s.buffer = make([]byte, info.Size()) - _, err = s.file.Read(s.buffer) - if err != nil && err != io.EOF { - return nil - } + io.ReadFull(s.file, s.buffer) } return s.buffer } diff --git a/pkg/mediatype/sniffer_context.go b/pkg/mediatype/sniffer_context.go index a15729ae..ea0302d0 100644 --- a/pkg/mediatype/sniffer_context.go +++ b/pkg/mediatype/sniffer_context.go @@ -253,7 +253,7 @@ func (s SnifferContext) Read(start int64, end int64) []byte { } } data := make([]byte, end-start+1) - _, err := stream.Read(data) + _, err := io.ReadFull(stream, data) if err != nil { return nil } diff --git a/pkg/parser/epub/deobfuscator.go b/pkg/parser/epub/deobfuscator.go index 26bba9c1..792db485 100644 --- a/pkg/parser/epub/deobfuscator.go +++ b/pkg/parser/epub/deobfuscator.go @@ -94,8 +94,8 @@ func (d DeobfuscatingResource) Stream(ctx context.Context, w io.Writer, start in // First, we just read the obfuscated portion (1040 or 1024 first bytes) obfuscatedPortion := make([]byte, v) - on, err := pr.Read(obfuscatedPortion) - if err != nil && err != io.EOF { + on, err := io.ReadFull(pr, obfuscatedPortion) + if err != nil && err != io.ErrUnexpectedEOF { if fre, ok := err.(*fetcher.ResourceError); ok { return 0, fre } else { @@ -171,6 +171,15 @@ func (d DeobfuscatingResource) CompressedLength(ctx context.Context) int64 { return d.ProxyResource.CompressedLength(ctx) } +func (d DeobfuscatingResource) CRC32Checksum(ctx context.Context) *uint32 { + _, v := d.obfuscation() + if v > 0 { + return nil + } + + return d.ProxyResource.CRC32Checksum(ctx) +} + // StreamCompressed implements CompressedResource func (d DeobfuscatingResource) StreamCompressed(ctx context.Context, w io.Writer) (int64, *fetcher.ResourceError) { _, v := d.obfuscation()