Drive Streaming Event Workflows with Standing Queries

thatDot avatar Michael Aglietti

Standing Queries: Turning Event-Driven Data into Data-Driven Events

Quine’s super power is the ability to store and execute business logic within the graph. That query can then operate directly on data as it streams in. We call this type of query a standing query.

A standing query incrementally matches some graph structure while new data is ingested into the graph. Quine’s special design makes this process extremely fast and efficient. When a full pattern match is found, a standing query takes action.

A standing query is defined in two parts: a pattern and an output. The pattern defines what we want to match, expressed in Cypher using the form `MATCH … WHERE … RETURN …`. The output defines the action(s) to take for each result produced by the `RETURN`  in the pattern query.

The result of a standing query output is passed to a series of actions which process the output. This output can be logged, passed to other systems (via Kafka, Kinesis, HTTP POST, and more), or can even be used to perform additional actions like running new queries or even rewriting parts of the graph. Whatever logic your application needs.

How nodes match patterns

Each node in Quine is backed by an actor, which makes each graph node act like its own little CPU. Actors function as lightweight, single-threaded logical computation units that maintain state and communicate with each other by passing messages.

The actor model enables you to execute a standing query that is stored in the graph and remembered automatically. When you issue a `DistinctId` standing query, the query is broken into individual steps that can be tested one at a time on individual nodes. Quine stores the result of each successive decomposition of a query (smaller and smaller queries) internally on the node issuing that portion of the query. The previous node’s query is essentially a subscription to the next nodes status as either matching the query or not.

The Quine Streaming Graph asynchronous actor model showing actors (units of compute) associated with each node in graph.
An actor associated with each node performs incremental computation.

Any changes in the next node’s pattern match state result in a notification to the querying node. In this way, a complex query is relayed through the graph, where each node subscribes to whether or not the next node fulfills its part of the query. When a complete match is made, or unmade, the chain is notified with results and an output action is triggered.

Info

There are two pattern match modes: distinctId and multipleValues
This must take the form of MATCH WHERE RETURN
When the mode is DistinctId, the pattern query RETURN must also be DISTINCT.

Creating a standing query

The first step to making a Standing Query is determining the graph pattern you want to watch for. You may have deployed Quine in your data pipeline to perform a series of tasks to isolate data, implement a specific feature, or monitor the stream to find a specific pattern in real time. In any case, Quine will implement your logic using Cypher.  

Let’s demonstrate this concept using Quine’s built in synthetic data generator that was introduced in v1.3.0. Say that you have a need to establish the relationships between all numbers in a number line and any number that is divisible by 10 using integer division (where dividing always returns a whole number; the remainder is discarded).

ingestStreams:
  - format:
      query: |-
        WITH gen.node.from(toInteger($that)) AS n,
             toInteger($that) AS i
        MATCH (thisNode), (nextNode), (divNode) 
        WHERE id(thisNode) = id(n) 
          AND id(nextNode) = idFrom(i + 1) 
          AND id(divNode) = idFrom(i / 10) 
        SET this.i = i,
            this.prop = gen.string.from(i)
        CREATE (thisNode)-[:next]->(nextNode), 
               (thisNode)-[:div_by_ten]->(divNode)
      type: CypherLine
    type: NumberIteratorIngest
    ingestLimit: 100000

Creates a graph with 100000 nodes and a shape that we can use for our example.

Numbers divisible by 10 using integer division

In the example above, I want to count the unique times that a pattern like the one visualized above occurs in a sample of 100000 numbers. A key to our pattern is the existence of the “data” parameter in a node that is generated by the `gen.string.from()` function. The complete recipe is in the Quine repo if you want to follow along.

To detect a pattern in our data, we can write a Cypher query in the `pattern` section:

standingQueries:
  - pattern:
      query: |-
        MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)
        WHERE exists(c.prop)
        RETURN DISTINCT id(c) as id
      type: Cypher
    outputs:
      count-1000-results:
        type: Drop

It is looking for a number which is the ten-divisor of another number which is also the ten-divisor of a number in the graph. That basically means it’s looking for one of the first 1000 nodes created by our “number iterator” ingest.

❯ java -jar quine -r sq-test.yaml
Graph is ready
Running Recipe Standing Query Test Recipe
Using 1 node appearances
Using 11 quick queries 
Running Standing Query STANDING-1
Running Ingest Stream INGEST-1
Quine web server available at http://0.0.0.0:8080
INGEST-1 status is completed and ingested 100000

 | => STANDING-1 count 1000

This example simply counts how many are detected, using the standing query `output` variant:  `type: Drop`

Standing query result output: driving workflows

Say that instead of just counting the number of times that the pattern matches, we need to output the match for debugging or inspection. We can replace the `Drop` output with a `CypherQuery` that uses the matched result and then prints information to the console. When issuing a `DistinctId` standing query, the result of a match is a payload that looks like:

{
    "meta": {
        "isPositiveMatch": true,
        "resultId": "2a757517-1225-7fe2-0d0e-22625ad3be37"
    },
    "data": {
        "a.id": 45110,
        "a.prop": "YH32SISr",
        "b.id": 4511,
        "b.prop": "fqx8aVAU",
        "c.id": 451,
        "c.prop": "61mTZqH8"
    }
}

This payload includes the ID of the node that initially matched in the `data` field. So We can write a new Cypher query to go fetch additional information triggered by this match:

MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)
WHERE id(c) = $that.data.id
RETURN a.i, a.prop, b.i, b.prop c.i, c.prop

The `MATCH` portion looks similar to our standing query, but this time we’re not monitoring the graph, we’re fetching data from the three-node pattern rooted at `(c)`.

Replacing the `count-1000-results` output with `inspect-results` from below would accomplish just that.

inspect-results:
  type: CypherQuery
  query: |-
    MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)
    WHERE id(c) = $that.data.id
    RETURN a.i, a.prop, b.i, b.prop c.i, c.prop
  andThen:
    type: PrintToStandardOut

The outputs stage of a standing query is where you can express your business logic and put Quine to work for you in your data pipeline. Take some time to review all of the possible output types in our API documentation located on the quine.io website.

Modifying standing queries

Modify a Standing Query Output

Another time that you need to notify Quine of changes in your standing queries is when you modify the `outputs` section of an existing standing query. The Quine API has two methods for the `/api/v1/query/standing/{standing-query-name}/output/{standing-query-output-name}` endpoint that allow you to `DELETE` and `POST` a new output to an existing standing query.

From above, let’s change the original standing query output type from `Drop` to a new `CypherQuery` that outputs the matches to the console. We will use two API calls to accomplish the change.

Delete the existing output:

curl --request DELETE \  
--url http://0.0.0.0:8080/api/v1/query/standing/STANDING-1/output/count-1000-results \  
--header 'Content-Type: application/json'

Create the new output:

curl --request POST \
  --url http://0.0.0.0:8080/api/v1/query/standing/STANDING-1/output/inspect-results \
  --header 'Content-Type: application/json' \
  --data '{
    "type": "CypherQuery",
    "query": "MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c) WHERE id(c) = $that.data.id RETURN a.id, a.prop, b.id, b.prop c.id, c.prop",
    "andThen": {
      "type": "PrintToStandardOut"
    }
}'

Propagate a New Standing Query

When a new standing query is registered in the system, it gets automatically registered only new nodes (or old nodes that are loaded back into the cache). This behavior is the default because pro-actively setting the standing query on all existing data might be quite costly depending on how much historical data there is. So Quine defaults to the most efficient option.

However, sometimes there is a need to actively propagate standing queries across all previously ingested data as well. You can use the API to request that Quine propagate a new standing query to all nodes in the existing graph. Here’s how the request looks in `curl`.

curl --request POST \
  --url http://0.0.0.0:8080/api/v1/query/standing/control/propagate?include-sleeping=true \
  --header 'Content-Type: application/json'

Review the in-product API documentation via the Quine web interface for additional code snippets.

Conclusion

In this blog post, we looked at the different types of standing queries that you can create in Quine. A standing query is a powerful tool for data processing because it allows you to express your business logic as part of your data pipeline. We also looked at how you can modify an existing standing query output type and propagate a new standing query across the graph.

Quine is open source if you want to explore standing queries for yourself using your own data. Download a precompiled version or build it yourself from the codebase from the Quine Github codebase.

Have a question, suggestion, or improvement? I welcome your feedback! Please drop into Quine Slack and let me know. I’m always happy to discuss Quine or answer questions.