Flows

The pipeline can fanin, and fanout and branch at any point. The following stages control flow in the pipeline.

Fanin

Fanins create many parallel inputs that will feed onto the same next stage, so multiple sources can be used for input of events (eg. udp and zeromq).

Sequence

Sequences are a series of stages, in order. The top-level of the configuration is implicitly a Sequence.

Fanout

Fanouts create many parallel outputs that run independently.

Branching

If and Switch can be used to conditionally call stages.

class logcabin.flow.Fanin(**kwargs)

This merges all of the outputs of the child stages to a single queue.

Syntax:

with Fanin():
    Udp()
    Zeromq()
class logcabin.flow.Sequence(**kwargs)

This connects the output of the preceding stage to the input of the next, and so on, so the event is processed by each stage one after the other, in order.

Syntax:

with Sequence():
    Mutate()
    Mutate()
    ...
class logcabin.flow.Fanout(**kwargs)

This enqueues the event onto multiple input queues in parallel.

Syntax:

with Fanout():
    Log()
    Elasticsearch()
    Mongodb()
    ...
class logcabin.flow.If(condition, on_error='reject')

Conditionally execute stages.

The syntax is as follows. The condition may be a lambda expression or code string:

with If('field==1'):
    Json()
class logcabin.flow.Switch(on_error='reject')

Branch flow based on a condition.

The cases are specified using this syntax. The condition may be a lambda expression or code string:

with Switch() as case:
    with case(lambda ev: ev.field == 'value'):
        Json()
    with case('field2 == "value2"'):
        Mutate()
    with case.default:
        Regex(regex='abc')