From 54f5f069653610ffcd3ca4e5c837f992c2268879 Mon Sep 17 00:00:00 2001 From: Eliaazzz Date: Fri, 15 May 2026 23:42:12 +1000 Subject: [PATCH] ClickHouseIO: Add DateTime64 support for sub-second timestamp precision MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ClickHouse's DateTime64(precision[, 'timezone']) was not recognized by TableSchema or the column-type parser, so pipelines emitting sub-second timestamps (log/event ingestion, financial data) could not write to DateTime64 columns. This adds: * TypeName.DATETIME64 with precision (0-9) and optional timezone fields, plus a ColumnType.dateTime64(precision[, timezone]) factory. * Parser grammar for DateTime64([, '']) so the type is also recognized inside Nullable(...) and Array(...) via the existing primitive() rule. * Beam schema mapping picks the narrowest logical type that round-trips the requested precision: precision <= 3 → Joda DATETIME (preserves existing pipelines). precision 4-6 → SqlTypes.TIMESTAMP (MicrosInstant). precision >= 7 → NanosInstant, the only built-in logical type that carries full nanosecond precision through a Row; MicrosInstant would reject sub-micro nanos. * Writer serialization as a little-endian Int64 of epoch_seconds * 10^precision + sub_second_units, accepting both Joda ReadableInstant and java.time.Instant inputs; floor division on negative timestamps matches ClickHouse's own encoding. Tests: parser cases for DateTime64(3), DateTime64(6,'UTC'), DateTime64(9), Nullable(DateTime64(...)) and Array(DateTime64(...)); schema-mapping tests for the micros and nanos buckets; encoder unit tests covering Joda/java.time inputs, zero/nano/negative edge cases and the precision-7 100 ns tick truncation path; round-trip integration tests against the ClickHouse testcontainer for precisions 3/6/9 (with non-micro-aligned nanos for the nanos case) plus a nullable case. Closes #38466 --- CHANGES.md | 1 + .../sdk/io/clickhouse/ClickHouseWriter.java | 37 ++++++++ .../beam/sdk/io/clickhouse/TableSchema.java | 48 ++++++++++ .../src/main/javacc/ColumnTypeParser.jj | 20 ++++ .../sdk/io/clickhouse/ClickHouseIOIT.java | 75 +++++++++++++++ .../io/clickhouse/ClickHouseWriterTest.java | 91 +++++++++++++++++++ .../sdk/io/clickhouse/TableSchemaTest.java | 74 +++++++++++++++ 7 files changed, 346 insertions(+) create mode 100644 sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java diff --git a/CHANGES.md b/CHANGES.md index ca911e52a7ad..92079aeaf015 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)). * IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/issues/38061))). +* ClickHouseIO: support writing `DateTime64(precision[, 'timezone'])` columns with sub-second precision (Java) ([#38466](https://github.com/apache/beam/issues/38466)). ## New Features / Improvements diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java index 73735f568646..cc82c19c2299 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java @@ -39,6 +39,39 @@ public class ClickHouseWriter { private static final Instant EPOCH_INSTANT = new Instant(0L); + // 10^0 through 10^9 inclusive — precision is validated in [0, 9] by ColumnType.dateTime64. + private static final long[] POW10 = { + 1L, 10L, 100L, 1_000L, 10_000L, 100_000L, 1_000_000L, 10_000_000L, 100_000_000L, 1_000_000_000L + }; + + /** + * Encodes a timestamp into ClickHouse's {@code DateTime64(precision)} representation: a signed + * 64-bit integer counting ticks of size 10-precision seconds since the Unix epoch. + * + *

Accepts either a Joda {@link ReadableInstant} (millisecond precision) or a {@link + * java.time.Instant} (nanosecond precision). Sub-tick fractions are truncated toward negative + * infinity, matching ClickHouse's own encoding for negative timestamps. + */ + static long encodeDateTime64(Object value, int precision) { + long epochSecond; + int nanoOfSecond; + if (value instanceof java.time.Instant) { + java.time.Instant inst = (java.time.Instant) value; + epochSecond = inst.getEpochSecond(); + nanoOfSecond = inst.getNano(); + } else if (value instanceof ReadableInstant) { + long millis = ((ReadableInstant) value).getMillis(); + epochSecond = Math.floorDiv(millis, 1000L); + nanoOfSecond = (int) Math.floorMod(millis, 1000L) * 1_000_000; + } else { + throw new IllegalArgumentException( + "DateTime64 requires a Joda ReadableInstant or java.time.Instant, got " + + (value == null ? "null" : value.getClass().getName())); + } + long subSecondTicks = nanoOfSecond / POW10[9 - precision]; + return Math.addExact(Math.multiplyExact(epochSecond, POW10[precision]), subSecondTicks); + } + @SuppressWarnings("unchecked") static void writeNullableValue(ClickHouseOutputStream stream, ColumnType columnType, Object value) throws IOException { @@ -138,6 +171,10 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj BinaryStreamUtils.writeUnsignedInt32(stream, epochSeconds); break; + case DATETIME64: + BinaryStreamUtils.writeInt64(stream, encodeDateTime64(value, columnType.precision())); + break; + case ARRAY: List values = (List) value; BinaryStreamUtils.writeVarInt(stream, values.size()); diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java index baee77c5f9af..ae573dfccffb 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java @@ -27,6 +27,8 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -76,6 +78,22 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) { case DATETIME: return Schema.FieldType.DATETIME; + case DATETIME64: + // Pick the narrowest Beam logical type that still round-trips the requested precision: + // ≤ 3 (milliseconds) → Joda DATETIME, keeping existing pipelines unchanged. + // 4–6 (down to microseconds) → SqlTypes.TIMESTAMP (MicrosInstant) — interoperable + // with BigQueryIO, Avro and Beam SQL. + // ≥ 7 (sub-microsecond) → NanosInstant, the only built-in type that preserves + // full nanosecond precision through Row construction. + int p = columnType.precision(); + if (p <= 3) { + return Schema.FieldType.DATETIME; + } else if (p <= 6) { + return Schema.FieldType.logicalType(SqlTypes.TIMESTAMP); + } else { + return Schema.FieldType.logicalType(new NanosInstant()); + } + case STRING: return Schema.FieldType.STRING; @@ -163,6 +181,7 @@ public enum TypeName { // Primitive types DATE, DATETIME, + DATETIME64, ENUM8, ENUM16, FIXEDSTRING, @@ -238,6 +257,14 @@ public abstract static class ColumnType implements Serializable { public abstract @Nullable Map tupleTypes(); + /** Sub-second precision (0–9) of {@code DateTime64}. {@code null} for other types. */ + public abstract @Nullable Integer precision(); + + /** + * Optional timezone of {@code DateTime64}; semantically display-only. {@code null} otherwise. + */ + public abstract @Nullable String timezone(); + public ColumnType withNullable(boolean nullable) { return toBuilder().nullable(nullable).build(); } @@ -258,6 +285,23 @@ public static ColumnType fixedString(int size) { .build(); } + public static ColumnType dateTime64(int precision) { + return dateTime64(precision, null); + } + + public static ColumnType dateTime64(int precision, @Nullable String timezone) { + if (precision < 0 || precision > 9) { + throw new IllegalArgumentException( + "DateTime64 precision must be in [0, 9], got " + precision); + } + return ColumnType.builder() + .typeName(TypeName.DATETIME64) + .nullable(false) + .precision(precision) + .timezone(timezone) + .build(); + } + public static ColumnType enum8(Map enumValues) { return ColumnType.builder() .typeName(TypeName.ENUM8) @@ -367,6 +411,10 @@ abstract static class Builder { public abstract Builder tupleTypes(Map tupleElements); + public abstract Builder precision(@Nullable Integer precision); + + public abstract Builder timezone(@Nullable String timezone); + public abstract ColumnType build(); } } diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj index 5bb9ba4171a6..91f04d0c4464 100644 --- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj +++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj @@ -79,6 +79,7 @@ TOKEN : { < ARRAY : "ARRAY" > | < DATE : "DATE" > + | < DATETIME64 : "DATETIME64" > | < DATETIME : "DATETIME" > | < ENUM8 : "ENUM8" > | < ENUM16 : "ENUM16" > @@ -206,6 +207,7 @@ private ColumnType primitive() : { TypeName type; String size; + ColumnType ct; } { ( @@ -214,10 +216,28 @@ private ColumnType primitive() : ( ( size = integer() ) ) { return ColumnType.fixedString(Integer.valueOf(size)); } + | + (ct = dateTime64()) { return ct; } ) } +private ColumnType dateTime64() : +{ + String precision; + String timezone = null; +} +{ + ( + ( precision = integer() ) + ( ( timezone = string() ) )? + + ) + { + return ColumnType.dateTime64(Integer.parseInt(precision), timezone); + } +} + private ColumnType nullable() : { ColumnType ct; diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java index 8ce412c5f88c..e6a0fdec3c7e 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java @@ -32,6 +32,8 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.ReleaseInfo; @@ -480,6 +482,79 @@ public void testPojo() throws Exception { assertEquals(12L, sum1); } + @Test + public void testDateTime64Millis() throws Exception { + Schema schema = Schema.of(Schema.Field.of("ts", FieldType.DATETIME)); + DateTime ts = new DateTime(2026, 5, 15, 12, 34, 56, 789, DateTimeZone.UTC); + Row row = Row.withSchema(schema).addValue(ts).build(); + + executeSql("CREATE TABLE test_datetime64_ms (ts DateTime64(3, 'UTC')) ENGINE=Log"); + + pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ms")); + pipeline.run().waitUntilFinish(); + + // toUnixTimestamp64Milli returns the underlying tick count, which is the most stable thing to + // assert across CH versions (string formatting may include trailing zeros depending on + // version). + long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Milli(ts) FROM test_datetime64_ms"); + assertEquals(ts.getMillis(), ticks); + } + + @Test + public void testDateTime64Micros() throws Exception { + Schema schema = Schema.of(Schema.Field.of("ts", FieldType.logicalType(SqlTypes.TIMESTAMP))); + // 2026-05-15T12:34:56.789012Z — exactly micros, so MicrosInstant accepts it. + java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_000L); + Row row = Row.withSchema(schema).addValue(ts).build(); + + executeSql("CREATE TABLE test_datetime64_us (ts DateTime64(6)) ENGINE=Log"); + + pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_us")); + pipeline.run().waitUntilFinish(); + + long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Micro(ts) FROM test_datetime64_us"); + assertEquals(1_778_071_696L * 1_000_000L + 789_012L, ticks); + } + + @Test + public void testDateTime64Nanos() throws Exception { + // DateTime64(9) must preserve full nanosecond precision. Use NanosInstant directly + // because SqlTypes.TIMESTAMP (MicrosInstant) rejects non-micro-aligned nanos like the + // trailing 345 below. + Schema schema = Schema.of(Schema.Field.of("ts", FieldType.logicalType(new NanosInstant()))); + java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_345L); + Row row = Row.withSchema(schema).addValue(ts).build(); + + executeSql("CREATE TABLE test_datetime64_ns (ts DateTime64(9)) ENGINE=Log"); + + pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ns")); + pipeline.run().waitUntilFinish(); + + long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Nano(ts) FROM test_datetime64_ns"); + assertEquals(1_778_071_696L * 1_000_000_000L + 789_012_345L, ticks); + } + + @Test + public void testNullableDateTime64() throws Exception { + Schema schema = + Schema.of(Schema.Field.nullable("ts", FieldType.logicalType(SqlTypes.TIMESTAMP))); + java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_000L); + Row row1 = Row.withSchema(schema).addValue(ts).build(); + Row row2 = Row.withSchema(schema).addValue(null).build(); + + executeSql("CREATE TABLE test_nullable_datetime64 (ts Nullable(DateTime64(6))) ENGINE=Log"); + + pipeline + .apply(Create.of(row1, row2).withRowSchema(schema)) + .apply(write("test_nullable_datetime64")); + pipeline.run().waitUntilFinish(); + + long total = executeQueryAsLong("SELECT COUNT(*) FROM test_nullable_datetime64"); + long nonNull = executeQueryAsLong("SELECT COUNT(ts) FROM test_nullable_datetime64"); + assertEquals(2L, total); + assertEquals(1L, nonNull); + } + @Test public void testUserAgentInQueryLog() throws Exception { Schema schema = Schema.of(Schema.Field.of("f0", FieldType.INT64)); diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java new file mode 100644 index 000000000000..6365191fe7f3 --- /dev/null +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.clickhouse; + +import static org.junit.Assert.assertEquals; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ClickHouseWriter}. */ +@RunWith(JUnit4.class) +public class ClickHouseWriterTest { + + @Test + public void encodeDateTime64MillisFromJoda() { + DateTime jodaTs = new DateTime(2026, 5, 15, 12, 34, 56, 789, DateTimeZone.UTC); + long expectedMillis = jodaTs.getMillis(); + assertEquals(expectedMillis, ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 3)); + } + + @Test + public void encodeDateTime64MicrosFromJavaInstant() { + // 2026-05-15T12:34:56.789012Z + java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_000L); + long expectedMicros = 1_778_071_696L * 1_000_000L + 789_012L; + assertEquals(expectedMicros, ClickHouseWriter.encodeDateTime64(ts, 6)); + } + + @Test + public void encodeDateTime64NanosFromJavaInstant() { + // Non-micro-aligned nanos (trailing 345) must survive the encode round-trip; this was the + // P1 regression that motivated using NanosInstant for the >6 precision schema mapping. + java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_345L); + long expectedNanos = 1_778_071_696L * 1_000_000_000L + 789_012_345L; + assertEquals(expectedNanos, ClickHouseWriter.encodeDateTime64(ts, 9)); + } + + @Test + public void encodeDateTime64Precision7TruncatesBelow100Nanos() { + // Precision 7 → 100 ns ticks; the trailing 345 ns should be truncated to 3 (the 100-ns + // quotient), and the lower digits dropped. + java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_345L); + long expected = 1_778_071_696L * 10_000_000L + 7_890_123L; + assertEquals(expected, ClickHouseWriter.encodeDateTime64(ts, 7)); + } + + @Test + public void encodeDateTime64NanosTruncatesSubNanoFromJoda() { + // Joda only carries ms precision, so encoding into nanos shifts left by 6 with no loss. + DateTime jodaTs = new DateTime(2030, 1, 1, 0, 0, 0, 123, DateTimeZone.UTC); + long expected = jodaTs.getMillis() * 1_000_000L; + assertEquals(expected, ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 9)); + } + + @Test + public void encodeDateTime64HandlesNegativeMillisWithFloorDivision() { + // -1ms maps to (-1s, +999ms), encoded at precision 3 should be exactly -1. + org.joda.time.Instant jodaTs = new org.joda.time.Instant(-1L); + assertEquals(-1L, ClickHouseWriter.encodeDateTime64(jodaTs, 3)); + } + + @Test + public void encodeDateTime64ZeroPrecisionRoundsTowardEpochSeconds() { + java.time.Instant ts = java.time.Instant.ofEpochSecond(42L, 999_999_999L); + // Precision 0 means whole-second ticks; sub-second component is truncated. + assertEquals(42L, ClickHouseWriter.encodeDateTime64(ts, 0)); + } + + @Test(expected = IllegalArgumentException.class) + public void encodeDateTime64RejectsUnsupportedValue() { + ClickHouseWriter.encodeDateTime64("not-a-timestamp", 3); + } +} diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java index f560d6268afb..27f8d3ef3dbc 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java @@ -39,6 +39,33 @@ public void testParseDateTime() { assertEquals(ColumnType.DATETIME, ColumnType.parse("DateTime")); } + @Test + public void testParseDateTime64Millis() { + assertEquals(ColumnType.dateTime64(3), ColumnType.parse("DateTime64(3)")); + } + + @Test + public void testParseDateTime64MicrosWithTimezone() { + assertEquals(ColumnType.dateTime64(6, "UTC"), ColumnType.parse("DateTime64(6, 'UTC')")); + } + + @Test + public void testParseDateTime64Nanos() { + assertEquals(ColumnType.dateTime64(9), ColumnType.parse("DateTime64(9)")); + } + + @Test + public void testParseNullableDateTime64() { + assertEquals( + ColumnType.dateTime64(6).withNullable(true), ColumnType.parse("Nullable(DateTime64(6))")); + } + + @Test + public void testParseArrayOfDateTime64() { + assertEquals( + ColumnType.array(ColumnType.dateTime64(3)), ColumnType.parse("Array(DateTime64(3))")); + } + @Test public void testParseFloat32() { assertEquals(ColumnType.FLOAT32, ColumnType.parse("Float32")); @@ -198,6 +225,53 @@ public void testEquivalentSchema() { assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema)); } + @Test + public void testEquivalentSchemaDateTime64Millis() { + // Precision ≤ 3 keeps the legacy Joda-backed DATETIME so that existing pipelines using + // millisecond timestamps continue to work without code changes. + TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", ColumnType.dateTime64(3))); + Schema expected = Schema.of(Schema.Field.of("ts", Schema.FieldType.DATETIME)); + assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema)); + } + + @Test + public void testEquivalentSchemaDateTime64Micros() { + // Precision 4–6 maps to SqlTypes.TIMESTAMP (MicrosInstant) — interoperable with + // BigQueryIO and Beam SQL, sufficient for microsecond ticks. + TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", ColumnType.dateTime64(6))); + Schema expected = + Schema.of( + Schema.Field.of( + "ts", + Schema.FieldType.logicalType( + org.apache.beam.sdk.schemas.logicaltypes.SqlTypes.TIMESTAMP))); + assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema)); + } + + @Test + public void testEquivalentSchemaDateTime64Nanos() { + // Precision 7–9 needs nanosecond precision; MicrosInstant rejects non-micro-aligned + // nanos, so the mapping must use NanosInstant. + TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", ColumnType.dateTime64(9))); + Schema expected = + Schema.of( + Schema.Field.of( + "ts", + Schema.FieldType.logicalType( + new org.apache.beam.sdk.schemas.logicaltypes.NanosInstant()))); + assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema)); + } + + @Test(expected = IllegalArgumentException.class) + public void testDateTime64RejectsNegativePrecision() { + ColumnType.dateTime64(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testDateTime64RejectsPrecisionAboveNine() { + ColumnType.dateTime64(10); + } + @Test public void testParseTupleSingle() { Map m1 = new HashMap<>();