Ingesting data from the internet into Quine Streaming Graph

thatDot avatar Michael Aglietti

The previous article in this series (Quine Ingest Streams) introduced the ingest stream and the basic structure for for creating them. In this article, I go deeper, exploring the ingest query and its role in the ingest stream.

A quick review of ingest streams:

  • An ingest stream connects Quine to data producers.
  • Ingest streams use backpressure to avoid becoming overloaded.
  • Data is transformed by the ingest query into a streaming graph.
  • Using idFrom allows us to act as if all nodes in the graph already exist.
  • Ingest streams are created either by API calls or Recipes.

For this article, we use the built-in wikipedia recipe as a starting point.

Defining an Ingest Stream

The wikipedia page ingest recipe defines an ingest stream that receives updates from the mediawiki.page-create event stream.

Here’s a copy of the ingest stream from the recipe:

ingestStreams:
  - type: ServerSentEventsIngest
    url: https://stream.wikimedia.org/v2/stream/page-create
    format:
      type: CypherJson
      query: |-
        MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)
        MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
        MATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)
        SET revNode = $that, revNode.type = "rev"
        SET dbNode.database = $that.database, dbNode.type = "db"
        SET userNode = $that.performer, userNode.type = "user"
        WITH *, datetime($that.rev_timestamp) AS d
        CALL create.setLabels(revNode, ["rev:" + $that.page_title])
        CALL create.setLabels(dbNode, ["db:" + $that.database])
        CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])
        CALL reify.time(d, ["year", "month", "day", "hour", "minute"]) YIELD node AS timeNode
        CALL incrementCounter(timeNode, "count")
        CREATE (revNode)-[:at]->(timeNode)
        CREATE (revNode)-[:db]->(dbNode)
        CREATE (revNode)-[:by]->(userNode)

This ingest stream has three elements: type, url, and format. The type declaration for an ingest stream establishes the structure for the ingest stream object definition. This ingest stream is a ServerSentEventsIngest1 stream.

Reviewing the ServerSentEventsIngest schema documentation from the API docs provides us with the schema that we need to follow for the ingest stream definition.

NOTE
The schema definition will default to File Ingest Stream when first opened.
Be sure to click on the down arrow 🔽 next to File Ingest Stream and select Server Sent Events Stream from the drop down to view the correct schema.

Here’s the schema for a ServerSentEventsIngest

Quine Server Ingest Stream Schema

The structure of the ServerSentEventsIngest stream is pretty straight forward.

type specifies the schema type for the ingest stream
format defines what the ingest stream will do with each line it receive
format defines what the ingest stream will do with each line it receive

type identifies the line format in the stream
query defines the Cypher ingest query
parameter name of the parameter to store the current datum

url defines the connection URL for the data producer
parallelism and maximumPerSecond tune the bandwidth for the ingest stream and when to apply backpressure

Wikipedia page-create Data

Quick aside, we need to understand the data that we are working on before we start pulling the ingest query apart.

Here’s a sample page-create json object to review. View more samples by visiting the Wikipedia event streams page, selecting the mediawiki.page-create stream, then clicking the green “Stream” button.

{
    "$schema": "/mediawiki/revision/create/1.1.0",
    "meta": {
        "uri": "https://en.wikipedia.org/wiki/Established_population",
        "request_id": "85b7bd4b-23a5-4c20-84a1-d89430c21f6c",
        "id": "8a34f1c0-a276-4a2b-ae2e-305f8822011c",
        "dt": "2022-05-20T16:43:34Z",
        "domain": "en.wikipedia.org",
        "stream": "mediawiki.page-create",
        "topic": "eqiad.mediawiki.page-create",
        "partition": 0,
        "offset": 231788500
    },
    "database": "enwiki",
    "page_id": 70828723,
    "page_title": "Established_population",
    "page_namespace": 0,
    "rev_id": 1088883819,
    "rev_timestamp": "2022-05-20T16:43:33Z",
    "rev_sha1": "d9uoc7gw3cj3ejhs8ihvsi61hp54icq",
    "rev_minor_edit": false,
    "rev_len": 82,
    "rev_content_model": "wikitext",
    "rev_content_format": "text/x-wiki",
    "performer": {
        "user_text": "Invasive Spices",
        "user_groups": [
            "extendedconfirmed",
            "*",
            "user",
            "autoconfirmed"
        ],
        "user_is_bot": false,
        "user_id": 40272459,
        "user_registration_dt": "2020-09-30T23:11:08Z",
        "user_edit_count": 9319
    },
    "page_is_redirect": true,
    "comment": "#REDIRECT [[Naturalisation (biology)]] {{R cat shell| {{R from related topic}} }}",
    "parsedcomment": "#REDIRECT <a href=\"/wiki/Naturalisation_(biology)\" title=\"Naturalisation (biology)\">Naturalisation (biology)</a> {{R cat shell| {{R from related topic}} }}",
    "rev_slots": {
        "main": {
            "rev_slot_content_model": "wikitext",
            "rev_slot_sha1": "d9uoc7gw3cj3ejhs8ihvsi61hp54icq",
            "rev_slot_size": 82,
            "rev_slot_origin_rev_id": 1088883819
        }
    }
}

Take a moment to get familiar with the page-create schema from the wikipedia API documentation. The sample object is a bit messy for us to really see what is going on, so let’s clean it up a bit. Showing just the keys from the object with jq makes it much easier to plan our ingest query.

❯ jq '. | keys' /tmp/data.json
[
  "$schema",
  "comment",
  "database",
  "meta",
  "page_id",
  "page_is_redirect",
  "page_namespace",
  "page_title",
  "parsedcomment",
  "performer",
  "rev_content_format",
  "rev_content_model",
  "rev_id",
  "rev_len",
  "rev_minor_edit",
  "rev_sha1",
  "rev_slots",
  "rev_timestamp"
]

The mediawiki recipe is an example use case for the reify.time user function. It creates temporal nodes in the graph and relationships with the page-create nodes based on the rev_timestamp.

By demonstrating the reify.time function, our ingest query creates revision nodes, db nodes, and user nodes that are related to each other and their representative time nodes.

To learn more about creating time-series nodes in Quine, read about time reification here.

The Ingest Query

The ingest query is the workhorse of the ingest stream. Each datum, the page-create object in this case, is processed by the ingest query. The query is written in Cypher and is responsible for parsing data, creating nodes, storing data and setting relationships in the streaming graph.

First, the ingest query creates the nodes we want using MATCH and WHERE. The node id is assigned using the idFrom function.

MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)
MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
MATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)

Notice that we pass two parameters to the idFrom function. The first parameter, establishes a unique namespace for the id to avoid collisions. The second parameter is the rev_id from the page-create object. The result from idFrom is a deterministic UUID for each node.

Next, we store the rev, db, and user values as properties in the respective nodes and label each node for clarity in the graph explorer. Quine parses the ingested line and stores the results in a variable, $that. You can retrieve values from the ingested datum using dot notation as $that.<attribute>.

SET revNode = $that, revNode.type = "rev"
SET dbNode.database = $that.database, dbNode.type = "db"
SET userNode = $that.performer, userNode.type = "user"
CALL create.setLabels(revNode, ["rev:" + $that.page_title])
CALL create.setLabels(dbNode, ["db:" + $that.database])
CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])

There is quite a bit going on in this simple line. Specifically, the use of WITH *. Let’s take a moment to understand why we chose to use this pattern.

By calling WITH *, Cypher changes the scope of data available. If you explicitly list each node in the data and accidentally omit a variable, it’s lost for the remainder of the query, and you can get unexpected errors. Using the glob ensures that all nodes and variables are at your disposal in the ingest query.

WITH *, datetime($that.rev_timestamp) AS d

The ingest query make a CALL to the reify.time function to create a new timeNode. The resulting node is based on the year, month, day, hour, and minute of the rev_timestamp. It also increments the count parameter of the timeNode.

CALL reify.time(d, ["year", "month", "day", "hour", "minute"]) 
	YIELD node AS timeNode
CALL incrementCounter(timeNode, "count")

Finally, the ingest query creates the relationships between nodes in the graph.

CREATE (revNode)-[:at]->(timeNode)
CREATE (revNode)-[:db]->(dbNode)
CREATE (revNode)-[:by]->(userNode)

Now, let’s run the recipe to see how the ingest query builds out the graph in Quine.  With the latest Quine jar file downloaded from Quine.io start the recipe from the command line.

❯ java -jar quine-x.x.x.jar -r wikipedia

The recipe includes a standing query that outputs nodes to the terminal as they arrive. You should see activity quickly after launching the recipe.

Before the graph gets too large, open Quine explorer (http:/0.0.0.0:8080) and run the time nodes stored query. Each of the time nodes were created by the ingest query using the timestamp in the page-create object.

We call these synthetic nodes. Synthetic nodes are useful when looking for abstract patterns between loosely related nodes. In this case, which updates were done during a particular time bucket.

Quine Exploration UI – Time Buckets


Using the API, let’s inspect the ingest stream using the ingest endpoint.

❯ http GET http://0.0.0.0:8080/api/v1/ingest Content-Type:application/json
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Type: application/json
Date: Fri, 20 May 2022 20:06:01 GMT
Server: akka-http/10.2.9
Transfer-Encoding: chunked

{
    "INGEST-1": {
        "settings": {
            "format": {
                "parameter": "that",
                "query": "MATCH (revNode) WHERE id(revNode) = idFrom(\"revision\", $that.rev_id)\nMATCH (dbNode) WHERE id(dbNode) = idFrom(\"db\", $that.database)\nMATCH (userNode) WHERE id(userNode) = idFrom(\"id\", $that.performer.user_id)\nSET revNode = $that, revNode.type = \"rev\"\nSET dbNode.database = $that.database, dbNode.type = \"db\"\nSET userNode = $that.performer, userNode.type = \"user\"\nWITH *, datetime($that.rev_timestamp) AS d\nCALL create.setLabels(revNode, [\"rev:\" + $that.page_title])\nCALL create.setLabels(dbNode, [\"db:\" + $that.database])\nCALL create.setLabels(userNode, [\"user:\" + $that.performer.user_text])\nCALL reify.time(d, [\"year\", \"month\", \"day\", \"hour\", \"minute\"]) YIELD node AS timeNode\nCALL incrementCounter(timeNode, \"count\")\nCREATE (revNode)-[:at]->(timeNode)\nCREATE (revNode)-[:db]->(dbNode)\nCREATE (revNode)-[:by]->(userNode)",
                "type": "CypherJson"
            },
            "parallelism": 16,
            "type": "ServerSentEventsIngest",
            "url": "https://stream.wikimedia.org/v2/stream/page-create"
        },
        "stats": {
            "byteRates": {
                "count": 1354157,
                "fifteenMinute": 1552.6927122874843,
                "fiveMinute": 1398.959143968717,
                "oneMinute": 1099.4731678954581,
                "overall": 1448.3578957557581
            },
            "ingestedCount": 914,
            "rates": {
                "count": 914,
                "fifteenMinute": 1.0510781922502073,
                "fiveMinute": 0.9474472912218986,
                "oneMinute": 0.7431750446830565,
                "overall": 0.9775815796950665
            },
            "startTime": "2022-05-20T19:50:26.494025Z",
            "totalRuntime": 934608
        },
        "status": "Running"
    }
}

The ingest query defined via the recipe is named INGEST-1 and is currently running.

Info

Did you know tat you can make API calls directly from the embedded API documentation? Select the page icon (📄) from the left nav inside of Quine Explore then navigate to the API endpoint that you want to exercise. Adjust the API call as needed, and press the blue “Send API Request” Button..

Pausing the stream via the API is done via the ingest/{name}/pause endpoint.

❯ http PUT http://tow-mater:8080/api/v1/ingest/INGEST-1/pause Content-Type:application/json
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Type: application/json
Date: Fri, 20 May 2022 20:09:27 GMT
Server: akka-http/10.2.9
Transfer-Encoding: chunked

{
    "name": "INGEST-1",
    "settings": {
        "format": {
            "parameter": "that",
            "query": "MATCH (revNode) WHERE id(revNode) = idFrom(\"revision\", $that.rev_id)\nMATCH (dbNode) WHERE id(dbNode) = idFrom(\"db\", $that.database)\nMATCH (userNode) WHERE id(userNode) = idFrom(\"id\", $that.performer.user_id)\nSET revNode = $that, revNode.type = \"rev\"\nSET dbNode.database = $that.database, dbNode.type = \"db\"\nSET userNode = $that.performer, userNode.type = \"user\"\nWITH *, datetime($that.rev_timestamp) AS d\nCALL create.setLabels(revNode, [\"rev:\" + $that.page_title])\nCALL create.setLabels(dbNode, [\"db:\" + $that.database])\nCALL create.setLabels(userNode, [\"user:\" + $that.performer.user_text])\nCALL reify.time(d, [\"year\", \"month\", \"day\", \"hour\", \"minute\"]) YIELD node AS timeNode\nCALL incrementCounter(timeNode, \"count\")\nCREATE (revNode)-[:at]->(timeNode)\nCREATE (revNode)-[:db]->(dbNode)\nCREATE (revNode)-[:by]->(userNode)",
            "type": "CypherJson"
        },
        "parallelism": 16,
        "type": "ServerSentEventsIngest",
        "url": "https://stream.wikimedia.org/v2/stream/page-create"
    },
    "stats": {
        "byteRates": {
            "count": 1653281,
            "fifteenMinute": 1530.565647994232,
            "fiveMinute": 1428.2092910910662,
            "oneMinute": 1488.2104624440235,
            "overall": 1448.444229804896
        },
        "ingestedCount": 1117,
        "rates": {
            "count": 1117,
            "fifteenMinute": 1.0361739604926652,
            "fiveMinute": 0.96669545913622,
            "oneMinute": 1.0032209384426753,
            "overall": 0.9786067989220232
        },
        "startTime": "2022-05-20T19:50:26.494025Z",
        "totalRuntime": 1141066
    },
    "status": "Paused"
}

Notice that the updates in your terminal window stopped and the INGEST-1 ingest stream has a status of "Paused".

Restart the stream with a PUT to the /ingest/{name}/start endpoint. Updates will resume in your terminal window and the ingest stream status will return to "Running".

Conclusion

We are just getting warmed up whit ingest streams! This post walked through a simple ingest stream and ingest query to read server-sent events (SSE) from the Wikipedia streaming events service.

Next up in the series is Ingesting CSV data where we will go over how Quine streams in data that is stored in a CSV file.

I welcome your feedback! Drop in to Quine Slack and let me know what you think. I’m always happy to discuss Quine or answer questions.