Post

Monitor Data Pipelines with Jupyter

Using the power of Python with Pandas and Plotly, I wanted to create a runbook to serve as a quick healthcheck every morning over coffee. The existing data pipelines were built to log key statistics on every run to audit tables. With the audit tables as a basis, I built a Jupyter notebook to monitor key stats that would normally be buried in the ETL tool’s SQLite database that was not built for analytical purposes. This allows me to slowly wakeup as I caffeinate and plan my day according to visual observations. It’s a win-win!

Setup

Connectivity

As mentioned, each pipeline logs data to audit tables that are then used to generate the desired charts. To do this, I use the Python module sqlalchemy. In this example I’m using a SQLServer database, but the module supports other engines.

1
2
3
4
5
6
7
8
9
10
11
import pandas as pd
from sqlalchemy import create_engine
import datetime as dt
import plotly.express as px

#connect via trusted auth
engine = create_engine(
    'mssql+pyodbc://'
    '@RPCMPDB01/cdp?'
    'driver=ODBC+Driver+17+for+SQL+Server'
    )

Data

With the connection defined, I’m now free to populate my Panda dataframes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#load dataframes
df_audit = pd.read_sql(sql=auto_log, con=engine, index_col=['automation_name'], parse_dates=['start_date','end_date'])

#set data types and add calculated fields
df_audit['feed_instance_id'] = df_audit['feed_instance_id'].astype('string')
df_audit['total_elapse_secs'] = (df_audit['end_date'] - df_audit['start_date']).astype('timedelta64[s]').fillna(0)
df_audit['run_date'] = df_audit['start_date'].astype('datetime64[D]')
df_audit = df_audit.set_index(['run_date'], append=True)

df_audit_agg = df_audit.groupby(['automation_name','run_date'])['total_elapse_secs'].mean()
df_audit = df_audit.join(df_audit_agg, rsuffix='_mean')
df_audit = df_audit.fillna(value={'feed_instance_id':'1'})
df_audit_agg = df_audit.groupby(['automation_name','run_date'])['feed_instance_id'].count().fillna(1)
df_audit = df_audit.join(df_audit_agg, rsuffix='_count')

#filter out incomplete runs
df = df_audit[df_audit.end_date.notnull()].reset_index()

Plotly Gantt

My first exposure to Gantt charts was limited to waterfall project methodologies. In this use case, I’m able to draw a graphical representation to quickly inform me of:

  • missed data pipeline runs
  • anomalies in typical run times as runtimes are depicted by width

Below is an example of this in practice:

gantt example

Note how you can determine:

  • 22_101_auto (indicated in pink) failed to execute on 9/24
  • 23_101_auto (indicated in yellow) recently came on-line on 9/25
  • 17_104_auto and 20_102_auto have occassional spikes in runtimes (indicated by width) and that those can be safely be ignored based on historical trends

The above was created using the following:

1
2
3
4
5
6
7
8
9
10
#form timeline based on dataframes
fig1 = px.timeline(df, x_start='start_date', x_end='end_date', y='automation_name', hover_data=['automation_name','total_elapse_secs','total_elapse_secs_mean','has_error'], color='automation_name', title='Automation Timeline')
fig1.update_layout(autosize=False, width=2100, height=1100, plot_bgcolor='white')
fig1.update_yaxes(title=None, gridcolor='lightgrey')

dt_now = dt.datetime.now()
init_range = [ (dt_now - dt.timedelta(days=8)).strftime('%Y-%m-%d'), (dt_now + dt.timedelta(days=1)).strftime('%Y-%m-%d')]

fig1.update_xaxes(ticks='outside',gridcolor='lightgrey',rangeslider_visible=True, range=init_range)
fig1.show()

Plotly Box

When an abnormal runtime is noticed, it helps to know if the time is within a normal range. A box chart may be used to address this scenario.

box chart example

In this chart, I’m using a logarithmic scale to account for differing runtimes as some pipelines are more computational intensive. Each automation’s runtime is depicted based on its quartile distribution. Per Plotly’s documentation:

The ends of the box represent the lower and upper quartiles, while the median (second quartile) is marked by a line inside the box.

I’ve also added some additional graphical overlays to provide additional context.

  • Scatter plots for each run in the 30-day window are injected alongside each box to help visualize the runtime distribution
  • Additional trace is added, indicated by a different color dot within a distribution box, that indicates the today’s average. This allows one to quickly discern whether today’s run was on the lower-end (e.g. 24_101_auto) or on the higher-end (e.g. 17_104_auto). The later may be worthy of additional investigation.

The above was created using the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
min_cutoff = 600
prior_30_days = today - dt.timedelta(days=30)

long_auto = df_today.index.values

df_box_stats = df[(df['start_date'] >= pd.to_datetime(prior_30_days.strftime('%Y-%m-%d'))) & (df['automation_name'].isin(long_auto))]

fig_box = px.box(df_box_stats, x='automation_name', y='total_elapse_secs', points='all', color='automation_name', hover_data=['start_date'], title='30-Day Distribution', log_y=True)
fig_box.update_yaxes(rangemode="tozero", title='Elapsed Time (secs)', showgrid=True)
fig_box.update_xaxes(title='Automation', tickangle=90, showline=True, linewidth=1, linecolor='grey', showgrid=True, gridwidth=1, gridcolor='lightgrey')
fig_box.update_layout(autosize=False, width=2100, height=900, plot_bgcolor='white')

import plotly.graph_objects as go
for auto_name in long_auto:
  y_val = df_today.loc[auto_name].loc['Current Elapse Mean']
  if y_val == None:
    y_val = 0
  fig_box.add_trace(go.Scatter(x=[auto_name], y=[y_val], mode='markers',marker_symbol = 'circle-dot', marker_size=10, showlegend=False))
fig_box.show()
This post is licensed under CC BY 4.0 by the author.