diff --git a/CHANGES.md b/CHANGES.md
index ca911e52a7ad..92079aeaf015 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)).
* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/issues/38061))).
+* ClickHouseIO: support writing `DateTime64(precision[, 'timezone'])` columns with sub-second precision (Java) ([#38466](https://github.com/apache/beam/issues/38466)).
## New Features / Improvements
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
index 73735f568646..cc82c19c2299 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
@@ -39,6 +39,39 @@
public class ClickHouseWriter {
private static final Instant EPOCH_INSTANT = new Instant(0L);
+ // 10^0 through 10^9 inclusive — precision is validated in [0, 9] by ColumnType.dateTime64.
+ private static final long[] POW10 = {
+ 1L, 10L, 100L, 1_000L, 10_000L, 100_000L, 1_000_000L, 10_000_000L, 100_000_000L, 1_000_000_000L
+ };
+
+ /**
+ * Encodes a timestamp into ClickHouse's {@code DateTime64(precision)} representation: a signed
+ * 64-bit integer counting ticks of size 10-precision seconds since the Unix epoch.
+ *
+ *
Accepts either a Joda {@link ReadableInstant} (millisecond precision) or a {@link
+ * java.time.Instant} (nanosecond precision). Sub-tick fractions are truncated toward negative
+ * infinity, matching ClickHouse's own encoding for negative timestamps.
+ */
+ static long encodeDateTime64(Object value, int precision) {
+ long epochSecond;
+ int nanoOfSecond;
+ if (value instanceof java.time.Instant) {
+ java.time.Instant inst = (java.time.Instant) value;
+ epochSecond = inst.getEpochSecond();
+ nanoOfSecond = inst.getNano();
+ } else if (value instanceof ReadableInstant) {
+ long millis = ((ReadableInstant) value).getMillis();
+ epochSecond = Math.floorDiv(millis, 1000L);
+ nanoOfSecond = (int) Math.floorMod(millis, 1000L) * 1_000_000;
+ } else {
+ throw new IllegalArgumentException(
+ "DateTime64 requires a Joda ReadableInstant or java.time.Instant, got "
+ + (value == null ? "null" : value.getClass().getName()));
+ }
+ long subSecondTicks = nanoOfSecond / POW10[9 - precision];
+ return Math.addExact(Math.multiplyExact(epochSecond, POW10[precision]), subSecondTicks);
+ }
+
@SuppressWarnings("unchecked")
static void writeNullableValue(ClickHouseOutputStream stream, ColumnType columnType, Object value)
throws IOException {
@@ -138,6 +171,10 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj
BinaryStreamUtils.writeUnsignedInt32(stream, epochSeconds);
break;
+ case DATETIME64:
+ BinaryStreamUtils.writeInt64(stream, encodeDateTime64(value, columnType.precision()));
+ break;
+
case ARRAY:
List values = (List) value;
BinaryStreamUtils.writeVarInt(stream, values.size());
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index baee77c5f9af..ae573dfccffb 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -27,6 +27,8 @@
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -76,6 +78,22 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) {
case DATETIME:
return Schema.FieldType.DATETIME;
+ case DATETIME64:
+ // Pick the narrowest Beam logical type that still round-trips the requested precision:
+ // ≤ 3 (milliseconds) → Joda DATETIME, keeping existing pipelines unchanged.
+ // 4–6 (down to microseconds) → SqlTypes.TIMESTAMP (MicrosInstant) — interoperable
+ // with BigQueryIO, Avro and Beam SQL.
+ // ≥ 7 (sub-microsecond) → NanosInstant, the only built-in type that preserves
+ // full nanosecond precision through Row construction.
+ int p = columnType.precision();
+ if (p <= 3) {
+ return Schema.FieldType.DATETIME;
+ } else if (p <= 6) {
+ return Schema.FieldType.logicalType(SqlTypes.TIMESTAMP);
+ } else {
+ return Schema.FieldType.logicalType(new NanosInstant());
+ }
+
case STRING:
return Schema.FieldType.STRING;
@@ -163,6 +181,7 @@ public enum TypeName {
// Primitive types
DATE,
DATETIME,
+ DATETIME64,
ENUM8,
ENUM16,
FIXEDSTRING,
@@ -238,6 +257,14 @@ public abstract static class ColumnType implements Serializable {
public abstract @Nullable Map tupleTypes();
+ /** Sub-second precision (0–9) of {@code DateTime64}. {@code null} for other types. */
+ public abstract @Nullable Integer precision();
+
+ /**
+ * Optional timezone of {@code DateTime64}; semantically display-only. {@code null} otherwise.
+ */
+ public abstract @Nullable String timezone();
+
public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
@@ -258,6 +285,23 @@ public static ColumnType fixedString(int size) {
.build();
}
+ public static ColumnType dateTime64(int precision) {
+ return dateTime64(precision, null);
+ }
+
+ public static ColumnType dateTime64(int precision, @Nullable String timezone) {
+ if (precision < 0 || precision > 9) {
+ throw new IllegalArgumentException(
+ "DateTime64 precision must be in [0, 9], got " + precision);
+ }
+ return ColumnType.builder()
+ .typeName(TypeName.DATETIME64)
+ .nullable(false)
+ .precision(precision)
+ .timezone(timezone)
+ .build();
+ }
+
public static ColumnType enum8(Map enumValues) {
return ColumnType.builder()
.typeName(TypeName.ENUM8)
@@ -367,6 +411,10 @@ abstract static class Builder {
public abstract Builder tupleTypes(Map tupleElements);
+ public abstract Builder precision(@Nullable Integer precision);
+
+ public abstract Builder timezone(@Nullable String timezone);
+
public abstract ColumnType build();
}
}
diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
index 5bb9ba4171a6..91f04d0c4464 100644
--- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
+++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
@@ -79,6 +79,7 @@ TOKEN :
{
< ARRAY : "ARRAY" >
| < DATE : "DATE" >
+ | < DATETIME64 : "DATETIME64" >
| < DATETIME : "DATETIME" >
| < ENUM8 : "ENUM8" >
| < ENUM16 : "ENUM16" >
@@ -206,6 +207,7 @@ private ColumnType primitive() :
{
TypeName type;
String size;
+ ColumnType ct;
}
{
(
@@ -214,10 +216,28 @@ private ColumnType primitive() :
( ( size = integer() ) ) {
return ColumnType.fixedString(Integer.valueOf(size));
}
+ |
+ (ct = dateTime64()) { return ct; }
)
}
+private ColumnType dateTime64() :
+{
+ String precision;
+ String timezone = null;
+}
+{
+ (
+ ( precision = integer() )
+ ( ( timezone = string() ) )?
+
+ )
+ {
+ return ColumnType.dateTime64(Integer.parseInt(precision), timezone);
+ }
+}
+
private ColumnType nullable() :
{
ColumnType ct;
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
index 8ce412c5f88c..e6a0fdec3c7e 100644
--- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
@@ -32,6 +32,8 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.ReleaseInfo;
@@ -480,6 +482,79 @@ public void testPojo() throws Exception {
assertEquals(12L, sum1);
}
+ @Test
+ public void testDateTime64Millis() throws Exception {
+ Schema schema = Schema.of(Schema.Field.of("ts", FieldType.DATETIME));
+ DateTime ts = new DateTime(2026, 5, 15, 12, 34, 56, 789, DateTimeZone.UTC);
+ Row row = Row.withSchema(schema).addValue(ts).build();
+
+ executeSql("CREATE TABLE test_datetime64_ms (ts DateTime64(3, 'UTC')) ENGINE=Log");
+
+ pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ms"));
+ pipeline.run().waitUntilFinish();
+
+ // toUnixTimestamp64Milli returns the underlying tick count, which is the most stable thing to
+ // assert across CH versions (string formatting may include trailing zeros depending on
+ // version).
+ long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Milli(ts) FROM test_datetime64_ms");
+ assertEquals(ts.getMillis(), ticks);
+ }
+
+ @Test
+ public void testDateTime64Micros() throws Exception {
+ Schema schema = Schema.of(Schema.Field.of("ts", FieldType.logicalType(SqlTypes.TIMESTAMP)));
+ // 2026-05-15T12:34:56.789012Z — exactly micros, so MicrosInstant accepts it.
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_000L);
+ Row row = Row.withSchema(schema).addValue(ts).build();
+
+ executeSql("CREATE TABLE test_datetime64_us (ts DateTime64(6)) ENGINE=Log");
+
+ pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_us"));
+ pipeline.run().waitUntilFinish();
+
+ long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Micro(ts) FROM test_datetime64_us");
+ assertEquals(1_778_071_696L * 1_000_000L + 789_012L, ticks);
+ }
+
+ @Test
+ public void testDateTime64Nanos() throws Exception {
+ // DateTime64(9) must preserve full nanosecond precision. Use NanosInstant directly
+ // because SqlTypes.TIMESTAMP (MicrosInstant) rejects non-micro-aligned nanos like the
+ // trailing 345 below.
+ Schema schema = Schema.of(Schema.Field.of("ts", FieldType.logicalType(new NanosInstant())));
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_345L);
+ Row row = Row.withSchema(schema).addValue(ts).build();
+
+ executeSql("CREATE TABLE test_datetime64_ns (ts DateTime64(9)) ENGINE=Log");
+
+ pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ns"));
+ pipeline.run().waitUntilFinish();
+
+ long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Nano(ts) FROM test_datetime64_ns");
+ assertEquals(1_778_071_696L * 1_000_000_000L + 789_012_345L, ticks);
+ }
+
+ @Test
+ public void testNullableDateTime64() throws Exception {
+ Schema schema =
+ Schema.of(Schema.Field.nullable("ts", FieldType.logicalType(SqlTypes.TIMESTAMP)));
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_000L);
+ Row row1 = Row.withSchema(schema).addValue(ts).build();
+ Row row2 = Row.withSchema(schema).addValue(null).build();
+
+ executeSql("CREATE TABLE test_nullable_datetime64 (ts Nullable(DateTime64(6))) ENGINE=Log");
+
+ pipeline
+ .apply(Create.of(row1, row2).withRowSchema(schema))
+ .apply(write("test_nullable_datetime64"));
+ pipeline.run().waitUntilFinish();
+
+ long total = executeQueryAsLong("SELECT COUNT(*) FROM test_nullable_datetime64");
+ long nonNull = executeQueryAsLong("SELECT COUNT(ts) FROM test_nullable_datetime64");
+ assertEquals(2L, total);
+ assertEquals(1L, nonNull);
+ }
+
@Test
public void testUserAgentInQueryLog() throws Exception {
Schema schema = Schema.of(Schema.Field.of("f0", FieldType.INT64));
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
new file mode 100644
index 000000000000..6365191fe7f3
--- /dev/null
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ClickHouseWriter}. */
+@RunWith(JUnit4.class)
+public class ClickHouseWriterTest {
+
+ @Test
+ public void encodeDateTime64MillisFromJoda() {
+ DateTime jodaTs = new DateTime(2026, 5, 15, 12, 34, 56, 789, DateTimeZone.UTC);
+ long expectedMillis = jodaTs.getMillis();
+ assertEquals(expectedMillis, ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 3));
+ }
+
+ @Test
+ public void encodeDateTime64MicrosFromJavaInstant() {
+ // 2026-05-15T12:34:56.789012Z
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_000L);
+ long expectedMicros = 1_778_071_696L * 1_000_000L + 789_012L;
+ assertEquals(expectedMicros, ClickHouseWriter.encodeDateTime64(ts, 6));
+ }
+
+ @Test
+ public void encodeDateTime64NanosFromJavaInstant() {
+ // Non-micro-aligned nanos (trailing 345) must survive the encode round-trip; this was the
+ // P1 regression that motivated using NanosInstant for the >6 precision schema mapping.
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_345L);
+ long expectedNanos = 1_778_071_696L * 1_000_000_000L + 789_012_345L;
+ assertEquals(expectedNanos, ClickHouseWriter.encodeDateTime64(ts, 9));
+ }
+
+ @Test
+ public void encodeDateTime64Precision7TruncatesBelow100Nanos() {
+ // Precision 7 → 100 ns ticks; the trailing 345 ns should be truncated to 3 (the 100-ns
+ // quotient), and the lower digits dropped.
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(1_778_071_696L, 789_012_345L);
+ long expected = 1_778_071_696L * 10_000_000L + 7_890_123L;
+ assertEquals(expected, ClickHouseWriter.encodeDateTime64(ts, 7));
+ }
+
+ @Test
+ public void encodeDateTime64NanosTruncatesSubNanoFromJoda() {
+ // Joda only carries ms precision, so encoding into nanos shifts left by 6 with no loss.
+ DateTime jodaTs = new DateTime(2030, 1, 1, 0, 0, 0, 123, DateTimeZone.UTC);
+ long expected = jodaTs.getMillis() * 1_000_000L;
+ assertEquals(expected, ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 9));
+ }
+
+ @Test
+ public void encodeDateTime64HandlesNegativeMillisWithFloorDivision() {
+ // -1ms maps to (-1s, +999ms), encoded at precision 3 should be exactly -1.
+ org.joda.time.Instant jodaTs = new org.joda.time.Instant(-1L);
+ assertEquals(-1L, ClickHouseWriter.encodeDateTime64(jodaTs, 3));
+ }
+
+ @Test
+ public void encodeDateTime64ZeroPrecisionRoundsTowardEpochSeconds() {
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(42L, 999_999_999L);
+ // Precision 0 means whole-second ticks; sub-second component is truncated.
+ assertEquals(42L, ClickHouseWriter.encodeDateTime64(ts, 0));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void encodeDateTime64RejectsUnsupportedValue() {
+ ClickHouseWriter.encodeDateTime64("not-a-timestamp", 3);
+ }
+}
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
index f560d6268afb..27f8d3ef3dbc 100644
--- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
@@ -39,6 +39,33 @@ public void testParseDateTime() {
assertEquals(ColumnType.DATETIME, ColumnType.parse("DateTime"));
}
+ @Test
+ public void testParseDateTime64Millis() {
+ assertEquals(ColumnType.dateTime64(3), ColumnType.parse("DateTime64(3)"));
+ }
+
+ @Test
+ public void testParseDateTime64MicrosWithTimezone() {
+ assertEquals(ColumnType.dateTime64(6, "UTC"), ColumnType.parse("DateTime64(6, 'UTC')"));
+ }
+
+ @Test
+ public void testParseDateTime64Nanos() {
+ assertEquals(ColumnType.dateTime64(9), ColumnType.parse("DateTime64(9)"));
+ }
+
+ @Test
+ public void testParseNullableDateTime64() {
+ assertEquals(
+ ColumnType.dateTime64(6).withNullable(true), ColumnType.parse("Nullable(DateTime64(6))"));
+ }
+
+ @Test
+ public void testParseArrayOfDateTime64() {
+ assertEquals(
+ ColumnType.array(ColumnType.dateTime64(3)), ColumnType.parse("Array(DateTime64(3))"));
+ }
+
@Test
public void testParseFloat32() {
assertEquals(ColumnType.FLOAT32, ColumnType.parse("Float32"));
@@ -198,6 +225,53 @@ public void testEquivalentSchema() {
assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
}
+ @Test
+ public void testEquivalentSchemaDateTime64Millis() {
+ // Precision ≤ 3 keeps the legacy Joda-backed DATETIME so that existing pipelines using
+ // millisecond timestamps continue to work without code changes.
+ TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", ColumnType.dateTime64(3)));
+ Schema expected = Schema.of(Schema.Field.of("ts", Schema.FieldType.DATETIME));
+ assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+ }
+
+ @Test
+ public void testEquivalentSchemaDateTime64Micros() {
+ // Precision 4–6 maps to SqlTypes.TIMESTAMP (MicrosInstant) — interoperable with
+ // BigQueryIO and Beam SQL, sufficient for microsecond ticks.
+ TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", ColumnType.dateTime64(6)));
+ Schema expected =
+ Schema.of(
+ Schema.Field.of(
+ "ts",
+ Schema.FieldType.logicalType(
+ org.apache.beam.sdk.schemas.logicaltypes.SqlTypes.TIMESTAMP)));
+ assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+ }
+
+ @Test
+ public void testEquivalentSchemaDateTime64Nanos() {
+ // Precision 7–9 needs nanosecond precision; MicrosInstant rejects non-micro-aligned
+ // nanos, so the mapping must use NanosInstant.
+ TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", ColumnType.dateTime64(9)));
+ Schema expected =
+ Schema.of(
+ Schema.Field.of(
+ "ts",
+ Schema.FieldType.logicalType(
+ new org.apache.beam.sdk.schemas.logicaltypes.NanosInstant())));
+ assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDateTime64RejectsNegativePrecision() {
+ ColumnType.dateTime64(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDateTime64RejectsPrecisionAboveNine() {
+ ColumnType.dateTime64(10);
+ }
+
@Test
public void testParseTupleSingle() {
Map m1 = new HashMap<>();