Skip to content

Commit 5dc2e0c

Browse files
fenfeng9wgtmac
authored andcommitted
parquet: add encrypted bloom filter fast path for known lengths
1 parent fd2a296 commit 5dc2e0c

1 file changed

Lines changed: 50 additions & 30 deletions

File tree

cpp/src/parquet/bloom_filter.cc

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <limits>
2121
#include <memory>
2222

23+
#include "arrow/io/memory.h"
2324
#include "arrow/result.h"
2425
#include "arrow/util/logging_internal.h"
2526
#include "arrow/util/macros.h"
@@ -50,6 +51,16 @@ int64_t ParseCiphertextTotalLength(const uint8_t* data, int64_t length) {
5051
return static_cast<int64_t>(buffer_size) + kCiphertextLengthSize;
5152
}
5253

54+
void CheckBloomFilterShortRead(int64_t expected, int64_t actual,
55+
std::string_view context) {
56+
if (ARROW_PREDICT_FALSE(actual < expected)) {
57+
std::stringstream ss;
58+
ss << context << " read failed: expected ";
59+
ss << expected << " bytes, got " << actual;
60+
throw ParquetException(ss.str());
61+
}
62+
}
63+
5364
} // namespace
5465

5566
constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
@@ -126,27 +137,17 @@ ::arrow::Status ValidateBloomFilterHeader(const format::BloomFilterHeader& heade
126137
return ::arrow::Status::OK();
127138
}
128139

129-
} // namespace
130-
131-
BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
140+
BlockSplitBloomFilter DeserializeEncryptedFromStream(
132141
const ReaderProperties& properties, ArrowInputStream* input,
133142
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
134143
int16_t row_group_ordinal, int16_t column_ordinal) {
135-
if (decryptor == nullptr) {
136-
throw ParquetException("Bloom filter decryptor must be provided");
137-
}
138-
139144
ThriftDeserializer deserializer(properties);
140145
format::BloomFilterHeader header;
141146

142147
// Read the length-prefixed ciphertext for the header.
143148
PARQUET_ASSIGN_OR_THROW(auto length_buf, input->Read(kCiphertextLengthSize));
144-
if (ARROW_PREDICT_FALSE(length_buf->size() < kCiphertextLengthSize)) {
145-
std::stringstream ss;
146-
ss << "Bloom filter header read failed: expected " << kCiphertextLengthSize
147-
<< " bytes, got " << length_buf->size();
148-
throw ParquetException(ss.str());
149-
}
149+
CheckBloomFilterShortRead(kCiphertextLengthSize, length_buf->size(),
150+
"Bloom filter header length");
150151

151152
const int64_t header_cipher_total_len =
152153
ParseCiphertextTotalLength(length_buf->data(), length_buf->size());
@@ -158,6 +159,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
158159
throw ParquetException(
159160
"Bloom filter length less than encrypted bloom filter header length");
160161
}
162+
161163
// Read the full header ciphertext and decrypt the Thrift header.
162164
auto header_cipher_buf =
163165
AllocateBuffer(properties.memory_pool(), header_cipher_total_len);
@@ -167,26 +169,26 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
167169
PARQUET_ASSIGN_OR_THROW(auto read_size, input->Read(header_cipher_remaining,
168170
header_cipher_buf->mutable_data() +
169171
kCiphertextLengthSize));
170-
if (ARROW_PREDICT_FALSE(read_size < header_cipher_remaining)) {
171-
std::stringstream ss;
172-
ss << "Bloom filter header read failed: expected " << header_cipher_remaining
173-
<< " bytes, got " << read_size;
174-
throw ParquetException(ss.str());
175-
}
172+
CheckBloomFilterShortRead(header_cipher_remaining, read_size, "Bloom filter header");
176173

177174
// Bloom filter header and bitset are separate encrypted modules with different AADs.
178175
UpdateDecryptor(decryptor, row_group_ordinal, column_ordinal,
179176
encryption::kBloomFilterHeader);
180-
uint32_t header_cipher_len = static_cast<uint32_t>(header_cipher_total_len);
177+
auto header_cipher_len = static_cast<uint32_t>(header_cipher_total_len);
181178
try {
182179
deserializer.DeserializeMessage(header_cipher_buf->data(), &header_cipher_len,
183180
&header, decryptor);
184-
DCHECK_EQ(header_cipher_len, header_cipher_total_len);
185181
} catch (std::exception& e) {
186182
std::stringstream ss;
187183
ss << "Deserializing bloom filter header failed.\n" << e.what();
188184
throw ParquetException(ss.str());
189185
}
186+
if (ARROW_PREDICT_FALSE(header_cipher_len != header_cipher_total_len)) {
187+
std::stringstream ss;
188+
ss << "Encrypted bloom filter header length mismatch: expected "
189+
<< header_cipher_total_len << " bytes, got " << header_cipher_len;
190+
throw ParquetException(ss.str());
191+
}
190192
PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
191193

192194
const int32_t bloom_filter_size = header.numBytes;
@@ -204,12 +206,8 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
204206

205207
// Read and decrypt the bitset bytes.
206208
PARQUET_ASSIGN_OR_THROW(auto bitset_cipher_buf, input->Read(bitset_cipher_len));
207-
if (ARROW_PREDICT_FALSE(bitset_cipher_buf->size() < bitset_cipher_len)) {
208-
std::stringstream ss;
209-
ss << "Bloom filter bitset read failed: expected " << bitset_cipher_len
210-
<< " bytes, got " << bitset_cipher_buf->size();
211-
throw ParquetException(ss.str());
212-
}
209+
CheckBloomFilterShortRead(bitset_cipher_len, bitset_cipher_buf->size(),
210+
"Bloom filter bitset");
213211

214212
const int32_t bitset_plain_len =
215213
decryptor->PlaintextLength(static_cast<int32_t>(bitset_cipher_len));
@@ -231,6 +229,30 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
231229
return bloom_filter;
232230
}
233231

232+
} // namespace
233+
234+
BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
235+
const ReaderProperties& properties, ArrowInputStream* input,
236+
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
237+
int16_t row_group_ordinal, int16_t column_ordinal) {
238+
if (decryptor == nullptr) {
239+
throw ParquetException("Bloom filter decryptor must be provided");
240+
}
241+
242+
// Read the full Bloom filter payload up front when the total length is known.
243+
if (bloom_filter_length.has_value()) {
244+
PARQUET_ASSIGN_OR_THROW(auto bloom_filter_buf, input->Read(*bloom_filter_length));
245+
CheckBloomFilterShortRead(*bloom_filter_length, bloom_filter_buf->size(),
246+
"Bloom filter");
247+
::arrow::io::BufferReader reader(bloom_filter_buf);
248+
return DeserializeEncryptedFromStream(properties, &reader, bloom_filter_length,
249+
decryptor, row_group_ordinal, column_ordinal);
250+
}
251+
252+
return DeserializeEncryptedFromStream(properties, input, bloom_filter_length, decryptor,
253+
row_group_ordinal, column_ordinal);
254+
}
255+
234256
BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
235257
const ReaderProperties& properties, ArrowInputStream* input,
236258
std::optional<int64_t> bloom_filter_length) {
@@ -292,9 +314,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
292314
PARQUET_ASSIGN_OR_THROW(
293315
auto read_size, input->Read(required_read_size,
294316
buffer->mutable_data() + bloom_filter_bytes_in_header));
295-
if (ARROW_PREDICT_FALSE(read_size < required_read_size)) {
296-
throw ParquetException("Bloom Filter read failed: not enough data");
297-
}
317+
CheckBloomFilterShortRead(required_read_size, read_size, "Bloom filter");
298318
BlockSplitBloomFilter bloom_filter(properties.memory_pool());
299319
bloom_filter.Init(buffer->data(), bloom_filter_size);
300320
return bloom_filter;

0 commit comments

Comments
 (0)