from lightflow.models import Dag
from lightflow.tasks import PythonTask
# the callback function for all tasks
def inc_number(data, store, signal, context):
print('Task {task_name} being run in DAG {dag_name} '
'for workflow {workflow_name} ({workflow_id}) '
'on {worker_hostname}'.format(**context.to_dict()))
if 'value' not in data:
data['value'] = 0
data['value'] = data['value'] + 1
print('This is task #{}'.format(data['value']))
# create the main DAG
d = Dag('main_dag')
# create the 3 tasks that increment a number
task_1 = PythonTask(name='task_1',
task_2 = PythonTask(name='task_2',
task_3 = PythonTask(name='task_3',
# set up the graph of the DAG as a linear sequence of tasks
task_1: task_2,
task_2: task_3
from lightflow.models import Dag
from lightflow.tasks import PythonTask
# the callback function for the tasks
def print_info(data, store, signal, context):
print('Task {task_name} being run in DAG {dag_name} '
'for workflow {workflow_name} ({workflow_id})'.format(**context.to_dict()))
# create the main DAG
d = Dag('main_dag')
# task that limits the branching to certain successor tasks
branch_task = PythonTask(name='branch_task',
# first task, first lane
lane1_print_task = PythonTask(name='lane1_print_task',
# first task, second lane
lane2_print_task = PythonTask(name='lane2_print_task',
# first task, third lane
lane3_print_task = PythonTask(name='lane3_print_task',
# joins all three lanes together and waits for the predecessor tasks to finish processing
join_task = PythonTask(name='t_join_me',
# set up the graph of the DAG as illustrated above. Please note how a list of tasks
# defines tasks that are run in parallel (branched out).
d.define({branch_task: [lane1_print_task, lane2_print_task, lane3_print_task],
lane1_print_task: join_task,
lane2_print_task: join_task,
lane3_print_task: join_task})
from lightflow.models import Dag
from lightflow.tasks import PythonTask
# store the value 5 under the key 'value'
def put_data(data, store, signal, context):
data['value'] = 5
# print the name of the task and the current value
def print_data(data, store, signal, context):
print(context.task_name, 'The value is:', data['value'])
# square the current value
def square_data(data, store, signal, context):
data['value'] = data['value']**2
# multiply the value from the first dataset and the second dataset. Since the default
# dataset has never been changed, the default dataset is still the first (index==0)
# dataset in the list of all datasets. The second dataset is referenced by its index==1.
def multiply_data(data, store, signal, context):
data['value'] = data['value'] * data.get_by_index(1)['value']
# subtract two values by using the aliases of the two datasets and different functions
# for illustration purposes: get_by_alias() and the shorthand notation ([alias])
def subtract_data(data, store, signal, context):
data['value'] = data.get_by_alias('first')['value'] - data('second')['value']
# create the main DAG based on the diagram above
d = Dag('main_dag')
put_task = PythonTask(name='put_task', callback=put_data)
square_task = PythonTask(name='square_task', callback=square_data)
multiply_task = PythonTask(name='multiply_task', callback=multiply_data)
subtract_task = PythonTask(name='subtract_task', callback=subtract_data)
print_task_1 = PythonTask(name='print_task_1', callback=print_data)
print_task_2 = PythonTask(name='print_task_2', callback=print_data)
print_task_3 = PythonTask(name='print_task_3', callback=print_data)
print_task_4 = PythonTask(name='print_task_4', callback=print_data)
d.define({put_task: {print_task_1: None,
square_task: None,
multiply_task: None,
subtract_task: 'first'},
square_task: [print_task_2, multiply_task],
multiply_task: {print_task_3: None,
subtract_task: 'second'},
subtract_task: print_task_4})
from time import sleep
import numpy as np
from lightflow.models import Dag
from lightflow.tasks import PythonTask
# the callback function for the init task
def print_name(data, store, signal, context):
print('Task {task_name} being run in DAG {dag_name} '
'for workflow {workflow_name} ({workflow_id})'.format(**context.to_dict()))
# this callback function starts five dags. For each dag the function waits a second,
# then creates a numpy array and stores it into the data that is then passed to the
# sub dag. The dag that should be started can either be given by its name or the dag
# object itself. The names of the created dags are recorded and the task waits for
# all created dags to be completed.
def start_sub_dag(data, store, signal, context):
dag_names = []
for i in range(5):
data['image'] = np.ones((100, 100))
started_dag = signal.start_dag(sub_dag, data=data)
# this callback function prints the dimensions of the received numpy array
def sub_dag_print(data, store, signal, context):
print('Received an image with dimensions: {}'.format(data['image'].shape))
init_task = PythonTask(name='init_task',
call_dag_task = PythonTask(name='call_dag_task',
# create the main dag that runs the init task first, followed by the call_dag task.
main_dag = Dag('main_dag')
init_task: call_dag_task
# create the tasks for the sub dag that simply prints the shape of the numpy array
# passed down from the main dag.
print_task = PythonTask(name='print_task',
# create the sub dag that is being called by the main dag. In order to stop the
# system from automatically starting the dag when the workflow is run, set the autostart
# parameter to false.
sub_dag = Dag('sub_dag', autostart=False)
print_task: None
from lightflow.models import Dag
from lightflow.tasks import PythonTask
import numpy as np
# the callback function to store data in the persistent data store. It stores a single
# integer value in 'number', a single integer value into the hierarchical key
# 'buffer' -> 'observable' and a numpy array into 'image'. Additionally it adds an integer
# value to a list in 'sample' -> 'spectra'.
def store_data(data, store, signal, context):
store.set('number', 5)
store.set('buffer.observable', 20)
store.set('image', np.ones((100, 100)))
store.push('sample.spectra', 7)
# the callback function for the task that retrieves and prints the 'number' and 'image'
# values then modifies the 'number' value and creates a new list of 'filenames'.
def modify_data(data, store, signal, context):
number = store.get('number')
print('The number is: {}'.format(number))
img = store.get('image')
print('The dimension of the image is: {}'.format(img.shape))
store.set('number', number * 10)
store.push('filenames', 'file_a.spec')
# the callback function for the task that adds another filename to the list.
def add_filename(data, store, signal, context):
store.push('filenames', 'file_b.spec')
# the callback function for the task that adds a nested list to the list of filenames and
# then extends the list of filenames with two more entries.
def add_more_filenames(data, store, signal, context):
store.push('filenames', ['nested_a', 'nested_b'])
store.extend('filenames', ['file_c.spec', 'file_d.spec'])
# create the main DAG
d = Dag('main_dag')
# create the tasks that call the functions above
store_task = PythonTask(name='store_task',
modify_task = PythonTask(name='modify_task',
add_filename_task = PythonTask(name='add_filename_task',
add_more_filename_task = PythonTask(name='add_more_filename_task',
# set up the graph of the DAG, in which the store_task and modify_task are called
# in sequence while the add_filename_task and add_more_filename_task are run in parallel.
store_task: modify_task,
modify_task: [add_filename_task, add_more_filename_task]