Ingest and Analyze Log Files Using Streaming Graph

This blog shows you how Quine streaming graph can ingest multiple log formats to create a single, unified streaming graph for real-time analysis.

by
Michael Aglietti
June 7, 2022
Quine
5
min read

Processing Machine Logs with Streaming Graph

You know we had to get here eventually. I'm looking into all of the ways that Quine can connect to and ingest streaming sources. Last time I covered ingest from multiple sources, a Quine strength. Next up is my old friend, the log file.

Log files are a structured stream of parsable data using regular expressions. Log lines are emitted at all levels of an application. The challenge is that they are primarily islands of disconnected bits of the overall picture. Placed into a data pipeline, we can use Quine to combine different types of logs and use a standing query to match interesting patterns upstream of a log analytics solution like Splunk or Sumo Logic.

Log Line Structure

Processing log files can quickly become as messy as the log files themself. I think that it's best to approach a log file like any other data source and take the time to understand the log line structure before asking any questions.

Quine is an application that produces log lines, and just like many other applications, the structure of the log lines follows a pattern. The logline pattern is defined in Scala, making it very easy for us to understand what the log line contains.

pattern = "%date %level [%mdc{akkaSource:-NotFromActor}] [%thread] %logger - %msg%n%ex"

Quine Log RegEx

Each Quine log line was assembled using the pre-defined pattern. This presents a perfect opportunity to use a regular expression, reverse the pattern, and build a streaming graph.

NOTE
The regex link in the example below uses the log output from a Quine Enterprise cluster. Learn more about the Quine Enterprise and other products created by thatDot. The regular expression will work for both Quine and Quine Enterprise.

I developed a regular expression that reverses the logline and returns the log elements for use by the ingest stream ingest query. I also published a recipe that uses the regular expression to parse Quine log lines on Quine.io.

(^\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2},\d{3}) # date and time string 
(FATAL|ERROR|WARN|INFO|DEBUG)                  # log level
\[(\S*)\]                                      # actor address
\[(\S*)\]                                      # thread name
(\S*)                                          # logging class
-                                              # the log message
((?:(?!^[0-9]{4}(?:-[0-9]{2}){2}(?:[^|\r?\n]+){3}).*(?:\r?\n)?)+)

Quine Log Ingest Stream

In my previous article, I connected to a `CSV` file using the `CypherCsv` `FileIngest` format so that Quine could break the rows of data stored in the file back into columns. The `CypherLine` `FileIngest` format allows us to read each line into the `$that` variable and process it through a Cypher query. 

ingestStreams:
  - type: FileIngest
    path: $in_file
    format:
      type: CypherLine
      query: |-
        // Quine log pattern "%date %level [%mdc{akkaSource:-NotFromActor}] [%thread] %logger - %msg%n%ex"
        WITH text.regexFirstMatch($that, "(^\\d{4}-\\d{2}-\\d{2} \\d{1,2}:\\d{2}:\\d{2},\\d{3}) (FATAL|ERROR|WARN|INFO|DEBUG) \\[(\\S*)\\] \\[(\\S*)\\] (\\S*) - (.*)") as r 
        WHERE r IS NOT NULL 
        // 0: whole matched line
        // 1: date time string
        // 2: log level
        // 3: actor address. Might be inside of `akka.stream.Log(…)`
        // 4: thread name
        // 5: logging class
        // 6: Message
        WITH *, split(r[3], "/") as path, split(r[6], "(") as msgPts
        WITH *, replace(COALESCE(split(path[2], "@")[-1], 'No host'),")","") as qh
        MATCH (actor), (msg), (class), (host)
        WHERE id(host)  = idFrom("host", qh)
          AND id(actor) = idFrom("actor", r[3])
          AND id(msg)   = idFrom("msg", r[0])
          AND id(class) = idFrom("class", r[5])
        SET host: Host, host.address = split(qh, ":")[0], host.port = split(qh, ":")[-1], host.host = qh,
            actor: Actor, actor.address = r[3], actor.id = replace(path[-1],")",""), actor.shard = path[-2], actor.type = path[-3],
            msg: Message, msg.msg = r[6], msg.type = split(msgPts[0], " ")[0], msg.level = r[2],
            class: Class, class.class = r[5]
        WITH * CALL reify.time(datetime({date: localdatetime(r[1], "yyyy-MM-dd HH:mm:ss,SSS")})) YIELD node AS time
        CREATE (actor)-[:sent]->(msg),
               (actor)-[:of_class]->(class),
               (actor)-[:on_host]->(host),
               (msg)-[:at_time]->(time)

The ingest stream definition:

  • Reads Quine log lines from a file
  • Parses each line with regex
  • Creates host, actor, message, and class nodes
  • Populates the node properties
  • Relates the nodes in the streaming graph
  • Anchors the message with a relationship to a time node from 'reify.time'

Configuring Quine Logs

Ok, let's run this recipe and see how it works. By default, the log level in Quine is set to WARN. We can increase the log level in the configuration or pass in a Java system configuration property when we launch Quine.

NOTE
 Set the log level in Quine (or Quine Enterprise) via the thatdot.loglevelconfiguration option..

Setting Log Level in Configuration

Start by getting your current Quine configuration. The easiest way to get the configuration is to start Quine and then `GET` the configuration via an API call.

❯ curl --request GET \
  --url http://0.0.0.0:8080/api/v1/admin/config \
  --header 'Content-Type: application/json' \
> quine.conf


Edit the `quine.conf` file and add `"thatdot":{"loglevel":"DEBUG"},` before the `quine` object.

❯ jq '.' quine.conf
{
  "thatdot": {
    "loglevel": "DEBUG"
  },
  "quine": {
    "decline-sleep-when-access-within": "0",
    "decline-sleep-when-write-within": "100ms",
    "dump-config": false,
    "edge-iteration": "reverse-insertion",
    "id": {
      "partitioned": false,
      "type": "uuid"
    },
    "in-memory-hard-node-limit": 75000,
    "in-memory-soft-node-limit": 10000,
    "labels-property": "__LABEL",
    "metrics-reporters": [
      {
        "type": "jmx"
      }
    ],
    "persistence": {
      "effect-order": "memory-first",
      "journal-enabled": true,
      "snapshot-schedule": "on-node-sleep",
      "snapshot-singleton": false,
      "standing-query-schedule": "on-node-sleep"
    },
    "shard-count": 4,
    "should-resume-ingest": false,
    "store": {
      "create-parent-dir": false,
      "filepath": "quine.db",
      "sync-all-writes": false,
      "type": "rocks-db",
      "write-ahead-log": true
    },
    "timeout": "2m",
    "webserver": {
      "address": "0.0.0.0",
      "enabled": true,
      "port": 8080
    }
  }
}

Now, restart Quine and include the `config.file` property.

java -Dconfig.file=quine.conf -jar quine-x.x.x.jar > quineLog.log

`DEBUG` level log lines will stream into the  `quineLog.log` file.

Passing Log Level at Runtime

Another slightly more straightforward way to enable Quine logs is to pass in a Java system configuration property. Here's how to start Quine and enable logging from the command line.

java -Dthatdot.loglevel=DEBUG -jar quine-x.x.x.jar > quineLog.log

`DEBUG` level log lines will stream into the `quineLog.log` file.

Ingesting Other Log Formats

You can easily modify the regex I developed for Quine log lines above to parse similar log output, like those found in *nix based system files or other Java applications.

Standard-ish Java Log Output

Depending on the `log level`, Java emits a lot of information into logs. This ingest stream handles application log lines from most Java applications. Sometimes the log entry itself spans multiple lines.

- type: FileIngest
  path: $app_log
  format:
    type: CypherJson
    query: |-
      WITH *, text.regexFirstMatch($that.message, '^(\\d{4}(?:-\\d{2}){2}(?:[^]\\r?\\n]+))\\s+?\\[(.+?)\\]\\s+?(\\S+?)\\s+(.+?)\\s+\\-\\s+((?:(?!^\\d{4}(?:-\\d{2}){2}(?:[^|\\r?\\n]+){3}).*(?:\\r?\\n)?)+)') AS r WHERE r IS NOT NULL
      CREATE (log {
        timestamp: r[1],
        component: r[2],
        level: r[3],
        subprocess: r[4],
        message: r[5],
        type: 'log'
      })
      // Create hour/minute buckets per event
      WITH * WHERE r[1] IS NOT NULL CALL reify.time(datetime({date: localdatetime(r[1], "yyyy-MM-dd HH:mm:ss,SSS")}), ["hour","minute"]) YIELD node AS timeNode
      // Create edges for timenNodes
      CREATE (log)-[:at]->(timeNode)

Ubuntu Ubuntu 22.04 LTS Syslog

If you're developing distributed applications, you will most likely need a regular expression that parses the Ubuntu `/var/log/syslog` file. First, you need to edit `/etc/rsyslog.conf` and uncomment the line to emit the traditional `DateTime` format.

#
# Use traditional timestamp format.
# To enable high precision timestamps, comment out the following line.
#
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat

The log line format is:
`%timestamp:::date-rfc3339% %HOSTNAME% %app-name% %procid% %msgid% %msg%n`

- type: FileIngest
  path: $syslog
  format:
    type: CypherLine
    query: |-
      WITH text.regexFirstMatch($that, '^(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d*?\\+\\d{2}:\\d{2}|Z).?\\s(.*?)(?=\\s).?\\s(\\S+)\\[(\\S+)\\]:\\s(.*)') AS s WHERE s IS NOT NULL
      CREATE (syslog {
        timestamp: s[1],
        hostname: s[2],
        app_name: s[3],
        proc_id: s[4],
        message: s[5],
        type: 'syslog'
      })
      // Create hour/minute buckets per event
      WITH * WHERE s[1] IS NOT NULL CALL reify.time(datetime({date: localdatetime(s[1], "yyyy-MM-dd'T'HH:mm:ss.SSSSSSz")}), ["hour","minute"]) YIELD node AS timeNode
      // Create edges for timenNodes
      CREATE (syslog)-[:at]->(timeNode)

MySQL Error Log

Working on a web application that's been around for a while, it's probably sitting on top of a MySQL database. The traditional-format MySQL log messages have these [fields](https://dev.mysql.com/doc/refman/8.0/en/error-log-format.html):

`time thread [label] [err_code] [subsystem] msg`

For example:
`2022-04-14T06:55:26.961757Z 0 [System] [MY-011323] [Server] X Plugin ready for connections. Socket: /var/run/mysqld/mysqlx.sock`

Add these log entries to your streaming graph for analysis too.

- type: FileIngest
  path: $sqlerr_log
  format:
    type: CypherLine
    query: |-
      WITH text.regexFirstMatch($that, '^(\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z)\\s(\\d)\\s\\[(\\S+)\\]\\s\\[(\\S+)\\]\\s\\[(\\S+)\\]\\s(.*)') AS m WHERE m IS NOT NULL
      CREATE (sqllog {
        timestamp: m[1],
        thread: m[2],
        label: m[3],
        err_code: m[4],
        subsystem: m[5],
        message: m[6],
        type: 'sqllog'
      })
      // Create hour/minute buckets per event
      WITH * WHERE m[1] IS NOT NULL CALL reify.time(datetime({date: localdatetime(m[1], "yyyy-MM-dd'T'HH:mm:ss.SSSSSSz")}), ["hour","minute"]) YIELD node AS timeNode
      // Create edges for timenNodes
      CREATE (sqllog)-[:at]->(timeNode)

Conclusion

Streaming data comes from all kinds of sources. With Quine, it's easy to convert that data stream into a streaming graph.

Quine is open source if you want to run this analysis for yourself. Download a precompiled version or build it yourself from the codebase Quine Github. I published the recipe that I developed at https://quine.io/recipes/quine-log-recipe. The page has instructions for downloading the `quineLog.log `files and running the recipe.

Have a question, suggestion, or improvement? I welcome your feedback! Please drop in to Quine Slack and let me know. I'm always happy to discuss Quine or answer questions.

by
Michael Aglietti
Thanks! Your email has been signed up.
Sorry, an error occurred. Please try signing up later.