[Webinar] Q1 Confluent Cloud Launch Brings You the Latest Features | Register Now

KSQL: What’s New in 5.2

Get started with Confluent, for free

Watch demo: Kafka streaming in 10 minutes

Written By

KSQL enables you to write streaming applications expressed purely in SQL. There’s a ton of great new features in 5.2, many of which are a result of requests and support from the community—we use GitHub to track these, and I’ve indicated in each point below the corresponding issue. If you have suggestions for new features, please do be sure to search our GitHub issues page and upvote, or create a new issue as appropriate.

In CASE you need more flexibility with your data…

GitHub issue #620

CASE is one of those Swiss-Army-knife functions of the SQL world. There are numerous uses for it, and now KSQL supports it :yay:

CASE: Data cleansing

Imagine you have an inbound stream of data, in which some of the values aren’t in the form that you want them. Take this list of hostnames from syslog traffic, for example:

ksql> SELECT HOST FROM SYSLOG;
BZ2,24a43cde91a0,v3.9.27.8537
BZ2,dc9fdbec6a10,v3.9.27.8537
asgard02
U7PG2,f09fc2238301,v3.9.27.8537
rpi-03
rpi-02

There are a couple of machines (asgard02, rpi-03), and then some networking equipment, denoted by the BZ2 and U7PG2 prefixes. Assuming that we want to work with syslog data, in which all network devices data was treated as one per-device type, then we could use CASE to clean the data up:

ksql> SELECT HOST, CASE
                    WHEN HOST LIKE 'BZ2%' THEN 'Wifi AP'
                    WHEN HOST LIKE 'U7PG2%' THEN 'Unifi'
                    ELSE HOST
                  END AS HOST_CLEAN
        FROM SYSLOG;
BZ2,24a43cde91a0,v3.9.27.8537 | Wifi AP
BZ2,dc9fdbec6a10,v3.9.27.8537 | Wifi AP
asgard02 | asgard02
U7PG2,f09fc2238301,v3.9.27.8537 | Unifi
rpi-03 | rpi-03
rpi-02 | rpi-02

 

CASE: Deriving new columns

Let’s consider a simple example of products with their associated SKUs:

ksql> SELECT SKU, PRODUCT FROM PRODUCTS;
H1235 | Toaster
H1425 | Kettle
F0192 | Banana
F1723 | Apple
x1234 | Cat

The leading character of the SKU encodes the department of the product. To make it easier to work with, we want to make it its own column in the data. This is easy with KSQL:

ksql> SELECT SKU,
             CASE
                WHEN SKU LIKE 'H%' THEN 'Homewares'
                WHEN SKU LIKE 'F%' THEN 'Food'
                ELSE 'Unknown'
              END AS DEPARTMENT,
             PRODUCT
        FROM PRODUCTS;
H1235 | Homewares | Toaster
H1425 | Homewares | Kettle
F0192 | Food | Banana
F1723 | Food | Apple
x1234 | Unknown | Cat

We can persist this to a new KSQL stream, which populates an Apache Kafka® topic:

ksql> CREATE STREAM PRODUCTS_ENRICHED AS \
      SELECT SKU,
             CASE WHEN SKU LIKE 'H%' THEN 'Homewares' 
                  WHEN SKU LIKE 'F%' THEN 'Food' 
                  ELSE 'Unknown' 
              END AS DEPARTMENT,
             PRODUCT
        FROM PRODUCTS;
ksql> DESCRIBE PRODUCTS_ENRICHED;
Name: PRODUCTS_ENRICHED
 Field      | Type
----------------------------------------
 ROWTIME    | BIGINT           (system)
 ROWKEY     | VARCHAR(STRING)  (system)
 SKU        | VARCHAR(STRING)
 DEPARTMENT | VARCHAR(STRING)
 PRODUCT    | VARCHAR(STRING)

Now any message arriving on the source PRODUCTS topic will be transformed and written with the additional DEPARTMENT column to the target PRODUCTS_ENRICHED topic.

CASE: Bucketing data

For analytical purposes, it’s often useful to assign buckets to data based on the range of values. Let’s take a list of orders:

ksql> SELECT ORDER_ID, ORDER_TOTAL_USD FROM ORDERS LIMIT 5;
1 | 6.55
2 | 6.79
3 | 6.52
4 | 0.8
5 | 7.1
Limit Reached
Query terminated

From this source, we want to allocate a “size” attribute based on the order total value:

CREATE STREAM ORDERS_BUCKETED AS
SELECT ORDER_ID,
       CASE
         WHEN ORDER_TOTAL_USD < 5             THEN 'Small'
         WHEN ORDER_TOTAL_USD BETWEEN 5 AND 7 THEN 'Medium'
         ELSE                                      'Large'
       END AS ORDER_SIZE
  FROM ORDERS;

Using the new attribute, it’s easy to do aggregate calculations:

ksql> SELECT ORDER_SIZE, COUNT(*)
        FROM ORDERS_BUCKETED
      GROUP BY ORDER_SIZE;
Small | 49
Medium | 19
Large | 32

 

CASE: Selectively masking data

You can use CASE to change a column’s value based on that of another. Here, we can mask out the email addresses of customers from a particular country:

ksql> SELECT ID,
             COUNTRY,
             CASE
              WHEN COUNTRY='FR' THEN MASK(EMAIL)
              ELSE EMAIL END AS EMAIL_FR_MASKED
        FROM CUSTOMERS;
1 | US | dmacnamara0@theguardian.com
2 | CA | hmcgrail1@economist.com
4 | FR | xxxxxxxxxxn-xxx-xx-xx
6 | FR | xxxxxxxxxn-xxxxxxxx-xxx
7 | FR | xxxxxxxn-xxxxxxxxxxx-xxx
5 | US | kmacclay4@shutterfly.com

CASE: Generate values for missing attributes

Sometimes data may have “holes”—that is, not every field in every record has a value. Sometimes missing values can be reconstructed or inferred from other fields present in a record.

In this stream of data, some network devices have a type assigned to them:

ksql> SELECT DEVICE_NAME, DEVICE_TYPE FROM UBNT_USERS WHERE DEVICE_TYPE!='';
Burner iPhone | Apple
Robin's work iPhone | Apple
rpi-01.moffatt.me | RaspberryPi
rpi-03.moffatt.me | RaspberryPi

Whilst others don’t:

ksql> SELECT DEVICE_NAME, DEVICE_TYPE FROM UBNT_USERS WHERE DEVICE_TYPE='';
Fire 01 (Red) |
cdh57-01-node-01.moffatt.me |
logstash-irc.moffatt.me |
Fire 02 (Yellow) |

We can create an enhanced stream of data, preserving the device type where it does exist and deriving one where it doesn’t based on the name of the device:

ksql> CREATE STREAM UBNT_USERS_ENRICHED AS
      SELECT DEVICE_NAME,
             CASE
              WHEN DEVICE_TYPE!='' THEN DEVICE_TYPE
              WHEN DEVICE_NAME LIKE 'Fire%' THEN 'Amazon Fire'
              WHEN DEVICE_NAME LIKE '%.moffatt.me' THEN 'Home server'
              ELSE 'Unknown'
            END AS DEVICE_TYPE
      FROM UBNT_USERS;
 Message
----------------------------
 Stream created and running
----------------------------

Now the DEVICE_TYPE column is populated for every record:

ksql> SELECT DEVICE_TYPE, DEVICE_NAME from UBNT_USERS_ENRICHED;
Amazon Fire | Fire 01 (Red)
Amazon Fire | Fire 02 (Yellow)
Apple | Burner iPhone
Apple | Robin's work iPhone
Home server | cdh57-01-node-01.moffatt.me
Home server | logstash-irc.moffatt.me
Raspberr | rpi-01.moffatt.me
Raspberr | rpi-03.moffatt.me
SlimDevi | Squeezebox - Kitchen
SlimDevi | Squeezebox - Sitting Room

CASE: Generate conditional aggregates

KSQL supports stateful aggregations, and with the new CASE functionality in 5.2 we can do even more flexible analysis. For example, given a list of customers by store, with attributes indicating whether they are active or not and whether they have opted out of communications:

ksql> SELECT STORE_ID, DO_NOT_CONTACT_EMAIL, ACTIVE, EMAIL FROM STORE_CUSTOMERS WHERE STORE_ID='WA4';
WA4 | 1 | 1 | dgarterykt@businesswire.com
WA4 | 1 | 1 | frossander36@i2i.jp
WA4 | 0 | 0 | nwhannel4h@census.gov
WA4 | 1 | 1 | tedsallaa@cdbaby.com
WA4 | 0 | 1 | sblakelockak@dell.com
WA4 | 1 | 0 | ctugmanbk@wikimedia.org
WA4 | 0 | 1 | vdunckleedf@sun.com
WA4 | 1 | 0 | ldenialfm@csmonitor.com
WA4 | 1 | 1 | mcutfordoj@ustream.tv
WA4 | 0 | 1 | rbutsonrj@xrea.com

…we can create an aggregate showing the number of active customers per store, and the number of active opt-in customers per store:

ksql> SELECT STORE_ID,
             SUM(CASE WHEN DO_NOT_CONTACT_EMAIL = 0 THEN 1 ELSE 0 END) AS ACTIVE_EMAIL_MEMBER_COUNT,
             COUNT(*) AS ACTIVE_MEMBER_COUNT
        FROM STORE_CUSTOMERS
       WHERE ACTIVE=1
    GROUP BY STORE_ID;
WA4 | 3 | 7

CASE: Traffic routing

Imagine you’ve got a stream of data, and you’d like to segment that data into multiple streams based on a condition in the data, perhaps for A/B testing purposes. Not only do you want to route it to different topics but you also want visibility into which messages are going where and how many.

Given the following stream of click data:

ksql> SELECT VIEWTIME, USERID, PAGEID
        FROM PAGEVIEWS;
1553003313122 | User_3 | Page_25
1553003313562 | User_2 | Page_11
1553003313958 | User_8 | Page_94
1553003314153 | User_6 | Page_77

…we want to treat all data from User_8 and User_9 as group A and all other messages as group B.

ksql> CREATE STREAM PAGEVIEWS_AB AS
      SELECT VIEWTIME,
             USERID,
             PAGEID,
             CASE
                WHEN USERID ='User_9' OR USERID = 'User_8' THEN 'A'
                ELSE 'B'
             END AS USER_TRAFFIC_GROUP
        FROM PAGEVIEWS;

The group is stored in a new column called USER_TRAFFIC_GROUP. It is added to every message as it arrives in the source topic and is written to a new one called PAGEVIEWS_AB:

ksql> SELECT VIEWTIME, USERID, PAGEID, USER_TRAFFIC_GROUP
        FROM PAGEVIEWS_AB;
1552922710150 | User_6 | Page_40 | B
1552922710563 | User_8 | Page_83 | A
1552922711116 | User_4 | Page_70 | B
1552922711578 | User_8 | Page_69 | A
1552922712698 | User_5 | Page_43 | B

Now we can route messages:

ksql> CREATE STREAM PAGEVIEWS_GROUP_A AS SELECT * FROM PAGEVIEWS_AB WHERE USER_TRAFFIC_GROUP='A';
 Message
----------------------------
 Stream created and running
----------------------------
ksql> CREATE STREAM PAGEVIEWS_GROUP_B AS SELECT * FROM PAGEVIEWS_AB WHERE USER_TRAFFIC_GROUP='B';
 Message
----------------------------
 Stream created and running
----------------------------

In addition to routing the messages this way, we can use the derived column USER_TRAFFIC_GROUP to drive analytics:

ksql> CREATE TABLE PAGEVIEWS_TRAFFIC_MONITOR AS
      SELECT TIMESTAMPTOSTRING(WINDOWSTART(), 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
             USER_TRAFFIC_GROUP,
             COUNT(*) AS PAGEVIEW_COUNT
        FROM PAGEVIEWS_AB WINDOW TUMBLING (SIZE 1 MINUTE)
    GROUP BY USER_TRAFFIC_GROUP;
ksql> SELECT WINDOW_START_TS, USER_TRAFFIC_GROUP, PAGEVIEW_COUNT FROM PAGEVIEWS_TRAFFIC_MONITOR1;

2019-03-18 16:52:00 | B | 186 2019-03-18 16:52:00 | A | 45 2019-03-18 16:57:00 | A | 11 2019-03-18 16:57:00 | B | 35

KSQL processing log

KSQL now has the ability to log details of processing errors to a destination such as another Kafka topic, from where they can be inspected. Previously, this information was only available from the KSQL server’s log file itself.

To configure it, first stop the KSQL server and then open the Apache Log4j configuration file (etc/ksql/log4j.properties by default). Append the following to it:

log4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout
log4j.appender.kafka_appender.BrokerList=localhost:9092
log4j.appender.kafka_appender.Topic=asgard_ksql_processing_log
log4j.logger.processing=ERROR, kafka_appender

The two configuration items in particular that you may need to change are:

log4j.appender.kafka_appender.BrokerList
log4j.appender.kafka_appender.Topic

This is the Kafka broker and topic to which you want to write the processing log. Optionally, the KSQL server can automagically define a stream on top of the Kafka topic for you. If you want to do this, set the following in ksql-server.properties:

ksql.logging.processing.topic.auto.create=true
ksql.logging.processing.stream.auto.create=true
ksql.logging.processing.topic.name=asgard_ksql_processing_log

Make sure that the value of ksql.logging.processing.topic.name matches the value of log4j.appender.kafka_appender.Topic in the Log4j configuration. Once you’ve restarted the KSQL server, you can inspect the Kafka topic to which any processing logs are written:

ksql> PRINT 'asgard_ksql_processing_log' FROM BEGINNING LIMIT 1;
Format:JSON
{"ROWTIME":1552564841591,"ROWKEY":"null","level":"ERROR","logger":"processing.5476133448908187392.KsqlTopic.source.deserializer","time":1552564841423,"message":{"type":0,"deserializationError":{"errorMessage":"Converting byte[] to Kafka Connect data failed due to serialization error: ","recordB64":null},"recordProcessingError":null,"productionError":null}}

Even better, you can query it directly using KSQL and the stream that the server creates if you’ve configured it to do so:

ksql> SELECT TIMESTAMPTOSTRING(TIME,'yyyy-MM-dd HH:mm:ss'),
             MESSAGE->DESERIALIZATIONERROR->ERRORMESSAGE,
             MESSAGE->DESERIALIZATIONERROR->RECORDB64
        FROM KSQL_PROCESSING_LOG;
2019-03-14 13:36:07 | Converting byte[] to Kafka Connect data failed due to serialization error:  | eyAiaGVhZGVyIjogeyAic2NoZW1hVmVyc2lvbk5vIjogIjEiLCB9LCAicGF5bG9hZCI6IHsgIm1vZGlmaWVkRGF0ZSI6IDE1NTIzMzQzMjU0MTIsICJjcmVhdGVkRGF0ZSI6IDE1NTIzMzQzMjU0MTIsICJjcmVhdGVkQnkiOiAiQiIsICJzdWNjZXNzZnVsIjogdHJ1ZSwgInNvdXJjZV9vcmRlcl9pZCI6ICIzNDExOTc2OTMzMjE1IiwgfSB9

Whether the value of the message itself is included or not is controlled by the KSQL server setting ksql.logging.processing.rows.include. The message is Base64 encoded, which is easily decoded as required—here, using the bash utility base64:

$ echo 'eyAiaGVhZGVyIjogeyAic2NoZW1hVmVyc2lvbk5vIjogIjEiLCB9LCAicGF5bG9hZCI6IHsgIm1vZGlmaWVkRGF0ZSI6IDE1NTIzMzQzMjU0MTIsICJjcmVhdGVkRGF0ZSI6IDE1NTIzMzQzMjU0MTIsICJjcmVhdGVkQnkiOiAiQiIsICJzdWNjZXNzZnVsIjogdHJ1ZSwgInNvdXJjZV9vcmRlcl9pZCI6ICIzNDExOTc2OTMzMjE1IiwgfSB9' | base64 --decode -D
{ "header": { "schemaVersionNo": "1", }, "payload": { "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215", } }

To learn more about the processing log functionality, refer to the docs.

PRINT now supports the LIMIT clause

GitHub issue #1316

PRINT is one of those features you may not quite grok until you start using it…and then you’ll wonder how you lived without it. It provides a simple way of displaying the contents of a Kafka topic and figures out itself which deserialiser to use. Avro? No problem! JSON? Bring it on!

In KSQL 5.2, the PRINT feature gets even better as you can specify how many records you’d like to see from the topic using the LIMIT clause.

 

Multi-line commands

GitHub issue #1424

Whilst we have a ton of new functionality included in 5.2, we’ve also stepped up the maturity of the product. A great example of this is that you no longer need \ characters to denote line continuation if you want to break a statement up over multiple lines.

 

New URL manipulation functions

Several new functions for handling URLs and splitting out the constituent parts (host, path, etc.) have been added:

ksql> SELECT URL,
             URL_EXTRACT_HOST(URL),
             URL_EXTRACT_PATH(URL),
             URL_EXTRACT_PROTOCOL(URL),
             URL_EXTRACT_QUERY(URL)
        FROM CLICKS;
http://google.co.jp/dolor.png?quis=blandit | google.co.jp | /dolor.png | http | quis=blandit
https://npr.org/morbi/non.js?nibh=at | npr.org | /morbi/non.js | https | nibh=at
http://cnet.com/enim/lorem.png?etiam=accumsan | cnet.com | /enim/lorem.png | http | etiam=accumsan

New aggregate functions: COLLECT_LIST and COLLECT_SET

The new aggregate functions COLLECT_LIST and COLLECT_SET can be used for pretty cool analysis. Let’s imagine we have a stream of click events:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'), user, url
      FROM clicks;
2019-03-13 13:24:40 | rmoff | http://confluent.io/products
2019-03-13 13:24:50 | rmoff | http://confluent.io/ksql
2019-03-13 13:25:01 | rmoff | http://confluent.io/buy
2019-03-13 13:26:58 | rmoff | http://confluent.io/products
2019-03-13 13:27:33 | rmoff | http://confluent.io/replicator

We can look at how many clicks a user made in a given session (for example, with a 60-second timeout), as well as the items on which they clicked:

ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(), 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
             user,
             COLLECT_LIST(URL_EXTRACT_PATH(url)) AS CLICK_PATH,
             COUNT(*) AS CLICK_COUNT
        FROM clicks WINDOW SESSION (60 SECONDS)
    GROUP BY user;
2019-03-13 13:24:40 | rmoff | [/products, /ksql, /buy] | 3
2019-03-13 13:26:58 | rmoff | [/products, /replicator] | 2

Not forgetting to mention…

  • BETWEEN is now supported as a predicate operator (GitHub issue #1004)
  • SPLIT is now supported to create an array from a column based on a separator character (GitHub issue #2349). Here’s an example of it using the above URL_EXTRACT_QUERY function and splitting it by the & character:
    ksql> SELECT URL,
                 URL_EXTRACT_QUERY(URL),
                 SPLIT(URL_EXTRACT_QUERY(URL),'&'),
                 SPLIT(URL_EXTRACT_QUERY(URL),'&')[0]
            FROM CLICKS;
    https://is.gd/hac.xml?velit=non&donec=pretium&diam=quis | velit=non&donec=pretium&diam=quis | [velit=non, donec=pretium, diam=quis] | velit=non

In addition to what is covered in this article, there are further KSQL improvements in Confluent Platform 5.2, such as UI enhancements in Confluent Control Center that let you pause and resume data feeds, card and table formats for displaying querying results and more.

What are you waiting for?

Go and download Confluent Platform 5.2 today and see what KSQL can do!

Interested in more? Learn about ksqlDB, the successor to KSQL.

  • Robin is a principal developer advocate at Confluent, the company founded by the original creators of Apache Kafka, as well as an Oracle Groundbreaker Ambassador. His career has always involved data, from the old worlds of COBOL and DB2, through the worlds of Oracle and Hadoop, and into the current world with Kafka. His particular interests are analytics, systems architecture, performance testing and optimization. He blogs at http://cnfl.io/rmoff and http://rmoff.net/ and can be found tweeting grumpy geek thoughts as @rmoff. Outside of work he enjoys drinking good beer and eating fried breakfasts, although generally not at the same time.

Get started with Confluent, for free

Watch demo: Kafka streaming in 10 minutes

Did you like this blog post? Share it now