Get in touch with our support team for any questions not answered in our help center.
Join our community on
Get in touch with our support team for any questions not answered in our help center.
Join our community on
Quine streaming graph is specifically designed to find high-value patterns in high-volume event streams, consuming data from APIs, data lakes, and most commonly, event stream processing systems. Quine is complementary to systems like Flink and ksqlDB, both of which are quite powerful but do not make it easy to connect and find complex patterns in categorical data.
A streaming system like Kafka allows developers to divide their monolithic applications into manageable components while addressing resilience and scalability needs.
Switching to real-time event processing does not come without tradeoffs, however. Duplicate messages are common in streaming systems, and duplicate events will inevitably show up in a Kafka stream, especially at scale.
Quine natively addresses duplicate and out-of-order data issues in streaming data pipelines.
In a high-volume data pipeline, duplicate messages are unavoidable. The duplication of events is often the necessary side effect of guaranteeing that data is successfully delivered. The traditional solution is for a consumer application to record what it’s seen recently and drop any event that is already processed.
But event duplication can become a major challenge as your streaming system scales across multiple partitions. Stream consumers are usually distributed on different machines to help the system scale, making it difficult to quickly share knowledge of which events have already been processed. Each Kafka partition typically has its own consumer, if the consumer fails to process the event for any reason, when it resumes, the operation will request events starting from an earlier offset in Kafka. The result is that duplicate events will get sent downstream to other applications.
Processing events multiple times can cause inconsistencies within the facts that your application logic depends on. The effect is wrong analytic insights; or worse, your application performs the wrong actions.
Here are a few of the common approaches for managing duplicate events in a streaming system.
idFrom(…)
Duplicate data delivery is one of the main problems Quine is built to solve. To understand how Quine solves this problem, let’s first understand two of Quine’s fundamental design concepts:
Each event that Quine processes operates on a specific set of nodes in the graph. With traditional static graphs, your application must ensure that each node is created exactly once—and this becomes a big performance drain. Quine behaves as if all nodes exist already, but are not yet filled with data or connected to any other nodes. You don’t have to worry about “creating nodes” twice because all possible nodes exist already. There will always be exactly one right place to handle each message, if only it can be found…
To find the node responsible for each message, Quine has a built-in function called idFrom(…)
. idFrom
takes data from the incoming event and deterministically turns it into a unique node ID in the graph. idFrom
is entirely deterministic. Given the same arguments, idFrom
will always return the same node ID. This is similar to a “consistent hashing” approach used for other purposes, but in this case, Quine returns a well-formed node ID instead of a hash.
Node IDs are user-configurable, so they can take many forms, but by default node IDs will be UUIDs. See the documentation on idProviders for more information on idFrom
and alternate options for node ID types.
Once we know the ID of a node in the graph, that node will handle processing the event and deduplicating future events. So if the same event is received by Quine twice, idFrom
will return the same nodeId
each time. Since Quine only saves to disk the changes to each node, the duplicate event becomes a no-op. The practical effect of this is that using idFrom
will resolve duplicate events in the stream automatically. So you can go back to building your application instead of micromanaging the event stream delivery guarantees.
Using idFrom
within ingest stream queries is standard practice, even when a node is expected to show up repeatedly in the successive events. Take, for example, the Wikipedia page ingest recipe. The ingest stream query refers to a dbNode
for each database where a page-create
event belongs.
ingestStreams:
- type: ServerSentEventsIngest
url: https://stream.wikimedia.org/v2/stream/page-create
format:
type: CypherJson
query: |-
MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)
MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
MATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)
SET revNode = $that, revNode.type = "rev"
SET dbNode.database = $that.database, dbNode.type = "db"
SET userNode = $that.performer, userNode.type = "user"
WITH *, datetime($that.rev_timestamp) AS d
CALL create.setLabels(revNode, ["rev:" + $that.page_title])
CALL create.setLabels(dbNode, ["db:" + $that.database])
CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])
CALL reify.time(d, ["year", "month", "day", "hour", "minute"]) YIELD node AS timeNode
CREATE (revNode)-[:at]->(timeNode)
CREATE (revNode)-[:db]->(dbNode)
CREATE (revNode)-[:by]->(userNode)
Let’s take a closer look at line two of the query. Notice that even when starting with an empty Quine system, we begin by MATCHing the dbNode
. We don’t create it because it already exists. We MATCH it with a WHERE constraint on its ID using idFrom
:
MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
Using idFrom
, Quine calculates the node ID using a combination of the string “db” and the value of the database
field passed in from the event: $that
. idFrom
will always return the same node ID when given the same arguments.
NOTE: It’s good practice to prefix the idFrom()
with a descriptive name for the type of values being passed in in order to effectively create a namespace to further ensure there won’t be accidental collisions on the id that gets created. If another field coincidentally had the value as $that.database
, prefixing it with a string will ensure the same value from different types doesn’t accidentally refer to the same node when it shouldn’t.
If we query the top five most connected database nodes, it reveals that idFrom
deterministically calculated node IDs thousands of times over a short period while processing the Wikipedia page-create Kafka stream.
❯ curl -s -X "POST" "http://0.0.0.0:8080/api/v1/query/cypher" \
-H 'Content-Type: text/plain' \
-d $'MATCH (n)
WHERE n.type = "db"
MATCH (n)-[r]-()
RETURN DISTINCT n.database, count(r)
ORDER BY count(r) DESC
LIMIT 5' \
| jq .
}
This produces the following results:
Database | Count |
---|---|
commonswiki | 2953 |
wikidatawiki | 1883 |
enwiki | 790 |
ruwiki | 144 |
enwiktionary | 139 |
Using idFrom
to calculate the nodeId
tells us exactly where in the graph that message should be handled—whether it’s the first or thousandth time we’ve referred to that node. The processing on each node will only apply updates if the data actually needs updates. So duplicate messages routed to the same node will have the second message behave as a no-op and cause no troublesome side effects.
idFrom
is a powerful tool that makes complex streaming data easier to reason about in a graph and is the foundation for developing with the Quine streaming graph.
If you are using Kafka and have issues with duplicate data, Quine’s a great solution. Quine is open source so trying it out is as simple as downloading it and connecting it to Kafka.
Here’s a list of resources to get you started: