Configuration

Basics

Logcabin uses a python DSL to configure it.

To use a specific configuration pass the -c/--config option with the filename. It defaults to config.py.

The configuration is a set of stages, which can be any of the input, filter or output stages. The configuration is interpreted once and the constructed stages are built into the definition of a pipeline.

With the defined pipeline, logcabin will launch each instance in a greenlet, each with independent input and output queues, so no single stage blocks the processing of any other (provided it’s not tying up CPU).

Example:

# import the stages we wish to use
from flow import Fanin, Fanout
from inputs.udp import Udp
from inputs.zeromq import Zeromq
from filters.json import Json
from filters.mutate import Mutate
from outputs.log import Log
from outputs.elasticsearch import Elasticsearch

# take input from vanilla udp or a zeromq connection
with Fanin():
    Udp(port=6000)
    Zeromq(address='tcp://*:2130')

# transform the plain text input into an structured event with the Json filter, only if field==1.
with If('field==1'):
    Json()

# set myfield=abc
Mutate(set={'myfield': 'abc'})

# broadcast this to the logcabin log and index to elasticsearch /test/event
with Fanout():
    Log()
    Elasticsearch(index='test', type='event')

This configures two inputs, which are both processed through the Json filter, and then output to two outputs in parallel: Log and Elasticsearch.

For full details of the inputs, filters and outputs see sections below.

Examples

Below are some example configurations.

Files

# import the inputs, filters and outputs
from inputs.file import File as IFile
from filters.regex import Regex
from outputs.file import File as OFile

# read line by line from input.log
IFile('input.log')
# extract from message format 'timestamp - message'
Regex('(?P<timestamp>.+) - (?P<message>.+)')
# and output the resulting structured event (json) to output.log
OFile('output.log')

# try me:
# DATE=$(date); echo "$DATE - message" >> input.log

Inputs

# import the inputs and an output
from flow import Fanin
from inputs.udp import Udp
from inputs.zeromq import Zeromq
from outputs.log import Log

# Multiple input sources can be simultaneously received. They are read in
# parallel and events 'fan in' to the rest of the pipeline.
with Fanin():
    Udp(port=6000)
    Zeromq()

# log the results to logcabin.log
Log()

Outputs

# import everything we're using
from flow import Fanout
from inputs.zeromq import Zeromq
from outputs.file import File
from outputs.elasticsearch import Elasticsearch
from outputs.mongodb import Mongodb

# single zeromq input
Zeromq()

# Broadcast the event in parallel to all of the following outputs. The event
# will simultaneously be written to mylogs.log, indexed to elasticsearch and
# saved to mongodb.
with Fanout():
    File(filename='mylogs.log', max_size=10, compress='gz')
    Elasticsearch(index='events', type='{program}')
    Mongodb()

Complex

# import everything we're using
from flow import Fanin, Switch
from inputs.udp import Udp
from inputs.zeromq import Zeromq
from filters.json import Json
from filters.stats import Stats
from outputs.graphite import Graphite
from outputs.elasticsearch import Elasticsearch
from outputs.file import File
from outputs.s3 import S3

# input from a couple of sources
with Fanin():
    Udp(port=6000)
    Zeromq()

# parse json
Json()
# generate statistic counts (suitable for graphite)
Stats(timings={'rails.{controller}.{action}.duration': 'duration'})
# write the data to a rotating log file
File(filename='mylogs.log', max_size=1000000, compress='gz')
# decide the destination because on some tags
with Switch() as case:
    # on log roll, archive the file to S3
    with case("'fileroll' in tags"):
        S3(access_key='xyz',
           secret_key='123',
           bucket='mybucket',
           path='logs/{timestamp:%Y%m%dT%H%M}')
    # write the aggregate statistics to graphite
    with case("'stat' in tags"):
        Graphite()
    # otherwise just index into elasticsearch
    with case.default:
        Elasticsearch(index='logcabin', type='event')