-
Notifications
You must be signed in to change notification settings - Fork 89
Utilize memory allocator in ReadProperties.GetStream #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Utilize memory allocator in ReadProperties.GetStream #547
Conversation
parquet/file/page_reader.go
Outdated
| // underlying reader, even if there is less data available than that. So even if there are no more bytes, | ||
| // the buffer must have at least bytes.MinRead capacity remaining to avoid a relocation. | ||
| allocSize := lenCompressed | ||
| if p.decompressBuffer.Cap() < lenCompressed+bytes.MinRead { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dependent on the combined behavior of io.LimitReader and bytes.Buffer. Which seems fragile to me, but I don't have any other ideas how to deal with it. I'll at least add unit tests that the reallocation happens when I don't add bytes.MinRead to the allocation size and doesn't happen when I do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this seems really fragile. Maybe io.ReadFull directly into p.decompressBuffer.Bytes()[:lenCompressed] instead of using the intermediate bytes.Buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, lets go with ReadFull and we can skip ``bytes.Buffer` altogether.
| if n != lenUncompressed { | ||
| return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n) | ||
| } | ||
| if p.cryptoCtx.DataDecryptor != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is needed or not. But for data page v2, the data is just read by ReadFull and Decrypt is not called:
arrow-go/parquet/file/page_reader.go
Lines 815 to 824 in 95b3f76
| if compressed { | |
| if levelsBytelen > 0 { | |
| io.ReadFull(p.r, buf.Bytes()[:levelsBytelen]) | |
| } | |
| if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil { | |
| return false | |
| } | |
| } else { | |
| io.ReadFull(p.r, buf.Bytes()) | |
| } |
So maybe the Decrypt call is not needed for data age v1 or dictionary page either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reverse actually. Looks like this is a bug we just never came across, I'm guessing no one was using DataPageV2 with uncompressed data but still encrypted that was using this library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I'll fix it for DataPageV2 then. I'll add a unit tests without compression and with encryption that should fail with the current main, if I have the time.
I reran the profiler with the current commit in this PR, with a 2.8GB parquet file stored in S3, uncompressed. And cpu profiler is showing that more time is spent in runtime.memmove (copying memory) than in syscall.Syscall6 (read). Which is annoying me. :-D
I think it should be still possible to eliminate at least one copy for the uncompressed case.
So this is my scenario:
ReaderProperties.GetStreamreads column chunk from a TLS and stores it in a buffer (or just allocates the buffer ifBufferedStreamEnabled, but lets go with the unbuffered case for now)serializedPageReaderis created with the buffer returned from ReaderProperties.GetStreamserializedPageReader.Nextget the page header callsserializedPageReader.readUncompressed/serializedPageReader.decompresswhich reads data from the GetStream buffer into dictPageBuffer/dataPageBuffer
3a) for the uncompressed case the this is just a copypagestruct is created from the bytes written to the dictPageBuffer/dataPageBuffer
I think I could avoid the copy in 3a and create page directly from the bytes in the buffer returned by ReaderProperties.GetStream by using combination of calls Peek (to get the bytes)+Discard (to move the internal position inside the buffer). This should hold when BufferedStreamEnabled is false, I have to check what happens when it is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thanks for diving into this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, so I "steal" the buffer by using Peek/Discard if the data has been read previously and it is available of the BufferedReader. So in the uncompressed and unencrypted case -> data is read and stored into a buffer in ReaderProperties.GetStream and copied to the user provided buffer to Float32ColumnChunkReader.ReadBatch.
Now, if we have a plainEncoder and no compression, it should be possible to write the data directly to the user provided buffer, so that would eliminate even that copy, but one is more complicated and I need to be start doing other stuff. :D
Also, the decryption types allocate buffers for the decrypted data. We could send it an already allocated buffer to use, or maybe do an in place decryption (if possible), or give it the custom allocator if it is set.
Anyway, I'll fix the decryption for DataPageV2 next and I'll consider this one done.
e3a38f7 to
c7bee5e
Compare
| require.NoError(t, err) | ||
|
|
||
| icr := col0.(*file.Int64ColumnChunkReader) | ||
| // require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I uncomment this, then this causes a panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be panicing as the SeekToRow works correctly based on my last tests.... I'll see if i can debug this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still panics:
--- FAIL: TestDecryptColumns (0.00s)
--- FAIL: TestDecryptColumns/DataPageV2_BufferedRead (0.00s)
panic: cipher: message authentication failed [recovered, repanicked]
goroutine 8 [running]:
testing.tRunner.func1.2({0x2899940, 0xc000054100})
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1872 +0x237
testing.tRunner.func1()
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1875 +0x35b
panic({0x2899940?, 0xc000054100?})
/usr/local/Cellar/go/1.25.6/libexec/src/runtime/panic.go:783 +0x132
github.com/apache/arrow-go/v18/parquet/internal/encryption.(*aesDecryptor).Decrypt(0xc0002c58f0, {0xc000026800, 0x4000, 0x4000}, {0xc0002c5b10?, 0x600c00003ff20?, 0x44d3560?}, {0xc0002c5e90, 0xd, 0x10})
/daniel-adam-tfs/arrow-go/parquet/internal/encryption/aes.go:262 +0x26d
github.com/apache/arrow-go/v18/parquet/internal/encryption.(*decryptor).Decrypt(0xc0003326c0?, {0xc000026800?, 0xc00003fd40?, 0x1397b05?})
/daniel-adam-tfs/arrow-go/parquet/internal/encryption/decryptor.go:268 +0x45
github.com/apache/arrow-go/v18/parquet/file.(*serializedPageReader).readPageHeader(0xc0003326c0, {0x2aceb20, 0xc00003ff20}, 0xc0002ff740)
/daniel-adam-tfs/arrow-go/parquet/file/page_reader.go:704 +0x139
github.com/apache/arrow-go/v18/parquet/file.(*serializedPageReader).Next(0xc0003326c0)
/daniel-adam-tfs/arrow-go/parquet/file/page_reader.go:798 +0xdd
github.com/apache/arrow-go/v18/parquet/file.(*serializedPageReader).SeekToPageWithRow(0xc0003326c0, 0x3)
/daniel-adam-tfs/arrow-go/parquet/file/page_reader.go:749 +0x186
github.com/apache/arrow-go/v18/parquet/file.(*columnChunkReader).SeekToRow(0xc0002fa780, 0x3)
/daniel-adam-tfs/arrow-go/parquet/file/column_reader.go:584 +0x2a
github.com/apache/arrow-go/v18/parquet/file_test.checkDecryptedValues(0xc000326380, 0xc000210870, 0xc0003283f0)
/daniel-adam-tfs/arrow-go/parquet/file/column_reader_test.go:867 +0x591
github.com/apache/arrow-go/v18/parquet/file_test.TestDecryptColumns.func1(0xc000326380)
/daniel-adam-tfs/arrow-go/parquet/file/column_reader_test.go:964 +0x18a
testing.tRunner(0xc000326380, 0xc0002fe680)
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1934 +0xea
created by testing.(*T).Run in goroutine 7
/usr/local/Cellar/go/1.25.6/libexec/src/testing/testing.go:1997 +0x465
FAIL github.com/apache/arrow-go/v18/parquet/file 1.922s
For each of the cases defined, doesn't matter whether it is a V1 or V2 page, or if is it compressed or not, or if BufferedStream is used or not. SeekToRow works in the unencrypted case, so this is again encryption related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be related to this line:
arrow-go/parquet/file/page_reader.go
Lines 699 to 702 in 38dc64b
| if oidx == nil { | |
| if _, err = section.Seek(p.dataOffset-p.baseOffset, io.SeekStart); err != nil { | |
| return err | |
| } |
I think this is intended to skip past the dictionary page to the data page. But for some reason after this Seek the
arrow-go/parquet/file/page_reader.go
Line 663 in 38dc64b
| view = p.cryptoCtx.MetaDecryptor.Decrypt(view) |
call fails. The offset to the data page is correct, so I'm thinking that some internal state of the decryptor or the page reader is affected by skip past the parsing of the dictionary page.
But I'll remove the SeekToRow and park this for now in #566. Because we are benchmarking various formats + readers of unencrypted data, so I wanna make some optimization (including this PR, which according to my benchmarks speeds things up a little).
| arrWriterProps := pqarrow.NewArrowWriterProperties() | ||
|
|
||
| var buf bytes.Buffer | ||
| wr, err := pqarrow.NewFileWriter(schema, &buf, writerProps, arrWriterProps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade I think there is a bug in pqarrow writer. For the data page v2 buffer, it first writes the levels (definition and repetition) and then the values. Only values are compressed. However, this whole buffer is then encrypted. And unless ChatGPT is hallucinating on me then only the compressed values should also be encrypted, levels should stay unencrypted and uncompressed.
I'll check with some encrypted parquet create in a different way on Monday and see what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade OK, I should have some time this week to finish this. In fact, I think the memory allocation is done, but the decryption needs fixing before the tests pass.
I've been trying to use PyArrow to encrypt/decrypt files, but there seems to be some discrepancy in implementations. I cannot get files encrypted by PyArrow to decrypt using go-arrow and vice versa. I'll open an issue for the encryption/decryption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please link the issue here when you file it, as PyArrow and arrow-go should be agreeing on encrypting/decrypting and vice versa since pyarrow binds to Arrow C++ and the tests for parquet C++ and arrow-go should be the same tests that are passing. I'd be interested to reproduce the failure and debug it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works correctly now.
|
Alright, decryption should be correct now. Let me rebase this (...and try remember what was happening here 😄 ) |
1dde56d to
b9587e2
Compare
Rationale for this change
Optimization of memory usage, enables the use of custom allocators when reading column data with both buffered and unbuffered readers.
What changes are included in this PR?
Changes to bufferedReader type, new bytesBufferReader type and modification of ReadProperties.GetStream to propagate the custom memory allocator to the readers.
Are these changes tested?
TODO: add unit tests
Are there any user-facing changes?
The allocator if provided with reader properties will be used to allocate the underlying buffers for the buffered/unbuffered readers.
The BufferedReader interface was extended by the Free method to allow returning of the memory to the allocator.