Ingesting From Multiple Data Sources into Quine Streaming Graph

Ingest multiple data sources into Quine in order create a single streaming graph. ETL basics and Cypher queries are covered.

by
Michael Aglietti
June 2, 2022
Quine
6
min read

Building a Streaming Graph from Multiple Sources

As part of the ongoing series in which I exploring different ways to use the ingest stream to load data into Quine, I want to cover one of Quine's specialities: building a streaming graph from multiple data sources. This time, we'll work with CSV data exported from IMDb to answer the question; "Which actors have acted in and directed the same movie?"

The CSV Files

Usually, if someone says that they have data, most likely it's going to be in `CSV` format or pretty darn close to it. (Or `JSON`, but that is another blog post.) In our case, we have two files filled with data in `CSV` format. Let's inspect what's inside.

File 1: movieData.csv

The `movieData.csv` file contains records for actors, movies, and the actor's relationship to the movie. Conveniently, each record type has a schema, flattened into rows during export.

Should we separate the data back into discrete files and then load them? No, we can set up separate ingest streams to act on each data type in the file. Effectively, we will separate the "jobs to do" into Cypher queries and stream in the data.

File 2: ratingData.csv

Our second file, `ratingData.csv` is very straightforward. It contains 100,000 rows of movie ratings. Adding the `ratings` data into our model completes our discovery phase for the supplied data.

The IMDB csv data's original RDBMS schema, including Person, Movie, Rating, and Join entities.
Original implied schema of IMDB data.

The CypherCsv Ingest Stream

The Quine API documentation defines the schema of the File Ingest Format ingest stream for us. The schema is robust and accommodates CSV, JSON, and line file types. Please take a moment to read through the documentation. Be sure to select type: FileIngest -> format: CypherCsv using the API documentation dropdowns.

I define ingest streams to transform and load the movie data into Quine. Quine ingest streams behave independently and in parallel when processing files. This means that we can have multiple ingest streams operating on a single file. This is the case for the movieData.csv file because there are several operations that we need to perform on multiple types of data.

Movie Rows

The first ingest stream that I set up will address the Movie rows in the movieData.csv file. There are 9,125 movies in the data set. I create two nodes from each Movie row using an ingest query, movie and genre. I store all of the movie data as properties in the Movie mode.

WITH $that AS row
MATCH (m) WHERE row.Entity = 'Movie' AND id(m) = idFrom("Movie", row.movieId)
SET
  m:Movie,
  m.tmdbId = row.tmdbId,
  m.imdbId = row.imdbId,
  m.imdbRating = toFloat(row.imdbRating),
  m.released = row.released,
  m.title = row.title,
  m.year = toInteger(row.year),
  m.poster = row.poster,
  m.runtime = toInteger(row.runtime),
  m.countries = split(coalesce(row.countries,""), "|"),
  m.imdbVotes = toInteger(row.imdbVotes),
  m.revenue = toInteger(row.revenue),
  m.plot = row.plot,
  m.url = row.url,
  m.budget = toInteger(row.budget),
  m.languages = split(coalesce(row.languages,""), "|"),
  m.movieId = row.movieId
WITH m,split(coalesce(row.genres,""), "|") AS genres
UNWIND genres AS genre
WITH m, genre
MATCH (g) WHERE id(g) = idFrom("Genre", genre)
SET g.genre = genre, g:Genre
MERGE (m:Movie)-[:IN_GENRE]->(g:Genre)

Quine passes each line to the ingest stream via the variable `$that` to which I assign the identity `row`. A `MATCH` is made when the `row.Entity` value is `Movie` and a node `id` is returned from the `idFrom()` function. `SET` is used to give the node a label and to store metadata as node properties.

Each movie row has a pipe `|` delimited list of genres in the `genres` column. I split the column value apart and created a Genre node for each genre in the list, labeled and containing the genre as a property.

Finally, the `Movie` node is related to the `Genre` node with `MERGE`.

Person Rows

The second ingest stream addresses the `Person` rows in the same way I did for the `Movie` rows. There are 19047 person records in the `movieData.csv` file.

WITH $that AS row
MATCH (p) WHERE row.Entity = "Person" AND id(p) = idFrom("Person", row.tmdbId)
SET
  p:Person,
  p.imdbId = row.imdbId,
  p.bornIn = row.bornIn,
  p.name = row.name,
  p.bio = row.bio,
  p.poster = row.poster,
  p.url = row.url,
  p.born = row.born,
  p.died = row.died,
  p.tmdbId = row.tmdbId,
  p.born = CASE row.born WHEN "" THEN null ELSE datetime(row.born + "T00:00:00Z") END,
  p.died = CASE row.died WHEN "" THEN null ELSE datetime(row.died + "T00:00:00Z") END

The ingest query in this ingest stream matches when the `row.Entity` is `Person`, creates a node using the `idFrom()` function, and stores the Person metadata in node parameters.

Join Rows

Looking at the rows that have `Join` in the `Entity` column leads me to believe that the data in this `CSV` file originated from a relational database. There are two types of joins in the file, `Acted` and `Directed`. The ingest queries below process them.

Acted In

WITH $that AS row
WITH row WHERE row.Entity = "Join" AND row.Work = "Acting"
MATCH (p) WHERE id(p) = idFrom("Person", row.tmdbId)
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MATCH (r) WHERE id(r) = idFrom("Role", row.tmdbId, row.movieId, row.role)
SET 
  r.role = row.role, 
  r.movie = row.movieId, 
  r.tmdbId = row.tmdbId, 
  r:Role
MERGE (p:Person)-[:PLAYED]->(r:Role)<-[:HAS_ROLE]-(m:Movie)
MERGE (p:Person)-[:ACTED_IN]->(m:Movie)

Acted join rows create relationships between Person, Role, and Movie nodes. There are two paths created from the Person nodes. The first path `(p)-[:PLAYED]->(r)<-[:HAS_ROLE]-(m)` establishes the relationship between actors (Person) and the roles they have played as well as the roles in a movie (Movies). A second path is formed that directly relates an actor to movies they acted in.

Directed

WITH $that AS row
WITH row WHERE row.Entity = "Join" AND row.Work = "Directing"
MATCH (p) WHERE id(p) = idFrom("Person", row.tmdbId)
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MERGE (p:Person)-[:DIRECTED]->(m:Movie)

The Directed ingest query matches join rows and creates a path relating directors with the movies they have directed.

Ratings

WITH $that AS row
MATCH (m) WHERE id(m) = idFrom("Movie", row.movieId)
MATCH (u) WHERE id(u) = idFrom("User", row.userId)
MATCH (rtg) WHERE id(rtg) = idFrom("Rating", row.movieId, row.userId, row.rating)
SET u.name = row.name, u:User
SET rtg.rating = row.rating,
  rtg.timestamp = toInteger(row.timestamp),
  rtg:Rating
MERGE (u:User)-[:SUBMITTED]->(rtg:Rating)<-[:HAS_RATING]-(m:Movie)
MERGE (u:User)-[:RATED]->(m:Movie)

The last ingest query processes rows from the `ratingData.csv` file. The query creates User and Rating nodes, then relates them together.

Running the Recipe

As my project progressed, I developed a Quine recipe to load my `CSV` files and perform the analysis. Running the recipe requires a couple of Quine options to pass in the locations of the `CSV` files and an updated configuration setting.

java \
-Dquine.in-memory-soft-node-limit=30000 \
-jar ../releases/latest -r movieData \
--recipe-value movie_file=movieData.csv \
--recipe-value rating_file=ratingData.csv

After ingesting the `CSV` files, it results in the data set stored in Quine:

The data model in Quine for the IMDB data.

The orange Movie and Person nodes are created directly from the `Entity` column in `movieData.csv`. The User node is from `ratingData.csv` and the green nodes were derived from data stored within an entity row. The `ActedDirected` relationship is built by the standing query in the recipe.

Answering the Question

Getting all of this data into Quine was only part of the challenge. Remember the question that we were asked, "which actors have acted in and directed the same movie?"

Quine is a streaming graph; if we were to connect the ingest streams to the streaming source, rather than `CSV` files, the standing query inside of the recipe that I developed would answer the question for movies in the past as well as movies in the future.

Our standing query matches when a complete pattern for the situation when an actor (`Person`) both `ACTED_IN` and `DIRECTED` the same movie.

MATCH (a:Movie)<-[:ACTED_IN]-(p:Person)-[:DIRECTED]->(m:Movie) 
WHERE id(a) = id(m)
RETURN id(m) as movieId, m.title as Movie, id(p) as personId, p.name as Actor

When the standing query completes a match, it processes the movie `id` and person `id` through the output query and actions.

standingQueries:
  - pattern:
      type: Cypher
      mode: MultipleValues
      query: |-
        MATCH (a:Movie)<-[:ACTED_IN]-(p:Person)-[:DIRECTED]->(m:Movie) 
        WHERE id(a) = id(m)
        RETURN id(m) as movieId, m.title as Movie, id(p) as personId, p.name as Actor
    outputs:
      set-ActedDirected:
        type: CypherQuery
        query: |-
          MATCH (m),(p)
          WHERE strId(m) = $that.data.movie AND strId(p) = $that.data.person
          MERGE (p:Person)-[:ActedDirected]->(m:Movie)
      log-actor-director:
        type: WriteToFile
        path: "ActorDirector.jsonl"

My standing query creates a new `ActedDirected` relationship between the Person and Movie nodes, then logs the relationship.

Four hundred ninety-one actors acted in and directed the same movie in our data set.

{
    "data": {
        "Actor": "Clint Eastwood",
        "Movie": "Unforgiven",
        "movieId": "4a6d64c8-9c90-3362-b443-4d2e7b2fb9d1",
        "personId": "4638a820-3b68-3fc7-9fa7-341e876b701e"
    }
}

Conclusion

Phew, we made it through! And we learned a lot along the way.

  • CSV data is streamed into Quine
  • Quine can read from external files and streaming providers
  • You can ingest multiple streams at once, movies and reviewers, and combine them into one streaming graph
  • Always separate ingest queries using the jobs to be done framework

Quine is open source if you want to run this analysis for yourself. Download a precompiled version or build it yourself from the codebase Quine Github. I published the recipe that I developed at https://quine.io/recipes. The page has instructions for downloading the `CSV `files and running the recipe.

Have a question, suggestion, or improvement? I welcome your feedback! Please drop in to Quine Slack and let me know. I'm always happy to discuss Quine or answer questions.

by
Michael Aglietti
Thanks! Your email has been signed up.
Sorry, an error occurred. Please try signing up later.

Read more like this: