Lightflow
A lightweight, distributed workflow system

Declarative

Lightflow makes it easy to describe workflows using Python. A carefully crafted API lets you build workflows of any complexity without hassle.

Start with a single file for simple workflows and later scale to multiple files for a more modular setup. Since a workflow definition is just pure Python, you don't have to learn a new configuration language and you can use your favorite libraries and modules.

Graph-based

Lightflow uses directed acyclic graphs (DAG) in order to describe the dependencies between the tasks in a workflow.

You have full control over the data flow between tasks and you can start DAGs from a task to change your workflow dynamically at runtime.

Distributed

Lightflow uses Celery to distribute the tasks of a workflow to multiple workers. It even employs Celery to run and monitor the status of your workflow. This removes a single point of failure, such as a central daemon, often found in other workflow tools.

You need to pin certain tasks to certain workers? No problem, Lightflow comes with support for custom queues.


Dependencies

 Operating system

Lightflow is being developed and tested on Linux, with Debian and RedHat being the main platforms.

 Python

Lightflow is written in Python 3 and requires Python 3.5 or higher.

 redis

The redis database is required by Lightflow as a communication broker between tasks. It is also used as the default broker for the Celery queuing system, but could be replaced with any other supported Celery broker.

 MongoDB

Lightflow makes use of MongoDB for storing persistent data during a workflow run that can be accessed by all tasks.

Installation

Lightflow is available from PyPI. Simply install it with:

$ pip install lightflow

Getting started

The following getting started guide assumes a redis database running on localhost and port 6379 as well as a MongoDB database running on localhost and port 27017.

Create a default configuration file and copy the provided example workflows to a local directory of your choice:

$ lightflow config default .
$ lightflow config examples .

If you like, list all available example workflows:

$ lightflow workflow list

In order to execute a workflow, start a worker that consumes jobs from the workflow, dag and task queues. Then start a workflow from the list of available examples. The following example starts the workflow simple:

$ lightflow worker start
$ lightflow workflow start simple

Example workflows

Lightflow comes with a number of example workflows demonstrating all aspects of the library. This section presents a selection of the examples, illustrating how Lightflow models the basic patterns often found in workflows and pipelines.

Sequential

The sequential workflow pattern is one of the most basic patterns. A number of tasks are executed in order, starting with the first task.

In this example, a counter is stored in value and initialized in task_1. Each task increments the counter, prints its value and passes it on to the next task.

from lightflow.models import Dag
from lightflow.tasks import PythonTask


# the callback function for all tasks
def inc_number(data, store, signal, context):
    print('Task {task_name} being run in DAG {dag_name} '
          'for workflow {workflow_name} ({workflow_id}) '
          'on {worker_hostname}'.format(**context.to_dict()))

    if 'value' not in data:
        data['value'] = 0

    data['value'] = data['value'] + 1
    print('This is task #{}'.format(data['value']))


# create the main DAG
d = Dag('main_dag')

# create the 3 tasks that increment a number
task_1 = PythonTask(name='task_1',
                    callback=inc_number)

task_2 = PythonTask(name='task_2',
                    callback=inc_number)

task_3 = PythonTask(name='task_3',
                    callback=inc_number)


# set up the graph of the DAG as a linear sequence of tasks
d.define({
    task_1: task_2,
    task_2: task_3
})
                           

Parallel

Lightflow runs tasks that can be executed in parallel at the same time.

This example starts with the branch_task that fans out into three simple tasks. Once all three tasks have completed running, the join_task is executed.

from lightflow.models import Dag
from lightflow.tasks import PythonTask


# the callback function for the tasks
def print_info(data, store, signal, context):
    print('Task {task_name} being run in DAG {dag_name} '
          'for workflow {workflow_name} ({workflow_id})'.format(**context.to_dict()))


# create the main DAG
d = Dag('main_dag')

# task that limits the branching to certain successor tasks
branch_task = PythonTask(name='branch_task',
                         callback=print_info)

# first task, first lane
lane1_print_task = PythonTask(name='lane1_print_task',
                              callback=print_info)

# first task, second lane
lane2_print_task = PythonTask(name='lane2_print_task',
                              callback=print_info)

# first task, third lane
lane3_print_task = PythonTask(name='lane3_print_task',
                              callback=print_info)

# joins all three lanes together and waits for the predecessor tasks to finish processing
join_task = PythonTask(name='t_join_me',
                       callback=print_info)

# set up the graph of the DAG as illustrated above. Please note how a list of tasks
# defines tasks that are run in parallel (branched out).
d.define({branch_task: [lane1_print_task, lane2_print_task, lane3_print_task],
          lane1_print_task: join_task,
          lane2_print_task: join_task,
          lane3_print_task: join_task})
                            

Data Flow

Almost arbitrary data can flow between tasks. If multiple tasks send data to the same task downstream, the data can be labeled with an alias in order for the downstream task to be able to pick the right dataset.

In this example, the put_task stores 5 into value and passes it on to the print_task_1, square_task, multiply_task and subtract_task. The square_task squares the value, prints it and passes it on to the multiply_task. The input to the multiply_task are two datasets. Since multiplication is a commutative operation, the multiply_task will simply multiply both datasets regardless of their order. The result of the multiplication is passed on to the subtract_task. Since the order of the input for the subtract_task matters, the data passed from the put_task is given the alias first, while the data from the multiply_task is labelled second.

from lightflow.models import Dag
from lightflow.tasks import PythonTask


# store the value 5 under the key 'value'
def put_data(data, store, signal, context):
    data['value'] = 5


# print the name of the task and the current value
def print_data(data, store, signal, context):
    print(context.task_name, 'The value is:', data['value'])


# square the current value
def square_data(data, store, signal, context):
    data['value'] = data['value']**2


# multiply the value from the first dataset and the second dataset. Since the default
# dataset has never been changed, the default dataset is still the first (index==0)
# dataset in the list of all datasets. The second dataset is referenced by its index==1.
def multiply_data(data, store, signal, context):
    data['value'] = data['value'] * data.get_by_index(1)['value']


# subtract two values by using the aliases of the two datasets and different functions
# for illustration purposes: get_by_alias() and the shorthand notation ([alias])
def subtract_data(data, store, signal, context):
    data['value'] = data.get_by_alias('first')['value'] - data('second')['value']


# create the main DAG based on the diagram above
d = Dag('main_dag')

put_task = PythonTask(name='put_task', callback=put_data)
square_task = PythonTask(name='square_task', callback=square_data)
multiply_task = PythonTask(name='multiply_task', callback=multiply_data)
subtract_task = PythonTask(name='subtract_task', callback=subtract_data)

print_task_1 = PythonTask(name='print_task_1', callback=print_data)
print_task_2 = PythonTask(name='print_task_2', callback=print_data)
print_task_3 = PythonTask(name='print_task_3', callback=print_data)
print_task_4 = PythonTask(name='print_task_4', callback=print_data)


d.define({put_task: {print_task_1: None,
                     square_task: None,
                     multiply_task: None,
                     subtract_task: 'first'},
          square_task: [print_task_2, multiply_task],
          multiply_task: {print_task_3: None,
                          subtract_task: 'second'},
          subtract_task: print_task_4})
                            

Sub DAGs

Lightflow allows for more than one DAG to be defined in a workflow and for tasks to queue new DAGs. This allows for dynamically changing workflows.

In this example two DAGs are defined. The main_dag that is executed at the start of the workflow and the sub_dag that is started dynamically. The call_dag_task of the main_dag starts 5 sub_dag instances by sending a request to the signal system. For illustration purposes a numpy array is being sent with the request.

from time import sleep
import numpy as np

from lightflow.models import Dag
from lightflow.tasks import PythonTask


# the callback function for the init task
def print_name(data, store, signal, context):
    print('Task {task_name} being run in DAG {dag_name} '
          'for workflow {workflow_name} ({workflow_id})'.format(**context.to_dict()))


# this callback function starts five dags. For each dag the function waits a second,
# then creates a numpy array and stores it into the data that is then passed to the
# sub dag. The dag that should be started can either be given by its name or the dag
# object itself. The names of the created dags are recorded and the task waits for
# all created dags to be completed.
def start_sub_dag(data, store, signal, context):
    dag_names = []
    for i in range(5):
        sleep(1)
        data['image'] = np.ones((100, 100))
        started_dag = signal.start_dag(sub_dag, data=data)
        dag_names.append(started_dag)

    signal.join_dags(dag_names)


# this callback function prints the dimensions of the received numpy array
def sub_dag_print(data, store, signal, context):
    print('Received an image with dimensions: {}'.format(data['image'].shape))


init_task = PythonTask(name='init_task',
                       callback=print_name)

call_dag_task = PythonTask(name='call_dag_task',
                           callback=start_sub_dag)

# create the main dag that runs the init task first, followed by the call_dag task.
main_dag = Dag('main_dag')
main_dag.define({
    init_task: call_dag_task
})


# create the tasks for the sub dag that simply prints the shape of the numpy array
# passed down from the main dag.
print_task = PythonTask(name='print_task',
                        callback=sub_dag_print)

# create the sub dag that is being called by the main dag. In order to stop the
# system from automatically starting the dag when the workflow is run, set the autostart
# parameter to false.
sub_dag = Dag('sub_dag', autostart=False)

sub_dag.define({
    print_task: None
})
                            

Data Store

Data that should be kept during a workflow run can be saved into the persistent data store. This data is deleted as soon as the workflow run ends, but is available to all tasks during the lifetime of the workflow.

The data store provides methods to store and retrieve single values or append values to a list. This can even be done asynchronously from different tasks at the same time. The key under which the data is being stored supports a hierarchical structure using a dot notation.

This example workflow stores different types of data in the persistent data store and modifies them.

from lightflow.models import Dag
from lightflow.tasks import PythonTask

import numpy as np


# the callback function to store data in the persistent data store. It stores a single
# integer value in 'number', a single integer value into the hierarchical key
# 'buffer' -> 'observable' and a numpy array into 'image'. Additionally it adds an integer
# value to a list in 'sample' -> 'spectra'.
def store_data(data, store, signal, context):
    store.set('number', 5)
    store.set('buffer.observable', 20)
    store.set('image', np.ones((100, 100)))
    store.push('sample.spectra', 7)


# the callback function for the task that retrieves and prints the 'number' and 'image'
# values then modifies the 'number' value and creates a new list of 'filenames'.
def modify_data(data, store, signal, context):
    number = store.get('number')
    print('The number is: {}'.format(number))

    img = store.get('image')
    print('The dimension of the image is: {}'.format(img.shape))

    store.set('number', number * 10)
    store.push('filenames', 'file_a.spec')


# the callback function for the task that adds another filename to the list.
def add_filename(data, store, signal, context):
    store.push('filenames', 'file_b.spec')


# the callback function for the task that adds a nested list to the list of filenames and
# then extends the list of filenames with two more entries.
def add_more_filenames(data, store, signal, context):
    store.push('filenames', ['nested_a', 'nested_b'])
    store.extend('filenames', ['file_c.spec', 'file_d.spec'])


# create the main DAG
d = Dag('main_dag')

# create the tasks that call the functions above
store_task = PythonTask(name='store_task',
                        callback=store_data)

modify_task = PythonTask(name='modify_task',
                         callback=modify_data)

add_filename_task = PythonTask(name='add_filename_task',
                               callback=add_filename)

add_more_filename_task = PythonTask(name='add_more_filename_task',
                                    callback=add_more_filenames)

# set up the graph of the DAG, in which the store_task and modify_task are called
# in sequence while the add_filename_task and add_more_filename_task are run in parallel.
d.define({
    store_task: modify_task,
    modify_task: [add_filename_task, add_more_filename_task]
})