Streaming Graph Get Started

thatDot avatar thatDot

It’s been said that graphs are everywhere. Graph-based data models provide a flexible and intuitive way to represent complex relationships and interconnectedness in data. They are particularly well-suited for scenarios where relationships and patterns are important, but until recently, they have been confined to a handful of use cases – databases, chip design, information theory, AI – that all have one thing in common: the data in question is stored first and then processed, usually as a batch job.

In other words, the data in these use cases is at rest. However, what about data in motion, data in event-driven use cases that is constantly changing and being transmitted? As event-driven applications and operational intelligence scenarios, such as real-time monitoring, situational intelligence, and fraud detection, continue to expand rapidly, graph data models and the primary query language used for them, Cypher, are proving much more versatile than SQL and the best tools for the task.

Consider the challenge of extracting insights from a complex event stream. The stream may have high volume and velocity, require the correlation of events by context from multiple sources, contain meaningful event patterns, and have a short timeframe to identify, detect, and take action. Addressing these challenges requires efficient data processing, scalable infrastructure, and effective event modeling techniques in graph solutions and Cypher.

Graph databases are useful for batch processing a portion of a complex event stream to provide macro-level insights and metrics to understand events but not take action. The same concepts (patterns and algorithms) used in graph databases when the event stream is at rest can be applied to an event stream while in motion using a streaming graph like Quine, often directly reusing the Cypher written in the database. Here’s how Cypher addresses the challenges found in complex event streams:

  • Pattern Matching: Cypher excels at pattern matching, allowing you to detect patterns (sub-graphs) in the event stream. This is particularly useful for identifying sequences of events or detecting specific patterns, allowing you to efficiently filter and process relevant events based on their relationships and properties.
  • Event Correlation: You can define relationships between events and other entities, such as users, devices, or locations. This enables you to correlate events based on common attributes or shared relationships, often with high cardinality and a mix of categorical and numerical data, to identify patterns, anomalies, or complex dependencies.
  • Time-based Queries: Cypher provides temporal capabilities, allowing you to query and analyze events based on their timestamps or time intervals. You can filter events based on specific time ranges, compare temporal values, and perform time-based aggregations. This enables you to process time-dependent patterns, detect trends, and perform time window-based computations on the event stream.
  • Real-time Insights: You can continuously execute Cypher queries on an incoming event stream, allowing for dynamic analysis and near real-time decision-making. This enables you to monitor, detect patterns, and trigger actions based on the evolving stream of events.

Event Pattern Detection

Specifying a pattern (sub-graph) to MATCH can identify specific sequences of events or combinations of events of interest. For example, when observing the efficiency of cache nodes in a CDN network, Cypher can easily identify when a series (10) of cache misses occur and send an alert to the NOC to trigger an investigation.

The Cypher required to detect a MISS event only needs to identify the node types and relationships as a pattern.

MATCH (server1:server)<-[:TARGETED]-(event1 {cache_class:"MISS"})-[:REQUESTED]->(asset)<-[:REQUESTED]-(event2 {cache_class:"MISS"})-[:TARGETED]->(server2:server)
RETURN DISTINCT id(event1) AS event1

Then, additional Cypher processes the event to take action, recording it as a metric or sending an alert if the metric constraint is exceeded. This technique is demonstrated in the CDN Observability recipe. An unexpected challenge

Cypher can respond to changes in the event stream in real time, allowing organizations to reduce the risk associated with a condition’s duration before it is analyzed and addressed. For example, the Financial Risk Calculation recipe models market changes in real-time so that organizations can provide adequate coverage for risk exposure while ensuring their regulatory compliance minimally affects their asset allocation. As basic patterns are matched, results are passed to business logic written in Cypher to generate an adjusted trading value, correlate (roll-up) trading events across the network, and trigger an alert when the trading system is out of compliance. When a pattern match query detects an investment pattern, it triggers an output query to process the StandingQueryResult.

For example, the result returned from an investment pattern in Cypher:

MATCH MATCH (investment:investment)<-[:HOLDS]-(desk:desk)<-[:HAS]-(institution:institution)
RETURN DISTINCT id(investment) AS id
Triggers business logic in Cypher to generate a new property with a value based on the nodes investment.class property.SET investment.adjustedValue = CASE
 WHEN investment.class = '1' THEN investment.value WHEN investment.class = '2a' THEN investment.value * .85 WHEN investment.class = '2b' AND investment.type = 9 THEN investment.value * .75 WHEN investment.class = '2b' AND investment.type = 10 THEN investment.value * .5
END

The investment events are then correlated through a roll-up function for each investment type.

UNWIND [["1","adjustedValue1"], ["2a","adjustedValue2a"], ["2b","adjustedValue2b"]] AS stuff

WITH institution,investment,desk,stuff
WHERE investment.class = stuff[0]

CALL float.add(institution,stuff[1],investment.adjustedValue) YIELD result AS institutionAdjustedValueRollupByClass
CALL float.add(institution,"totalAdjustedValue",investment.adjustedValue) YIELD result AS institutionAdjustedValueRollup

CALL float.add(desk,stuff[1],investment.adjustedValue) YIELD result AS deskAdjustedValueRollupByClass
CALL float.add(desk,"totalAdjustedValue",investment.adjustedValue) YIELD result AS deskAdjustedValueRollup

SET institution.percentAdjustedValue2 = ((institution.adjustedValue2a + institution.adjustedValue2b)/institution.totalAdjustedValue) * 100,
institution.percentAdjustedValue2b = (institution.adjustedValue2b/institution.totalAdjustedValue) * 100

Temporal Analysis

With Cypher, you can express temporal conditions, such as events occurring within a specific time window, events happening before or after certain events, or events falling into a particular time range. This enables temporal analysis of event streams, including trend analysis, time-based aggregations, and windowed computations. For example, the temporal locality recipe looks for emails sent or received by cto@company.com within a four to six-minute sliding window. The pattern query matches each individual (sender)-[:SENT_MSG]->(message)-[:RECEIVED_MSG]->(receiver) pattern containing the CTO’s email address.

MATCH (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r)
WHERE n.email="cto@company.com" OR r.email="cto@company.com"
RETURN id(n) as ctoId, id(m) as ctoMsgId, m.time as mTime, id(r) as recId

And then calculates the duration between the emails to generate a sub-graph containing messages that went to or from the CTO within the time window.

MATCH (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r), (thisMsg)
WHERE id(n) = $that.data.ctoId
AND id(r) = $that.data.recId
AND id(thisMsg) = $that.data.ctoMsgId
AND id(m) <> id(thisMsg)
AND duration("PT6M") > duration.between(m.time,thisMsg.time) > duration("P
CREATE (m)-[:IN_WINDOW]->(thisMsg)
CREATE (m)<-[:IN_WINDOW]-(thisMsg) WITH n, m, r, "http://localhost:8080/#MATCH" + text.urlencode(' (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r) WHERE strId(n)="' + strId(n) + '"AND strId(r)="' + strId(r) + '" AND strId(m)="' + strId(m) + '" RETURN n, r, m') a
RETURN URL

Conclusion

Cypher is a powerful and expressive query language well-suited for processing complex event streams. Quine streaming graph enables Cypher developers to leverage graph techniques early when processing a complex event stream to aggregate and shape events, detect patterns for alerting and early feedback, and perform event normalization before entering the data warehouse. Learn more and Try Quine

If you want to try Quine using your own data, here are some resources to help:

  1. Learn more about Quine by visiting the Quine open source project.
  2. Download Quine – JAR file | Docker Image | Github
  3. Check out the Financial Risk Calculation recipe to see how Cypher is used for real-time rollups.
  4. Check out demos and other videos at our YouTube channel.

Recent posts

Want to read more news and other posts? Visit the resource center for all things thatDot.

Help Center

Streaming Graph Help

Novelty & Additional Help