Kafka data deduping made easy using Quine’s idFrom function

thatDot avatar Michael Aglietti

Using Quine with Kafka as Source and Sink to Process Categorical Data

Quine streaming graph is specifically designed to find high-value patterns in high-volume event streams, consuming data from APIs, data lakes, and most commonly, event stream processing systems. Quine is complementary to systems like Flink and ksqlDB, both of which are quite powerful but do not make it easy to connect and find complex patterns in categorical data.

A streaming system like Kafka allows developers to divide their monolithic applications into manageable components while addressing resilience and scalability needs.

Switching to real-time event processing does not come without tradeoffs, however.  Duplicate messages are common in streaming systems, and duplicate events will inevitably show up in a Kafka stream, especially at scale.

Quine natively addresses duplicate and out-of-order data issues in streaming data pipelines.

The Problem: Message Duplication Causes Multiple Negative Effects

In a high-volume data pipeline, duplicate messages are unavoidable. The duplication of events is often the necessary side effect of guaranteeing that data is successfully delivered. The traditional solution is for a consumer application to record what it’s seen recently and drop any event that is already processed.

But event duplication can become a major challenge as your streaming system scales across multiple partitions. Stream consumers are usually distributed on different machines to help the system scale, making it difficult to quickly share knowledge of which events have already been processed. Each Kafka partition typically has its own consumer, if the consumer fails to process the event for any reason, when it resumes, the operation will request events starting from an earlier offset in Kafka. The result is that duplicate events will get sent downstream to other applications.

Processing events multiple times can cause inconsistencies within the facts that your application logic depends on. The effect is wrong analytic insights; or worse, your application performs the wrong actions.

Here are a few of the common approaches for managing duplicate events in a streaming system.

  1. Allow duplicate messages to occur. Maybe processing duplicate events is not a problem in your system. However, most of the time this is not the case.
  2. Perform deduplication in a database. This approach starts off fine until your DB won’t scale. It is common for this to turn into a batch processing approach that defeats the reason that you decided to develop a streaming system in the first place.
  3. Create a deduplication service. Call out from your streaming system to look up an event (or event ID) to see if it has already been processed. This is the natural evolution of option #2 which turns into its own expensive and painful service to manage.
  4. Change your business logic or requirements to allow idempotent processing. If none of the previous options are appealing, you might try to alter your algorithms or your goals so that processing a duplicate message will have no effect. This is often impossible.

A Better Solution: locate nodes in the graph with `idFrom(…)`

Duplicate data delivery is one of the main problems Quine is built to solve. To understand how Quine solves this problem, let’s first understand two of Quine’s fundamental design concepts:

  1. In Quine, streaming event processing is performed by graph nodes backed by actors scaled across any number of servers.
  2. Quine behaves as if all nodes already exist.

Each event that Quine processes operates on a specific set of nodes in the graph. With traditional static graphs, your application must ensure that each node is created exactly once—and this becomes a big performance drain. Quine behaves as if all nodes exist already, but are not yet filled with data or connected to any other nodes. You don’t have to worry about “creating nodes” twice because all possible nodes exist already. There will always be exactly one right place to handle each message, if only it can be found…

To find the node responsible for each message, Quine has a built-in function called `idFrom(…)`. `idFrom` takes data from the incoming event and deterministically turns it into a unique node ID in the graph. `idFrom` is entirely deterministic. Given the same arguments, `idFrom` will always return the same node ID. This is similar to a “consistent hashing” approach used for other purposes, but in this case, Quine returns a well-formed node ID instead of a hash.

Node IDs are user-configurable, so they can take many forms, but by default node IDs will be UUIDs. See the documentation on idProviders for more information on `idFrom` and alternate options for node ID types.

Once we know the ID of a node in the graph, that node will handle processing the event and deduplicating future events. So if the same event is received by Quine twice, `idFrom` will return the same `nodeId` each time. Since Quine only saves to disk the changes to each node, the duplicate event becomes a no-op. The practical effect of this is that using `idFrom` will resolve duplicate events in the stream automatically. So you can go back to building your application instead of micromanaging the event stream delivery guarantees.

Using `idFrom` within ingest stream queries is standard practice, even when a node is expected to show up repeatedly in the successive events. Take, for example, the Wikipedia page ingest recipe. The ingest stream query refers to a `dbNode` for each database where a `page-create` event belongs.

ingestStreams:
  - type: ServerSentEventsIngest
    url: https://stream.wikimedia.org/v2/stream/page-create
    format:
      type: CypherJson
      query: |-
        MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)
        MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
        MATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)
        SET revNode = $that, revNode.type = "rev"
        SET dbNode.database = $that.database, dbNode.type = "db"
        SET userNode = $that.performer, userNode.type = "user"
        WITH *, datetime($that.rev_timestamp) AS d
        CALL create.setLabels(revNode, ["rev:" + $that.page_title])
        CALL create.setLabels(dbNode, ["db:" + $that.database])
        CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])
        CALL reify.time(d, ["year", "month", "day", "hour", "minute"]) YIELD node AS timeNode
        CREATE (revNode)-[:at]->(timeNode)
        CREATE (revNode)-[:db]->(dbNode)
        CREATE (revNode)-[:by]->(userNode)

Let’s take a closer look at line two of the query. Notice that even when starting with an empty Quine system, we begin by MATCHing the `dbNode`. We don’t create it because it already exists. We MATCH it with a WHERE constraint on its ID using `idFrom`:

MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)

Using `idFrom`, Quine calculates the node ID using a combination of the string “db” and the value of the `database` field passed in from the event: `$that`. `idFrom` will always return the same node ID when given the same arguments.

NOTE: It’s good practice to prefix the `idFrom()` with a descriptive name for the type of values being passed in in order to effectively create a namespace to further ensure there won’t be accidental collisions on the id that gets created. If another field coincidentally had the value as `$that.database`, prefixing it with a string will ensure the same value from different types doesn’t accidentally refer to the same node when it shouldn’t.

If we query the top five most connected database nodes, it reveals that `idFrom` deterministically calculated node IDs thousands of times over a short period while processing the Wikipedia page-create Kafka stream.

❯ curl -s -X "POST" "http://0.0.0.0:8080/api/v1/query/cypher" \
     -H 'Content-Type: text/plain' \
     -d $'MATCH (n)
WHERE n.type = "db"
MATCH (n)-[r]-()
RETURN DISTINCT n.database, count(r)
ORDER BY count(r) DESC
LIMIT 5' \
| jq .
}

This produces the following results:


.tg  
.tg td
.tg th
.tg .tg-5l9e
.tg .tg-7d05
.tg .tg-wpo4

 Database  
 Count 

 commonswiki
 2953
 wikidatawiki
 1883
 enwiki
 790
 ruwiki
 144
 enwiktionary
 139

Using `idFrom` to calculate the `nodeId` tells us exactly where in the graph that message should be handled—whether it’s the first or thousandth time we’ve referred to that node. The processing on each node will only apply updates if the data actually needs updates. So duplicate messages routed to the same node will have the second message behave as a no-op and cause no troublesome side effects.

`idFrom` is a powerful tool that makes complex streaming data easier to reason about in a graph and is the foundation for developing with the Quine streaming graph.

Just like Kafka, Quine is Open Source

If you are using Kafka and have issues with duplicate data, Quine’s a great solution. Quine is open source so trying it out is as simple as downloading it and connecting it to Kafka.

Here’s a list of resources to get you started:

  1. Download Quine – JAR file | Docker Image | Github
  2. Check out the Ingest Data into Quine blog series covering everything from ingest from Kafka to ingesting .CSV data
  3. Apache Log Recipe – this recipe provides more ingest pattern examples
  4. Join Quine Community Slack and get help from thatDot engineers and community members.