Skip to content

Commit f95e363

Browse files
committed
parquet: add encrypted bloom filter fast path for known lengths
1 parent df47afa commit f95e363

1 file changed

Lines changed: 37 additions & 8 deletions

File tree

cpp/src/parquet/bloom_filter.cc

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <limits>
2121
#include <memory>
2222

23+
#include "arrow/io/memory.h"
2324
#include "arrow/result.h"
2425
#include "arrow/util/logging_internal.h"
2526
#include "arrow/util/macros.h"
@@ -126,16 +127,10 @@ ::arrow::Status ValidateBloomFilterHeader(const format::BloomFilterHeader& heade
126127
return ::arrow::Status::OK();
127128
}
128129

129-
} // namespace
130-
131-
BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
130+
BlockSplitBloomFilter DeserializeEncryptedFromStream(
132131
const ReaderProperties& properties, ArrowInputStream* input,
133132
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
134133
int16_t row_group_ordinal, int16_t column_ordinal) {
135-
if (decryptor == nullptr) {
136-
throw ParquetException("Bloom filter decryptor must be provided");
137-
}
138-
139134
ThriftDeserializer deserializer(properties);
140135
format::BloomFilterHeader header;
141136

@@ -158,6 +153,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
158153
throw ParquetException(
159154
"Bloom filter length less than encrypted bloom filter header length");
160155
}
156+
161157
// Read the full header ciphertext and decrypt the Thrift header.
162158
auto header_cipher_buf =
163159
AllocateBuffer(properties.memory_pool(), header_cipher_total_len);
@@ -181,12 +177,17 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
181177
try {
182178
deserializer.DeserializeMessage(header_cipher_buf->data(), &header_cipher_len,
183179
&header, decryptor);
184-
DCHECK_EQ(header_cipher_len, header_cipher_total_len);
185180
} catch (std::exception& e) {
186181
std::stringstream ss;
187182
ss << "Deserializing bloom filter header failed.\n" << e.what();
188183
throw ParquetException(ss.str());
189184
}
185+
if (ARROW_PREDICT_FALSE(header_cipher_len != header_cipher_total_len)) {
186+
std::stringstream ss;
187+
ss << "Encrypted bloom filter header length mismatch: expected "
188+
<< header_cipher_total_len << " bytes, got " << header_cipher_len;
189+
throw ParquetException(ss.str());
190+
}
190191
PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
191192

192193
const int32_t bloom_filter_size = header.numBytes;
@@ -231,6 +232,34 @@ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
231232
return bloom_filter;
232233
}
233234

235+
} // namespace
236+
237+
BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
238+
const ReaderProperties& properties, ArrowInputStream* input,
239+
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
240+
int16_t row_group_ordinal, int16_t column_ordinal) {
241+
if (decryptor == nullptr) {
242+
throw ParquetException("Bloom filter decryptor must be provided");
243+
}
244+
245+
// Read the full Bloom filter payload up front when the total length is known.
246+
if (bloom_filter_length.has_value()) {
247+
PARQUET_ASSIGN_OR_THROW(auto bloom_filter_buf, input->Read(*bloom_filter_length));
248+
if (ARROW_PREDICT_FALSE(bloom_filter_buf->size() < *bloom_filter_length)) {
249+
std::stringstream ss;
250+
ss << "Bloom filter read failed: expected " << *bloom_filter_length
251+
<< " bytes, got " << bloom_filter_buf->size();
252+
throw ParquetException(ss.str());
253+
}
254+
::arrow::io::BufferReader reader(bloom_filter_buf);
255+
return DeserializeEncryptedFromStream(properties, &reader, bloom_filter_length,
256+
decryptor, row_group_ordinal, column_ordinal);
257+
}
258+
259+
return DeserializeEncryptedFromStream(properties, input, bloom_filter_length, decryptor,
260+
row_group_ordinal, column_ordinal);
261+
}
262+
234263
BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
235264
const ReaderProperties& properties, ArrowInputStream* input,
236265
std::optional<int64_t> bloom_filter_length) {

0 commit comments

Comments
 (0)