66{-# LANGUAGE TypeApplications #-}
77{-# LANGUAGE RecordWildCards #-}
88
9- module DataFrame.IO.Parquet where
9+ module DataFrame.IO.Parquet (
10+ readParquet
11+ ) where
1012
13+ import qualified Codec.Compression.Snappy as Snappy
1114import Codec.Compression.Zstd.Streaming
1215import Control.Monad
1316import qualified Data.ByteString as BSO
1417import qualified Data.ByteString.Char8 as BS
1518import Data.Char
1619import Data.Foldable
20+ import qualified Data.Vector.Unboxed as VU
1721import Data.IORef
1822import qualified Data.Map as M
1923import Data.Maybe
2024import qualified Data.Text as T
2125import DataFrame.Internal.DataFrame (DataFrame )
2226import qualified DataFrame.Internal.DataFrame as DI
27+ import qualified DataFrame.Internal.Column as DI
28+ import qualified DataFrame.Operations.Core as DI
2329import Foreign
30+ import GHC.Float
2431import GHC.IO (unsafePerformIO )
2532import System.IO
2633
@@ -305,9 +312,11 @@ readParquet path = withBinaryFile path ReadMode $ \handle -> do
305312 (size, magicString) <- readMetadataSizeFromFooter handle
306313 when (magicString /= " PAR1" ) $ error " Invalid Parquet file"
307314
308- metadata <- readMetadata handle size
309- -- print metadata
310- forM_ (rowGroups metadata) $ \ r -> do
315+ colMap <- newIORef (M. empty :: (M. Map T. Text DI. Column ))
316+ colNames <- newIORef ([] :: [T. Text ])
317+
318+ fileMetadata <- readMetadata handle size
319+ forM_ (rowGroups fileMetadata) $ \ r -> do
311320 forM_ (rowGroupColumns r) $ \ c -> do
312321 let metadata = columnMetaData c
313322 let colDataPageOffset = columnDataPageOffset metadata
@@ -317,40 +326,213 @@ readParquet path = withBinaryFile path ReadMode $ \handle -> do
317326 else colDataPageOffset
318327 let colLength = columnTotalCompressedSize metadata
319328 columnBytes <- readBytes handle colStart colLength
320- print metadata
321- (maybePage, res) <- readPage columnBytes
329+ (maybePage, res) <- readPage (columnCodec metadata) columnBytes
322330 case maybePage of
323331 Just p -> if isDictionaryPage p
324332 then do
325- (maybePage', res') <- readPage res
326- let p' = fromMaybe (error " [UNEXPECTED] No data page found" ) maybePage'
327- print $ readPageBytes (pageBytes p)
328- print p'
329- else print " Data???"
333+ (maybePage', res') <- readPage (columnCodec metadata) res
334+ let p' = fromMaybe (error " Empty page" ) maybePage'
335+ let schemaElem = filter (\ se -> (elementName se) == (T. pack $ head (columnPathInSchema metadata))) (schema fileMetadata)
336+ let rep = if null schemaElem then UNKNOWN_REPETITION_TYPE else ((repetitionType . head ) schemaElem)
337+ when (rep == REPEATED || rep == UNKNOWN_REPETITION_TYPE ) (error $ " REPETITION TYPE NOT SUPPORTED: " ++ show rep)
338+
339+ case ((definitionLevelEncoding . pageTypeHeader . pageHeader ) p') of
340+ ERLE -> do
341+ let rleColumn = case columnType metadata of
342+ PBYTE_ARRAY -> readByteArrayColumn (pageBytes p)
343+ PDOUBLE -> readDoubleColumn (pageBytes p)
344+ PINT32 -> readInt32Column (pageBytes p)
345+ t -> error $ " UNKNOWN TYPE: " ++ (show t)
346+ let nbytes = littleEndianInt32 (take 4 (pageBytes p'))
347+ let rleDecoder = MkRleDecoder (drop 4 (pageBytes p')) 1 0 0
348+
349+ -- Create index decoder
350+ let lvlByteLen = (fromIntegral nbytes + 4 )
351+ let rleBytes = drop lvlByteLen (pageBytes p')
352+ let bitWidth = head rleBytes
353+ let indexDecoder = MkRleDecoder (tail rleBytes) (fromIntegral bitWidth) 0 0
354+
355+ let finalCol = DI. takeColumn ((fromIntegral . dataPageHeaderNumValues . pageTypeHeader . pageHeader) p') (decodeDictionary rleColumn rleDecoder indexDecoder)
356+ let colName = T. pack $ head (columnPathInSchema metadata)
357+
358+ modifyIORef' colNames (++ [colName])
359+ modifyIORef' colMap (\ m -> M. insertWith (\ l r -> fromMaybe (error " UNEXPECTED" ) (DI. concatColumns l r)) colName finalCol m)
360+ other -> error $ " UNSUPPORTED ENCODING: " ++ (show other)
361+ else (error " PLAIN DATA PAGES NOT SUPPORTED" )
330362 Nothing -> pure ()
331363
332- return DI. empty
364+ c' <- readIORef colMap
365+ colNames' <- readIORef colNames
366+ let asscList = map (\ name -> (name, c' M. ! name)) colNames'
367+ pure $ DI. fromNamedColumns asscList
368+
369+ decodeDictionary :: DI. Column -> RleDecoder -> RleDecoder -> DI. Column
370+ decodeDictionary col rleDecoder indexDecoder
371+ | repCount indexDecoder > 0 = error " UNIMPLEMENTED: Repetition not supported"
372+ | litCount indexDecoder > 0 = decodeDictionary (DI. atIndicesStable (VU. map fromIntegral (getIndices indexDecoder)) col) rleDecoder (indexDecoder { litCount = 0 })
373+ | otherwise = let
374+ (finished, indexDecoder') = advance indexDecoder
375+ in if finished then col else decodeDictionary col rleDecoder indexDecoder'
333376
334- readPageBytes :: [Word8 ] -> [BSO. ByteString ]
377+ advance :: RleDecoder -> (Bool , RleDecoder )
378+ advance indexDecoder
379+ | (rleDecoderData indexDecoder) == [] = (True , indexDecoder)
380+ | otherwise = let
381+ (indicator, remaining) = readUVarInt (rleDecoderData indexDecoder)
382+ isLiteral = (indicator .&. 1 ) /= 0
383+ countValues = (fromIntegral (indicator `shiftR` 1 ) :: Int32 )
384+ litCount = if isLiteral then (countValues * 8 ) else 0
385+ in if isLiteral then (False , indexDecoder { rleDecoderData = remaining, litCount = litCount }) else (True , indexDecoder) -- (error "NON-LITERAL TYPES NOT YET SUPPORTED")
386+
387+ getIndices :: RleDecoder -> VU. Vector Word32
388+ getIndices indexDecoder
389+ | rleBitWidth indexDecoder == 5 = unpackWidth5 (rleDecoderData indexDecoder)
390+ | rleBitWidth indexDecoder == 1 = unpackWidth1 (rleDecoderData indexDecoder)
391+ | rleBitWidth indexDecoder == 2 = unpackWidth2 (rleDecoderData indexDecoder)
392+ | rleBitWidth indexDecoder == 3 = unpackWidth3 (rleDecoderData indexDecoder)
393+ | otherwise = error $ " Unsupported bit width: " ++ (show (rleBitWidth indexDecoder))
394+
395+ unpackWidth5 :: [Word8 ] -> VU. Vector Word32
396+ unpackWidth5 [] = VU. empty
397+ unpackWidth5 bytes = let
398+ n0 = littleEndianWord32 $ take 4 bytes
399+ n1 = littleEndianWord32 $ take 4 $ drop 4 bytes
400+ n2 = littleEndianWord32 $ take 4 $ drop 8 bytes
401+ n3 = littleEndianWord32 $ take 4 $ drop 12 bytes
402+ n4 = littleEndianWord32 $ take 4 $ drop 16 bytes
403+ out0 = (n0 .>>. 0 ) `mod` (1 .<<. 5 )
404+ out1 = (n0 .>>. 5 ) `mod` (1 .<<. 5 )
405+ out2 = (n0 .>>. 10 ) `mod` (1 .<<. 5 )
406+ out3 = (n0 .>>. 15 ) `mod` (1 .<<. 5 )
407+ out4 = (n0 .>>. 20 ) `mod` (1 .<<. 5 )
408+ out5 = (n0 .>>. 25 ) `mod` (1 .<<. 5 )
409+ out6 = (n0 .>>. 30 ) .|. ((n1 `mod` (1 .<<. 3 )) .<<. (5 - 3 ))
410+ out7 = (n1 .>>. 3 ) `mod` (1 .<<. 5 )
411+ out8 = (n1 .>>. 8 ) `mod` (1 .<<. 5 )
412+ out9 = (n1 .>>. 13 ) `mod` (1 .<<. 5 )
413+ out10 = (n1 .>>. 18 ) `mod` (1 .<<. 5 )
414+ out11 = (n1 .>>. 23 ) `mod` (1 .<<. 5 )
415+ out12 = (n1 .>>. 28 ) .|. (n2 `mod` (1 .<<. 1 )) .<<. (5 - 1 )
416+ out13 = (n2 .>>. 1 ) `mod` (1 .<<. 5 )
417+ out14 = (n2 .>>. 6 ) `mod` (1 .<<. 5 )
418+ out15 = (n2 .>>. 11 ) `mod` (1 .<<. 5 )
419+ out16 = (n2 .>>. 16 ) `mod` (1 .<<. 5 )
420+ out17 = (n2 .>>. 21 ) `mod` (1 .<<. 5 )
421+ out18 = (n2 .>>. 26 ) `mod` (1 .<<. 5 )
422+ out19 = (n2 .>>. 31 ) .|. (n3 `mod` (1 .<<. 4 )) .<<. (5 - 4 )
423+ out20 = (n3 .>>. 4 ) `mod` (1 .<<. 5 )
424+ out21 = (n3 .>>. 9 ) `mod` (1 .<<. 5 )
425+ out22 = (n3 .>>. 14 ) `mod` (1 .<<. 5 )
426+ out23 = (n3 .>>. 19 ) `mod` (1 .<<. 5 )
427+ out24 = (n3 .>>. 24 ) `mod` (1 .<<. 5 )
428+ out25 = (n3 .>>. 29 ) .|. (n4 `mod` (1 .<<. 2 )) .<<. (5 - 2 )
429+ out26 = (n4 .>>. 2 ) `mod` (1 .<<. 5 )
430+ out27 = (n4 .>>. 7 ) `mod` (1 .<<. 5 )
431+ out28 = (n4 .>>. 12 ) `mod` (1 .<<. 5 )
432+ out29 = (n4 .>>. 17 ) `mod` (1 .<<. 5 )
433+ out30 = (n4 .>>. 22 ) `mod` (1 .<<. 5 )
434+ out31 = (n4 .>>. 27 )
435+ in (VU. fromList [out0,out1,out2,out3,out4,out5,out6,out7,out8,out9,out10,out11,out12,out13,out14,out15,out16,out17,out18,out19,out20,out21,out22,out23,out24,out25,out26,out27,out28,out29,out30,out31]) VU. ++ (unpackWidth5 (drop 20 bytes))
436+
437+ unpackWidth2 , unpackWidth1 , unpackWidth3 :: [Word8 ] -> VU. Vector Word32
438+ unpackWidth1 [] = VU. empty
439+ unpackWidth1 bytes = let
440+ n = littleEndianWord32 $ take 4 bytes
441+ in VU. fromList (map (\ i -> (n .>>. i) .&. 1 ) [0 .. 31 ]) VU. ++ (unpackWidth1 (drop 4 bytes))
442+ unpackWidth2 [] = VU. empty
443+ unpackWidth2 bytes = let
444+ n = littleEndianWord32 $ take 4 bytes
445+ in VU. fromList (map (\ i -> (n .>>. (i * 2 )) .&. 1 ) [0 .. 14 ] ++ [n .>>. 30 ]) VU. ++ (unpackWidth2 (drop 4 bytes))
446+ unpackWidth3 [] = VU. empty
447+ unpackWidth3 bytes = let
448+ n0 = littleEndianWord32 $ take 4 bytes
449+ n1 = littleEndianWord32 $ take 4 $ drop 4 bytes
450+ n2 = littleEndianWord32 $ take 4 $ drop 8 bytes
451+ out0 = (n0 .>>. 0 ) `mod` (1 .<<. 3 )
452+ out1 = (n0 .>>. 3 ) `mod` (1 .<<. 3 )
453+ out2 = (n0 .>>. 6 ) `mod` (1 .<<. 3 )
454+ out3 = (n0 .>>. 9 ) `mod` (1 .<<. 3 )
455+ out4 = (n0 .>>. 12 ) `mod` (1 .<<. 3 )
456+ out5 = (n0 .>>. 15 ) `mod` (1 .<<. 3 )
457+ out6 = (n0 .>>. 18 ) `mod` (1 .<<. 3 )
458+ out7 = (n0 .>>. 21 ) `mod` (1 .<<. 3 )
459+ out8 = (n0 .>>. 24 ) `mod` (1 .<<. 3 )
460+ out9 = (n0 .>>. 27 ) `mod` (1 .<<. 3 )
461+ out10 = (n0 .>>. 30 ) .|. (n1 `mod` (1 .<<. 1 )) .<<. (3 - 1 )
462+ out11 = (n1 .>>. 1 ) `mod` (1 .<<. 3 )
463+ out12 = (n1 .>>. 4 ) `mod` (1 .<<. 3 )
464+ out13 = (n1 .>>. 7 ) `mod` (1 .<<. 3 )
465+ out14 = (n1 .>>. 10 ) `mod` (1 .<<. 3 )
466+ out15 = (n1 .>>. 13 ) `mod` (1 .<<. 3 )
467+ out16 = (n1 .>>. 16 ) `mod` (1 .<<. 3 )
468+ out17 = (n1 .>>. 19 ) `mod` (1 .<<. 3 )
469+ out18 = (n1 .>>. 22 ) `mod` (1 .<<. 3 )
470+ out19 = (n1 .>>. 25 ) `mod` (1 .<<. 3 )
471+ out20 = (n1 .>>. 28 ) `mod` (1 .<<. 3 )
472+ out21 = ((n1 .>>. 31 ) `mod` (1 .<<. 3 )) .|. (n2 `mod` (1 .<<. 2 )) .<<. (3 - 2 )
473+ out22 = (n2 .>>. 2 ) `mod` (1 .<<. 3 )
474+ out23 = (n2 .>>. 5 ) `mod` (1 .<<. 3 )
475+ out24 = (n2 .>>. 8 ) `mod` (1 .<<. 3 )
476+ out25 = (n2 .>>. 11 ) `mod` (1 .<<. 3 )
477+ out26 = (n2 .>>. 14 ) `mod` (1 .<<. 3 )
478+ out27 = (n2 .>>. 17 ) `mod` (1 .<<. 3 )
479+ out28 = (n2 .>>. 20 ) `mod` (1 .<<. 3 )
480+ out29 = (n2 .>>. 23 ) `mod` (1 .<<. 3 )
481+ out30 = (n2 .>>. 26 ) `mod` (1 .<<. 3 )
482+ out31 = (n2 .>>. 29 )
483+ in (VU. fromList [out0,out1,out2,out3,out4,out5,out6,out7,out8,out9,out10,out11,out12,out13,out14,out15,out16,out17,out18,out19,out20,out21,out22,out23,out24,out25,out26,out27,out28,out29,out30,out31]) VU. ++ (unpackWidth3 (drop 12 bytes))
484+
485+ data RleDecoder = MkRleDecoder { rleDecoderData :: [Word8 ]
486+ , rleBitWidth :: Int32
487+ , repCount :: Int32
488+ , litCount :: Int32
489+ } deriving (Show , Eq )
490+
491+ expandDictionary :: [Word8 ] -> [Word8 ]
492+ expandDictionary (bitWidth: rest) = rest
493+
494+ readInt32Column :: [Word8 ] -> DI. Column
495+ readInt32Column = DI. fromList . readPageInt32
496+
497+ readDoubleColumn :: [Word8 ] -> DI. Column
498+ readDoubleColumn = DI. fromList . readPageWord64
499+
500+ readByteArrayColumn :: [Word8 ] -> DI. Column
501+ readByteArrayColumn = DI. fromList . readPageBytes
502+
503+ readPageInt32 :: [Word8 ] -> [Int32 ]
504+ readPageInt32 [] = []
505+ readPageInt32 xs = (fromIntegral (littleEndianInt32 (take 4 xs))) : readPageInt32 (drop 4 xs)
506+
507+ readPageWord64 :: [Word8 ] -> [Double ]
508+ readPageWord64 [] = []
509+ readPageWord64 xs = (castWord64ToDouble (littleEndianWord64 (take 8 xs))) : readPageWord64 (drop 8 xs)
510+
511+ readPageBytes :: [Word8 ] -> [T. Text ]
335512readPageBytes [] = []
336513readPageBytes xs = let
337514 lenBytes = fromIntegral (littleEndianWord8 $ take 4 xs)
338515 totalBytesRead = lenBytes + 4
339- in BSO . pack (take lenBytes (drop 4 xs)) : readPageBytes (drop totalBytesRead xs)
516+ in T . pack (map (chr . fromIntegral ) $ take lenBytes (drop 4 xs)) : readPageBytes (drop totalBytesRead xs)
340517
341- readPage :: [Word8 ] -> IO (Maybe Page , [Word8 ])
342- readPage [] = pure (Nothing , [] )
343- readPage columnBytes = do
518+ readPage :: CompressionCodec -> [Word8 ] -> IO (Maybe Page , [Word8 ])
519+ readPage c [] = pure (Nothing , [] )
520+ readPage c columnBytes = do
344521 let (hdr, rem ) = readPageHeader emptyPageHeader columnBytes 0
345- print hdr
346522 let compressed = take (fromIntegral $ compressedPageSize hdr) rem
347523
348524 -- Weird round about way to uncompress zstd files compressed using the
349525 -- streaming API
350- Consume dFunc <- decompress
351- Consume dFunc' <- dFunc (BSO. pack compressed)
352- Done res <- dFunc' BSO. empty
353- pure $ (Just $ Page hdr (BSO. unpack res), drop (fromIntegral $ compressedPageSize hdr) rem )
526+ fullData <- case c of
527+ ZSTD -> do
528+ Consume dFunc <- decompress
529+ Consume dFunc' <- dFunc (BSO. pack compressed)
530+ Done res <- dFunc' BSO. empty
531+ pure res
532+ SNAPPY -> pure $ Snappy. decompress (BSO. pack compressed)
533+ UNCOMPRESSED -> pure (BSO. pack compressed)
534+ comp -> error (" UNSUPPORTED_COMPRESSION TYPE: " ++ (show comp))
535+ pure $ (Just $ Page hdr (BSO. unpack fullData), drop (fromIntegral $ compressedPageSize hdr) rem )
354536
355537data Page = Page { pageHeader :: PageHeader
356538 , pageBytes :: [Word8 ] } deriving (Show , Eq )
@@ -1324,14 +1506,49 @@ readVarIntFromBytes bs = (fromIntegral n, rem)
13241506 res = result .|. ((fromIntegral (x .&. 0x7f ) :: Integer ) `shiftL` shift)
13251507 in if (x .&. 0x80 ) /= 0x80 then (res, xs) else loop (shift + 7 ) res xs
13261508
1327- -- // Uint32 returns the uint32 representation of b[0:4].
1328- -- func (littleEndian) Uint32(b []byte) uint32 {
1329- -- _ = b[3] // bounds check hint to compiler; see golang.org/issue/14808
1330- -- return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
1331- -- }
1332-
1333-
13341509littleEndianWord8 :: [Word8 ] -> Word8
13351510littleEndianWord8 bytes
13361511 | length bytes == 4 = foldr (\ v acc -> acc .|. v) 0 (zipWith (\ b i -> b `shiftL` i) bytes [0 ,8 .. ])
1337- | otherwise = error " Expected exactly 4 bytes"
1512+ | otherwise = error " Expected exactly 4 bytes"
1513+
1514+ littleEndianWord32 :: [Word8 ] -> Word32
1515+ littleEndianWord32 bytes
1516+ | length bytes == 4 = foldr (\ v acc -> acc .|. v) 0 (zipWith (\ b i -> (fromIntegral b) `shiftL` i) bytes [0 ,8 .. ])
1517+ | length bytes < 4 = littleEndianWord32 (take 4 $ bytes ++ (cycle [0 ]))
1518+ | otherwise = error $ " Expected exactly 4 bytes for Word32 but got " ++ (show bytes)
1519+
1520+ littleEndianWord64 :: [Word8 ] -> Word64
1521+ littleEndianWord64 bytes
1522+ | length bytes == 8 = foldr (\ v acc -> acc .|. v) 0 (zipWith (\ b i -> (fromIntegral b) `shiftL` i) bytes [0 ,8 .. ])
1523+ | otherwise = error " Expected exactly 8 bytes"
1524+
1525+ littleEndianInt32 :: [Word8 ] -> Int32
1526+ littleEndianInt32 bytes
1527+ | length bytes == 4 = foldr (\ v acc -> acc .|. v) 0 (zipWith (\ b i -> (fromIntegral b) `shiftL` i) bytes [0 ,8 .. ])
1528+ | otherwise = error " Expected exactly 4 bytes for Int32"
1529+
1530+ readUVarInt :: [Word8 ] -> (Word64 , [Word8 ])
1531+ readUVarInt xs = loop xs 0 0 0
1532+ where loop bs x _ 10 = (x, bs)
1533+ loop (b: bs) x s i
1534+ | b < 0x80 = (x .|. ((fromIntegral b) `shiftL` s), bs)
1535+ | otherwise = loop bs (x .|. (fromIntegral ((b .&. 0x7f ) `shiftL` s))) (s + 7 ) (i + 1 )
1536+
1537+ bitStream :: [Word8 ] -> [[Word8 ]]
1538+ bitStream xs = map (reverse . toBits) xs
1539+
1540+ toBits :: Word8 -> [Word8 ]
1541+ toBits b = go 1 b
1542+ where
1543+ go 8 n = [(n .&. 1 )]
1544+ go i n = (n .&. 1 ) : go (i + 1 ) (n .>>. 1 )
1545+
1546+ bitStreamToInt :: Word8 -> [Word8 ] -> [Int32 ]
1547+ bitStreamToInt _ [] = []
1548+ bitStreamToInt bitWidth bits = let
1549+ currBits = take (fromIntegral bitWidth) bits
1550+ remaining = drop (fromIntegral bitWidth) bits
1551+ in bitsToInt32 bitWidth currBits : bitStreamToInt bitWidth remaining
1552+
1553+ bitsToInt32 :: Word8 -> [Word8 ] -> Int32
1554+ bitsToInt32 bitWidth bits = fromIntegral $ foldr (.|.) 0 (zipWith (\ s b -> b .<<. s) [0 .. ] (reverse bits))
0 commit comments