Summary

This is both a general(ish) explainer and template(s) for usage in data science, specifically utilizing (formerly) Quantum Black’s now open-source (Linux Foundation) framework for creating What does that mean? At its most simple it basically ties things into "nodes" and "pipelines", where nodes are utility functions and functions that tie things together, and pipelines chain together multiple nodes, allowing for steps to be easily repeated. DAGs, they’re all the rage. Make sure to read their tutorial if you’re not familiar with it, it’s good (and I won’t be covering nearly as much as there, it’s under the assumption you’ve skimmed it):

https://kedro.readthedocs.io/en/stable/tutorial/spaceflights_tutorial.html

Some benefits you get using it with org-mode and emacs, however:

  • Single file navigation / no need-to bounce around VSCode haphazardly when you can jump between headers and tangle to the appropriate places.
  • Transition to a presentation from inside Org-mode seamlessly
  • Include data exploration and other documentation in the same file.
  • Other languages with org-babel

Kedro uses a specific workflow which I will follow for this template/tutorial.

I’m not entirely sure how useful this will be, but it allows me to get some practice with org-babel and a project excuse to tinker around with kedro some more in a use-case that I haven’t seen.

This will be more of a “update as I come up with new ideas” type of article, and, of course, if you see something that would be better served by a different method, feel free to leave a comment / issue.

Highlevel / TLDR: This is for using org to config Kedro. Similar to using a monolithic org file for configing emacs. Geared towards people who already use emacs and use something like ivy/helm/vertico to traverse headings easily.

Assumptions

  • I’m using conda for environment management (conda.el / jupyter-python)
  • You’ve created the kedro project already since there’s the y/n steps insofar as telemetry, etc.
    • TODO: maybe put the steps here? Probably not worth the timesink insofar as a template. The steps would 95% of the time be done in the command line but once.

Kedro Spaceflights Tutorial

Scenario: It is 2160 and the space tourism industry is booming. Globally, thousands of space shuttle companies take tourists to the Moon and back. You have been able to source amenities offered in each space shuttle, customer reviews and company information.

Project: You want to construct a model that predicts the price for each trip to the Moon and the corresponding return flight.

Setup

This is for environment setup, utility functions that you might not necessarily want to put in your init.el or other things not directly related to the core of the project.

Emacs and Shell

  • Elisp

    Commands and utility functions that help reduce repeated code and make the process more seamless. Run manually with C-c C-c. One could alternatively add this to their init.el.

    The following code is used to prevent org-mode from complaining about the variables. Otherwise you can set these as local variables in a real-life scenario.

    (setq projdir "~/code/literate-kedro")
    (setq projsub "/src/literate_kedro")
    (print projdir)
    
  • Shell / Other

    If you plan on using the terminal for all shell commands, this isn’t necessary. You can simply activate your conda environment with conda-env-activate under the assumption you’re using conda.el. Because the session is set at the top level, it should persist through the following commands. See the echo commands below confirming.

    conda activate kedro-example
    

    Making sure I’m in the correct directory / using the correct environment.

    echo $(pwd)
    echo $(which pip)
    

requirements.txt

The requirements for the kedro project in the form of a

black~=22.0
flake8>=3.7.9, <4.0
ipython>=7.31.1, <8.0
isort~=5.0
jupyter~=1.0
jupyterlab~=3.0
kedro[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet, plotly.PlotlyDataSet, plotly.JSONDataSet]==0.18.2
kedro-telemetry~=0.2.0
kedro-viz~=4.7
nbstripout~=0.4
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
pytest~=6.2
scikit-learn~=1.0

Configuration

The conf folder in kedro is for:

  • Logging
  • Credentials
  • Other Sensitive/Personal content

TODO: These are things that generally I would be more comfortable configuring outside of org. You could probably get fancy with piping things with keys back and forth but that remains outside of my scope for now.

You could probably get away with safely configing your logging here without needing anything fancy.

  • data_science.yml

    data_science:
      active_modelling_pipeline:
        model_options:
          test_size: 0.2
          random_state: 3
          features:
            - engines
            - passenger_capacity
            - crew
            - d_check_complete
            - moon_clearance_complete
            - iata_approved
            - company_rating
            - review_scores_rating
      candidate_modelling_pipeline:
        model_options:
          test_size: 0.2
          random_state: 8
          features:
            - engines
            - passenger_capacity
            - crew
            - review_scores_rating
    
    data_processing.companies_columns:
      type: tracking.JSONDataSet
      filepath: data/09_tracking/companies_columns.json
    
    data_science.active_modelling_pipeline.metrics:
      type: tracking.MetricsDataSet
      filepath: data/09_tracking/metrics.json
    
  • settings.py

    Normally you don’t edit this, but there’s some cases where it’s necessary. This is for storing metrics in a sqlite database.

    from kedro_viz.integrations.kedro.sqlite_store import SQLiteStore
    from pathlib import Path
    
    SESSION_STORE_CLASS = SQLiteStore
    SESSION_STORE_ARGS = {"path": str(Path(__file__).parents[2] / "data")}
    

Data

The second part of the kedro workflow is the data phase, which involves adding data to the data folder, and then referencing the datasets for the project in the conf/base/catalog.yml file.

Preparation

Steps involved in acquiring / registering the data to the appropriate catalog(s). You could explain the business case or variables also.

  • Acquisition

    In some cases data is easily acquired with a curl/wget, specifically forone-off analyses. It can also be helpful to show where you downloaded your data.

    mkdir -p $directory/
    #reviews
    curl -o "$directory/reviews.csv" https://kedro-org.github.io/kedro/reviews.csv
    # companies
    curl -o "$directory/companies.csv" https://kedro-org.github.io/kedro/companies.csv
    # shuttles
    curl -o "$directory/shuttles.xlsx" https://kedro-org.github.io/kedro/shuttles.xlsx
    
  • Data Registration

    • catalog.yml

      You now need to register the datasets so they can be loaded by Kedro. All Kedro projects have a conf/base/catalog.yml. file.

      companies:
        type: pandas.CSVDataSet
        filepath: data/01_raw/companies.csv
        layer: raw
      
      reviews:
        type: pandas.CSVDataSet
        filepath: data/01_raw/reviews.csv
        layer: raw
      
      shuttles:
        type: pandas.ExcelDataSet
        filepath: data/01_raw/shuttles.xlsx
        layer: raw
      
      data_processing.preprocessed_companies:
        type: pandas.ParquetDataSet
        filepath: data/02_intermediate/preprocessed_companies.pq
        layer: intermediate
      
      data_processing.preprocessed_shuttles:
        type: pandas.ParquetDataSet
        filepath: data/02_intermediate/preprocessed_shuttles.pq
        layer: intermediate
      
      model_input_table:
        type: pandas.ParquetDataSet
        filepath: data/03_primary/model_input_table.pq
        layer: primary
      
      data_science.active_modelling_pipeline.regressor:
        type: pickle.PickleDataSet
        filepath: data/06_models/regressor_active.pickle
        versioned: true
        layer: models
      
      data_science.candidate_modelling_pipeline.regressor:
        type: pickle.PickleDataSet
        filepath: data/06_models/regressor_candidate.pickle
        versioned: true
        layer: models
      
      data_science.active_modelling_pipeline.metrics:
        type: tracking.MetricsDataSet
        filepath: data/09_tracking/metrics.json
      
      data_processing.companies_columns:
        type: tracking.JSONDataSet
        filepath: data/09_tracking/companies_columns.json
      

Pipelines

These are the node functions associated with the data_processing pipeline.

In many typical Kedro projects, a single (“main”) pipeline increases in complexity as the project evolves. To keep your project fit for purpose, you can create modular pipelines, which are logically isolated and can be reused. Modular pipelines are easier to develop, test and maintain, and are portable so they can be copied and reused between projects.

  • data_processing

    This pipeline is for processing the data.

    • nodes.py

      A Kedro node is a wrapper for a Python function that names the inputs and outputs of that function. It is the building block of a pipeline. Nodes can be linked when the output of one node is the input of another.

      NOTE: Could theoretically break this into seperate codeblocks (import, utility, preprocess, but might be a bit cumbersome. Input welcome for best practices.)

      from typing import Tuple, Dict
      
      import pandas as pd
      
      
      def _is_true(x: pd.Series) -> pd.Series:
          return x == "t"
      
      
      def _parse_percentage(x: pd.Series) -> pd.Series:
          x = x.str.replace("%", "")
          x = x.astype(float) / 100
          return x
      
      
      def _parse_money(x: pd.Series) -> pd.Series:
          x = x.str.replace("$", "").str.replace(",", "")
          x = x.astype(float)
          return x
      
      def preprocess_companies(companies: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
          """Preprocesses the data for companies.
      
          Args:
              companies: Raw data.
          Returns:
              Preprocessed data, with `company_rating` converted to a float and
              `iata_approved` converted to boolean.
          """
          companies["iata_approved"] = _is_true(companies["iata_approved"])
          companies["company_rating"] = _parse_percentage(companies["company_rating"])
          return companies, {"columns": companies.columns.tolist(), "data_type": "companies"}
      
      
      def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
          """Preprocesses the data for shuttles.
      
          Args:
              shuttles: Raw data.
          Returns:
              Preprocessed data, with `price` converted to a float and `d_check_complete`,
              `moon_clearance_complete` converted to boolean.
          """
          shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
          shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
          shuttles["price"] = _parse_money(shuttles["price"])
          return shuttles
      
      def create_model_input_table(
          shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
      ) -> pd.DataFrame:
          """Combines all data to create a model input table.
      
          Args:
              shuttles: Preprocessed data for shuttles.
              companies: Preprocessed data for companies.
              reviews: Raw data for reviews.
          Returns:
              model input table.
      
          """
          rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
          model_input_table = rated_shuttles.merge(
              companies, left_on="company_id", right_on="id"
          )
          model_input_table = model_input_table.dropna()
          return model_input_table
      
    • pipeline.py

      A Kedro pipeline organises the dependencies and execution order of a collection of nodes, and connects inputs and outputs. The pipeline determines the node execution order by resolving dependencies.

      from kedro.pipeline import Pipeline, node
      from kedro.pipeline.modular_pipeline import pipeline
      
      from .nodes import (
          preprocess_companies,
          preprocess_shuttles,
          create_model_input_table,
      )
      
      
      def create_pipeline(**kwargs) -> Pipeline:
          return pipeline(
              [
                  node(
                      func=preprocess_companies,
                      inputs="companies",
                      outputs=["preprocessed_companies","companies_columns"],
                      name="preprocess_companies_node",
                  ),
                  node(
                      func=preprocess_shuttles,
                      inputs="shuttles",
                      outputs="preprocessed_shuttles",
                      name="preprocess_shuttles_node",
                  ),
                  node(
                      func=create_model_input_table,
                      inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                      outputs="model_input_table",
                      name="create_model_input_table_node",
                  ),
      
              ],
              namespace="data_processing",
              inputs=["companies", "shuttles", "reviews"],
              outputs="model_input_table",
          )
      
  • data_science

    This pipeline is for performing data science on the data previously processed.

    • nodes.py

      import logging
      from typing import Dict, Tuple
      
      import pandas as pd
      from sklearn.linear_model import LinearRegression
      from sklearn.metrics import r2_score, mean_absolute_error, max_error
      from sklearn.model_selection import train_test_split
      
      
      def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
          """Splits data into features and targets training and test sets.
      
          Args:
              data: Data containing features and target.
              parameters: Parameters defined in parameters/data_science.yml.
          Returns:
              Split data.
          """
          X = data[parameters["features"]]
          y = data["price"]
          X_train, X_test, y_train, y_test = train_test_split(
              X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
          )
          return X_train, X_test, y_train, y_test
      
      
      def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
          """Trains the linear regression model.
      
          Args:
              X_train: Training data of independent features.
              y_train: Training data for price.
      
          Returns:
              Trained model.
          """
          regressor = LinearRegression()
          regressor.fit(X_train, y_train)
          return regressor
      
      
      def evaluate_model(
          regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
      ) -> Dict[str, float]:
          """Calculates and logs the coefficient of determination.
      
          Args:
              regressor: Trained model.
              X_test: Testing data of independent features.
              y_test: Testing data for price.
          """
          y_pred = regressor.predict(X_test)
          score = r2_score(y_test, y_pred)
          mae = mean_absolute_error(y_test, y_pred)
          me = max_error(y_test, y_pred)
          logger = logging.getLogger(__name__)
          logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
          return {"r2_score": score, "mae": mae, "max_error": me}
      
    • pipeline.py

      from kedro.pipeline import Pipeline, node
      from kedro.pipeline.modular_pipeline import pipeline
      
      from .nodes import evaluate_model, split_data, train_model
      
      
      def create_pipeline(**kwargs) -> Pipeline:
          pipeline_instance = pipeline(
              [
                  node(
                      func=split_data,
                      inputs=["model_input_table", "params:model_options"],
                      outputs=["X_train", "X_test", "y_train", "y_test"],
                      name="split_data_node",
                  ),
                  node(
                      func=train_model,
                      inputs=["X_train", "y_train"],
                      outputs="regressor",
                      name="train_model_node",
                  ),
                  node(
                      func=evaluate_model,
                      inputs=["regressor", "X_test", "y_test"],
                      outputs="metrics",
                      name="evaluate_model_node",
                  ),
              ]
          )
          ds_pipeline_1 = pipeline(
              pipe=pipeline_instance,
              inputs="model_input_table",
              namespace="active_modelling_pipeline",
          )
          ds_pipeline_2 = pipeline(
              pipe=pipeline_instance,
              inputs="model_input_table",
              namespace="candidate_modelling_pipeline",
          )
          return pipeline(
              pipe=ds_pipeline_1 + ds_pipeline_2,
              inputs="model_input_table",
              namespace="data_science",
          )
      
  • pipeline_registry.py

    This is the pipeline registry, where you add the appropriate pipelines defined further down into the register function. This is how kedro recognizes which pipelines to run.

    from typing import Dict
    
    from kedro.pipeline import Pipeline
    
    from literate_kedro.pipelines import data_processing as dp
    from literate_kedro.pipelines import data_science as ds
    
    
    def register_pipelines() -> Dict[str, Pipeline]:
        """Register the project's pipelines.
    
        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.
    
        """
        data_processing_pipeline = dp.create_pipeline()
        data_science_pipeline = ds.create_pipeline()
    
        return {
            "__default__": data_processing_pipeline + data_science_pipeline,
            "dp": data_processing_pipeline,
            "ds": data_science_pipeline,
        }
    

Conclusions

This is pretty much all you need to replicate the kedro tutorial from within a single org-file. I would be remiss to not mention that kedro itself has a pretty good alternative of simply using their jupyter plugin, but I tend to use emacs all the time anyways, so I figured, what the hey.

There are still a couple things I’d like to ponder over insofar as best practices and future articles like:

  • Should you split up the nodes and pipelines further? Using literate programmign to basically make a utility, processing, etc. code block.
  • Version control, only on the tangled files?
  • Making use of transclusion from other files.
  • Turning the “relevant parts”’ into a presentation - slicing the org document into exploratory analysis that ignores the code depending on audience.

I’ll probably try to write an article doing a “real” analysis with this workflow in the future.

cp ./literate-kedro-spaceflight.org ~/code/justin.vc/static/org/literate-kedro-spaceflight.org

This copies the file when I export, which allows you to see the original org file if curious. That specifically includes how I tangled the files to the correct places. (And, once again, input welcome if I there’s better ways to do it).