diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java index 874c99fded..757d65e906 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java @@ -29,6 +29,8 @@ import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble; +import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA; @@ -147,6 +149,26 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu } }, + /** + * ALP (Adaptive Lossless floating-Point) encoding for FLOAT and DOUBLE types. + * Works by converting floating-point values to integers using decimal scaling, + * then applying Frame of Reference (FOR) encoding and bit-packing. + */ + ALP { + @Override + public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) { + switch (descriptor.getType()) { + case FLOAT: + return new AlpValuesReaderForFloat(); + case DOUBLE: + return new AlpValuesReaderForDouble(); + default: + throw new ParquetDecodingException( + "ALP encoding is only supported for FLOAT and DOUBLE, not " + descriptor.getType()); + } + } + }, + /** * @deprecated This is no longer used, and has been replaced by {@link #RLE} * which is combination of bit packing and rle diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java new file mode 100644 index 0000000000..9e0bec0ba1 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +/** + * Constants for the ALP (Adaptive Lossless floating-Point) encoding. + * + *

ALP encoding converts floating-point values to integers using decimal scaling, + * then applies Frame of Reference (FOR) encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Based on the paper: "ALP: Adaptive Lossless floating-Point Compression" (SIGMOD 2024) + * + * @see ALP Paper + */ +public final class AlpConstants { + + private AlpConstants() { + // Utility class + } + + // ========== Page Header Constants ========== + + /** Current ALP format version */ + public static final int ALP_VERSION = 1; + + /** ALP compression mode identifier (0 = ALP) */ + public static final int ALP_COMPRESSION_MODE = 0; + + /** FOR encoding for integers (0 = FOR) */ + public static final int ALP_INTEGER_ENCODING_FOR = 0; + + /** Size of the ALP page header in bytes */ + public static final int ALP_HEADER_SIZE = 8; + + // ========== Vector Constants ========== + + /** Default number of elements per compressed vector (2^10 = 1024) */ + public static final int ALP_VECTOR_SIZE = 1024; + + /** Log2 of the default vector size */ + public static final int ALP_VECTOR_SIZE_LOG = 10; + + // ========== Exponent/Factor Limits ========== + + /** Maximum exponent for float encoding (10^10 ~ 10 billion) */ + public static final int FLOAT_MAX_EXPONENT = 10; + + /** Maximum exponent for double encoding (10^18 ~ 1 quintillion) */ + public static final int DOUBLE_MAX_EXPONENT = 18; + + /** Number of (exponent, factor) combinations for float: sum(1..11) = 66 */ + public static final int FLOAT_COMBINATIONS = 66; + + /** Number of (exponent, factor) combinations for double: sum(1..19) = 190 */ + public static final int DOUBLE_COMBINATIONS = 190; + + // ========== Sampling Constants ========== + + /** Number of values sampled per vector */ + public static final int SAMPLER_SAMPLES_PER_VECTOR = 256; + + /** Number of sample vectors per rowgroup */ + public static final int SAMPLER_SAMPLE_VECTORS_PER_ROWGROUP = 8; + + /** Maximum (exponent, factor) combinations to keep in preset */ + public static final int MAX_COMBINATIONS = 5; + + /** Stop sampling if this many consecutive combinations produce worse results */ + public static final int EARLY_EXIT_THRESHOLD = 4; + + // ========== Fast Rounding Magic Numbers ========== + + /** + * Magic number for fast float rounding using the floating-point trick. + * Formula: 2^22 + 2^23 = 12,582,912 + */ + public static final float MAGIC_FLOAT = 12_582_912.0f; + + /** + * Magic number for fast double rounding using the floating-point trick. + * Formula: 2^51 + 2^52 = 6,755,399,441,055,744 + */ + public static final double MAGIC_DOUBLE = 6_755_399_441_055_744.0; + + // ========== Metadata Sizes ========== + + /** Size of AlpInfo structure in bytes (exponent:1 + factor:1 + num_exceptions:2) */ + public static final int ALP_INFO_SIZE = 4; + + /** Size of ForInfo structure for float (frame_of_reference:4 + bit_width:1) */ + public static final int FLOAT_FOR_INFO_SIZE = 5; + + /** Size of ForInfo structure for double (frame_of_reference:8 + bit_width:1) */ + public static final int DOUBLE_FOR_INFO_SIZE = 9; + + // ========== Precomputed Powers of 10 ========== + + /** Precomputed powers of 10 for float encoding (10^0 to 10^10) */ + public static final float[] FLOAT_POW10 = {1e0f, 1e1f, 1e2f, 1e3f, 1e4f, 1e5f, 1e6f, 1e7f, 1e8f, 1e9f, 1e10f}; + + /** Precomputed powers of 10 for double encoding (10^0 to 10^18) */ + public static final double[] DOUBLE_POW10 = { + 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18 + }; + + /** Precomputed negative powers of 10 for decoding (10^0 to 10^-18) */ + public static final double[] DOUBLE_POW10_NEG = { + 1e0, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9, 1e-10, 1e-11, 1e-12, 1e-13, 1e-14, 1e-15, 1e-16, + 1e-17, 1e-18 + }; + + // ========== Bit Masks for Negative Zero Detection ========== + + /** Bit pattern for negative zero in float */ + public static final int FLOAT_NEGATIVE_ZERO_BITS = 0x80000000; + + /** Bit pattern for negative zero in double */ + public static final long DOUBLE_NEGATIVE_ZERO_BITS = 0x8000000000000000L; +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java new file mode 100644 index 0000000000..222ebea834 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +/** + * Core ALP (Adaptive Lossless floating-Point) encoding and decoding logic. + * + *

ALP works by converting floating-point values to integers using decimal scaling, + * then applying Frame of Reference (FOR) encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Encoding formula: encoded = round(value * 10^(exponent - factor)) + *

Decoding formula: value = encoded / 10^(exponent - factor) + * + *

Exception conditions: + *

+ */ +public final class AlpEncoderDecoder { + + private AlpEncoderDecoder() { + // Utility class + } + + // ========== Float Encoding/Decoding ========== + + /** + * Check if a float value is an exception (cannot be losslessly encoded). + * + * @param value the float value to check + * @return true if the value is an exception + */ + public static boolean isFloatException(float value) { + // NaN check + if (Float.isNaN(value)) { + return true; + } + // Infinity check + if (Float.isInfinite(value)) { + return true; + } + // Negative zero check + if (Float.floatToRawIntBits(value) == FLOAT_NEGATIVE_ZERO_BITS) { + return true; + } + return false; + } + + /** + * Check if a float value will be an exception for the given exponent/factor. + * + * @param value the float value + * @param exponent the decimal exponent (0-10) + * @param factor the decimal factor (0 <= factor <= exponent) + * @return true if the value is an exception for this encoding + */ + public static boolean isFloatException(float value, int exponent, int factor) { + if (isFloatException(value)) { + return true; + } + + // Try encoding and check for round-trip failure + float multiplier = FLOAT_POW10[exponent]; + if (factor > 0) { + multiplier /= FLOAT_POW10[factor]; + } + + float scaled = value * multiplier; + + // Check for overflow + if (scaled > Integer.MAX_VALUE || scaled < Integer.MIN_VALUE) { + return true; + } + + // Fast round + int encoded = fastRoundFloat(scaled); + + // Check round-trip + float decoded = encoded / multiplier; + return Float.floatToRawIntBits(value) != Float.floatToRawIntBits(decoded); + } + + /** + * Encode a float value to an integer using the specified exponent and factor. + * + *

Formula: encoded = round(value * 10^(exponent - factor)) + * + * @param value the float value to encode + * @param exponent the decimal exponent (0-10) + * @param factor the decimal factor (0 <= factor <= exponent) + * @return the encoded integer value + */ + public static int encodeFloat(float value, int exponent, int factor) { + float multiplier = FLOAT_POW10[exponent]; + if (factor > 0) { + multiplier /= FLOAT_POW10[factor]; + } + return fastRoundFloat(value * multiplier); + } + + /** + * Decode an integer back to a float using the specified exponent and factor. + * + *

Formula: value = encoded / 10^(exponent - factor) + * + * @param encoded the encoded integer value + * @param exponent the decimal exponent (0-10) + * @param factor the decimal factor (0 <= factor <= exponent) + * @return the decoded float value + */ + public static float decodeFloat(int encoded, int exponent, int factor) { + float multiplier = FLOAT_POW10[exponent]; + if (factor > 0) { + multiplier /= FLOAT_POW10[factor]; + } + return encoded / multiplier; + } + + /** + * Fast rounding for float values using the magic number technique. + * + * @param value the float value to round + * @return the rounded integer value + */ + public static int fastRoundFloat(float value) { + if (value >= 0) { + return (int) ((value + MAGIC_FLOAT) - MAGIC_FLOAT); + } else { + return (int) ((value - MAGIC_FLOAT) + MAGIC_FLOAT); + } + } + + // ========== Double Encoding/Decoding ========== + + /** + * Check if a double value is an exception (cannot be losslessly encoded). + * + * @param value the double value to check + * @return true if the value is an exception + */ + public static boolean isDoubleException(double value) { + // NaN check + if (Double.isNaN(value)) { + return true; + } + // Infinity check + if (Double.isInfinite(value)) { + return true; + } + // Negative zero check + if (Double.doubleToRawLongBits(value) == DOUBLE_NEGATIVE_ZERO_BITS) { + return true; + } + return false; + } + + /** + * Check if a double value will be an exception for the given exponent/factor. + * + * @param value the double value + * @param exponent the decimal exponent (0-18) + * @param factor the decimal factor (0 <= factor <= exponent) + * @return true if the value is an exception for this encoding + */ + public static boolean isDoubleException(double value, int exponent, int factor) { + if (isDoubleException(value)) { + return true; + } + + // Try encoding and check for round-trip failure + double multiplier = DOUBLE_POW10[exponent]; + if (factor > 0) { + multiplier /= DOUBLE_POW10[factor]; + } + + double scaled = value * multiplier; + + // Check for overflow + if (scaled > Long.MAX_VALUE || scaled < Long.MIN_VALUE) { + return true; + } + + // Fast round + long encoded = fastRoundDouble(scaled); + + // Check round-trip + double decoded = encoded / multiplier; + return Double.doubleToRawLongBits(value) != Double.doubleToRawLongBits(decoded); + } + + /** + * Encode a double value to a long using the specified exponent and factor. + * + *

Formula: encoded = round(value * 10^(exponent - factor)) + * + * @param value the double value to encode + * @param exponent the decimal exponent (0-18) + * @param factor the decimal factor (0 <= factor <= exponent) + * @return the encoded long value + */ + public static long encodeDouble(double value, int exponent, int factor) { + double multiplier = DOUBLE_POW10[exponent]; + if (factor > 0) { + multiplier /= DOUBLE_POW10[factor]; + } + return fastRoundDouble(value * multiplier); + } + + /** + * Decode a long back to a double using the specified exponent and factor. + * + *

Formula: value = encoded / 10^(exponent - factor) + * + * @param encoded the encoded long value + * @param exponent the decimal exponent (0-18) + * @param factor the decimal factor (0 <= factor <= exponent) + * @return the decoded double value + */ + public static double decodeDouble(long encoded, int exponent, int factor) { + double multiplier = DOUBLE_POW10[exponent]; + if (factor > 0) { + multiplier /= DOUBLE_POW10[factor]; + } + return encoded / multiplier; + } + + /** + * Fast rounding for double values using the magic number technique. + * + * @param value the double value to round + * @return the rounded long value + */ + public static long fastRoundDouble(double value) { + if (value >= 0) { + return (long) ((value + MAGIC_DOUBLE) - MAGIC_DOUBLE); + } else { + return (long) ((value - MAGIC_DOUBLE) + MAGIC_DOUBLE); + } + } + + // ========== Bit Width Calculation ========== + + /** + * Calculate the bit width needed to store unsigned values up to maxDelta. + * + * @param maxDelta the maximum delta value (unsigned) + * @return the number of bits needed (0-32 for int, 0-64 for long) + */ + public static int bitWidth(int maxDelta) { + if (maxDelta == 0) { + return 0; + } + return 32 - Integer.numberOfLeadingZeros(maxDelta); + } + + /** + * Calculate the bit width needed to store unsigned values up to maxDelta. + * + * @param maxDelta the maximum delta value (unsigned) + * @return the number of bits needed (0-64) + */ + public static int bitWidth(long maxDelta) { + if (maxDelta == 0) { + return 0; + } + return 64 - Long.numberOfLeadingZeros(maxDelta); + } + + // ========== Best Exponent/Factor Selection ========== + + /** + * Result of finding the best exponent/factor combination. + */ + public static class EncodingParams { + public final int exponent; + public final int factor; + public final int numExceptions; + + public EncodingParams(int exponent, int factor, int numExceptions) { + this.exponent = exponent; + this.factor = factor; + this.numExceptions = numExceptions; + } + } + + /** + * Find the best exponent/factor combination for encoding float values. + * + *

Tries all valid (exponent, factor) combinations and selects the one + * that minimizes the number of exceptions. + * + * @param values the float values to analyze + * @param offset the starting index in the array + * @param length the number of values to analyze + * @return the best encoding parameters + */ + public static EncodingParams findBestFloatParams(float[] values, int offset, int length) { + int bestExponent = 0; + int bestFactor = 0; + int bestExceptions = length; // Start with worst case + + for (int e = 0; e <= FLOAT_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isFloatException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + // Early exit if we found perfect encoding + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + /** + * Find the best exponent/factor combination for encoding double values. + * + *

Tries all valid (exponent, factor) combinations and selects the one + * that minimizes the number of exceptions. + * + * @param values the double values to analyze + * @param offset the starting index in the array + * @param length the number of values to analyze + * @return the best encoding parameters + */ + public static EncodingParams findBestDoubleParams(double[] values, int offset, int length) { + int bestExponent = 0; + int bestFactor = 0; + int bestExceptions = length; // Start with worst case + + for (int e = 0; e <= DOUBLE_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isDoubleException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + // Early exit if we found perfect encoding + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java new file mode 100644 index 0000000000..d7a06781f3 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ALP values reader for DOUBLE type. + * + *

Reads ALP-encoded double values from a page and decodes them back to double values. + * + *

Page Layout: + *

+ * ┌─────────┬────────────────┬────────────────┬─────────────┐
+ * │ Header  │ AlpInfo Array  │ ForInfo Array  │ Data Array  │
+ * │ 8 bytes │ 4B × N vectors │ 9B × N vectors │ Variable    │
+ * └─────────┴────────────────┴────────────────┴─────────────┘
+ * 
+ */ +public class AlpValuesReaderForDouble extends ValuesReader { + private static final Logger LOG = LoggerFactory.getLogger(AlpValuesReaderForDouble.class); + + // Decoded double values (eagerly decoded) + private double[] decodedValues; + private int currentIndex; + private int totalCount; + + public AlpValuesReaderForDouble() { + this.currentIndex = 0; + this.totalCount = 0; + } + + @Override + public void initFromPage(int valuesCount, ByteBufferInputStream stream) + throws ParquetDecodingException, IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + + // Read and validate header + ByteBuffer headerBuf = stream.slice(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int version = headerBuf.get() & 0xFF; + int compressionMode = headerBuf.get() & 0xFF; + int integerEncoding = headerBuf.get() & 0xFF; + int logVectorSize = headerBuf.get() & 0xFF; + int numElements = headerBuf.getInt(); + + if (version != ALP_VERSION) { + throw new ParquetDecodingException("Unsupported ALP version: " + version + ", expected " + ALP_VERSION); + } + if (compressionMode != ALP_COMPRESSION_MODE) { + throw new ParquetDecodingException("Unsupported ALP compression mode: " + compressionMode); + } + if (integerEncoding != ALP_INTEGER_ENCODING_FOR) { + throw new ParquetDecodingException("Unsupported ALP integer encoding: " + integerEncoding); + } + + int vectorSize = 1 << logVectorSize; + int numVectors = (numElements + vectorSize - 1) / vectorSize; + + this.totalCount = numElements; + this.decodedValues = new double[numElements]; + this.currentIndex = 0; + + // Read AlpInfo array + ByteBuffer alpInfoBuf = stream.slice(ALP_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + int[] exponents = new int[numVectors]; + int[] factors = new int[numVectors]; + int[] numExceptions = new int[numVectors]; + + for (int v = 0; v < numVectors; v++) { + exponents[v] = alpInfoBuf.get() & 0xFF; + factors[v] = alpInfoBuf.get() & 0xFF; + numExceptions[v] = alpInfoBuf.getShort() & 0xFFFF; + } + + // Read ForInfo array + ByteBuffer forInfoBuf = stream.slice(DOUBLE_FOR_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + long[] frameOfReference = new long[numVectors]; + int[] bitWidths = new int[numVectors]; + + for (int v = 0; v < numVectors; v++) { + frameOfReference[v] = forInfoBuf.getLong(); + bitWidths[v] = forInfoBuf.get() & 0xFF; + } + + // Decode each vector + for (int v = 0; v < numVectors; v++) { + int vectorStart = v * vectorSize; + int vectorEnd = Math.min(vectorStart + vectorSize, numElements); + int vectorLen = vectorEnd - vectorStart; + + // Calculate packed data size + int packedBytes = (vectorLen * bitWidths[v] + 7) / 8; + + // Read and unpack values + long[] deltas = new long[vectorLen]; + if (bitWidths[v] > 0) { + ByteBuffer packedBuf = stream.slice(packedBytes).order(ByteOrder.LITTLE_ENDIAN); + unpackLongs(packedBuf, deltas, vectorLen, bitWidths[v]); + } + // else: all deltas are 0 (bitWidth == 0) + + // Reverse FOR and decode + for (int i = 0; i < vectorLen; i++) { + long encoded = deltas[i] + frameOfReference[v]; + decodedValues[vectorStart + i] = AlpEncoderDecoder.decodeDouble(encoded, exponents[v], factors[v]); + } + + // Apply exceptions + if (numExceptions[v] > 0) { + ByteBuffer excPosBuf = stream.slice(numExceptions[v] * 2).order(ByteOrder.LITTLE_ENDIAN); + ByteBuffer excValBuf = + stream.slice(numExceptions[v] * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN); + + for (int e = 0; e < numExceptions[v]; e++) { + int pos = excPosBuf.getShort() & 0xFFFF; + double val = excValBuf.getDouble(); + decodedValues[vectorStart + pos] = val; + } + } + } + } + + /** + * Unpack longs from a byte buffer using the specified bit width. + * Note: This method reads from the buffer's current position, using absolute indexing + * relative to that position. + */ + private void unpackLongs(ByteBuffer packed, long[] values, int count, int bitWidth) { + int basePosition = packed.position(); + int bitPos = 0; + for (int i = 0; i < count; i++) { + long value = 0; + int bitsToRead = bitWidth; + int destBit = 0; + + while (bitsToRead > 0) { + int byteIdx = bitPos / 8; + int bitIdx = bitPos % 8; + int bitsAvailable = 8 - bitIdx; + int bitsThisRound = Math.min(bitsAvailable, bitsToRead); + // Use long arithmetic to avoid overflow for bit widths > 31 + long mask = (1L << bitsThisRound) - 1; + // Use basePosition + byteIdx to account for sliced buffer position + long bits = ((packed.get(basePosition + byteIdx) & 0xFFL) >>> bitIdx) & mask; + value |= (bits << destBit); + bitPos += bitsThisRound; + destBit += bitsThisRound; + bitsToRead -= bitsThisRound; + } + + values[i] = value; + } + } + + @Override + public double readDouble() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("ALP double data was already exhausted."); + } + return decodedValues[currentIndex++]; + } + + @Override + public void skip() { + skip(1); + } + + @Override + public void skip(int n) { + if (n < 0 || currentIndex + n > totalCount) { + throw new ParquetDecodingException(String.format( + "Cannot skip this many elements. Current index: %d. Skip %d. Total count: %d", + currentIndex, n, totalCount)); + } + currentIndex += n; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java new file mode 100644 index 0000000000..e6ee6eba76 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ALP values reader for FLOAT type. + * + *

Reads ALP-encoded float values from a page and decodes them back to float values. + * + *

Page Layout: + *

+ * ┌─────────┬────────────────┬────────────────┬─────────────┐
+ * │ Header  │ AlpInfo Array  │ ForInfo Array  │ Data Array  │
+ * │ 8 bytes │ 4B × N vectors │ 5B × N vectors │ Variable    │
+ * └─────────┴────────────────┴────────────────┴─────────────┘
+ * 
+ */ +public class AlpValuesReaderForFloat extends ValuesReader { + private static final Logger LOG = LoggerFactory.getLogger(AlpValuesReaderForFloat.class); + + // Decoded float values (eagerly decoded) + private float[] decodedValues; + private int currentIndex; + private int totalCount; + + public AlpValuesReaderForFloat() { + this.currentIndex = 0; + this.totalCount = 0; + } + + @Override + public void initFromPage(int valuesCount, ByteBufferInputStream stream) + throws ParquetDecodingException, IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + + // Read and validate header + ByteBuffer headerBuf = stream.slice(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int version = headerBuf.get() & 0xFF; + int compressionMode = headerBuf.get() & 0xFF; + int integerEncoding = headerBuf.get() & 0xFF; + int logVectorSize = headerBuf.get() & 0xFF; + int numElements = headerBuf.getInt(); + + if (version != ALP_VERSION) { + throw new ParquetDecodingException("Unsupported ALP version: " + version + ", expected " + ALP_VERSION); + } + if (compressionMode != ALP_COMPRESSION_MODE) { + throw new ParquetDecodingException("Unsupported ALP compression mode: " + compressionMode); + } + if (integerEncoding != ALP_INTEGER_ENCODING_FOR) { + throw new ParquetDecodingException("Unsupported ALP integer encoding: " + integerEncoding); + } + + int vectorSize = 1 << logVectorSize; + int numVectors = (numElements + vectorSize - 1) / vectorSize; + + this.totalCount = numElements; + this.decodedValues = new float[numElements]; + this.currentIndex = 0; + + // Read AlpInfo array + ByteBuffer alpInfoBuf = stream.slice(ALP_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + int[] exponents = new int[numVectors]; + int[] factors = new int[numVectors]; + int[] numExceptions = new int[numVectors]; + + for (int v = 0; v < numVectors; v++) { + exponents[v] = alpInfoBuf.get() & 0xFF; + factors[v] = alpInfoBuf.get() & 0xFF; + numExceptions[v] = alpInfoBuf.getShort() & 0xFFFF; + } + + // Read ForInfo array + ByteBuffer forInfoBuf = stream.slice(FLOAT_FOR_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + int[] frameOfReference = new int[numVectors]; + int[] bitWidths = new int[numVectors]; + + for (int v = 0; v < numVectors; v++) { + frameOfReference[v] = forInfoBuf.getInt(); + bitWidths[v] = forInfoBuf.get() & 0xFF; + } + + // Decode each vector + for (int v = 0; v < numVectors; v++) { + int vectorStart = v * vectorSize; + int vectorEnd = Math.min(vectorStart + vectorSize, numElements); + int vectorLen = vectorEnd - vectorStart; + + // Calculate packed data size + int packedBytes = (vectorLen * bitWidths[v] + 7) / 8; + + // Read and unpack values + int[] deltas = new int[vectorLen]; + if (bitWidths[v] > 0) { + ByteBuffer packedBuf = stream.slice(packedBytes).order(ByteOrder.LITTLE_ENDIAN); + unpackInts(packedBuf, deltas, vectorLen, bitWidths[v]); + } + // else: all deltas are 0 (bitWidth == 0) + + // Reverse FOR and decode + for (int i = 0; i < vectorLen; i++) { + int encoded = deltas[i] + frameOfReference[v]; + decodedValues[vectorStart + i] = AlpEncoderDecoder.decodeFloat(encoded, exponents[v], factors[v]); + } + + // Apply exceptions + if (numExceptions[v] > 0) { + ByteBuffer excPosBuf = stream.slice(numExceptions[v] * 2).order(ByteOrder.LITTLE_ENDIAN); + ByteBuffer excValBuf = + stream.slice(numExceptions[v] * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN); + + for (int e = 0; e < numExceptions[v]; e++) { + int pos = excPosBuf.getShort() & 0xFFFF; + float val = excValBuf.getFloat(); + decodedValues[vectorStart + pos] = val; + } + } + } + } + + /** + * Unpack integers from a byte buffer using the specified bit width. + * Note: This method reads from the buffer's current position, using absolute indexing + * relative to that position. + */ + private void unpackInts(ByteBuffer packed, int[] values, int count, int bitWidth) { + int basePosition = packed.position(); + int bitPos = 0; + for (int i = 0; i < count; i++) { + int value = 0; + int bitsToRead = bitWidth; + int destBit = 0; + + while (bitsToRead > 0) { + int byteIdx = bitPos / 8; + int bitIdx = bitPos % 8; + int bitsAvailable = 8 - bitIdx; + int bitsThisRound = Math.min(bitsAvailable, bitsToRead); + int mask = (1 << bitsThisRound) - 1; + // Use basePosition + byteIdx to account for sliced buffer position + int bits = ((packed.get(basePosition + byteIdx) & 0xFF) >>> bitIdx) & mask; + value |= (bits << destBit); + bitPos += bitsThisRound; + destBit += bitsThisRound; + bitsToRead -= bitsThisRound; + } + + values[i] = value; + } + } + + @Override + public float readFloat() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("ALP float data was already exhausted."); + } + return decodedValues[currentIndex++]; + } + + @Override + public void skip() { + skip(1); + } + + @Override + public void skip(int n) { + if (n < 0 || currentIndex + n > totalCount) { + throw new ParquetDecodingException(String.format( + "Cannot skip this many elements. Current index: %d. Skip %d. Total count: %d", + currentIndex, n, totalCount)); + } + currentIndex += n; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java new file mode 100644 index 0000000000..08569f22d2 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.CapacityByteArrayOutputStream; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; + +/** + * ALP (Adaptive Lossless floating-Point) values writer. + * + *

ALP encoding converts floating-point values to integers using decimal scaling, + * then applies Frame of Reference (FOR) encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Page Layout: + *

+ * ┌─────────┬────────────────┬────────────────┬─────────────┐
+ * │ Header  │ AlpInfo Array  │ ForInfo Array  │ Data Array  │
+ * │ 8 bytes │ 4B × N vectors │ 5B/9B × N      │ Variable    │
+ * └─────────┴────────────────┴────────────────┴─────────────┘
+ * 
+ */ +public abstract class AlpValuesWriter extends ValuesWriter { + + protected final int initialCapacity; + protected final int pageSize; + protected final ByteBufferAllocator allocator; + protected CapacityByteArrayOutputStream outputStream; + + public AlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + this.initialCapacity = initialCapacity; + this.pageSize = pageSize; + this.allocator = allocator; + this.outputStream = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); + } + + @Override + public Encoding getEncoding() { + return Encoding.ALP; + } + + @Override + public void close() { + outputStream.close(); + } + + /** + * Float-specific ALP values writer. + */ + public static class FloatAlpValuesWriter extends AlpValuesWriter { + private float[] buffer; + private int count; + + public FloatAlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + super(initialCapacity, pageSize, allocator); + // Initial buffer size - will grow as needed + this.buffer = new float[Math.max(ALP_VECTOR_SIZE, initialCapacity / Float.BYTES)]; + this.count = 0; + } + + @Override + public void writeFloat(float v) { + if (count >= buffer.length) { + // Grow buffer + float[] newBuffer = new float[buffer.length * 2]; + System.arraycopy(buffer, 0, newBuffer, 0, count); + buffer = newBuffer; + } + buffer[count++] = v; + } + + @Override + public long getBufferedSize() { + // Estimate: each float value contributes roughly 2-4 bytes after compression + // (actual size depends on data characteristics) + return count * 3L; // Conservative estimate + } + + @Override + public BytesInput getBytes() { + if (count == 0) { + return BytesInput.empty(); + } + + outputStream.reset(); + + // Calculate number of vectors + int numVectors = (count + ALP_VECTOR_SIZE - 1) / ALP_VECTOR_SIZE; + + // Prepare metadata arrays + int[] exponents = new int[numVectors]; + int[] factors = new int[numVectors]; + int[] numExceptions = new int[numVectors]; + int[] frameOfReference = new int[numVectors]; + int[] bitWidths = new int[numVectors]; + + // Prepare encoded data arrays + int[][] encodedValues = new int[numVectors][]; + short[][] exceptionPositions = new short[numVectors][]; + float[][] exceptionValues = new float[numVectors][]; + + // Process each vector + for (int v = 0; v < numVectors; v++) { + int vectorStart = v * ALP_VECTOR_SIZE; + int vectorEnd = Math.min(vectorStart + ALP_VECTOR_SIZE, count); + int vectorLen = vectorEnd - vectorStart; + + // Find best encoding parameters + AlpEncoderDecoder.EncodingParams params = + AlpEncoderDecoder.findBestFloatParams(buffer, vectorStart, vectorLen); + exponents[v] = params.exponent; + factors[v] = params.factor; + numExceptions[v] = params.numExceptions; + + // Encode values + int[] encoded = new int[vectorLen]; + short[] excPositions = new short[params.numExceptions]; + float[] excValues = new float[params.numExceptions]; + int excIdx = 0; + int placeholder = 0; // Will be set to first non-exception value + boolean foundNonException = false; + + // First pass: find placeholder + for (int i = 0; i < vectorLen; i++) { + float value = buffer[vectorStart + i]; + if (!AlpEncoderDecoder.isFloatException(value, params.exponent, params.factor)) { + placeholder = AlpEncoderDecoder.encodeFloat(value, params.exponent, params.factor); + foundNonException = true; + break; + } + } + + // Second pass: encode + int minValue = Integer.MAX_VALUE; + for (int i = 0; i < vectorLen; i++) { + float value = buffer[vectorStart + i]; + if (AlpEncoderDecoder.isFloatException(value, params.exponent, params.factor)) { + excPositions[excIdx] = (short) i; + excValues[excIdx] = value; + excIdx++; + encoded[i] = placeholder; // Use placeholder for exceptions + } else { + encoded[i] = AlpEncoderDecoder.encodeFloat(value, params.exponent, params.factor); + } + if (encoded[i] < minValue) { + minValue = encoded[i]; + } + } + + // Apply Frame of Reference + int maxDelta = 0; + for (int i = 0; i < vectorLen; i++) { + encoded[i] = encoded[i] - minValue; + if (encoded[i] > maxDelta) { + maxDelta = encoded[i]; + } + } + + frameOfReference[v] = minValue; + bitWidths[v] = AlpEncoderDecoder.bitWidth(maxDelta); + encodedValues[v] = encoded; + exceptionPositions[v] = excPositions; + exceptionValues[v] = excValues; + } + + // Write the page + ByteBuffer headerBuffer = ByteBuffer.allocate(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + headerBuffer.put((byte) ALP_VERSION); + headerBuffer.put((byte) ALP_COMPRESSION_MODE); + headerBuffer.put((byte) ALP_INTEGER_ENCODING_FOR); + headerBuffer.put((byte) ALP_VECTOR_SIZE_LOG); + headerBuffer.putInt(count); + outputStream.write(headerBuffer.array(), 0, ALP_HEADER_SIZE); + + // Write AlpInfo array + ByteBuffer alpInfoBuffer = + ByteBuffer.allocate(ALP_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + for (int v = 0; v < numVectors; v++) { + alpInfoBuffer.put((byte) exponents[v]); + alpInfoBuffer.put((byte) factors[v]); + alpInfoBuffer.putShort((short) numExceptions[v]); + } + outputStream.write(alpInfoBuffer.array(), 0, ALP_INFO_SIZE * numVectors); + + // Write ForInfo array + ByteBuffer forInfoBuffer = + ByteBuffer.allocate(FLOAT_FOR_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + for (int v = 0; v < numVectors; v++) { + forInfoBuffer.putInt(frameOfReference[v]); + forInfoBuffer.put((byte) bitWidths[v]); + } + outputStream.write(forInfoBuffer.array(), 0, FLOAT_FOR_INFO_SIZE * numVectors); + + // Write Data array for each vector + for (int v = 0; v < numVectors; v++) { + int vectorStart = v * ALP_VECTOR_SIZE; + int vectorEnd = Math.min(vectorStart + ALP_VECTOR_SIZE, count); + int vectorLen = vectorEnd - vectorStart; + + // Write bit-packed values + if (bitWidths[v] > 0) { + byte[] packed = packInts(encodedValues[v], vectorLen, bitWidths[v]); + outputStream.write(packed, 0, packed.length); + } + + // Write exception positions + if (numExceptions[v] > 0) { + ByteBuffer excPosBuffer = + ByteBuffer.allocate(numExceptions[v] * 2).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions[v]; i++) { + excPosBuffer.putShort(exceptionPositions[v][i]); + } + outputStream.write(excPosBuffer.array(), 0, numExceptions[v] * 2); + + // Write exception values + ByteBuffer excValBuffer = + ByteBuffer.allocate(numExceptions[v] * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions[v]; i++) { + excValBuffer.putFloat(exceptionValues[v][i]); + } + outputStream.write(excValBuffer.array(), 0, numExceptions[v] * Float.BYTES); + } + } + + return BytesInput.from(outputStream); + } + + /** + * Pack integers into a byte array using the specified bit width. + */ + private byte[] packInts(int[] values, int count, int bitWidth) { + int totalBits = count * bitWidth; + int totalBytes = (totalBits + 7) / 8; + byte[] packed = new byte[totalBytes]; + + long bitBuffer = 0; + int bitCount = 0; + int bytePos = 0; + + for (int i = 0; i < count; i++) { + bitBuffer |= ((long) values[i] << bitCount); + bitCount += bitWidth; + + while (bitCount >= 8) { + packed[bytePos++] = (byte) bitBuffer; + bitBuffer >>>= 8; + bitCount -= 8; + } + } + + // Write remaining bits + if (bitCount > 0) { + packed[bytePos] = (byte) bitBuffer; + } + + return packed; + } + + @Override + public void reset() { + count = 0; + outputStream.reset(); + } + + @Override + public long getAllocatedSize() { + return buffer.length * Float.BYTES + outputStream.getCapacity(); + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s FloatAlpValuesWriter %d values, %d bytes allocated", prefix, count, getAllocatedSize()); + } + } + + /** + * Double-specific ALP values writer. + */ + public static class DoubleAlpValuesWriter extends AlpValuesWriter { + private double[] buffer; + private int count; + + public DoubleAlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + super(initialCapacity, pageSize, allocator); + this.buffer = new double[Math.max(ALP_VECTOR_SIZE, initialCapacity / Double.BYTES)]; + this.count = 0; + } + + @Override + public void writeDouble(double v) { + if (count >= buffer.length) { + double[] newBuffer = new double[buffer.length * 2]; + System.arraycopy(buffer, 0, newBuffer, 0, count); + buffer = newBuffer; + } + buffer[count++] = v; + } + + @Override + public long getBufferedSize() { + return count * 5L; // Conservative estimate + } + + @Override + public BytesInput getBytes() { + if (count == 0) { + return BytesInput.empty(); + } + + outputStream.reset(); + + // Calculate number of vectors + int numVectors = (count + ALP_VECTOR_SIZE - 1) / ALP_VECTOR_SIZE; + + // Prepare metadata arrays + int[] exponents = new int[numVectors]; + int[] factors = new int[numVectors]; + int[] numExceptions = new int[numVectors]; + long[] frameOfReference = new long[numVectors]; + int[] bitWidths = new int[numVectors]; + + // Prepare encoded data arrays + long[][] encodedValues = new long[numVectors][]; + short[][] exceptionPositions = new short[numVectors][]; + double[][] exceptionValues = new double[numVectors][]; + + // Process each vector + for (int v = 0; v < numVectors; v++) { + int vectorStart = v * ALP_VECTOR_SIZE; + int vectorEnd = Math.min(vectorStart + ALP_VECTOR_SIZE, count); + int vectorLen = vectorEnd - vectorStart; + + // Find best encoding parameters + AlpEncoderDecoder.EncodingParams params = + AlpEncoderDecoder.findBestDoubleParams(buffer, vectorStart, vectorLen); + exponents[v] = params.exponent; + factors[v] = params.factor; + numExceptions[v] = params.numExceptions; + + // Encode values + long[] encoded = new long[vectorLen]; + short[] excPositions = new short[params.numExceptions]; + double[] excValues = new double[params.numExceptions]; + int excIdx = 0; + long placeholder = 0; + boolean foundNonException = false; + + // First pass: find placeholder + for (int i = 0; i < vectorLen; i++) { + double value = buffer[vectorStart + i]; + if (!AlpEncoderDecoder.isDoubleException(value, params.exponent, params.factor)) { + placeholder = AlpEncoderDecoder.encodeDouble(value, params.exponent, params.factor); + foundNonException = true; + break; + } + } + + // Second pass: encode + long minValue = Long.MAX_VALUE; + for (int i = 0; i < vectorLen; i++) { + double value = buffer[vectorStart + i]; + if (AlpEncoderDecoder.isDoubleException(value, params.exponent, params.factor)) { + excPositions[excIdx] = (short) i; + excValues[excIdx] = value; + excIdx++; + encoded[i] = placeholder; + } else { + encoded[i] = AlpEncoderDecoder.encodeDouble(value, params.exponent, params.factor); + } + if (encoded[i] < minValue) { + minValue = encoded[i]; + } + } + + // Apply Frame of Reference + // Use unsigned comparison because the delta range may exceed Long.MAX_VALUE + long maxDelta = 0; + for (int i = 0; i < vectorLen; i++) { + encoded[i] = encoded[i] - minValue; + // Use unsigned comparison to handle overflow when range > Long.MAX_VALUE + if (Long.compareUnsigned(encoded[i], maxDelta) > 0) { + maxDelta = encoded[i]; + } + } + + frameOfReference[v] = minValue; + // bitWidth works correctly for unsigned values (uses numberOfLeadingZeros) + bitWidths[v] = AlpEncoderDecoder.bitWidth(maxDelta); + encodedValues[v] = encoded; + exceptionPositions[v] = excPositions; + exceptionValues[v] = excValues; + } + + // Write the page + ByteBuffer headerBuffer = ByteBuffer.allocate(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + headerBuffer.put((byte) ALP_VERSION); + headerBuffer.put((byte) ALP_COMPRESSION_MODE); + headerBuffer.put((byte) ALP_INTEGER_ENCODING_FOR); + headerBuffer.put((byte) ALP_VECTOR_SIZE_LOG); + headerBuffer.putInt(count); + outputStream.write(headerBuffer.array(), 0, ALP_HEADER_SIZE); + + // Write AlpInfo array + ByteBuffer alpInfoBuffer = + ByteBuffer.allocate(ALP_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + for (int v = 0; v < numVectors; v++) { + alpInfoBuffer.put((byte) exponents[v]); + alpInfoBuffer.put((byte) factors[v]); + alpInfoBuffer.putShort((short) numExceptions[v]); + } + outputStream.write(alpInfoBuffer.array(), 0, ALP_INFO_SIZE * numVectors); + + // Write ForInfo array (9 bytes per vector for double) + ByteBuffer forInfoBuffer = + ByteBuffer.allocate(DOUBLE_FOR_INFO_SIZE * numVectors).order(ByteOrder.LITTLE_ENDIAN); + for (int v = 0; v < numVectors; v++) { + forInfoBuffer.putLong(frameOfReference[v]); + forInfoBuffer.put((byte) bitWidths[v]); + } + outputStream.write(forInfoBuffer.array(), 0, DOUBLE_FOR_INFO_SIZE * numVectors); + + // Write Data array for each vector + for (int v = 0; v < numVectors; v++) { + int vectorStart = v * ALP_VECTOR_SIZE; + int vectorEnd = Math.min(vectorStart + ALP_VECTOR_SIZE, count); + int vectorLen = vectorEnd - vectorStart; + + // Write bit-packed values + if (bitWidths[v] > 0) { + byte[] packed = packLongs(encodedValues[v], vectorLen, bitWidths[v]); + outputStream.write(packed, 0, packed.length); + } + + // Write exception positions + if (numExceptions[v] > 0) { + ByteBuffer excPosBuffer = + ByteBuffer.allocate(numExceptions[v] * 2).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions[v]; i++) { + excPosBuffer.putShort(exceptionPositions[v][i]); + } + outputStream.write(excPosBuffer.array(), 0, numExceptions[v] * 2); + + // Write exception values + ByteBuffer excValBuffer = + ByteBuffer.allocate(numExceptions[v] * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < numExceptions[v]; i++) { + excValBuffer.putDouble(exceptionValues[v][i]); + } + outputStream.write(excValBuffer.array(), 0, numExceptions[v] * Double.BYTES); + } + } + + return BytesInput.from(outputStream); + } + + /** + * Pack longs into a byte array using the specified bit width. + */ + private byte[] packLongs(long[] values, int count, int bitWidth) { + int totalBits = count * bitWidth; + int totalBytes = (totalBits + 7) / 8; + byte[] packed = new byte[totalBytes]; + + int bitPos = 0; + for (int i = 0; i < count; i++) { + long value = values[i]; + int bitsToWrite = bitWidth; + while (bitsToWrite > 0) { + int byteIdx = bitPos / 8; + int bitIdx = bitPos % 8; + int bitsAvailable = 8 - bitIdx; + int bitsThisRound = Math.min(bitsAvailable, bitsToWrite); + int mask = (1 << bitsThisRound) - 1; + packed[byteIdx] |= (byte) ((value & mask) << bitIdx); + value >>>= bitsThisRound; + bitPos += bitsThisRound; + bitsToWrite -= bitsThisRound; + } + } + + return packed; + } + + @Override + public void reset() { + count = 0; + outputStream.reset(); + } + + @Override + public long getAllocatedSize() { + return buffer.length * Double.BYTES + outputStream.getCapacity(); + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s DoubleAlpValuesWriter %d values, %d bytes allocated", prefix, count, getAllocatedSize()); + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpBitPackingTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpBitPackingTest.java new file mode 100644 index 0000000000..ff46051d38 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpBitPackingTest.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.junit.Test; + +/** + * Tests to debug bit packing issues. + */ +public class AlpBitPackingTest { + + @Test + public void testSimpleTwoFloats() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + // Write just two simple values + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + byte[] bytes = input.toByteArray(); + + System.out.println("Encoded bytes length: " + bytes.length); + System.out.println("Encoded bytes (hex):"); + for (int i = 0; i < bytes.length; i++) { + System.out.printf("%02X ", bytes[i] & 0xFF); + if ((i + 1) % 16 == 0) System.out.println(); + } + System.out.println(); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(2, ByteBufferInputStream.wrap(input.toByteBuffer())); + + float v1 = reader.readFloat(); + float v2 = reader.readFloat(); + + System.out.println("Read v1: " + v1 + " (bits: " + Float.floatToRawIntBits(v1) + ")"); + System.out.println("Read v2: " + v2 + " (bits: " + Float.floatToRawIntBits(v2) + ")"); + + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(v1)); + assertEquals(Float.floatToRawIntBits(2.0f), Float.floatToRawIntBits(v2)); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testThreeFloatsWithNegative() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + // Write values including negative + writer.writeFloat(1.0f); + writer.writeFloat(-1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + byte[] bytes = input.toByteArray(); + + System.out.println("testThreeFloatsWithNegative - Encoded bytes (hex):"); + for (int i = 0; i < bytes.length; i++) { + System.out.printf("%02X ", bytes[i] & 0xFF); + if ((i + 1) % 16 == 0) System.out.println(); + } + System.out.println(); + + // Parse header manually to debug + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + int version = buf.get() & 0xFF; + int compressionMode = buf.get() & 0xFF; + int integerEncoding = buf.get() & 0xFF; + int logVectorSize = buf.get() & 0xFF; + int numElements = buf.getInt(); + + System.out.println(" Header: version=" + version + ", numElements=" + numElements); + + // Parse AlpInfo + int exponent = buf.get() & 0xFF; + int factor = buf.get() & 0xFF; + int numExceptions = buf.getShort() & 0xFFFF; + + System.out.println( + " AlpInfo: exponent=" + exponent + ", factor=" + factor + ", numExceptions=" + numExceptions); + + // Parse ForInfo + int frameOfReference = buf.getInt(); + int bitWidth = buf.get() & 0xFF; + + System.out.println(" ForInfo: FOR=" + frameOfReference + ", bitWidth=" + bitWidth); + + // Show expected encoding + float multiplier = 1.0f; // For exp=0, fac=0 + System.out.println(" Expected encoding (exp=" + exponent + ", fac=" + factor + "):"); + System.out.println(" 1.0f -> " + Math.round(1.0f * multiplier)); + System.out.println(" -1.0f -> " + Math.round(-1.0f * multiplier)); + System.out.println(" 2.0f -> " + Math.round(2.0f * multiplier)); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(3, ByteBufferInputStream.wrap(input.toByteBuffer())); + + float v1 = reader.readFloat(); + float v2 = reader.readFloat(); + float v3 = reader.readFloat(); + + System.out.println("testThreeFloatsWithNegative:"); + System.out.println("Read v1: " + v1 + " expected: 1.0"); + System.out.println("Read v2: " + v2 + " expected: -1.0"); + System.out.println("Read v3: " + v3 + " expected: 2.0"); + + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(v1)); + assertEquals(Float.floatToRawIntBits(-1.0f), Float.floatToRawIntBits(v2)); + assertEquals(Float.floatToRawIntBits(2.0f), Float.floatToRawIntBits(v3)); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testEncoderDecoderDirectly() { + // Test the encoder/decoder directly without bit packing + float value = 1.0f; + int exponent = 2; + int factor = 0; + + // Check if it's an exception + boolean isException = AlpEncoderDecoder.isFloatException(value, exponent, factor); + System.out.println("Is 1.0f an exception with exp=2, fac=0? " + isException); + + if (!isException) { + int encoded = AlpEncoderDecoder.encodeFloat(value, exponent, factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, exponent, factor); + System.out.println("Encoded: " + encoded); + System.out.println("Decoded: " + decoded); + assertEquals(Float.floatToRawIntBits(value), Float.floatToRawIntBits(decoded)); + } + } + + @Test + public void testManualUnpack() throws Exception { + // Manually test unpacking + byte[] packedData = {0x02}; // Binary: 00000010 + ByteBuffer buf = ByteBuffer.wrap(packedData); + + int[] values = new int[2]; + int bitPos = 0; + int bitWidth = 1; + + for (int i = 0; i < 2; i++) { + int value = 0; + int bitsToRead = bitWidth; + int destBit = 0; + + while (bitsToRead > 0) { + int byteIdx = bitPos / 8; + int bitIdx = bitPos % 8; + int bitsAvailable = 8 - bitIdx; + int bitsThisRound = Math.min(bitsAvailable, bitsToRead); + int mask = (1 << bitsThisRound) - 1; + int bits = ((buf.get(byteIdx) & 0xFF) >>> bitIdx) & mask; + value |= (bits << destBit); + bitPos += bitsThisRound; + destBit += bitsThisRound; + bitsToRead -= bitsThisRound; + } + + values[i] = value; + System.out.println("Unpacked value[" + i + "] = " + value); + } + + assertEquals(0, values[0]); // First bit of 0x02 = 0 + assertEquals(1, values[1]); // Second bit of 0x02 = 1 + } + + @Test + public void testPackUnpackSymmetry() throws Exception { + // Test that packing and unpacking are symmetric + // Pack [2, 0, 3] with bitWidth=2, then unpack + + // Pack manually using the same algorithm as the writer + int[] values = {2, 0, 3}; + int bitWidth = 2; + int count = 3; + + // Pack (same as writer's packInts) + int totalBits = count * bitWidth; + int totalBytes = (totalBits + 7) / 8; + byte[] packed = new byte[totalBytes]; + + long bitBuffer = 0; + int bitCount = 0; + int bytePos = 0; + + for (int i = 0; i < count; i++) { + bitBuffer |= ((long) values[i] << bitCount); + bitCount += bitWidth; + + while (bitCount >= 8) { + packed[bytePos++] = (byte) bitBuffer; + bitBuffer >>>= 8; + bitCount -= 8; + } + } + + // Write remaining bits + if (bitCount > 0) { + packed[bytePos] = (byte) bitBuffer; + } + + System.out.println("testPackUnpackSymmetry:"); + System.out.println(" Input values: [" + values[0] + ", " + values[1] + ", " + values[2] + "]"); + System.out.println(" BitWidth: " + bitWidth); + System.out.println(" Packed bytes: "); + for (int i = 0; i < packed.length; i++) { + System.out.printf( + " packed[%d] = 0x%02X = %s\n", i, packed[i] & 0xFF, Integer.toBinaryString(packed[i] & 0xFF)); + } + + // Unpack (same as reader's unpackInts) + ByteBuffer buf = ByteBuffer.wrap(packed); + int[] unpacked = new int[count]; + int bitPos = 0; + + for (int i = 0; i < count; i++) { + int value = 0; + int bitsToRead = bitWidth; + int destBit = 0; + + while (bitsToRead > 0) { + int byteIdx = bitPos / 8; + int bitIdx = bitPos % 8; + int bitsAvailable = 8 - bitIdx; + int bitsThisRound = Math.min(bitsAvailable, bitsToRead); + int mask = (1 << bitsThisRound) - 1; + int bits = ((buf.get(byteIdx) & 0xFF) >>> bitIdx) & mask; + value |= (bits << destBit); + bitPos += bitsThisRound; + destBit += bitsThisRound; + bitsToRead -= bitsThisRound; + } + + unpacked[i] = value; + } + + System.out.println(" Unpacked values: [" + unpacked[0] + ", " + unpacked[1] + ", " + unpacked[2] + "]"); + + for (int i = 0; i < count; i++) { + assertEquals("Value at index " + i, values[i], unpacked[i]); + } + } + + @Test + public void testDoubleRandomDebug() throws Exception { + // Test with random doubles to debug the issue + java.util.Random rand = new java.util.Random(42); + final int numElements = 1024; + double[] values = new double[numElements]; + for (int i = 0; i < numElements; ++i) { + values[i] = rand.nextDouble() * 1000000.0 - 500000.0; + } + + // Find which values will be exceptions + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestDoubleParams(values, 0, numElements); + System.out.println("testDoubleRandomDebug:"); + System.out.println(" Best params: exponent=" + params.exponent + ", factor=" + params.factor); + System.out.println(" Num exceptions: " + params.numExceptions + " out of " + numElements); + + // Encode value 28 manually to see what it should be + double val28 = values[28]; + System.out.println(" Value at index 28: " + val28); + if (!AlpEncoderDecoder.isDoubleException(val28, params.exponent, params.factor)) { + long encoded28 = AlpEncoderDecoder.encodeDouble(val28, params.exponent, params.factor); + System.out.println(" Encoded (before FOR): " + encoded28); + double decoded28 = AlpEncoderDecoder.decodeDouble(encoded28, params.exponent, params.factor); + System.out.println(" Decoded back: " + decoded28); + System.out.println(" Round-trip OK? " + + (Double.doubleToRawLongBits(val28) == Double.doubleToRawLongBits(decoded28))); + } + + // Now test encode/decode + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter( + numElements * 16, numElements * 16, new DirectByteBufferAllocator()); + + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + byte[] bytes = input.toByteArray(); + + System.out.println(" Total encoded bytes: " + bytes.length); + + // Parse header + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + int version = buf.get() & 0xFF; + int compressionMode = buf.get() & 0xFF; + int integerEncoding = buf.get() & 0xFF; + int logVectorSize = buf.get() & 0xFF; + int numElem = buf.getInt(); + System.out.println(" Header: numElements=" + numElem); + + // Parse AlpInfo + int exponent = buf.get() & 0xFF; + int factor = buf.get() & 0xFF; + int numExceptions = buf.getShort() & 0xFFFF; + System.out.println( + " AlpInfo: exponent=" + exponent + ", factor=" + factor + ", numExceptions=" + numExceptions); + + // Parse ForInfo (9 bytes for double) + long frameOfReference = buf.getLong(); + int bitWidth = buf.get() & 0xFF; + System.out.println(" ForInfo: FOR=" + frameOfReference + ", bitWidth=" + bitWidth); + + // Calculate expected delta for value 28 + if (!AlpEncoderDecoder.isDoubleException(val28, params.exponent, params.factor)) { + long encoded28 = AlpEncoderDecoder.encodeDouble(val28, params.exponent, params.factor); + long delta28 = encoded28 - frameOfReference; + System.out.println(" Expected delta for value 28: " + delta28); + System.out.println(" Delta in hex: " + Long.toHexString(delta28)); + } + + // Now manually read the packed data for index 28 + int packedDataStart = 8 + 4 + 9; // Header + AlpInfo + ForInfo + int bitStart = 28 * bitWidth; + int byteStart = packedDataStart + (bitStart / 8); + int bitOffset = bitStart % 8; + System.out.println( + " Packed data for index 28 starts at byte " + byteStart + ", bit offset " + bitOffset); + + // Print the bytes around this position + System.out.print(" Bytes around position: "); + for (int i = byteStart; i < Math.min(byteStart + 10, bytes.length); i++) { + System.out.printf("%02X ", bytes[i] & 0xFF); + } + System.out.println(); + + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(input.toByteBuffer())); + + // Check first few values and find first mismatch + int firstMismatch = -1; + for (int i = 0; i < numElements; i++) { + double expected = values[i]; + double actual = reader.readDouble(); + if (Double.doubleToRawLongBits(expected) != Double.doubleToRawLongBits(actual)) { + System.out.println(" First mismatch at index " + i + ":"); + System.out.println( + " Expected: " + expected + " (bits: " + Double.doubleToRawLongBits(expected) + ")"); + System.out.println(" Actual: " + actual + " (bits: " + Double.doubleToRawLongBits(actual) + ")"); + System.out.println( + " Is exception? " + AlpEncoderDecoder.isDoubleException(expected, exponent, factor)); + firstMismatch = i; + break; + } + } + + if (firstMismatch == -1) { + System.out.println(" All values matched!"); + } + + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testHeaderParsing() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + byte[] bytes = input.toByteArray(); + + // Parse header manually + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + int version = buf.get() & 0xFF; + int compressionMode = buf.get() & 0xFF; + int integerEncoding = buf.get() & 0xFF; + int logVectorSize = buf.get() & 0xFF; + int numElements = buf.getInt(); + + System.out.println("Header parsing:"); + System.out.println(" version: " + version); + System.out.println(" compressionMode: " + compressionMode); + System.out.println(" integerEncoding: " + integerEncoding); + System.out.println(" logVectorSize: " + logVectorSize); + System.out.println(" numElements: " + numElements); + + assertEquals(1, version); + assertEquals(0, compressionMode); + assertEquals(0, integerEncoding); + assertEquals(10, logVectorSize); + assertEquals(2, numElements); + + // Parse AlpInfo + int exponent = buf.get() & 0xFF; + int factor = buf.get() & 0xFF; + int numExceptions = buf.getShort() & 0xFFFF; + + System.out.println("AlpInfo:"); + System.out.println(" exponent: " + exponent); + System.out.println(" factor: " + factor); + System.out.println(" numExceptions: " + numExceptions); + + // Parse ForInfo + int frameOfReference = buf.getInt(); + int bitWidth = buf.get() & 0xFF; + + System.out.println("ForInfo:"); + System.out.println(" frameOfReference: " + frameOfReference); + System.out.println(" bitWidth: " + bitWidth); + + // Calculate expected packed data size + int vectorLen = 2; + int packedBytes = (vectorLen * bitWidth + 7) / 8; + System.out.println("Expected packed bytes: " + packedBytes); + + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java new file mode 100644 index 0000000000..4594e51d70 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import org.junit.Test; + +/** + * Tests for the core ALP encoder/decoder logic. + */ +public class AlpEncoderDecoderTest { + + // ========== Float Encoding/Decoding Tests ========== + + @Test + public void testFloatRoundTrip() { + float[] testValues = {0.0f, 1.0f, -1.0f, 3.14159f, 100.5f, 0.001f, 1234567.0f}; + + for (float value : testValues) { + for (int exponent = 0; exponent <= AlpConstants.FLOAT_MAX_EXPONENT; exponent++) { + for (int factor = 0; factor <= exponent; factor++) { + if (!AlpEncoderDecoder.isFloatException(value, exponent, factor)) { + int encoded = AlpEncoderDecoder.encodeFloat(value, exponent, factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, exponent, factor); + assertEquals( + "Round-trip failed for value=" + value + ", exponent=" + exponent + ", factor=" + + factor, + Float.floatToRawIntBits(value), + Float.floatToRawIntBits(decoded)); + } + } + } + } + } + + @Test + public void testFloatExceptionDetection() { + // NaN should be an exception + assertTrue("NaN should be an exception", AlpEncoderDecoder.isFloatException(Float.NaN)); + + // Infinity should be an exception + assertTrue( + "Positive infinity should be an exception", + AlpEncoderDecoder.isFloatException(Float.POSITIVE_INFINITY)); + assertTrue( + "Negative infinity should be an exception", + AlpEncoderDecoder.isFloatException(Float.NEGATIVE_INFINITY)); + + // Negative zero should be an exception + assertTrue("Negative zero should be an exception", AlpEncoderDecoder.isFloatException(-0.0f)); + + // Regular values should not be exceptions (basic check) + assertFalse("1.0f should not be a basic exception", AlpEncoderDecoder.isFloatException(1.0f)); + assertFalse("0.0f should not be a basic exception", AlpEncoderDecoder.isFloatException(0.0f)); + } + + @Test + public void testFloatEncoding() { + // Test known encoding: 1.23 * 10^2 = 123 + int encoded = AlpEncoderDecoder.encodeFloat(1.23f, 2, 0); + assertEquals(123, encoded); + + // Test with factor: 12.3 * 10^(2-1) = 12.3 * 10 = 123 + encoded = AlpEncoderDecoder.encodeFloat(12.3f, 2, 1); + assertEquals(123, encoded); + + // Test zero + encoded = AlpEncoderDecoder.encodeFloat(0.0f, 5, 0); + assertEquals(0, encoded); + } + + @Test + public void testFloatDecoding() { + // Test known decoding: 123 / 10^2 = 1.23 + float decoded = AlpEncoderDecoder.decodeFloat(123, 2, 0); + assertEquals(1.23f, decoded, 1e-6f); + + // Test with factor: 123 / 10^(2-1) = 123 / 10 = 12.3 + decoded = AlpEncoderDecoder.decodeFloat(123, 2, 1); + assertEquals(12.3f, decoded, 1e-6f); + + // Test zero + decoded = AlpEncoderDecoder.decodeFloat(0, 5, 0); + assertEquals(0.0f, decoded, 0.0f); + } + + @Test + public void testFastRoundFloat() { + // Positive values + assertEquals(5, AlpEncoderDecoder.fastRoundFloat(5.4f)); + assertEquals(6, AlpEncoderDecoder.fastRoundFloat(5.5f)); + assertEquals(6, AlpEncoderDecoder.fastRoundFloat(5.6f)); + + // Negative values + assertEquals(-5, AlpEncoderDecoder.fastRoundFloat(-5.4f)); + assertEquals(-6, AlpEncoderDecoder.fastRoundFloat(-5.5f)); + assertEquals(-6, AlpEncoderDecoder.fastRoundFloat(-5.6f)); + + // Zero + assertEquals(0, AlpEncoderDecoder.fastRoundFloat(0.0f)); + } + + // ========== Double Encoding/Decoding Tests ========== + + @Test + public void testDoubleRoundTrip() { + double[] testValues = {0.0, 1.0, -1.0, 3.14159265358979, 100.5, 0.001, 12345678901234.0}; + + for (double value : testValues) { + for (int exponent = 0; exponent <= Math.min(AlpConstants.DOUBLE_MAX_EXPONENT, 10); exponent++) { + for (int factor = 0; factor <= exponent; factor++) { + if (!AlpEncoderDecoder.isDoubleException(value, exponent, factor)) { + long encoded = AlpEncoderDecoder.encodeDouble(value, exponent, factor); + double decoded = AlpEncoderDecoder.decodeDouble(encoded, exponent, factor); + assertEquals( + "Round-trip failed for value=" + value + ", exponent=" + exponent + ", factor=" + + factor, + Double.doubleToRawLongBits(value), + Double.doubleToRawLongBits(decoded)); + } + } + } + } + } + + @Test + public void testDoubleExceptionDetection() { + // NaN should be an exception + assertTrue("NaN should be an exception", AlpEncoderDecoder.isDoubleException(Double.NaN)); + + // Infinity should be an exception + assertTrue( + "Positive infinity should be an exception", + AlpEncoderDecoder.isDoubleException(Double.POSITIVE_INFINITY)); + assertTrue( + "Negative infinity should be an exception", + AlpEncoderDecoder.isDoubleException(Double.NEGATIVE_INFINITY)); + + // Negative zero should be an exception + assertTrue("Negative zero should be an exception", AlpEncoderDecoder.isDoubleException(-0.0)); + + // Regular values should not be exceptions (basic check) + assertFalse("1.0 should not be a basic exception", AlpEncoderDecoder.isDoubleException(1.0)); + assertFalse("0.0 should not be a basic exception", AlpEncoderDecoder.isDoubleException(0.0)); + } + + @Test + public void testDoubleEncoding() { + // Test known encoding: 1.23 * 10^2 = 123 + long encoded = AlpEncoderDecoder.encodeDouble(1.23, 2, 0); + assertEquals(123L, encoded); + + // Test with factor: 12.3 * 10^(2-1) = 12.3 * 10 = 123 + encoded = AlpEncoderDecoder.encodeDouble(12.3, 2, 1); + assertEquals(123L, encoded); + + // Test zero + encoded = AlpEncoderDecoder.encodeDouble(0.0, 5, 0); + assertEquals(0L, encoded); + } + + @Test + public void testDoubleDecoding() { + // Test known decoding: 123 / 10^2 = 1.23 + double decoded = AlpEncoderDecoder.decodeDouble(123, 2, 0); + assertEquals(1.23, decoded, 1e-10); + + // Test with factor: 123 / 10^(2-1) = 123 / 10 = 12.3 + decoded = AlpEncoderDecoder.decodeDouble(123, 2, 1); + assertEquals(12.3, decoded, 1e-10); + + // Test zero + decoded = AlpEncoderDecoder.decodeDouble(0, 5, 0); + assertEquals(0.0, decoded, 0.0); + } + + @Test + public void testFastRoundDouble() { + // Positive values + assertEquals(5L, AlpEncoderDecoder.fastRoundDouble(5.4)); + assertEquals(6L, AlpEncoderDecoder.fastRoundDouble(5.5)); + assertEquals(6L, AlpEncoderDecoder.fastRoundDouble(5.6)); + + // Negative values + assertEquals(-5L, AlpEncoderDecoder.fastRoundDouble(-5.4)); + assertEquals(-6L, AlpEncoderDecoder.fastRoundDouble(-5.5)); + assertEquals(-6L, AlpEncoderDecoder.fastRoundDouble(-5.6)); + + // Zero + assertEquals(0L, AlpEncoderDecoder.fastRoundDouble(0.0)); + } + + // ========== Bit Width Tests ========== + + @Test + public void testBitWidthInt() { + assertEquals(0, AlpEncoderDecoder.bitWidth(0)); + assertEquals(1, AlpEncoderDecoder.bitWidth(1)); + assertEquals(2, AlpEncoderDecoder.bitWidth(2)); + assertEquals(2, AlpEncoderDecoder.bitWidth(3)); + assertEquals(3, AlpEncoderDecoder.bitWidth(4)); + assertEquals(8, AlpEncoderDecoder.bitWidth(255)); + assertEquals(9, AlpEncoderDecoder.bitWidth(256)); + assertEquals(16, AlpEncoderDecoder.bitWidth(65535)); + assertEquals(31, AlpEncoderDecoder.bitWidth(Integer.MAX_VALUE)); + } + + @Test + public void testBitWidthLong() { + assertEquals(0, AlpEncoderDecoder.bitWidth(0L)); + assertEquals(1, AlpEncoderDecoder.bitWidth(1L)); + assertEquals(2, AlpEncoderDecoder.bitWidth(2L)); + assertEquals(2, AlpEncoderDecoder.bitWidth(3L)); + assertEquals(3, AlpEncoderDecoder.bitWidth(4L)); + assertEquals(8, AlpEncoderDecoder.bitWidth(255L)); + assertEquals(9, AlpEncoderDecoder.bitWidth(256L)); + assertEquals(16, AlpEncoderDecoder.bitWidth(65535L)); + assertEquals(31, AlpEncoderDecoder.bitWidth((long) Integer.MAX_VALUE)); + assertEquals(63, AlpEncoderDecoder.bitWidth(Long.MAX_VALUE)); + } + + // ========== Best Parameters Tests ========== + + @Test + public void testFindBestFloatParams() { + // Test with values that have a clear best exponent/factor + float[] values = {1.23f, 4.56f, 7.89f, 10.11f, 12.13f}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + + assertNotNull(params); + assertTrue(params.exponent >= 0 && params.exponent <= AlpConstants.FLOAT_MAX_EXPONENT); + assertTrue(params.factor >= 0 && params.factor <= params.exponent); + + // Verify these params work for the values + for (float v : values) { + if (!AlpEncoderDecoder.isFloatException(v, params.exponent, params.factor)) { + int encoded = AlpEncoderDecoder.encodeFloat(v, params.exponent, params.factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, params.exponent, params.factor); + assertEquals(Float.floatToRawIntBits(v), Float.floatToRawIntBits(decoded)); + } + } + } + + @Test + public void testFindBestFloatParamsWithAllExceptions() { + // All NaN values - should all be exceptions + float[] values = {Float.NaN, Float.NaN, Float.NaN}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + + assertNotNull(params); + assertEquals(values.length, params.numExceptions); + } + + @Test + public void testFindBestDoubleParams() { + // Test with values that have a clear best exponent/factor + double[] values = {1.23, 4.56, 7.89, 10.11, 12.13}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestDoubleParams(values, 0, values.length); + + assertNotNull(params); + assertTrue(params.exponent >= 0 && params.exponent <= AlpConstants.DOUBLE_MAX_EXPONENT); + assertTrue(params.factor >= 0 && params.factor <= params.exponent); + + // Verify these params work for the values + for (double v : values) { + if (!AlpEncoderDecoder.isDoubleException(v, params.exponent, params.factor)) { + long encoded = AlpEncoderDecoder.encodeDouble(v, params.exponent, params.factor); + double decoded = AlpEncoderDecoder.decodeDouble(encoded, params.exponent, params.factor); + assertEquals(Double.doubleToRawLongBits(v), Double.doubleToRawLongBits(decoded)); + } + } + } + + @Test + public void testFindBestDoubleParamsWithAllExceptions() { + // All NaN/Infinity values - should all be exceptions + double[] values = {Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestDoubleParams(values, 0, values.length); + + assertNotNull(params); + assertEquals(values.length, params.numExceptions); + } + + @Test + public void testFindBestParamsWithOffset() { + // Test that offset works correctly + float[] values = {Float.NaN, Float.NaN, 1.23f, 4.56f, 7.89f, Float.NaN}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 2, 3); + + assertNotNull(params); + // Should be able to encode values[2..4] without exceptions + assertEquals(0, params.numExceptions); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesEndToEndTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesEndToEndTest.java new file mode 100644 index 0000000000..8b81684d62 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesEndToEndTest.java @@ -0,0 +1,719 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.assertEquals; + +import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.junit.Test; + +/** + * End-to-end tests for ALP encoding and decoding pipeline. + */ +public class AlpValuesEndToEndTest { + + // ========== Float Pipeline Tests ========== + + @Test + public void testFloatPipelineSingleValue() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(64, 64, new DirectByteBufferAllocator()); + + float value = 1.23f; + writer.writeFloat(value); + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(1, ByteBufferInputStream.wrap(input.toByteBuffer())); + + float decoded = reader.readFloat(); + assertEquals(Float.floatToRawIntBits(value), Float.floatToRawIntBits(decoded)); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatPipelineSmallBatch() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + float[] values = {0.0f, 1.0f, -1.0f, 3.14f, 100.5f, 0.001f, 1234567.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + float actual = reader.readFloat(); + assertEquals( + "Value mismatch for " + expected, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatPipelineRandomData() throws Exception { + Random rand = new Random(42); + final int numElements = 1024; + float[] values = new float[numElements]; + for (int i = 0; i < numElements; ++i) { + values[i] = rand.nextFloat() * 10000.0f - 5000.0f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter( + numElements * 8, numElements * 8, new DirectByteBufferAllocator()); + + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + float actual = reader.readFloat(); + assertEquals( + "Value mismatch for " + expected, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatPipelineWithExceptions() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + // Mix of regular values and exception values + float[] values = { + 1.0f, Float.NaN, 2.0f, Float.POSITIVE_INFINITY, 3.0f, Float.NEGATIVE_INFINITY, 4.0f, -0.0f, 5.0f + }; + + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + float expected = values[i]; + float actual = reader.readFloat(); + + if (Float.isNaN(expected)) { + assertEquals("NaN check at index " + i, true, Float.isNaN(actual)); + } else { + assertEquals( + "Value mismatch at index " + i + " for " + expected, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatPipelineAllExceptions() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + // All exception values + float[] values = {Float.NaN, Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, -0.0f}; + + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + float expected = values[i]; + float actual = reader.readFloat(); + + if (Float.isNaN(expected)) { + assertEquals("NaN check at index " + i, true, Float.isNaN(actual)); + } else { + assertEquals( + "Value mismatch at index " + i + " for " + expected, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatPipelineMultipleVectors() throws Exception { + Random rand = new Random(42); + // More than one vector (vector size is 1024) + final int numElements = 3000; + float[] values = new float[numElements]; + for (int i = 0; i < numElements; ++i) { + values[i] = rand.nextFloat() * 1000.0f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter( + numElements * 8, numElements * 8, new DirectByteBufferAllocator()); + + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < numElements; i++) { + float expected = values[i]; + float actual = reader.readFloat(); + assertEquals( + "Value mismatch at index " + i, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatReaderSkip() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f, 9.0f, 10.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + // Read first value + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + + // Skip 3 values + reader.skip(3); + + // Should now read 5.0 + assertEquals(Float.floatToRawIntBits(5.0f), Float.floatToRawIntBits(reader.readFloat())); + + // Skip 2 values + reader.skip(2); + + // Should now read 8.0 + assertEquals(Float.floatToRawIntBits(8.0f), Float.floatToRawIntBits(reader.readFloat())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Double Pipeline Tests ========== + + @Test + public void testDoublePipelineSingleValue() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(128, 128, new DirectByteBufferAllocator()); + + double value = 1.23456789; + writer.writeDouble(value); + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(1, ByteBufferInputStream.wrap(input.toByteBuffer())); + + double decoded = reader.readDouble(); + assertEquals(Double.doubleToRawLongBits(value), Double.doubleToRawLongBits(decoded)); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoublePipelineSmallBatch() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + double[] values = {0.0, 1.0, -1.0, 3.14159265358979, 100.5, 0.001, 12345678901234.0}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + double actual = reader.readDouble(); + assertEquals( + "Value mismatch for " + expected, + Double.doubleToRawLongBits(expected), + Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoublePipelineRandomData() throws Exception { + Random rand = new Random(42); + final int numElements = 1024; + double[] values = new double[numElements]; + for (int i = 0; i < numElements; ++i) { + values[i] = rand.nextDouble() * 1000000.0 - 500000.0; + } + + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter( + numElements * 16, numElements * 16, new DirectByteBufferAllocator()); + + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + double actual = reader.readDouble(); + assertEquals( + "Value mismatch for " + expected, + Double.doubleToRawLongBits(expected), + Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoublePipelineWithExceptions() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + // Mix of regular values and exception values + double[] values = { + 1.0, Double.NaN, 2.0, Double.POSITIVE_INFINITY, 3.0, Double.NEGATIVE_INFINITY, 4.0, -0.0, 5.0 + }; + + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + double expected = values[i]; + double actual = reader.readDouble(); + + if (Double.isNaN(expected)) { + assertEquals("NaN check at index " + i, true, Double.isNaN(actual)); + } else { + assertEquals( + "Value mismatch at index " + i + " for " + expected, + Double.doubleToRawLongBits(expected), + Double.doubleToRawLongBits(actual)); + } + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoublePipelineAllExceptions() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + // All exception values + double[] values = {Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, -0.0}; + + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + double expected = values[i]; + double actual = reader.readDouble(); + + if (Double.isNaN(expected)) { + assertEquals("NaN check at index " + i, true, Double.isNaN(actual)); + } else { + assertEquals( + "Value mismatch at index " + i + " for " + expected, + Double.doubleToRawLongBits(expected), + Double.doubleToRawLongBits(actual)); + } + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoublePipelineMultipleVectors() throws Exception { + Random rand = new Random(42); + // More than one vector (vector size is 1024) + final int numElements = 3000; + double[] values = new double[numElements]; + for (int i = 0; i < numElements; ++i) { + values[i] = rand.nextDouble() * 100000.0; + } + + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter( + numElements * 16, numElements * 16, new DirectByteBufferAllocator()); + + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < numElements; i++) { + double expected = values[i]; + double actual = reader.readDouble(); + assertEquals( + "Value mismatch at index " + i, + Double.doubleToRawLongBits(expected), + Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoubleReaderSkip() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + double[] values = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + // Read first value + assertEquals(Double.doubleToRawLongBits(1.0), Double.doubleToRawLongBits(reader.readDouble())); + + // Skip 3 values + reader.skip(3); + + // Should now read 5.0 + assertEquals(Double.doubleToRawLongBits(5.0), Double.doubleToRawLongBits(reader.readDouble())); + + // Skip 2 values + reader.skip(2); + + // Should now read 8.0 + assertEquals(Double.doubleToRawLongBits(8.0), Double.doubleToRawLongBits(reader.readDouble())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Writer Reset Tests ========== + + @Test + public void testFloatWriterReset() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + // Write some values + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + + // Reset + writer.reset(); + + // Should be empty after reset + assertEquals(0, writer.getBufferedSize()); + + // Write new values + float[] values = {3.0f, 4.0f, 5.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + float actual = reader.readFloat(); + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoubleWriterReset() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + // Write some values + writer.writeDouble(1.0); + writer.writeDouble(2.0); + + // Reset + writer.reset(); + + // Should be empty after reset + assertEquals(0, writer.getBufferedSize()); + + // Write new values + double[] values = {3.0, 4.0, 5.0}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + double actual = reader.readDouble(); + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Edge Case Tests ========== + + @Test + public void testFloatZeroValues() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + float[] values = {0.0f, 0.0f, 0.0f, 0.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + float actual = reader.readFloat(); + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoubleZeroValues() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + double[] values = {0.0, 0.0, 0.0, 0.0}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + double actual = reader.readDouble(); + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatIdenticalValues() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + float[] values = new float[100]; + for (int i = 0; i < values.length; i++) { + values[i] = 3.14159f; + } + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + float actual = reader.readFloat(); + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoubleIdenticalValues() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + double[] values = new double[100]; + for (int i = 0; i < values.length; i++) { + values[i] = 3.14159265358979; + } + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + double actual = reader.readDouble(); + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 60150439a6..3816526f18 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -748,6 +748,10 @@ public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) { } public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) { + // ALP encoding is not yet part of the parquet-format specification + if (encoding == org.apache.parquet.column.Encoding.ALP) { + throw new IllegalArgumentException("ALP encoding is not yet supported in the parquet-format specification"); + } return Encoding.valueOf(encoding.name()); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 264017a1f0..70692e6735 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -471,6 +471,10 @@ public void testLogicalToConvertedTypeConversion() { public void testEnumEquivalence() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); for (org.apache.parquet.column.Encoding encoding : org.apache.parquet.column.Encoding.values()) { + // Skip ALP encoding as it's not yet in the parquet-format specification + if (encoding == org.apache.parquet.column.Encoding.ALP) { + continue; + } assertEquals( encoding, parquetMetadataConverter.getEncoding(parquetMetadataConverter.getEncoding(encoding))); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadAlp.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadAlp.java new file mode 100644 index 0000000000..4787769de0 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadAlp.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cross-compatibility test for ALP (Adaptive Lossless floating-Point) encoding. + * + *

This test reads ALP-encoded parquet files generated by Arrow C++ to verify + * that the Java implementation can correctly decode them. + * + *

To run this test with local files generated by Arrow C++, set the environment variable + * or system property: + *

+ * + *

Once parquet-testing PR #100 is merged, this test will also support downloading + * test files from the parquet-testing repository. + * + * @see Arrow C++ ALP PR + * @see parquet-testing ALP PR + */ +public class TestInterOpReadAlp { + private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadAlp.class); + + // TODO: Update these once parquet-testing PR #100 is merged + // private static final String ALP_FLOAT_FILE = "alp_float.parquet"; + // private static final String ALP_DOUBLE_FILE = "alp_double.parquet"; + // private static final String CHANGESET = "TBD"; + // private final InterOpTester interop = new InterOpTester(); + + /** + * Get the path to a local ALP test file. + * + * @return Path to the test file, or null if not configured + */ + private Path getLocalAlpTestFile() { + String filePath = System.getProperty("ALP_TEST_FILE"); + if (filePath == null) { + filePath = System.getenv("ALP_TEST_FILE"); + } + if (filePath != null && new File(filePath).exists()) { + return new Path(filePath); + } + return null; + } + + /** + * Test reading an ALP-encoded parquet file from a local path. + * + *

This test is skipped if no local file is configured. + * Set ALP_TEST_FILE environment variable or system property to run. + */ + @Test + public void testReadLocalAlpFile() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Reading ALP test file: {}", alpFile); + + int rowCount = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + // Log first few rows for debugging + if (rowCount <= 5) { + LOG.info("Row {}: {}", rowCount, group); + } + } + } + + LOG.info("Successfully read {} rows from ALP-encoded file", rowCount); + assertTrue("Expected at least one row", rowCount > 0); + } + + /** + * Test reading ALP-encoded floats and comparing with PLAIN-encoded values. + * + *

This test expects a parquet file with columns: + *

+ * + *

Set ALP_TEST_FILE to a file with these columns to run this test. + */ + @Test + public void testReadAlpFloatCompareWithPlain() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Comparing ALP vs PLAIN float encoding from: {}", alpFile); + + int rowCount = 0; + int mismatchCount = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + try { + // Try to read both columns - may not exist depending on file format + float plainVal = group.getFloat("float_plain", 0); + float alpVal = group.getFloat("float_alp", 0); + + // Compare bit-exact values + if (Float.floatToRawIntBits(plainVal) != Float.floatToRawIntBits(alpVal)) { + mismatchCount++; + if (mismatchCount <= 5) { + LOG.warn( + "Float mismatch at row {}: plain={} (bits={}), alp={} (bits={})", + rowCount, + plainVal, + Float.floatToRawIntBits(plainVal), + alpVal, + Float.floatToRawIntBits(alpVal)); + } + } + } catch (RuntimeException e) { + // Columns may not exist in this file format + if (rowCount == 1) { + LOG.info("Could not compare float columns: {}", e.getMessage()); + } + break; + } + } + } + + if (mismatchCount > 0) { + LOG.error("Found {} float mismatches out of {} rows", mismatchCount, rowCount); + } + assertEquals("Float values should match between PLAIN and ALP encoding", 0, mismatchCount); + } + + /** + * Test reading ALP-encoded doubles and comparing with PLAIN-encoded values. + * + *

This test expects a parquet file with columns: + *

+ * + *

Set ALP_TEST_FILE to a file with these columns to run this test. + */ + @Test + public void testReadAlpDoubleCompareWithPlain() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Comparing ALP vs PLAIN double encoding from: {}", alpFile); + + int rowCount = 0; + int mismatchCount = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + try { + // Try to read both columns - may not exist depending on file format + double plainVal = group.getDouble("double_plain", 0); + double alpVal = group.getDouble("double_alp", 0); + + // Compare bit-exact values + if (Double.doubleToRawLongBits(plainVal) != Double.doubleToRawLongBits(alpVal)) { + mismatchCount++; + if (mismatchCount <= 5) { + LOG.warn( + "Double mismatch at row {}: plain={} (bits={}), alp={} (bits={})", + rowCount, + plainVal, + Double.doubleToRawLongBits(plainVal), + alpVal, + Double.doubleToRawLongBits(alpVal)); + } + } + } catch (RuntimeException e) { + // Columns may not exist in this file format + if (rowCount == 1) { + LOG.info("Could not compare double columns: {}", e.getMessage()); + } + break; + } + } + } + + if (mismatchCount > 0) { + LOG.error("Found {} double mismatches out of {} rows", mismatchCount, rowCount); + } + assertEquals("Double values should match between PLAIN and ALP encoding", 0, mismatchCount); + } + + /** + * Test reading any ALP-encoded file and verify basic functionality. + * + *

This test reads all float and double columns from the file + * and verifies the values are valid (not corrupted). + */ + @Test + public void testReadAlpVerifyValues() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Verifying ALP values from: {}", alpFile); + + int rowCount = 0; + int floatCount = 0; + int doubleCount = 0; + int nanCount = 0; + int infCount = 0; + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + + // Try to read float columns + for (int i = 0; i < group.getType().getFieldCount(); i++) { + String fieldName = group.getType().getFieldName(i); + try { + if (group.getType().getType(i).asPrimitiveType().getPrimitiveTypeName() + == org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT) { + float val = group.getFloat(fieldName, 0); + floatCount++; + if (Float.isNaN(val)) nanCount++; + if (Float.isInfinite(val)) infCount++; + } else if (group.getType().getType(i).asPrimitiveType().getPrimitiveTypeName() + == org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE) { + double val = group.getDouble(fieldName, 0); + doubleCount++; + if (Double.isNaN(val)) nanCount++; + if (Double.isInfinite(val)) infCount++; + } + } catch (Exception e) { + // Skip fields that can't be read + } + } + } + } + + LOG.info( + "Read {} rows, {} floats, {} doubles, {} NaNs, {} Infs", + rowCount, + floatCount, + doubleCount, + nanCount, + infCount); + assertTrue("Expected at least one row", rowCount > 0); + } + + // TODO: Uncomment and update once parquet-testing PR #100 is merged + /* + @Test + public void testReadAlpFromParquetTesting() throws IOException { + Path alpFloatFile = interop.GetInterOpFile(ALP_FLOAT_FILE, CHANGESET); + // Test implementation here + } + */ +}