gears

Luigi is a great tool for orchestrating complex data analysis pipelines. One potential pain point, however, is how to configure a pipeline with many tasks and, potentially, many parameters. We present a method of configuring the tasks that hopefully makes life simpler.

Consider a simple example pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import luigi


class TaskA(luigi.Task):

  foo = luigi.Parameter()

  def output(self):
    return luigi.LocalTarget('TaskA.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('Foo is {}'.format(self.foo))


class TaskB(luigi.Task):

  foo = luigi.Parameter()
  bar = luigi.Parameter()

  def requires(self):
    return TaskA(foo=self.foo)

  def output(self):
    return luigi.LocalTarget('TaskB.output')

  def run(self):
    with self.output().open('w') as f:
    f.write('Bar is {}'.format(self.bar))


class TaskC(luigi.Task):

  foo = luigi.Parameter()
  bar = luigi.Parameter()

  def requires(self):
    return TaskA(foo=self.foo)

  def output(self):
    return luigi.LocalTarget('TaskC.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('Bar is {}'.format(self.bar))


class TaskD(luigi.Task):

  foo = luigi.Parameter()
  bar = luigi.Parameter()
  baz = luigi.Parameter()

  def requires(self):
    return {
      'B': TaskB(foo=self.foo, bar=self.bar),
      'C': TaskC(foo=self.foo, bar=self.bar),
    }

  def output(self):
    return luigi.LocalTarget('TaskD.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('Baz is {}'.format(self.baz))

This defines a simple dependency graph like:

A simple Luigi dependency graph

The pipeline can then be run on the command line (assuming the above code is stored in a file called luigi_example.py):

1
luigi --module luigi_example TaskD --foo 'foo' --bar 'bar' --baz 'baz'

One issue that you may notice is that some parameters are defined on tasks, but are just passed through to their dependencies. For example, TaskB does not use the foo parameter in its run method, but requires it in order to define the TaskA dependency in requires.

It is possible to specify the parameters only on the classes that actually use the parameters instead:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import luigi


class TaskA(luigi.Task):

  foo = luigi.Parameter()

  def output(self):
    return luigi.LocalTarget('TaskA.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('I am {}'.format(self))


class TaskB(luigi.Task):

  bar = luigi.Parameter()

  def requires(self):
    return TaskA()

  def output(self):
    return luigi.LocalTarget('TaskB.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('I am {}'.format(self))


class TaskC(luigi.Task):

  bar = luigi.Parameter()

  def requires(self):
    return TaskA()

  def output(self):
    return luigi.LocalTarget('TaskC.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('I am {}'.format(self))


class TaskD(luigi.Task):

  baz = luigi.Parameter()

  def requires(self):
    return {
      'B': TaskB(),
      'C': TaskC(),
    }

  def output(self):
    return luigi.LocalTarget('TaskD.output')

  def run(self):
    with self.output().open('w') as f:
      f.write('I am {}'.format(self))

However, this complicates the matter of invoking the pipeline. The command to run the pipeline is now:

1
luigi --module luigi_example TaskD --baz 'baz' --TaskB-bar 'bar' --TaskC-bar 'bar' --TaskA-foo 'foo'

Configuration Files to the Rescue

Rather than specifying the parameter values on the command line, we can instead write a configuration file and specify that Luigi use the file to configure tasks. For example, we could use the following configuration file:

1
2
3
4
5
6
7
8
9
10
11
[TaskA]
foo: foo

[TaskB]
bar: bar

[TaskC]
bar: bar

[TaskD]
baz: baz

Assuming this is saved in a file example.cfg, we can then run the Luigi pipeline using:

1
LUIGI_CONFIG_PATH=example.cfg luigi --module luigi_example TaskD

Using a config file has the added benefit of being able to commit the file to source control to preserve the parameters used for generating a particular artifact of the Luigi pipeline.

The configuration file has a couple other tricks up its sleeve. In the above example, we’re still duplicating the value of the bar parameter. Since the file is parsed using ConfigParser, we can take advantage of the DEFAULT section that each task section inherits:

1
2
3
4
5
6
7
8
9
10
11
12
[DEFAULT]
bar: bar

[TaskA]
foo: foo

[TaskB]

[TaskC]

[TaskD]
baz: baz

Note, the task must still have a section defined in the file, even though it may be empty. There’s no change to the command needed to run the pipeline with this updated file:

1
LUIGI_CONFIG_PATH=example.cfg luigi --module luigi_example TaskD

Now we have a persistent configuration that we can store is source control with no duplication of parameters and a simplified command to run it!