model monitoring, data drift, concept shift,

Python Code Examples for Detecting Data Drift

Numal Numal Follow Jun 11, 2021 · 15 mins read
Python Code Examples for Detecting Data Drift
Share this

Let’s take a look at the code for calculating some of the different Statistical distances, namely:

  • Kolmogorov–Smirnov (KS)
  • First Wassertein Distance (Earth Mover’s distance)
  • Cramér-von Mises (CM) distance (Energy distance)
  • Population Stability Index (PSI)
import pandas as pd
# Import Data File
data = pd.read_csv("data/rt_2563789698568873_gradual.csv")
data.head()
X1 X2 class
0 0.550284 0.918541 0.0
1 0.564631 0.409379 0.0
2 0.430762 0.624576 0.0
3 0.662184 0.154598 0.0
4 0.266372 0.541274 1.0


data['class'].value_counts()
0.0    21219
1.0    19781
Name: class, dtype: int64
19781/data.shape[0]
0.4824634146341463
# Split Dataframe
def split_dataframe(df, chunk_size = 10000): 
    chunks = list()
    num_chunks = len(df) // chunk_size + 1
    for i in range(num_chunks):
        chunks.append(df[i*chunk_size:(i+1)*chunk_size])
    return chunks

def get_ratio(df, target_var='class'):
    class_0_count = df[target_var].value_counts()[0]
    class_1_count = df[target_var].value_counts()[1]
    total_count = class_0_count+class_1_count
    pos_ratio = class_1_count/total_count
    return pos_ratio
data.shape
(41000, 3)
chunk_list = split_dataframe(data, 3418)
month_ = 1
for df in chunk_list:
    df['month'] = month_
    month_+=1
    print(get_ratio(df))
0.4880046811000585
0.48244587478057344
0.4909303686366296
0.3317729666471621
0.32943241661790523
0.34991222937390287
0.6243417203042715
0.6146869514335869
0.609713282621416
0.489174956114687
0.4906377998829725
0.48853615520282184
data_monthly = pd.concat(chunk_list)

Mean Analysis

data_monthly_mean = data_monthly.groupby(by="month")[['X1','X2']].mean().T
data_monthly_mean.head()
month 1 2 3 4 5 6 7 8 9 10 11 12
X1 0.507525 0.499226 0.494066 0.490010 0.503252 0.503267 0.499385 0.500954 0.499622 0.497465 0.490109 0.497236
X2 0.499871 0.496975 0.500308 0.500965 0.494713 0.499834 0.494208 0.506740 0.500584 0.492279 0.492288 0.512654

2-sample Kolmogorov–Smirnov (KS) test

Since we are comparing the distributions, we need to set a reference distribution. This could be the first month of our dataset, or we could set the reference as the entire training dataset and then compare each month of the test set to the training set.

top_feature_list = ['X1','X2']
# Create reference dataframe
month_list = list(data_monthly['month'].unique())
d = {month: pd.DataFrame() for month in month_list}
for month in month_list:
    d[month] = data_monthly[data_monthly['month']==month]
type(d)
dict
import numpy as np
from scipy.stats import wasserstein_distance, energy_distance,  ks_2samp

#create an empty dataframe to store KS stats.
KS_stat=pd.DataFrame(columns=top_feature_list+['critical region'], index=month_list)  #This dataframe has both the KS stat value and p value.
KS_stat2=pd.DataFrame(columns=top_feature_list+['critical region'], index=month_list) #This dataframe only has the KS stat value (not p value).

reference=d[1] #reference snapshot
n=reference.shape[0] #size of the reference distribution

for date in d.keys():
    KS=[]          #2-sample KS 
    KS2=[]
    m=d[date].shape[0]
    crit_reg=1.731*np.sqrt((n+m)/(n*m)) #critical region 
    for variable in d[date][top_feature_list]:
        s,p=ks_2samp(reference[variable],d[date][variable])
        KS.append(('stat: '+str(round(s,4)),'P: '+str(round(p,4))))
        KS2.append(round(s,4))
                  
    KS_stat.loc[date]=KS+[crit_reg]
    KS_stat2.loc[date]=KS2+[crit_reg]
/Users/numalj/.pyenv/versions/3.7.6/envs/general/lib/python3.7/site-packages/pandas/core/internals/blocks.py:993: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  arr_value = np.array(value)
KS_stat
X1 X2 critical region
1 (stat: 0.0, P: 1.0) (stat: 0.0, P: 1.0) 0.041872
2 (stat: 0.0243, P: 0.2659) (stat: 0.0167, P: 0.7289) 0.041872
3 (stat: 0.0316, P: 0.0659) (stat: 0.0123, P: 0.9587) 0.041872
4 (stat: 0.0342, P: 0.0364) (stat: 0.0202, P: 0.4892) 0.041872
5 (stat: 0.0178, P: 0.6478) (stat: 0.0161, P: 0.7682) 0.041872
6 (stat: 0.0138, P: 0.9032) (stat: 0.0146, P: 0.858) 0.041872
7 (stat: 0.0184, P: 0.6071) (stat: 0.0184, P: 0.6071) 0.041872
8 (stat: 0.0202, P: 0.4892) (stat: 0.0199, P: 0.5082) 0.041872
9 (stat: 0.0211, P: 0.4343) (stat: 0.0126, P: 0.9497) 0.041872
10 (stat: 0.0249, P: 0.2412) (stat: 0.0301, P: 0.0897) 0.041872
11 (stat: 0.0298, P: 0.0953) (stat: 0.0255, P: 0.2182) 0.041872
12 (stat: 0.0209, P: 0.4379) (stat: 0.0291, P: 0.1073) 0.041921
KS_stat2
X1 X2 critical region
1 0.0 0.0 0.041872
2 0.0243 0.0167 0.041872
3 0.0316 0.0123 0.041872
4 0.0342 0.0202 0.041872
5 0.0178 0.0161 0.041872
6 0.0138 0.0146 0.041872
7 0.0184 0.0184 0.041872
8 0.0202 0.0199 0.041872
9 0.0211 0.0126 0.041872
10 0.0249 0.0301 0.041872
11 0.0298 0.0255 0.041872
12 0.0209 0.0291 0.041921

First Wassertein Distance

#create an empty dataframe to store first Wasserstein distance.
WD=pd.DataFrame(columns=top_feature_list, index=list(d.keys()))

reference=d[1] #reference snapshot
reference=reference.reset_index(drop=True) #the index should be reset, otherwise the algorithm does not work properly

for date in d.keys():
    d[date]=d[date].reset_index(drop=True)
    Wasserstein=[] #Wasserstein distance
    #CM=[]          # Cramér-von Mises distance
    for variable in d[date][top_feature_list]:
        #print(variable)
        u_value=reference[variable]
        v_value=d[date][variable]
        #NaN values need to be removed, otherwise the returned value would be NaN
        u_value = u_value[~np.isnan(u_value)]
        v_value = v_value[~np.isnan(v_value)]
        wd=wasserstein_distance(u_value,v_value)
        Wasserstein.append(wd)
    WD.loc[date]=Wasserstein
WD
X1 X2
1 0.0 0.0
2 0.008514 0.006643
3 0.013466 0.003723
4 0.017525 0.00609
5 0.005132 0.006293
6 0.005379 0.00509
7 0.008252 0.006759
8 0.007993 0.007269
9 0.008308 0.003452
10 0.010128 0.009651
11 0.017424 0.008445
12 0.010295 0.012909

Cramér-von Mises (CM) distance (or Energy distance)

#create an empty dataframe to store Cramér-von Mises (CM) distance.
CM=pd.DataFrame(columns=top_feature_list, index=month_list)

reference=d[1] #reference snapshot
reference=reference.reset_index(drop=True) #the index should be reset, otherwise the algorithm does not work properly

for date in d.keys():
    d[date]=d[date].reset_index(drop=True)
    Cramer=[]          # Cramér-von Mises distance
    for variable in d[date][top_feature_list]:
        u_value=reference[variable]
        v_value=d[date][variable]
        #NaN values need to be removed, otherwise the returned value would be NaN
        u_value = u_value[~np.isnan(u_value)]
        v_value = v_value[~np.isnan(v_value)]
        cm=energy_distance(u_value,v_value)
        Cramer.append(cm)
    CM.loc[date]=Cramer
CM.T
1 2 3 4 5 6 7 8 9 10 11 12
X1 0.0 0.014657 0.021297 0.027419 0.009069 0.009011 0.013373 0.013767 0.013972 0.016237 0.026414 0.015571
X2 0.0 0.010961 0.006421 0.010824 0.010467 0.008584 0.012049 0.012631 0.005941 0.017205 0.014662 0.020012

Population Stability Index (PSI)

import numpy as np

def calculate_psi(expected, actual, buckettype='bins', buckets=10, axis=0):
    '''Calculate the PSI (population stability index) across all variables
    Args:
       expected: numpy matrix of original values (Training)
       actual: numpy matrix of new values, same size as expected (Validation)
       buckettype: type of strategy for creating buckets, bins splits into even splits, quantiles splits into quantile buckets
       buckets: number of quantiles to use in bucketing variables
       axis: axis by which variables are defined, 0 for vertical, 1 for horizontal
    Returns:
       psi_values: ndarray of psi values for each variable
    Author:
       Matthew Burke
       github.com/mwburke
       worksofchart.com
    '''

    def psi(expected_array, actual_array, buckets):
        '''Calculate the PSI for a single variable
        Args:
           expected_array: numpy array of original values
           actual_array: numpy array of new values, same size as expected
           buckets: number of percentile ranges to bucket the values into
        Returns:
           psi_value: calculated PSI value
        '''

        def scale_range (input, min, max):
            input += -(np.min(input))
            input /= np.max(input) / (max - min)
            input += min
            return input


        breakpoints = np.arange(0, buckets + 1) / (buckets) * 100

        if buckettype == 'bins':
            breakpoints = scale_range(breakpoints, np.min(expected_array), np.max(expected_array))
        elif buckettype == 'quantiles':
            breakpoints = np.stack([np.percentile(expected_array, b) for b in breakpoints])



        expected_percents = np.histogram(expected_array, breakpoints)[0] / len(expected_array)
        actual_percents = np.histogram(actual_array, breakpoints)[0] / len(actual_array)

        def sub_psi(e_perc, a_perc):
            '''Calculate the actual PSI value from comparing the values.
               Update the actual value to a very small number if equal to zero
            '''
            if a_perc == 0:
                a_perc = 0.0001
            if e_perc == 0:
                e_perc = 0.0001

            value = (e_perc - a_perc) * np.log(e_perc / a_perc)
            return(value)

        psi_value = np.sum(sub_psi(expected_percents[i], actual_percents[i]) for i in range(0, len(expected_percents)))

        return(psi_value)

    if len(expected.shape) == 1:
        psi_values = np.empty(len(expected.shape))
    else:
        psi_values = np.empty(expected.shape[axis])

    for i in range(0, len(psi_values)):
        if len(psi_values) == 1:
            psi_values = psi(expected, actual, buckets)
        elif axis == 0:
            psi_values[i] = psi(expected[:,i], actual[:,i], buckets)
        elif axis == 1:
            psi_values[i] = psi(expected[i,:], actual[i,:], buckets)

    return(psi_values)
## Calculate psi for top features
psi_list = []
for feature in top_feature_list:
        # Assuming you have a validation and training set
        psi_t = calculate_psi(df_validation[feature], df_training[feature])
        psi_list.append(psi_t)      
        print(psi_t)
Join Newsletter
Get the latest news right in your inbox. We never spam!
Numal
Written by Numal
Hi, my name is Numal Jayawardena, the author of Practical ML. I have worked as a Software Engineer for a few years and now work as a Data Scientist. Ever since doing my master's the knowledge of machine learning and data science I learned has kept me eager to learn more! In this blog I hope to share some of my experiences and some of the concepts and other things that I have learned and found interesting.

If you wish to contact me, please feel free to reach out on my Linkedin link below.