Skip to content

Commit 75aed28

Browse files
authored
GH-3628: prevent NPE & unclosed releaser (#3629)
1 parent 52c0a5e commit 75aed28

2 files changed

Lines changed: 190 additions & 4 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
2323
import java.util.ArrayDeque;
24+
import java.util.ArrayList;
2425
import java.util.HashMap;
2526
import java.util.List;
2627
import java.util.Map;
@@ -46,6 +47,7 @@
4647
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
4748
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
4849
import org.apache.parquet.io.ParquetDecodingException;
50+
import org.apache.parquet.util.AutoCloseables;
4951
import org.slf4j.Logger;
5052
import org.slf4j.LoggerFactory;
5153

@@ -410,9 +412,11 @@ void setReleaser(ByteBufferReleaser releaser) {
410412

411413
@Override
412414
public void close() {
413-
for (ColumnChunkPageReader reader : readers.values()) {
414-
reader.releaseBuffers();
415-
}
416-
releaser.close();
415+
// Wrap each reader + the releaser as an AutoCloseable so AutoCloseables.uncheckedClose()
416+
// releases every resource even if one fails, and aggregates failures via suppressed exceptions
417+
List<AutoCloseable> toClose = new ArrayList<>(readers.size() + 1);
418+
readers.values().forEach(reader -> toClose.add(reader::releaseBuffers));
419+
toClose.add(releaser);
420+
AutoCloseables.uncheckedClose(toClose);
417421
}
418422
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import static org.apache.parquet.column.Encoding.PLAIN;
22+
import static org.apache.parquet.column.Encoding.RLE;
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertSame;
25+
26+
import java.nio.ByteBuffer;
27+
import java.util.Collections;
28+
import org.apache.parquet.ParquetReadOptions;
29+
import org.apache.parquet.bytes.ByteBufferAllocator;
30+
import org.apache.parquet.bytes.ByteBufferReleaser;
31+
import org.apache.parquet.bytes.BytesInput;
32+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
33+
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
34+
import org.apache.parquet.column.ColumnDescriptor;
35+
import org.apache.parquet.column.page.DataPage;
36+
import org.apache.parquet.column.page.DataPageV1;
37+
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
38+
import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
39+
import org.apache.parquet.schema.PrimitiveType;
40+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
41+
import org.apache.parquet.schema.Type.Repetition;
42+
import org.junit.Test;
43+
44+
public class TestColumnChunkPageReadStore {
45+
46+
private static final ColumnDescriptor COLUMN = new ColumnDescriptor(
47+
new String[] {"x"}, new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "x"), 0, 0);
48+
49+
private static final BytesInputDecompressor NOOP_DECOMPRESSOR = new BytesInputDecompressor() {
50+
@Override
51+
public BytesInput decompress(BytesInput bytes, int decompressedSize) {
52+
return bytes;
53+
}
54+
55+
@Override
56+
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {}
57+
58+
@Override
59+
public void release() {}
60+
};
61+
62+
@Test
63+
public void closeWithoutSetReleaserDoesNotThrow() {
64+
try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
65+
ParquetReadOptions options =
66+
ParquetReadOptions.builder().withAllocator(allocator).build();
67+
68+
ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(0L);
69+
store.addColumn(COLUMN, newReaderWithoutPages(options));
70+
71+
// setReleaser() is intentionally NOT called here.
72+
store.close();
73+
}
74+
}
75+
76+
@Test
77+
public void closeReleasesReleaserEvenWhenReaderThrows() throws Exception {
78+
RuntimeException releaseFailure = new RuntimeException("release boom");
79+
ByteBufferAllocator throwingAllocator = throwingAllocator(releaseFailure);
80+
81+
try (TrackingByteBufferAllocator storeAllocator =
82+
TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
83+
ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L);
84+
store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator));
85+
86+
// The store-level releaser holds a tracked buffer that must still be released even though the reader
87+
// fails first.
88+
ByteBufferReleaser storeReleaser = new ByteBufferReleaser(storeAllocator);
89+
storeReleaser.releaseLater(storeAllocator.allocate(8));
90+
store.setReleaser(storeReleaser);
91+
92+
try {
93+
store.close();
94+
throw new AssertionError("Expected close() to propagate the reader failure");
95+
} catch (RuntimeException e) {
96+
assertSame(releaseFailure, e.getCause());
97+
}
98+
}
99+
}
100+
101+
@Test
102+
public void closeReportsBothReaderAndReleaserFailures() {
103+
RuntimeException readerFailure = new RuntimeException("reader boom");
104+
RuntimeException releaserFailure = new RuntimeException("releaser boom");
105+
106+
ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L);
107+
store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator(readerFailure)));
108+
109+
// The store-level releaser also fails to release its queued buffer.
110+
ByteBufferAllocator throwingReleaserAllocator = throwingAllocator(releaserFailure);
111+
ByteBufferReleaser storeReleaser = new ByteBufferReleaser(throwingReleaserAllocator);
112+
storeReleaser.releaseLater(throwingReleaserAllocator.allocate(8));
113+
store.setReleaser(storeReleaser);
114+
115+
try {
116+
store.close();
117+
throw new AssertionError("Expected close() to propagate the failures");
118+
} catch (RuntimeException e) {
119+
// Readers are released before the releaser, so the reader failure is the primary (root) cause and the
120+
// releaser failure is attached to it as a suppressed exception, ensuring both are reported.
121+
Throwable root = e.getCause();
122+
assertSame(readerFailure, root);
123+
Throwable[] suppressed = root.getSuppressed();
124+
assertEquals(1, suppressed.length);
125+
assertSame(releaserFailure, suppressed[0]);
126+
}
127+
}
128+
129+
private static ByteBufferAllocator throwingAllocator(RuntimeException releaseFailure) {
130+
return new ByteBufferAllocator() {
131+
@Override
132+
public ByteBuffer allocate(int size) {
133+
return ByteBuffer.allocateDirect(size);
134+
}
135+
136+
@Override
137+
public void release(ByteBuffer b) {
138+
throw releaseFailure;
139+
}
140+
141+
@Override
142+
public boolean isDirect() {
143+
return true;
144+
}
145+
};
146+
}
147+
148+
private static ColumnChunkPageReader newReaderWithoutPages(ParquetReadOptions options) {
149+
return new ColumnChunkPageReader(
150+
NOOP_DECOMPRESSOR, Collections.<DataPage>emptyList(), null, null, 0L, null, null, 0, 0, options);
151+
}
152+
153+
private static ColumnChunkPageReader newReaderWithQueuedBuffer(ByteBufferAllocator allocator) {
154+
ParquetReadOptions options = ParquetReadOptions.builder()
155+
.withAllocator(allocator)
156+
.useOffHeapDecryptBuffer(true)
157+
.build();
158+
159+
ByteBuffer pageBytes = ByteBuffer.allocateDirect(4);
160+
pageBytes.putInt(0, 42);
161+
DataPageV1 page = new DataPageV1(BytesInput.from(pageBytes), 1, 4, null, RLE, RLE, PLAIN);
162+
163+
ColumnChunkPageReader reader = new ColumnChunkPageReader(
164+
NOOP_DECOMPRESSOR,
165+
Collections.<DataPage>singletonList(page),
166+
null,
167+
null,
168+
1L,
169+
null,
170+
null,
171+
0,
172+
0,
173+
options);
174+
175+
// Reading the page through the off-heap path queues a buffer into the reader's internal releaser, so
176+
// releaseBuffers() will later invoke the throwing allocator's release().
177+
if (reader.readPage() == null) {
178+
throw new IllegalStateException("Expected a page to be read");
179+
}
180+
return reader;
181+
}
182+
}

0 commit comments

Comments
 (0)