About

To run this code yourself please visit the github repo.

Luigi is a python ETL framework built by Spotify. I use pandas in my day-to-day job and have created numerous pipeline tasks to move, transform, and analyze data across my organization. I thought Luigi would be a great addition to help manage these pipelines, but after reading their getting started documentation, it left me scratching my head.

If you are reading this, then I assume the docs have you confused as well, and hopefully, my post below can provide you with a bit more clarity. The post assumes you have already read the docs. If you haven't, please read that first before continuing.

Task Execution

Typical ETL Execution:

Task A $\longrightarrow$ Task B $\longrightarrow$ Task C

Luigi ETL Execution:

Task A $\longleftarrow$ Task B $\longleftarrow$ Task C

The most important thing to understand about Luigi is that it executes the ETL backward (recursively). It checks first to see if the current task (Task C) is completed. If not, it will then move backward to check if the previous task is completed (Task B). Once it finds the first completed task, it will then begin to execute the Tasks moving forward again.

This approach can save you a lot of time in your ETL. The reason is that you won't re-run completed tasks. However, this makes it a bit trickier to implement because you can find yourself in a situation where Task B or Task C will always return that they are complete, and Task A will never run.

How are Tasks linked?

Task A $\longleftarrow$ Task B $\longleftarrow$ Task C

Except for External Tasks, most other tasks are dependent on another Luigi Task. The way you define this dependency is by defining a requires() method in the Task Class and defining those dependent tasks(s). If the task is complete, it won't bother to check the dependent task(s).

What defines a completed Task?

Luigi considers a Task completed when the Task output exists. So if Task A outputs task_a.csv and it exists, then Task A will be considered complete. What the getting started docs fail to mention is that in reality, Task A is complete when Task A's method complete() returns True. The complete() method default behavior is to check if the output exists. We can override this behavior, and I would probably bet most do that have deployed Luigi into production.

Coding Demonstration of Task Execution

Task A $\longleftarrow$ Task B

The below code is an example of how to set up a Luigi Task. We have two classes, Task_A and Task_B, where Task_B is dependent on Task_A. I've provided comments in the output to help visualize the order of events that take place when the Tasks run.

There are few things to note about the code:

  • GlobalParam is a helper class to provide a global variable so I can count the execution events i.e. 1: complete () ...

  • I replaced the Luigi complete() method with a similar method that checks if the output file exists so we could see the method executed in the print statements.

  • MockTarget creates an in-memory file object that we can write to and check if it exists.

import luigi
from luigi.mock import MockTarget

class GlobalParams(luigi.Config):
    count = luigi.IntParameter(default=1)

class Task_A(luigi.Task):

    def output(self):
        return MockTarget("Task_A")

    def run(self):
        print(f"{g.count}: run() {self.__class__.__name__} has no prior Task dependency. It is now running to complete the task")
        g.count += 1 
        out = self.output().open("w")
        out.write('complete')
        out.close()
    
    def complete(self):
        print(f'{g.count}: complete() Checking to see if {self.__class__.__name__} has been completed')
        g.count += 1 
        return self.output().exists() 

        
class Task_B(luigi.Task):
    
    def requires(self):
        print(f'{g.count}: requires() {self.__class__.__name__} is not completed, checking to see if previous tasks are required and completed')
        g.count += 1 
        return Task_A()
            
    def output(self):
        return MockTarget("Task_B")

    def run(self):
        print(f'{g.count}: run() All previous tasks are completed and {self.__class__.__name__} is running to complete the task')
        g.count += 1 
        out = self.output().open("w")
        out.write('complete')
        out.close()
        print(f'{g.count}: All Tasks are completed')
        
    def complete(self):
        print(f'{g.count}: complete() Checking to see if {self.__class__.__name__} has been completed')
        g.count += 1 
        
        if self.output().exists():
            print(f'{g.count}: All Tasks are completed')
        return self.output().exists()

g = GlobalParams()
luigi.build([Task_B()], local_scheduler=True)
DEBUG: Checking if Task_B() is complete
DEBUG: Checking if Task_A() is complete
INFO: Informed scheduler that task   Task_B__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Task_A__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 2456] Worker Worker(salt=998663148, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   Task_A()
INFO: [pid 2456] Worker Worker(salt=998663148, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      Task_A()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Task_A__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=998663148, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   Task_B()
INFO: [pid 2456] Worker Worker(salt=998663148, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      Task_B()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Task_B__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=998663148, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Task_A()
    - 1 Task_B()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

1: complete() Checking to see if Task_B has been completed
2: requires() Task_B is not completed, checking to see if previous tasks are required and completed
3: complete() Checking to see if Task_A has been completed
4: run() Task_A has no prior Task dependency. It is now running to complete the task
5: requires() Task_B is not completed, checking to see if previous tasks are required and completed
6: complete() Checking to see if Task_A has been completed
7: run() All previous tasks are completed and Task_B is running to complete the task
8: All Tasks are completed
True

When we run the Task a second-time, note how Task_A is not referenced. Luigi checked to see if Task_B was complete and stopped the execution since it returned True. That means, that if some file upstream is updated and needed to be transformed by Task_Ait would not occur since Luigi would always stop at Task_B.

g.count=1
luigi.build([Task_B()], local_scheduler=True)
DEBUG: Checking if Task_B() is complete
INFO: Informed scheduler that task   Task_B__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=177125647, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Task_B()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

1: complete() Checking to see if Task_B has been completed
2: All Tasks are completed
True

Luigi Paramaters

Words $\longleftarrow$ Count

Parameters are Luigi's intended way to make sure tasks get updated based on some frequency to make sure they don't get stuck in a "complete" status. Luigi offers their own Parameter object that is mostly intended to act as a constructor when executing tasks from the command line.

Below we have created two new Tasks, Words and Count. Each task takes a date as a parameter and appends the date to the file name output. You'll also notice I removed the complete() method. This means it will default to the original method that also checks if the output target exists more robustly.

import datetime
import pandas as pd
from pathlib import Path

OUTPUT_PATH = Path('output')

class Words(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
    
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'words_{self.date}.csv')

    def run(self):
        words = ['apple','banana','grapefruit']

        df = pd.DataFrame(dict(words=words))
        df.to_csv(self.output().path, index=False)
        
class Count(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
    
    def requires(self):
        # Passing the luigi paramater back to upstream task
        return Words(self.date) 
            
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'count_{self.date}.csv')

    def run(self):
        df = pd.read_csv(self.input().path)
        df['letter_count'] = df.words.map(len)
        df.to_csv(self.output().path, index=False)
        
luigi.build([Count()], local_scheduler=True)
DEBUG: Checking if Count(date=2020-10-26) is complete
DEBUG: Checking if Words(date=2020-10-26) is complete
INFO: Informed scheduler that task   Count_2020_10_26_424115e443   has status   PENDING
INFO: Informed scheduler that task   Words_2020_10_26_424115e443   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 2456] Worker Worker(salt=660151019, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   Words(date=2020-10-26)
INFO: [pid 2456] Worker Worker(salt=660151019, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      Words(date=2020-10-26)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Words_2020_10_26_424115e443   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=660151019, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   Count(date=2020-10-26)
INFO: [pid 2456] Worker Worker(salt=660151019, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      Count(date=2020-10-26)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Count_2020_10_26_424115e443   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=660151019, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Count(date=2020-10-26)
    - 1 Words(date=2020-10-26)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

True
//Directory Tree:
output/
├── count_2020-10-26.csv
└── words_2020-10-26.csv

Above, our Tasks ran successfully and saved the outputs in our output directory. So what happens if we were to run it a second time?

luigi.build([Count()], local_scheduler=True)
DEBUG: Checking if Count(date=2020-10-26) is complete
INFO: Informed scheduler that task   Count_2020_10_26_424115e443   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=832748578, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Count(date=2020-10-26)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

True
//Directory Tree:
output/
├── count_2020-10-26.csv
└── words_2020-10-26.csv

As you can see, nothing happened since the Count task encountered an output that already existed with the same name. Below we'll provide a different date to the Count task.

luigi.build([Count(date=pd.to_datetime('10/25/2021'))], local_scheduler=True)
DEBUG: Checking if Count(date=2021-10-25) is complete
DEBUG: Checking if Words(date=2021-10-25) is complete
INFO: Informed scheduler that task   Count_2021_10_25_8a7563aba6   has status   PENDING
INFO: Informed scheduler that task   Words_2021_10_25_8a7563aba6   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 2456] Worker Worker(salt=766530749, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   Words(date=2021-10-25)
INFO: [pid 2456] Worker Worker(salt=766530749, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      Words(date=2021-10-25)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Words_2021_10_25_8a7563aba6   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=766530749, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   Count(date=2021-10-25)
INFO: [pid 2456] Worker Worker(salt=766530749, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      Count(date=2021-10-25)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Count_2021_10_25_8a7563aba6   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=766530749, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Count(date=2021-10-25)
    - 1 Words(date=2021-10-25)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

True
//Directory Tree:
output/
├── count_2020-10-26.csv
├── count_2021-10-25.csv
├── words_2020-10-26.csv
└── words_2021-10-25.csv

External Tasks

If your pipeline starts with some dependency from an External Task, you can utilize the ExternalTask object. The External Task is the same as the Task object except it doesn't have a run() method.

External Task is useful because it allows for your task to gracefully end a job if the external source criteria are not met.

class Words(luigi.ExternalTask):
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/f'words.csv')

luigi.build([Words()], local_scheduler=True)
DEBUG: Checking if Words() is complete
WARNING: Data for Words() does not exist (yet?). The task is an external data dependency, so it cannot be run from this luigi process.
INFO: Informed scheduler that task   Words__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=806349904, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 were left pending, among these:
    * 1 were missing external dependencies:
        - 1 Words()

Did not run any tasks
This progress looks :| because there were missing external dependencies

===== Luigi Execution Summary =====

True

Above, the Words external task did not run because words.csv, the external dependency, was missing.

OUTPUT_PATH = Path('output')

words = ['apple','banana','grapefruit']

df = pd.DataFrame(dict(words=words))
df.to_csv(OUTPUT_PATH/'words.csv', index=False)

Now that we created words.csv our external task will return as completed and pass its output to the next Task if it exists.

luigi.build([Words()], local_scheduler=True)
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=781673351, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 Words()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

True

Alternate Complete Method

As I mentioned earlier, by default Luigi determines if a Task is complete by checking if the output exists. However, there is a common use case in pipeline workflows where Tasks should be run when a file is updated. Since Luigi only checks the output name it will determine that a Task is completed no matter how many times a file gets updated.

However, we can override this method by overriding the complete() method in the Task object by defining your criteria. It needs to return False if the task is not complete and True if the task is complete.

Below we will create our own complete() function that will update all tasks where their dependent task output files have been updated. It will first check to see if the dependency tasks have been completed, then check to see if the modified time of the output of the current task is greater than the modified time of the prior task's output.

import os
import time

class Words(luigi.ExternalTask):
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/'words.csv')
    
class CountLetters(luigi.Task):

    def requires(self):
        return Words()

    # Custom Complete Method
    def complete(self):        
        if not self.output().exists():
            print('//Count Letters: No Output File')
            return False

        input_mtime = time.ctime(os.path.getmtime(self.input().path))
        output_mtime = time.ctime(os.path.getmtime(self.output().path))            
        
        if output_mtime < input_mtime:
            print('//Count Letters: File Out of Date')
            return False
        
        print('//Count Letters: Task is Complete')
        return True

    
    def output(self):
        return luigi.LocalTarget(OUTPUT_PATH/'count_letters.csv')

    def run(self):
        df = pd.read_csv(self.input().path)
        df['letter_count'] = df.words.map(len)
        df.to_csv(self.output().path, index=False)
luigi.build([CountLetters()], local_scheduler=True)
DEBUG: Checking if CountLetters() is complete
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=437114638, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   CountLetters()
INFO: [pid 2456] Worker Worker(salt=437114638, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      CountLetters()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=437114638, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 Words()
* 1 ran successfully:
    - 1 CountLetters()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

//Count Letters: No Output File
True
//Directory Tree:
output/
├── count_letters.csv
└── words.csv

When we run the task again it doesn't run any tasks because words.csv exists and its modified time (mtime) is less than count_letters.csv modified time.

luigi.build([CountLetters()], local_scheduler=True)
DEBUG: Checking if CountLetters() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=438850535, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 CountLetters()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

//Count Letters: Task is Complete
True

We will now update count_letters.csv to include a few more words and watch our Luigi run the Task because of the updated modification times on the files.

words = ['apple','banana','grapefruit', 'cherry', 'orange']

df = pd.DataFrame(dict(words=words))
df.to_csv(OUTPUT_PATH/'words.csv', index=False)

luigi.build([CountLetters()], local_scheduler=True)
DEBUG: Checking if CountLetters() is complete
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=695141065, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   CountLetters()
INFO: [pid 2456] Worker Worker(salt=695141065, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      CountLetters()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=695141065, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 Words()
* 1 ran successfully:
    - 1 CountLetters()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

//Count Letters: File Out of Date
True

SQL Tasks

Words $\longleftarrow$ Count $\longleftarrow$ StoreSQL $\longleftarrow$ PrintSQL

SQL is a common step in many pipelines but the Luigi getting started docs barely cover the topic. In this section we will create two new tasks. The first task, StoreSql, will take the ouput from CountLetters and store it in a SQLite database. The second task, PrintSQL, will then read from out database and print both tables that Luigi created.

The big difference between LocalTarget is that SQLAlchemyTarget creates and updates a "Marker Table" to keep track of whether a task is complete or not. You provide the Marker Table with a update_id and Luigi will check if it exists before running the Task.

Below I've provided the code for our StoreSQL and PrintSQL tasks. There are a couple of things worth noting.

  • We have overridden the complete() method to check if the prior task CountLetters has been completed and if the StoreSQL task output exists. If either returns False the Task will run.
  • We are creating an SQLite database called my.db.
  • self.output().touch() is what marks the Task as complete and creates/updates the Marker Table
  • PrintSQL is complete method is set to False so that it always runs for demonstration purposes.
//Directory Tree:
output/
├── count_letters.csv
└── words.csv
from luigi.contrib import sqla
from sqlalchemy import create_engine

OUTPUT_PATH = Path('output')
connection_string = f"sqlite:///{OUTPUT_PATH}/my.db"

outputs = []

class StoreSQL(luigi.Task):
    connection_string = luigi.Parameter()
    target_table = luigi.Parameter()
    
    @property
    def update_id(self):
        mtime = os.path.getmtime(self.input().path)
        mtime = datetime.datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M:%S")
        return mtime + '_' + self.target_table

    def complete(self):
        
        if not self.output().exists():
            return False
        
        if not self.requires().complete():
            return False
        
        return True
        
    def requires(self):
        return CountLetters()

    def output(self):
        return sqla.SQLAlchemyTarget(
            connection_string=self.connection_string,
            target_table=self.target_table,
            update_id=self.update_id
        )

    def run(self):
        self.requires().complete()
        con = self.output().engine
        df = pd.read_csv(self.input().path)
        df.to_sql(name=self.target_table, con=con, if_exists='replace')

        # Update Marker Table
        self.output().touch()


class PrintSQL(luigi.Task):
    connection_string = luigi.Parameter()
    target_table = luigi.Parameter()
    
    def requires(self):
        return StoreSQL(self.connection_string, self.target_table)

    def complete(self):
        return False

    def output(self):
        pass

    def run(self):
        input = self.input()
        con = input.engine
        table = input.target_table
        
        print('// Letter Count Table')
        print(pd.read_sql(sql=table, con=con), end='\n\n')
        print('// Marker Table')
        print(pd.read_sql(sql='table_updates', con=con))
luigi.build([PrintSQL(connection_string, target_table='letter_count')], local_scheduler=True)
DEBUG: Checking if PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
DEBUG: Checking if StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
DEBUG: Checking if CountLetters() is complete
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 2456] Worker Worker(salt=382729348, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 2456] Worker Worker(salt=382729348, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=382729348, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 2456] Worker Worker(salt=382729348, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=382729348, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 complete ones were encountered:
    - 1 CountLetters()
* 2 ran successfully:
    - 1 PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
    - 1 StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

// Letter Count Table
   index       words  letter_count
0      0       apple             5
1      1      banana             6
2      2  grapefruit            10
3      3      cherry             6
4      4      orange             6

// Marker Table
                          update_id  target_table                   inserted
0  2020-10-26 17:09:56_letter_count  letter_count 2020-10-26 17:10:30.763616
True
Output Directory Tree:
output/
├── count_letters.csv
├── my.db
└── words.csv

As you can see from above, our SQL Task has updated two tables and printed out the table results. If you look at the Marker table update_id column you'll notice it is the concatenation of our count_letters.csv mtime and target table name.

Now let's update our words.csv and see what happens when we run the task again.

words = ['apple','banana','grapefruit', 'cherry', 'orange', 'peach', 'strawberry']

df = pd.DataFrame(dict(words=words))
df.to_csv(OUTPUT_PATH/'words.csv', index=False)
luigi.build([PrintSQL(connection_string, target_table='letter_count')], local_scheduler=True)
DEBUG: Checking if PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
DEBUG: Checking if StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
DEBUG: Checking if CountLetters() is complete
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
DEBUG: Checking if Words() is complete
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Words__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 2456] Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   CountLetters()
INFO: [pid 2456] Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      CountLetters()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CountLetters__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 2456] Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 2456] Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 2456] Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=051857807, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 1 complete ones were encountered:
    - 1 Words()
* 3 ran successfully:
    - 1 CountLetters()
    - 1 PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
    - 1 StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

// Letter Count Table
   index       words  letter_count
0      0       apple             5
1      1      banana             6
2      2  grapefruit            10
3      3      cherry             6
4      4      orange             6
5      5       peach             5
6      6  strawberry            10

// Marker Table
                          update_id  target_table                   inserted
0  2020-10-26 17:09:56_letter_count  letter_count 2020-10-26 17:10:30.763616
1  2020-10-26 17:10:34_letter_count  letter_count 2020-10-26 17:10:34.734229
True

As expected, our Letter Count table updated and the Marker Table's contains a new row to represent the task completing.

Let's run the task one more time without updating words.csv.

luigi.build([PrintSQL(connection_string, target_table='letter_count')], local_scheduler=True)
DEBUG: Checking if PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
DEBUG: Checking if StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count) is complete
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   PENDING
INFO: Informed scheduler that task   StoreSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 2456] Worker Worker(salt=023539738, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) running   PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
INFO: [pid 2456] Worker Worker(salt=023539738, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) done      PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PrintSQL_sqlite____output_letter_count_4c5210e673   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=023539738, workers=1, host=DESKTOP-BCU4BGH, username=Mike, pid=2456) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 StoreSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)
* 1 ran successfully:
    - 1 PrintSQL(connection_string=sqlite:///output/my.db, target_table=letter_count)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

// Letter Count Table
   index       words  letter_count
0      0       apple             5
1      1      banana             6
2      2  grapefruit            10
3      3      cherry             6
4      4      orange             6
5      5       peach             5
6      6  strawberry            10

// Marker Table
                          update_id  target_table                   inserted
0  2020-10-26 17:09:56_letter_count  letter_count 2020-10-26 17:10:30.763616
1  2020-10-26 17:10:34_letter_count  letter_count 2020-10-26 17:10:34.734229
True

The only task that ran was the PrintSQL task, and our other task(s) didn't run. The Marker Table was also not updated.