From 5a03c469ef5e70a43dac47399f88e030756d2284 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 25 Aug 2025 19:38:19 -0700 Subject: [PATCH 1/8] docs: added all window serdes and 4.1 updates --- .../developer-guide/config-streams.html | 13 ++-- docs/streams/developer-guide/datatypes.html | 71 ++++++++++++++++++- 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 8289423d3d798..5ea9ce8b4e1ad 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -72,9 +72,9 @@
  • Optional configuration parameters
  • Kafka consumers and producer configuration parameters @@ -543,7 +543,7 @@

    num.standby.replicasAdded to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 86400000 (1 day) - window.size.ms + window.size.ms (Deprecated. See Window Serdes for alternatives.) Low Sets window size for the deserializer in order to calculate window end times. null @@ -1225,7 +1225,7 @@

    topology.optimization -

    windowed.inner.class.serde

    +

    windowed.inner.class.serde (Deprecated)

    @@ -1234,6 +1234,9 @@

    windowed.inner.class.serdeWindow Serdes directly in your topology instead. +

    diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index afa6397c5ef40..6090db2bca552 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -48,9 +48,10 @@ -
  • Kafka Streams DSL for Scala Implicit Serdes
  • +
  • Kafka Streams DSL for Scala Implicit Serdes
  • Configuring Serdes

    @@ -163,6 +164,72 @@

    JSONAs shown in the example, you can use JSONSerdes inner classes Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>) to construct JSON compatible serializers and deserializers.

    +
    +

    Window Serdes

    +

    Apache Kafka Streams includes serde implementations for windowed keys in + its kafka-streams Maven artifact:

    +
    <dependency>
    +    <groupId>org.apache.kafka</groupId>
    +    <artifactId>kafka-streams</artifactId>
    +    <version>{{fullDotVersion}}</version>
    +</dependency>
    +

    This artifact provides the following windowed serde implementations under the package org.apache.kafka.streams.kstream:

    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    Data typeSerde
    Windowed<T> (Time Windows)new WindowedSerdes.TimeWindowedSerde<>(innerSerde, windowSize), WindowedSerdes.timeWindowedSerdeFrom(Class<T> type, long windowSize)
    Windowed<T> (Session Windows)new WindowedSerdes.SessionWindowedSerde<>(innerSerde), WindowedSerdes.sessionWindowedSerdeFrom(Class<T> type)
    TimeWindowedSerializer<T>new TimeWindowedSerializer<>(innerSerializer)
    TimeWindowedDeserializer<T>new TimeWindowedDeserializer<>(innerDeserializer, windowSize)
    SessionWindowedSerializer<T>new SessionWindowedSerializer<>(innerSerializer)
    SessionWindowedDeserializer<T>new SessionWindowedDeserializer<>(innerDeserializer)
    +

    Migration from Deprecated Configs

    +

    The following stream configuration parameters have been deprecated. For details, see the windowed.inner.class.serde configuration documentation:

    + + + + + + + + + + + + + + + + + + + + + +
    Deprecated ConfigReplacement
    windowed.inner.class.serdeUse TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, or SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS
    window.size.msUse TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG or pass window size directly to constructor
    +

    Implementing custom Serdes

    If you need to implement custom Serdes, your best starting point is to take a look at the source code references of From 2f18885161ceabc66710babfa2ba6906afd966c2 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 27 Aug 2025 08:32:13 -0700 Subject: [PATCH 2/8] added windowed.inner.class.serde to table --- docs/streams/developer-guide/config-streams.html | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 5ea9ce8b4e1ad..674e2ad078b64 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -548,6 +548,11 @@

    num.standby.replicasSets window size for the deserializer in order to calculate window end times. null + windowed.inner.class.serde (Deprecated. See Window Serdes for alternatives.) + Low + Serde for the inner class of a windowed record. Must implement the Serde interface. + null +
    From 25f43f122cbe0c9d686377260e0735fd2eda382a Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 27 Aug 2025 08:52:20 -0700 Subject: [PATCH 3/8] removed windowed.inner.class.serde entirely except in table --- docs/streams/developer-guide/config-streams.html | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 674e2ad078b64..8c45d0c7976d0 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -100,7 +100,6 @@
  • state.dir
  • task.assignor.class
  • topology.optimization
  • -
  • windowed.inner.class.serde (deprecated)
  • Kafka consumers and producer configuration parameters @@ -1229,21 +1228,6 @@

    topology.optimization -

    windowed.inner.class.serde (Deprecated)

    -
    -
    -

    - Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. -

    -

    - Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology. -

    -

    - Note: This configuration is deprecated. Use Window Serdes directly in your topology instead. -

    -
    -

  • upgrade.from

    From c1a52d7ce444b9a668fa19d561d5ba154a454feb Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 27 Aug 2025 15:06:08 -0700 Subject: [PATCH 4/8] removed redundant table to bulleted lists --- docs/streams/developer-guide/datatypes.html | 49 ++++++++------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index 6090db2bca552..5d8d78499cedf 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -174,37 +174,24 @@

    Window Serdesorg.apache.kafka.streams.kstream:

    - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    Data typeSerde
    Windowed<T> (Time Windows)new WindowedSerdes.TimeWindowedSerde<>(innerSerde, windowSize), WindowedSerdes.timeWindowedSerdeFrom(Class<T> type, long windowSize)
    Windowed<T> (Session Windows)new WindowedSerdes.SessionWindowedSerde<>(innerSerde), WindowedSerdes.sessionWindowedSerdeFrom(Class<T> type)
    TimeWindowedSerializer<T>new TimeWindowedSerializer<>(innerSerializer)
    TimeWindowedDeserializer<T>new TimeWindowedDeserializer<>(innerDeserializer, windowSize)
    SessionWindowedSerializer<T>new SessionWindowedSerializer<>(innerSerializer)
    SessionWindowedDeserializer<T>new SessionWindowedDeserializer<>(innerDeserializer)
    + +

    Serdes:

    +
      +
    • WindowedSerdes.TimeWindowedSerde<T>
    • +
    • WindowedSerdes.SessionWindowedSerde<T>
    • +
    + +

    Serializers:

    +
      +
    • TimeWindowedSerializer<T>
    • +
    • SessionWindowedSerializer<T>
    • +
    + +

    Deserializers:

    +
      +
    • TimeWindowedDeserializer<T>
    • +
    • SessionWindowedDeserializer<T>
    • +

    Migration from Deprecated Configs

    The following stream configuration parameters have been deprecated. For details, see the windowed.inner.class.serde configuration documentation:

    From a918948e394515dab1d252ff0ff419b8ab306cc3 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 27 Aug 2025 16:41:20 -0700 Subject: [PATCH 5/8] removed old section and added 3 different sections --- docs/streams/developer-guide/datatypes.html | 65 +++++++++++++-------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index 5d8d78499cedf..4097227fddba4 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -192,30 +192,47 @@

    Window SerdesTimeWindowedDeserializer<T>
  • SessionWindowedDeserializer<T>
  • -

    Migration from Deprecated Configs

    -

    The following stream configuration parameters have been deprecated. For details, see the windowed.inner.class.serde configuration documentation:

    -

    - - - - - - - - - - - - - - - - - - - - -
    Deprecated ConfigReplacement
    windowed.inner.class.serdeUse TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, or SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS
    window.size.msUse TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG or pass window size directly to constructor
    +

    Usage in Code

    +

    When using windowed serdes in your application code, you typically create instances via constructors or factory methods:

    +
    // Time windowed serde - using factory method
    +Serde<Windowed<String>> timeWindowedSerde = 
    +    WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
    +
    +// Time windowed serde - using constructor
    +Serde<Windowed<String>> timeWindowedSerde2 = 
    +    new WindowedSerdes.TimeWindowedSerde<>(Serdes.String(), 500L);
    +
    +// Session windowed serde - using factory method
    +Serde<Windowed<String>> sessionWindowedSerde = 
    +    WindowedSerdes.sessionWindowedSerdeFrom(String.class);
    +
    +// Session windowed serde - using constructor  
    +Serde<Windowed<String>> sessionWindowedSerde2 = 
    +    new WindowedSerdes.SessionWindowedSerde<>(Serdes.String());
    +
    +// Using individual serializers/deserializers
    +TimeWindowedSerializer<String> serializer = new TimeWindowedSerializer<>(Serdes.String().serializer());
    +TimeWindowedDeserializer<String> deserializer = new TimeWindowedDeserializer<>(Serdes.String().deserializer(), 500L);
    + +

    Usage in Command Line

    +

    When using command-line tools (like kafka-console-consumer), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:

    +
    # Time windowed deserializer configuration
    +--property print.key=true \
    +--property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer \
    +--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer \
    +--property key.deserializer.window.size.ms=500
    +
    +# Session windowed deserializer configuration  
    +--property print.key=true \
    +--property key.deserializer=org.apache.kafka.streams.kstream.SessionWindowedDeserializer \
    +--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer
    + +

    Deprecated Configs

    +

    The following StreamsConfig parameters are deprecated in favor of passing parameters directly to serializer/deserializer constructors:

    +
      +
    • StreamsConfig.WINDOWED_INNER_CLASS_SERDE is deprecated in favor of TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS and TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS
    • +
    • StreamsConfig.WINDOW_SIZE_MS_CONFIG is deprecated in favor of TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG
    • +

    Implementing custom Serdes

    From ad04ddbd49acc41f3d26a4f8fc9fd8851d082fbf Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Wed, 27 Aug 2025 16:44:41 -0700 Subject: [PATCH 6/8] fix typo --- docs/streams/developer-guide/datatypes.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index 4097227fddba4..b58d9dfe7ebe8 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -166,7 +166,7 @@

    JSON

    Window Serdes

    -

    Apache Kafka Streams includes serde implementations for windowed keys in +

    Apache Kafka Streams includes serde implementations for windowed types in its kafka-streams Maven artifact:

    <dependency>
         <groupId>org.apache.kafka</groupId>
    
    From c27259c036778998d0ba4d566617086b3b0af450 Mon Sep 17 00:00:00 2001
    From: Shashank 
    Date: Fri, 29 Aug 2025 17:48:56 -0700
    Subject: [PATCH 7/8] add script path
    
    Co-authored-by: Matthias J. Sax 
    ---
     docs/streams/developer-guide/datatypes.html | 2 +-
     1 file changed, 1 insertion(+), 1 deletion(-)
    
    diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
    index b58d9dfe7ebe8..82184ac9563f6 100644
    --- a/docs/streams/developer-guide/datatypes.html
    +++ b/docs/streams/developer-guide/datatypes.html
    @@ -215,7 +215,7 @@ 

    Usage in Code

    TimeWindowedDeserializer<String> deserializer = new TimeWindowedDeserializer<>(Serdes.String().deserializer(), 500L);

    Usage in Command Line

    -

    When using command-line tools (like kafka-console-consumer), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:

    +

    When using command-line tools (like bin/kafka-console-consumer.sh), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:

    # Time windowed deserializer configuration
     --property print.key=true \
     --property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer \
    
    From f15b6e74cdbb62b0ae29fb8017d88f80031ee725 Mon Sep 17 00:00:00 2001
    From: Shashank Hosahalli Shivamurthy 
    Date: Fri, 29 Aug 2025 17:51:34 -0700
    Subject: [PATCH 8/8] removed hard-coded version
    
    ---
     docs/streams/developer-guide/datatypes.html | 2 +-
     1 file changed, 1 insertion(+), 1 deletion(-)
    
    diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
    index 82184ac9563f6..2bc2d7d5d0ef3 100644
    --- a/docs/streams/developer-guide/datatypes.html
    +++ b/docs/streams/developer-guide/datatypes.html
    @@ -104,7 +104,7 @@ 

    Primitive and basic types<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>2.8.0</version> + <version>{{fullDotVersion}}</version> </dependency>

    This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.