Este notebook contém funções comumente reutilizadas no livro para carregamento e salvamento de dados, ajuste e avaliação de modelos de previsão ou plotagem de resultados.
O notebook pode ser baixado do GitHub com
!curl -O https://raw.githubusercontent.com/Fraud-Detection-Handbook/fraud-detection-handbook/main/Chapter_References/shared_functions.ipynb
O notebook pode ser incluído em outros notebooks usando
%run shared_functionsImportações gerais¶
# General
import os
import pandas as pd
import numpy as np
import math
import sys
import time
import pickle
import json
import datetime
import random
#import sklearn
import sklearn
from sklearn import *
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid', {'axes.facecolor': '0.9'})
import graphviz
import xgboost
# For imbalanced learning
import imblearn
import warnings
warnings.filterwarnings('ignore')
Carregamento e salvamento de dados¶
read_from_files¶
Primeiro uso em Capítulo 3, Transformação de Características Base.
# Load a set of pickle files, put them together in a single DataFrame, and order them by time
# It takes as input the folder DIR_INPUT where the files are stored, and the BEGIN_DATE and END_DATE
def read_from_files(DIR_INPUT, BEGIN_DATE, END_DATE):
files = [os.path.join(DIR_INPUT, f) for f in os.listdir(DIR_INPUT) if f>=BEGIN_DATE+'.pkl' and f<=END_DATE+'.pkl']
frames = []
for f in files:
df = pd.read_pickle(f)
frames.append(df)
del df
df_final = pd.concat(frames)
df_final=df_final.sort_values('TRANSACTION_ID')
df_final.reset_index(drop=True,inplace=True)
# Note: -1 are missing values for real world data
df_final=df_final.replace([-1],0)
return df_final
save_object¶
#Save oject as pickle file
def save_object(obj, filename):
with open(filename, 'wb') as output:
pickle.dump(obj, output, pickle.HIGHEST_PROTOCOL)
Pré-processamento de dados¶
scaleData¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
def scaleData(train,test,features):
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(train[features])
train[features]=scaler.transform(train[features])
test[features]=scaler.transform(test[features])
return (train,test)Estratégias de divisão treino/teste¶
get_train_test_set¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base. Razão de amostragem adicionada em Capítulo 5, Estratégias de Validação.
def get_train_test_set(transactions_df,
start_date_training,
delta_train=7,delta_delay=7,delta_test=7,
sampling_ratio=1.0,
random_state=0):
# Get the training set data
train_df = transactions_df[(transactions_df.TX_DATETIME>=start_date_training) &
(transactions_df.TX_DATETIME<start_date_training+datetime.timedelta(days=delta_train))]
# Get the test set data
test_df = []
# Note: Cards known to be compromised after the delay period are removed from the test set
# That is, for each test day, all frauds known at (test_day-delay_period) are removed
# First, get known defrauded customers from the training set
known_defrauded_customers = set(train_df[train_df.TX_FRAUD==1].CUSTOMER_ID)
# Get the relative starting day of training set (easier than TX_DATETIME to collect test data)
start_tx_time_days_training = train_df.TX_TIME_DAYS.min()
# Then, for each day of the test set
for day in range(delta_test):
# Get test data for that day
test_df_day = transactions_df[transactions_df.TX_TIME_DAYS==start_tx_time_days_training+
delta_train+delta_delay+
day]
# Compromised cards from that test day, minus the delay period, are added to the pool of known defrauded customers
test_df_day_delay_period = transactions_df[transactions_df.TX_TIME_DAYS==start_tx_time_days_training+
delta_train+
day-1]
new_defrauded_customers = set(test_df_day_delay_period[test_df_day_delay_period.TX_FRAUD==1].CUSTOMER_ID)
known_defrauded_customers = known_defrauded_customers.union(new_defrauded_customers)
test_df_day = test_df_day[~test_df_day.CUSTOMER_ID.isin(known_defrauded_customers)]
test_df.append(test_df_day)
test_df = pd.concat(test_df)
# If subsample
if sampling_ratio<1:
train_df_frauds=train_df[train_df.TX_FRAUD==1].sample(frac=sampling_ratio, random_state=random_state)
train_df_genuine=train_df[train_df.TX_FRAUD==0].sample(frac=sampling_ratio, random_state=random_state)
train_df=pd.concat([train_df_frauds,train_df_genuine])
# Sort data sets by ascending order of transaction ID
train_df=train_df.sort_values('TRANSACTION_ID')
test_df=test_df.sort_values('TRANSACTION_ID')
return (train_df, test_df)
def get_train_delay_test_set(transactions_df,
start_date_training,
delta_train=7,delta_delay=7,delta_test=7,
sampling_ratio=1.0,
random_state=0):
# Get the training set data
train_df = transactions_df[(transactions_df.TX_DATETIME>=start_date_training) &
(transactions_df.TX_DATETIME<start_date_training+datetime.timedelta(days=delta_train))]
# Get the delay set data
delay_df = transactions_df[(transactions_df.TX_DATETIME>=start_date_training+datetime.timedelta(days=delta_train)) &
(transactions_df.TX_DATETIME<start_date_training+datetime.timedelta(days=delta_train)+
+datetime.timedelta(days=delta_delay))]
# Get the test set data
test_df = []
# Note: Cards known to be compromised after the delay period are removed from the test set
# That is, for each test day, all frauds known at (test_day-delay_period) are removed
# First, get known defrauded customers from the training set
known_defrauded_customers = set(train_df[train_df.TX_FRAUD==1].CUSTOMER_ID)
# Get the relative starting day of training set (easier than TX_DATETIME to collect test data)
start_tx_time_days_training = train_df.TX_TIME_DAYS.min()
# Then, for each day of the test set
for day in range(delta_test):
# Get test data for that day
test_df_day = transactions_df[transactions_df.TX_TIME_DAYS==start_tx_time_days_training+
delta_train+delta_delay+
day]
# Compromised cards from that test day, minus the delay period, are added to the pool of known defrauded customers
test_df_day_delay_period = transactions_df[transactions_df.TX_TIME_DAYS==start_tx_time_days_training+
delta_train+
day-1]
new_defrauded_customers = set(test_df_day_delay_period[test_df_day_delay_period.TX_FRAUD==1].CUSTOMER_ID)
known_defrauded_customers = known_defrauded_customers.union(new_defrauded_customers)
test_df_day = test_df_day[~test_df_day.CUSTOMER_ID.isin(known_defrauded_customers)]
test_df.append(test_df_day)
test_df = pd.concat(test_df)
# If subsample
if sampling_ratio<1:
train_df_frauds=train_df[train_df.TX_FRAUD==1].sample(frac=sampling_ratio, random_state=random_state)
train_df_genuine=train_df[train_df.TX_FRAUD==0].sample(frac=sampling_ratio, random_state=random_state)
train_df=pd.concat([train_df_frauds,train_df_genuine])
# Sort data sets by ascending order of transaction ID
train_df=train_df.sort_values('TRANSACTION_ID')
test_df=test_df.sort_values('TRANSACTION_ID')
return (train_df, delay_df, test_df)prequentialSplit¶
Primeiro uso em Capítulo 5, Estratégias de Validação.
def prequentialSplit(transactions_df,
start_date_training,
n_folds=4,
delta_train=7,
delta_delay=7,
delta_assessment=7):
prequential_split_indices=[]
# For each fold
for fold in range(n_folds):
# Shift back start date for training by the fold index times the assessment period (delta_assessment)
# (See Fig. 5)
start_date_training_fold = start_date_training-datetime.timedelta(days=fold*delta_assessment)
# Get the training and test (assessment) sets
(train_df, test_df)=get_train_test_set(transactions_df,
start_date_training=start_date_training_fold,
delta_train=delta_train,delta_delay=delta_delay,delta_test=delta_assessment)
# Get the indices from the two sets, and add them to the list of prequential splits
indices_train=list(train_df.index)
indices_test=list(test_df.index)
prequential_split_indices.append((indices_train,indices_test))
return prequential_split_indices
Funções de previsão¶
fit_model_and_get_predictions¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
def fit_model_and_get_predictions(classifier, train_df, test_df,
input_features, output_feature="TX_FRAUD",scale=True):
# By default, scales input data
if scale:
(train_df, test_df)=scaleData(train_df,test_df,input_features)
# We first train the classifier using the `fit` method, and pass as arguments the input and output features
start_time=time.time()
classifier.fit(train_df[input_features], train_df[output_feature])
training_execution_time=time.time()-start_time
# We then get the predictions on the training and test data using the `predict_proba` method
# The predictions are returned as a numpy array, that provides the probability of fraud for each transaction
start_time=time.time()
predictions_test=classifier.predict_proba(test_df[input_features])[:,1]
prediction_execution_time=time.time()-start_time
predictions_train=classifier.predict_proba(train_df[input_features])[:,1]
# The result is returned as a dictionary containing the fitted models,
# and the predictions on the training and test sets
model_and_predictions_dictionary = {'classifier': classifier,
'predictions_test': predictions_test,
'predictions_train': predictions_train,
'training_execution_time': training_execution_time,
'prediction_execution_time': prediction_execution_time
}
return model_and_predictions_dictionaryAvaliação de desempenho¶
card_precision_top_k_day¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base. Detalhado em Capítulo 4, Métricas de Precisão Top K.
def card_precision_top_k_day(df_day,top_k):
# This takes the max of the predictions AND the max of label TX_FRAUD for each CUSTOMER_ID,
# and sorts by decreasing order of fraudulent prediction
df_day = df_day.groupby('CUSTOMER_ID').max().sort_values(by="predictions", ascending=False).reset_index(drop=False)
# Get the top k most suspicious cards
df_day_top_k=df_day.head(top_k)
list_detected_compromised_cards=list(df_day_top_k[df_day_top_k.TX_FRAUD==1].CUSTOMER_ID)
# Compute precision top k
card_precision_top_k = len(list_detected_compromised_cards) / top_k
return list_detected_compromised_cards, card_precision_top_k
card_precision_top_k¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base. Detalhado em Capítulo 4, Métricas de Precisão Top K.
def card_precision_top_k(predictions_df, top_k, remove_detected_compromised_cards=True):
# Sort days by increasing order
list_days=list(predictions_df['TX_TIME_DAYS'].unique())
list_days.sort()
# At first, the list of detected compromised cards is empty
list_detected_compromised_cards = []
card_precision_top_k_per_day_list = []
nb_compromised_cards_per_day = []
# For each day, compute precision top k
for day in list_days:
df_day = predictions_df[predictions_df['TX_TIME_DAYS']==day]
df_day = df_day[['predictions', 'CUSTOMER_ID', 'TX_FRAUD']]
# Let us remove detected compromised cards from the set of daily transactions
df_day = df_day[df_day.CUSTOMER_ID.isin(list_detected_compromised_cards)==False]
nb_compromised_cards_per_day.append(len(df_day[df_day.TX_FRAUD==1].CUSTOMER_ID.unique()))
detected_compromised_cards, card_precision_top_k = card_precision_top_k_day(df_day,top_k)
card_precision_top_k_per_day_list.append(card_precision_top_k)
# Let us update the list of detected compromised cards
if remove_detected_compromised_cards:
list_detected_compromised_cards.extend(detected_compromised_cards)
# Compute the mean
mean_card_precision_top_k = np.array(card_precision_top_k_per_day_list).mean()
# Returns precision top k per day as a list, and resulting mean
return nb_compromised_cards_per_day,card_precision_top_k_per_day_list,mean_card_precision_top_k
card_precision_top_k_custom¶
Primeiro uso em Capítulo 5, Estratégias de Validação.
def card_precision_top_k_custom(y_true, y_pred, top_k, transactions_df):
# Let us create a predictions_df DataFrame, that contains all transactions matching the indices of the current fold
# (indices of the y_true vector)
predictions_df=transactions_df.iloc[y_true.index.values].copy()
predictions_df['predictions']=y_pred
# Compute the CP@k using the function implemented in Chapter 4, Section 4.2
nb_compromised_cards_per_day,card_precision_top_k_per_day_list,mean_card_precision_top_k=\
card_precision_top_k(predictions_df, top_k)
# Return the mean_card_precision_top_k
return mean_card_precision_top_k
performance_assessment¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
def performance_assessment(predictions_df, output_feature='TX_FRAUD',
prediction_feature='predictions', top_k_list=[100],
rounded=True):
AUC_ROC = metrics.roc_auc_score(predictions_df[output_feature], predictions_df[prediction_feature])
AP = metrics.average_precision_score(predictions_df[output_feature], predictions_df[prediction_feature])
performances = pd.DataFrame([[AUC_ROC, AP]],
columns=['AUC ROC','Average precision'])
for top_k in top_k_list:
_, _, mean_card_precision_top_k = card_precision_top_k(predictions_df, top_k)
performances['Card Precision@'+str(top_k)]=mean_card_precision_top_k
if rounded:
performances = performances.round(3)
return performances
performance_assessment_model_collection¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
def performance_assessment_model_collection(fitted_models_and_predictions_dictionary,
transactions_df,
type_set='test',
top_k_list=[100]):
performances=pd.DataFrame()
for classifier_name, model_and_predictions in fitted_models_and_predictions_dictionary.items():
predictions_df=transactions_df
predictions_df['predictions']=model_and_predictions['predictions_'+type_set]
performances_model=performance_assessment(predictions_df, output_feature='TX_FRAUD',
prediction_feature='predictions', top_k_list=top_k_list)
performances_model.index=[classifier_name]
performances=performances.append(performances_model)
return performancesexecution_times_model_collection¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
def execution_times_model_collection(fitted_models_and_predictions_dictionary):
execution_times=pd.DataFrame()
for classifier_name, model_and_predictions in fitted_models_and_predictions_dictionary.items():
execution_times_model=pd.DataFrame()
execution_times_model['Training execution time']=[model_and_predictions['training_execution_time']]
execution_times_model['Prediction execution time']=[model_and_predictions['prediction_execution_time']]
execution_times_model.index=[classifier_name]
execution_times=execution_times.append(execution_times_model)
return execution_timesget_class_from_fraud_probability¶
Primeiro uso em Capítulo 4, Métricas Baseadas em Limiar.
# Getting classes from a vector of fraud probabilities and a threshold
def get_class_from_fraud_probability(fraud_probabilities, threshold=0.5):
predicted_classes = [0 if fraud_probability<threshold else 1
for fraud_probability in fraud_probabilities]
return predicted_classes
threshold_based_metrics¶
Primeiro uso em Capítulo 4, Métricas Baseadas em Limiar.
def threshold_based_metrics(fraud_probabilities, true_label, thresholds_list):
results = []
for threshold in thresholds_list:
predicted_classes = get_class_from_fraud_probability(fraud_probabilities, threshold=threshold)
(TN, FP, FN, TP) = metrics.confusion_matrix(true_label, predicted_classes).ravel()
MME = (FP+FN)/(TN+FP+FN+TP)
TPR = TP/(TP+FN)
TNR = TN/(TN+FP)
FPR = FP/(TN+FP)
FNR = FN/(TP+FN)
BER = 1/2*(FPR+FNR)
Gmean = np.sqrt(TPR*TNR)
precision = 1 # 1 if TP+FP=0
FDR = 1 # 1 if TP+FP=0
if TP+FP>0:
precision = TP/(TP+FP)
FDR=FP/(TP+FP)
NPV = 1 # 1 if TN+FN=0
FOR = 1 # 1 if TN+FN=0
if TN+FN>0:
NPV = TN/(TN+FN)
FOR = FN/(TN+FN)
F1_score = 2*(precision*TPR)/(precision+TPR)
results.append([threshold, MME, TPR, TNR, FPR, FNR, BER, Gmean, precision, NPV, FDR, FOR, F1_score])
results_df = pd.DataFrame(results,columns=['Threshold' ,'MME', 'TPR', 'TNR', 'FPR', 'FNR', 'BER', 'G-mean', 'Precision', 'NPV', 'FDR', 'FOR', 'F1 Score'])
return results_dfget_summary_performances¶
Primeiro uso em Capítulo 5, Seleção de Modelos.
def get_summary_performances(performances_df, parameter_column_name="Parameters summary"):
metrics = ['AUC ROC','Average precision','Card Precision@100']
performances_results=pd.DataFrame(columns=metrics)
performances_df.reset_index(drop=True,inplace=True)
best_estimated_parameters = []
validation_performance = []
test_performance = []
for metric in metrics:
index_best_validation_performance = performances_df.index[np.argmax(performances_df[metric+' Validation'].values)]
best_estimated_parameters.append(performances_df[parameter_column_name].iloc[index_best_validation_performance])
validation_performance.append(
str(round(performances_df[metric+' Validation'].iloc[index_best_validation_performance],3))+
'+/-'+
str(round(performances_df[metric+' Validation'+' Std'].iloc[index_best_validation_performance],2))
)
test_performance.append(
str(round(performances_df[metric+' Test'].iloc[index_best_validation_performance],3))+
'+/-'+
str(round(performances_df[metric+' Test'+' Std'].iloc[index_best_validation_performance],2))
)
performances_results.loc["Best estimated parameters"]=best_estimated_parameters
performances_results.loc["Validation performance"]=validation_performance
performances_results.loc["Test performance"]=test_performance
optimal_test_performance = []
optimal_parameters = []
for metric in ['AUC ROC Test','Average precision Test','Card Precision@100 Test']:
index_optimal_test_performance = performances_df.index[np.argmax(performances_df[metric].values)]
optimal_parameters.append(performances_df[parameter_column_name].iloc[index_optimal_test_performance])
optimal_test_performance.append(
str(round(performances_df[metric].iloc[index_optimal_test_performance],3))+
'+/-'+
str(round(performances_df[metric+' Std'].iloc[index_optimal_test_performance],2))
)
performances_results.loc["Optimal parameter(s)"]=optimal_parameters
performances_results.loc["Optimal test performance"]=optimal_test_performance
return performances_resultsmodel_selection_performances¶
Primeiro uso em Capítulo 5, Seleção de Modelos.
def model_selection_performances(performances_df_dictionary,
performance_metric='AUC ROC'):
# Note: max_depth of 50 is similar to None
default_parameters_dictionary={
"Decision Tree": 50,
"Logstic Regression": 1,
"Random Forest": "100/50",
"XGBoost": "100/0.1/2"
}
mean_performances_dictionary={
"Default parameters": [],
"Best validation parameters": [],
"Optimal parameters": []
}
std_performances_dictionary={
"Default parameters": [],
"Best validation parameters": [],
"Optimal parameters": []
}
# For each model class
for model_class, performances_df in performances_df_dictionary.items():
# Get the performances for the default paramaters
default_performances=performances_df[performances_df['Parameters summary']==default_parameters_dictionary[model_class]]
default_performances=default_performances.round(decimals=3)
mean_performances_dictionary["Default parameters"].append(default_performances[performance_metric+" Test"].values[0])
std_performances_dictionary["Default parameters"].append(default_performances[performance_metric+" Test Std"].values[0])
# Get the performances for the best estimated parameters
performances_summary=get_summary_performances(performances_df, parameter_column_name="Parameters summary")
mean_std_performances=performances_summary.loc[["Test performance"]][performance_metric].values[0]
mean_std_performances=mean_std_performances.split("+/-")
mean_performances_dictionary["Best validation parameters"].append(float(mean_std_performances[0]))
std_performances_dictionary["Best validation parameters"].append(float(mean_std_performances[1]))
# Get the performances for the boptimal parameters
mean_std_performances=performances_summary.loc[["Optimal test performance"]][performance_metric].values[0]
mean_std_performances=mean_std_performances.split("+/-")
mean_performances_dictionary["Optimal parameters"].append(float(mean_std_performances[0]))
std_performances_dictionary["Optimal parameters"].append(float(mean_std_performances[1]))
# Return the mean performances and their standard deviations
return (mean_performances_dictionary,std_performances_dictionary)def model_selection_performances(performances_df_dictionary,
performance_metric='AUC ROC',
model_classes=['Decision Tree',
'Logistic Regression',
'Random Forest',
'XGBoost'],
default_parameters_dictionary={
"Decision Tree": 50,
"Logistic Regression": 1,
"Random Forest": "100/50",
"XGBoost": "100/0.1/3"
}):
mean_performances_dictionary={
"Default parameters": [],
"Best validation parameters": [],
"Optimal parameters": []
}
std_performances_dictionary={
"Default parameters": [],
"Best validation parameters": [],
"Optimal parameters": []
}
# For each model class
for model_class in model_classes:
performances_df=performances_df_dictionary[model_class]
# Get the performances for the default paramaters
default_performances=performances_df[performances_df['Parameters summary']==default_parameters_dictionary[model_class]]
default_performances=default_performances.round(decimals=3)
mean_performances_dictionary["Default parameters"].append(default_performances[performance_metric+" Test"].values[0])
std_performances_dictionary["Default parameters"].append(default_performances[performance_metric+" Test Std"].values[0])
# Get the performances for the best estimated parameters
performances_summary=get_summary_performances(performances_df, parameter_column_name="Parameters summary")
mean_std_performances=performances_summary.loc[["Test performance"]][performance_metric].values[0]
mean_std_performances=mean_std_performances.split("+/-")
mean_performances_dictionary["Best validation parameters"].append(float(mean_std_performances[0]))
std_performances_dictionary["Best validation parameters"].append(float(mean_std_performances[1]))
# Get the performances for the boptimal parameters
mean_std_performances=performances_summary.loc[["Optimal test performance"]][performance_metric].values[0]
mean_std_performances=mean_std_performances.split("+/-")
mean_performances_dictionary["Optimal parameters"].append(float(mean_std_performances[0]))
std_performances_dictionary["Optimal parameters"].append(float(mean_std_performances[1]))
# Return the mean performances and their standard deviations
return (mean_performances_dictionary,std_performances_dictionary)Seleção de modelos¶
prequential_grid_search¶
Primeiro uso em Capítulo 5, Estratégias de Validação.
def prequential_grid_search(transactions_df,
classifier,
input_features, output_feature,
parameters, scoring,
start_date_training,
n_folds=4,
expe_type='Test',
delta_train=7,
delta_delay=7,
delta_assessment=7,
performance_metrics_list_grid=['roc_auc'],
performance_metrics_list=['AUC ROC'],
n_jobs=-1):
estimators = [('scaler', sklearn.preprocessing.StandardScaler()), ('clf', classifier)]
pipe = sklearn.pipeline.Pipeline(estimators)
prequential_split_indices=prequentialSplit(transactions_df,
start_date_training=start_date_training,
n_folds=n_folds,
delta_train=delta_train,
delta_delay=delta_delay,
delta_assessment=delta_assessment)
grid_search = sklearn.model_selection.GridSearchCV(pipe, parameters, scoring=scoring, cv=prequential_split_indices, refit=False, n_jobs=n_jobs)
X=transactions_df[input_features]
y=transactions_df[output_feature]
grid_search.fit(X, y)
performances_df=pd.DataFrame()
for i in range(len(performance_metrics_list_grid)):
performances_df[performance_metrics_list[i]+' '+expe_type]=grid_search.cv_results_['mean_test_'+performance_metrics_list_grid[i]]
performances_df[performance_metrics_list[i]+' '+expe_type+' Std']=grid_search.cv_results_['std_test_'+performance_metrics_list_grid[i]]
performances_df['Parameters']=grid_search.cv_results_['params']
performances_df['Execution time']=grid_search.cv_results_['mean_fit_time']
return performances_df
model_selection_wrapper¶
Primeiro uso em Capítulo 5, Seleção de Modelos.
def model_selection_wrapper(transactions_df,
classifier,
input_features, output_feature,
parameters,
scoring,
start_date_training_for_valid,
start_date_training_for_test,
n_folds=4,
delta_train=7,
delta_delay=7,
delta_assessment=7,
performance_metrics_list_grid=['roc_auc'],
performance_metrics_list=['AUC ROC'],
n_jobs=-1):
# Get performances on the validation set using prequential validation
performances_df_validation=prequential_grid_search(transactions_df, classifier,
input_features, output_feature,
parameters, scoring,
start_date_training=start_date_training_for_valid,
n_folds=n_folds,
expe_type='Validation',
delta_train=delta_train,
delta_delay=delta_delay,
delta_assessment=delta_assessment,
performance_metrics_list_grid=performance_metrics_list_grid,
performance_metrics_list=performance_metrics_list,
n_jobs=n_jobs)
# Get performances on the test set using prequential validation
performances_df_test=prequential_grid_search(transactions_df, classifier,
input_features, output_feature,
parameters, scoring,
start_date_training=start_date_training_for_test,
n_folds=n_folds,
expe_type='Test',
delta_train=delta_train,
delta_delay=delta_delay,
delta_assessment=delta_assessment,
performance_metrics_list_grid=performance_metrics_list_grid,
performance_metrics_list=performance_metrics_list,
n_jobs=n_jobs)
# Bind the two resulting DataFrames
performances_df_validation.drop(columns=['Parameters','Execution time'], inplace=True)
performances_df=pd.concat([performances_df_test,performances_df_validation],axis=1)
# And return as a single DataFrame
return performances_df
kfold_cv_with_classifier¶
Primeiro uso em Capítulo 6, Aprendizado sensível ao custo.
def kfold_cv_with_classifier(classifier,
X,
y,
n_splits=5,
strategy_name="Basline classifier"):
cv = sklearn.model_selection.StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=0)
cv_results_=sklearn.model_selection.cross_validate(classifier,X,y,cv=cv,
scoring=['roc_auc',
'average_precision',
'balanced_accuracy'],
return_estimator=True)
results=round(pd.DataFrame(cv_results_),3)
results_mean=list(results.mean().values)
results_std=list(results.std().values)
results_df=pd.DataFrame([[str(round(results_mean[i],3))+'+/-'+
str(round(results_std[i],3)) for i in range(len(results))]],
columns=['Fit time (s)','Score time (s)',
'AUC ROC','Average Precision','Balanced accuracy'])
results_df.rename(index={0:strategy_name}, inplace=True)
classifier_0=cv_results_['estimator'][0]
(train_index, test_index) = next(cv.split(X, y))
train_df=pd.DataFrame({'X1':X[train_index,0],'X2':X[train_index,1], 'Y':y[train_index]})
test_df=pd.DataFrame({'X1':X[test_index,0],'X2':X[test_index,1], 'Y':y[test_index]})
return (results_df, classifier_0, train_df, test_df)Plotagem¶
get_tx_stats¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
# Compute the number of transactions per day, fraudulent transactions per day and fraudulent cards per day
def get_tx_stats(transactions_df, start_date_df="2018-04-01"):
#Number of transactions per day
nb_tx_per_day=transactions_df.groupby(['TX_TIME_DAYS'])['CUSTOMER_ID'].count()
#Number of fraudulent transactions per day
nb_fraudulent_transactions_per_day=transactions_df.groupby(['TX_TIME_DAYS'])['TX_FRAUD'].sum()
#Number of fraudulent cards per day
nb_compromised_card_per_day=transactions_df[transactions_df['TX_FRAUD']==1].groupby(['TX_TIME_DAYS']).CUSTOMER_ID.nunique()
tx_stats=pd.DataFrame({"nb_tx_per_day":nb_tx_per_day,
"nb_fraudulent_transactions_per_day":nb_fraudulent_transactions_per_day,
"nb_compromised_cards_per_day":nb_compromised_card_per_day})
tx_stats=tx_stats.reset_index()
start_date = datetime.datetime.strptime(start_date_df, "%Y-%m-%d")
tx_date=start_date+tx_stats['TX_TIME_DAYS'].apply(datetime.timedelta)
tx_stats['tx_date']=tx_date
return tx_stats
get_template_tx_stats¶
Primeiro uso em Capítulo 3, Sistema de Detecção de Fraude Base.
# Plot the number of transactions per day, fraudulent transactions per day and fraudulent cards per day
def get_template_tx_stats(ax ,fs,
start_date_training,
title='',
delta_train=7,
delta_delay=7,
delta_test=7,
ylim=300):
ax.set_title(title, fontsize=fs*1.5)
ax.set_ylim([0, ylim])
ax.set_xlabel('Date', fontsize=fs)
ax.set_ylabel('Number', fontsize=fs)
plt.yticks(fontsize=fs*0.7)
plt.xticks(fontsize=fs*0.7)
ax.axvline(start_date_training+datetime.timedelta(days=delta_train), 0,ylim, color="black")
ax.axvline(start_date_training+datetime.timedelta(days=delta_train+delta_delay), 0, ylim, color="black")
ax.text(start_date_training+datetime.timedelta(days=2), ylim-20,'Training period', fontsize=fs)
ax.text(start_date_training+datetime.timedelta(days=delta_train+2), ylim-20,'Delay period', fontsize=fs)
ax.text(start_date_training+datetime.timedelta(days=delta_train+delta_delay+2), ylim-20,'Test period', fontsize=fs)
get_template_roc_curve¶
Primeiro uso em Capítulo 4, Métricas Sem Limiar.
def get_template_roc_curve(ax, title,fs,random=True):
ax.set_title(title, fontsize=fs)
ax.set_xlim([-0.01, 1.01])
ax.set_ylim([-0.01, 1.01])
ax.set_xlabel('False Positive Rate', fontsize=fs)
ax.set_ylabel('True Positive Rate', fontsize=fs)
if random:
ax.plot([0, 1], [0, 1],'r--',label="AUC ROC Random = 0.5")
get_template_pr_curve¶
Primeiro uso em Capítulo 4, Métricas Sem Limiar.
def get_template_pr_curve(ax, title,fs, baseline=0.5):
ax.set_title(title, fontsize=fs)
ax.set_xlim([-0.01, 1.01])
ax.set_ylim([-0.01, 1.01])
ax.set_xlabel('Recall (True Positive Rate)', fontsize=fs)
ax.set_ylabel('Precision', fontsize=fs)
ax.plot([0, 1], [baseline, baseline],'r--',label='AP Random = {0:0.3f}'.format(baseline))
get_performance_plot¶
Primeiro uso em Capítulo 5, Estratégias de Validação.
# Get the performance plot for a single performance metric
def get_performance_plot(performances_df,
ax,
performance_metric,
expe_type_list=['Test','Train'],
expe_type_color_list=['#008000','#2F4D7E'],
parameter_name="Tree maximum depth",
summary_performances=None):
# expe_type_list is the list of type of experiments, typically containing 'Test', 'Train', or 'Valid'
# For all types of experiments
for i in range(len(expe_type_list)):
# Column in performances_df for which to retrieve the data
performance_metric_expe_type=performance_metric+' '+expe_type_list[i]
# Plot data on graph
ax.plot(performances_df['Parameters summary'], performances_df[performance_metric_expe_type],
color=expe_type_color_list[i], label = expe_type_list[i])
# If performances_df contains confidence intervals, add them to the graph
if performance_metric_expe_type+' Std' in performances_df.columns:
conf_min = performances_df[performance_metric_expe_type]\
-2*performances_df[performance_metric_expe_type+' Std']
conf_max = performances_df[performance_metric_expe_type]\
+2*performances_df[performance_metric_expe_type+' Std']
ax.fill_between(performances_df['Parameters summary'], conf_min, conf_max, color=expe_type_color_list[i], alpha=.1)
# If summary_performances table is present, adds vertical dashed bar for best estimated parameter
if summary_performances is not None:
best_estimated_parameter=summary_performances[performance_metric][['Best estimated parameters']].values[0]
best_estimated_performance=float(summary_performances[performance_metric][['Validation performance']].values[0].split("+/-")[0])
ymin, ymax = ax.get_ylim()
ax.vlines(best_estimated_parameter, ymin, best_estimated_performance,
linestyles="dashed")
# Set title, and x and y axes labels
ax.set_title(performance_metric+'\n', fontsize=14)
ax.set(xlabel = parameter_name, ylabel=performance_metric)
get_performances_plots¶
Primeiro uso em Capítulo 5, Estratégias de Validação.
# Get the performance plots for a set of performance metric
def get_performances_plots(performances_df,
performance_metrics_list=['AUC ROC', 'Average precision', 'Card Precision@100'],
expe_type_list=['Test','Train'], expe_type_color_list=['#008000','#2F4D7E'],
parameter_name="Tree maximum depth",
summary_performances=None):
# Create as many graphs as there are performance metrics to display
n_performance_metrics = len(performance_metrics_list)
fig, ax = plt.subplots(1, n_performance_metrics, figsize=(5*n_performance_metrics,4))
# Plot performance metric for each metric in performance_metrics_list
for i in range(n_performance_metrics):
get_performance_plot(performances_df, ax[i], performance_metric=performance_metrics_list[i],
expe_type_list=expe_type_list,
expe_type_color_list=expe_type_color_list,
parameter_name=parameter_name,
summary_performances=summary_performances)
ax[n_performance_metrics-1].legend(loc='upper left',
labels=expe_type_list,
bbox_to_anchor=(1.05, 1),
title="Type set")
plt.subplots_adjust(wspace=0.5,
hspace=0.8)get_execution_times_plot¶
Primeiro uso em Capítulo 5, Estratégias de Validação.
# Get the performance plot for a single performance metric
def get_execution_times_plot(performances_df,
title="",
parameter_name="Tree maximum depth"):
fig, ax = plt.subplots(1,1, figsize=(5,4))
# Plot data on graph
ax.plot(performances_df['Parameters summary'], performances_df["Execution time"],
color="black")
# Set title, and x and y axes labels
ax.set_title(title, fontsize=14)
ax.set(xlabel = parameter_name, ylabel="Execution time (seconds)")get_model_selection_performances_plots¶
Primeiro uso em Capítulo 5, Seleção de Modelos.
# Get the performance plot for a single performance metric
def get_model_selection_performance_plot(performances_df_dictionary,
ax,
performance_metric,
ylim=[0,1],
model_classes=['Decision Tree',
'Logistic Regression',
'Random Forest',
'XGBoost']):
(mean_performances_dictionary,std_performances_dictionary) = \
model_selection_performances(performances_df_dictionary=performances_df_dictionary,
performance_metric=performance_metric)
# width of the bars
barWidth = 0.3
# The x position of bars
r1 = np.arange(len(model_classes))
r2 = r1+barWidth
r3 = r1+2*barWidth
# Create Default parameters bars (Orange)
ax.bar(r1, mean_performances_dictionary['Default parameters'],
width = barWidth, color = '#CA8035', edgecolor = 'black',
yerr=std_performances_dictionary['Default parameters'], capsize=7, label='Default parameters')
# Create Best validation parameters bars (Red)
ax.bar(r2, mean_performances_dictionary['Best validation parameters'],
width = barWidth, color = '#008000', edgecolor = 'black',
yerr=std_performances_dictionary['Best validation parameters'], capsize=7, label='Best validation parameters')
# Create Optimal parameters bars (Green)
ax.bar(r3, mean_performances_dictionary['Optimal parameters'],
width = barWidth, color = '#2F4D7E', edgecolor = 'black',
yerr=std_performances_dictionary['Optimal parameters'], capsize=7, label='Optimal parameters')
# Set title, and x and y axes labels
ax.set_ylim(ylim[0],ylim[1])
ax.set_xticks(r2+barWidth/2)
ax.set_xticklabels(model_classes, rotation = 45, ha="right", fontsize=12)
ax.set_title(performance_metric+'\n', fontsize=18)
ax.set_xlabel("Model class", fontsize=16)
ax.set_ylabel(performance_metric, fontsize=15)get_model_selection_performances_plots¶
Primeiro uso em Capítulo 5, Seleção de Modelos.
def get_model_selection_performances_plots(performances_df_dictionary,
performance_metrics_list=['AUC ROC', 'Average precision', 'Card Precision@100'],
ylim_list=[[0.6,0.9],[0.2,0.8],[0.2,0.35]],
model_classes=['Decision Tree',
'Logistic Regression',
'Random Forest',
'XGBoost']):
# Create as many graphs as there are performance metrics to display
n_performance_metrics = len(performance_metrics_list)
fig, ax = plt.subplots(1, n_performance_metrics, figsize=(5*n_performance_metrics,4))
parameter_types=['Default parameters','Best validation parameters','Optimal parameters']
# Plot performance metric for each metric in performance_metrics_list
for i in range(n_performance_metrics):
get_model_selection_performance_plot(performances_df_dictionary,
ax[i],
performance_metrics_list[i],
ylim=ylim_list[i],
model_classes=model_classes
)
ax[n_performance_metrics-1].legend(loc='upper left',
labels=parameter_types,
bbox_to_anchor=(1.05, 1),
title="Parameter type",
prop={'size': 12},
title_fontsize=12)
plt.subplots_adjust(wspace=0.5,
hspace=0.8)plot_decision_boundary_classifier¶
Primeiro uso em Capítulo 6, Aprendizado sensível ao custo.
def plot_decision_boundary_classifier(ax,
classifier,
train_df,
input_features=['X1','X2'],
output_feature='Y',
title="",
fs=14,
plot_training_data=True):
plot_colors = ["tab:blue","tab:orange"]
x1_min, x1_max = train_df[input_features[0]].min() - 1, train_df[input_features[0]].max() + 1
x2_min, x2_max = train_df[input_features[1]].min() - 1, train_df[input_features[1]].max() + 1
plot_step=0.1
xx, yy = np.meshgrid(np.arange(x1_min, x1_max, plot_step),
np.arange(x2_min, x2_max, plot_step))
Z = classifier.predict_proba(np.c_[xx.ravel(), yy.ravel()])[:,1]
Z = Z.reshape(xx.shape)
ax.contourf(xx, yy, Z, cmap=plt.cm.RdYlBu_r,alpha=0.3)
if plot_training_data:
# Plot the training points
groups = train_df.groupby(output_feature)
for name, group in groups:
ax.scatter(group[input_features[0]], group[input_features[1]], edgecolors='black', label=name)
ax.set_title(title, fontsize=fs)
ax.set_xlabel(input_features[0], fontsize=fs)
ax.set_ylabel(input_features[1], fontsize=fs)plot_decision_boundary¶
Primeiro uso em Capítulo 6, Aprendizado sensível ao custo.
def plot_decision_boundary(classifier_0,
train_df,
test_df):
fig_decision_boundary, ax = plt.subplots(1, 3, figsize=(5*3,5))
plot_decision_boundary_classifier(ax[0], classifier_0,
train_df,
title="Decision surface of the decision tree\n With training data",
plot_training_data=True)
plot_decision_boundary_classifier(ax[1], classifier_0,
train_df,
title="Decision surface of the decision tree\n",
plot_training_data=False)
plot_decision_boundary_classifier(ax[2], classifier_0,
test_df,
title="Decision surface of the decision tree\n With test data",
plot_training_data=True)
ax[-1].legend(loc='upper left',
#labels=[0,1],
bbox_to_anchor=(1.05, 1),
title="Class")
sm = plt.cm.ScalarMappable(cmap=plt.cm.RdYlBu_r, norm=plt.Normalize(vmin=0, vmax=1))
cax = fig_decision_boundary.add_axes([0.93, 0.15, 0.02, 0.5])
fig_decision_boundary.colorbar(sm, cax=cax, alpha=0.3, boundaries=np.linspace(0, 1, 11))
return fig_decision_boundaryFunções de aprendizado profundo¶
import torch
import torch.nn.functional as Fseed_everything¶
Primeiro uso em Capítulo 7, Rede neural feed-forward.
def seed_everything(seed):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = TrueUtilitários para gerenciamento de dados, treinamento e avaliação¶
Primeiro uso em Capítulo 7, Rede neural feed-forward.
class FraudDataset(torch.utils.data.Dataset):
def __init__(self, x, y):
'Initialization'
self.x = x
self.y = y
def __len__(self):
'Denotes the total number of samples'
return len(self.x)
def __getitem__(self, index):
'Generates one sample of data'
# Select sample index
if self.y is not None:
return self.x[index], self.y[index]
else:
return self.x[index]
def prepare_generators(training_set,valid_set,batch_size=64):
train_loader_params = {'batch_size': batch_size,
'shuffle': True,
'num_workers': 0}
valid_loader_params = {'batch_size': batch_size,
'num_workers': 0}
training_generator = torch.utils.data.DataLoader(training_set, **train_loader_params)
valid_generator = torch.utils.data.DataLoader(valid_set, **valid_loader_params)
return training_generator,valid_generator
def evaluate_model(model,generator,criterion):
model.eval()
batch_losses = []
for x_batch, y_batch in generator:
# Forward pass
y_pred = model(x_batch)
# Compute Loss
loss = criterion(y_pred.squeeze(), y_batch)
batch_losses.append(loss.item())
mean_loss = np.mean(batch_losses)
return mean_loss
class EarlyStopping:
def __init__(self, patience=2, verbose=False):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = np.Inf
def continue_training(self,current_score):
if self.best_score > current_score:
self.best_score = current_score
self.counter = 0
if self.verbose:
print("New best score:", current_score)
else:
self.counter+=1
if self.verbose:
print(self.counter, " iterations since best score.")
return self.counter <= self.patience
def training_loop(model,training_generator,valid_generator,optimizer,criterion,max_epochs=100,apply_early_stopping=True,patience=2,verbose=False):
#Setting the model in training mode
model.train()
if apply_early_stopping:
early_stopping = EarlyStopping(verbose=verbose,patience=patience)
all_train_losses = []
all_valid_losses = []
#Training loop
start_time=time.time()
for epoch in range(max_epochs):
model.train()
train_loss=[]
for x_batch, y_batch in training_generator:
optimizer.zero_grad()
# Forward pass
y_pred = model(x_batch)
# Compute Loss
loss = criterion(y_pred.squeeze(), y_batch)
# Backward pass
loss.backward()
optimizer.step()
train_loss.append(loss.item())
#showing last training loss after each epoch
all_train_losses.append(np.mean(train_loss))
if verbose:
print('')
print('Epoch {}: train loss: {}'.format(epoch, np.mean(train_loss)))
#evaluating the model on the test set after each epoch
valid_loss = evaluate_model(model,valid_generator,criterion)
all_valid_losses.append(valid_loss)
if verbose:
print('valid loss: {}'.format(valid_loss))
if apply_early_stopping:
if not early_stopping.continue_training(valid_loss):
if verbose:
print("Early stopping")
break
training_execution_time=time.time()-start_time
return model,training_execution_time,all_train_losses,all_valid_losses
def per_sample_mse(model,generator):
model.eval()
criterion = torch.nn.MSELoss(reduction="none")
batch_losses = []
for x_batch, y_batch in generator:
# Forward pass
y_pred = model(x_batch)
# Compute Loss
loss = criterion(y_pred.squeeze(), y_batch)
loss_app = list(torch.mean(loss,axis=1).detach().numpy())
batch_losses.extend(loss_app)
return batch_losses
class FraudDatasetForPipe(torch.utils.data.Dataset):
def __init__(self, x, y):
'Initialization'
self.x = torch.FloatTensor(x)
self.y = None
if y is not None:
self.y = torch.LongTensor(y.values)
def __len__(self):
'Denotes the total number of samples'
return len(self.x)
def __getitem__(self, index):
'Generates one sample of data'
# Select sample index
if self.y is not None:
return self.x[index], self.y[index]
else:
return self.x[index], -1
def rolling_window(array, window):
a = np.concatenate([np.ones((window-1,))*-1,array])
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides).astype(int)FraudDatasetUnsupervised¶
Primeiro uso em Capítulo 7, Autoencoders e detecção de anomalias.
class FraudDatasetUnsupervised(torch.utils.data.Dataset):
def __init__(self, x,output=True):
'Initialization'
self.x = x
self.output = output
def __len__(self):
'Denotes the total number of samples'
return len(self.x)
def __getitem__(self, index):
'Generates one sample of data'
# Select sample index
if self.output:
return self.x[index], self.x[index]
else:
return self.x[index]Módulos SimpleFraudMLPWithDropout e FraudMLP¶
Primeiro uso em Capítulo 7, Rede neural feed-forward.
class SimpleFraudMLPWithDropout(torch.nn.Module):
def __init__(self, input_size, hidden_size,p):
super(SimpleFraudMLPWithDropout, self).__init__()
# parameters
self.input_size = input_size
self.hidden_size = hidden_size
self.p = p
#input to hidden
self.fc1 = torch.nn.Linear(self.input_size, self.hidden_size)
self.relu = torch.nn.ReLU()
#hidden to output
self.fc2 = torch.nn.Linear(self.hidden_size, 1)
self.sigmoid = torch.nn.Sigmoid()
self.dropout = torch.nn.Dropout(self.p)
def forward(self, x):
hidden = self.fc1(x)
hidden = self.relu(hidden)
hidden = self.dropout(hidden)
output = self.fc2(hidden)
output = self.sigmoid(output)
return output
class FraudMLP(torch.nn.Module):
def __init__(self, input_size,hidden_size=100,num_layers=1,p=0):
super(FraudMLP, self).__init__()
# parameters
self.input_size = input_size
self.hidden_size = hidden_size
self.p = p
#input to hidden
self.fc1 = torch.nn.Linear(self.input_size, self.hidden_size)
self.relu = torch.nn.ReLU()
self.fc_hidden=[]
for i in range(num_layers-1):
self.fc_hidden.append(torch.nn.Linear(self.hidden_size, self.hidden_size))
self.fc_hidden.append(torch.nn.ReLU())
#hidden to output
self.fc2 = torch.nn.Linear(self.hidden_size, 2)
self.softmax = torch.nn.Softmax()
self.dropout = torch.nn.Dropout(self.p)
def forward(self, x):
hidden = self.fc1(x)
hidden = self.relu(hidden)
hidden = self.dropout(hidden)
for layer in self.fc_hidden:
hidden=layer(hidden)
hidden = self.dropout(hidden)
output = self.fc2(hidden)
output = self.softmax(output)
return outputMódulo SimpleAutoencoder¶
Primeiro uso em Capítulo 7, Autoencoders e detecção de anomalias.
class SimpleAutoencoder(torch.nn.Module):
def __init__(self, input_size, intermediate_size, code_size):
super(SimpleAutoencoder, self).__init__()
# parameters
self.input_size = input_size
self.intermediate_size = intermediate_size
self.code_size = code_size
self.relu = torch.nn.ReLU()
#encoder
self.fc1 = torch.nn.Linear(self.input_size, self.intermediate_size)
self.fc2 = torch.nn.Linear(self.intermediate_size, self.code_size)
#decoder
self.fc3 = torch.nn.Linear(self.code_size, self.intermediate_size)
self.fc4 = torch.nn.Linear(self.intermediate_size, self.input_size)
def forward(self, x):
hidden = self.fc1(x)
hidden = self.relu(hidden)
code = self.fc2(hidden)
code = self.relu(code)
hidden = self.fc3(code)
hidden = self.relu(hidden)
output = self.fc4(hidden)
#linear activation in final layer)
return outputMódulo Attention¶
Primeiro uso em Capítulo 7, Modelos sequenciais e aprendizado de representação.
# source : https://github.com/IBM/pytorch-seq2seq/blob/master/seq2seq/models/attention.py
class Attention(torch.nn.Module):
r"""
Applies an attention mechanism on the output features from the decoder.
.. math::
\begin{array}{ll}
x = context*output \\
attn = exp(x_i) / sum_j exp(x_j) \\
output = \tanh(w * (attn * context) + b * output)
\end{array}
Args:
dim(int): The number of expected features in the output
Inputs: output, context
- **output** (batch, output_len, dimensions): tensor containing the output features from the decoder.
- **context** (batch, input_len, dimensions): tensor containing features of the encoded input sequence.
Outputs: output, attn
- **output** (batch, output_len, dimensions): tensor containing the attended output features from the decoder.
- **attn** (batch, output_len, input_len): tensor containing attention weights.
Attributes:
linear_out (torch.nn.Linear): applies a linear transformation to the incoming data: :math:`y = Ax + b`.
mask (torch.Tensor, optional): applies a :math:`-inf` to the indices specified in the `Tensor`.
Examples::
>>> attention = seq2seq.models.Attention(256)
>>> context = Variable(torch.randn(5, 3, 256))
>>> output = Variable(torch.randn(5, 5, 256))
>>> output, attn = attention(output, context)
"""
def __init__(self, dim):
super(Attention, self).__init__()
self.linear_out = torch.nn.Linear(dim*2, dim)
self.mask = None
def set_mask(self, mask):
"""
Sets indices to be masked
Args:
mask (torch.Tensor): tensor containing indices to be masked
"""
self.mask = mask
def forward(self, output, context):
batch_size = output.size(0)
hidden_size = output.size(2)
input_size = context.size(1)
# (batch, out_len, dim) * (batch, in_len, dim) -> (batch, out_len, in_len)
attn = torch.bmm(output, context.transpose(1, 2))
if self.mask is not None:
attn.data.masked_fill_(self.mask, -float('inf'))
attn = F.softmax(attn.view(-1, input_size), dim=1).view(batch_size, -1, input_size)
# (batch, out_len, in_len) * (batch, in_len, dim) -> (batch, out_len, dim)
mix = torch.bmm(attn, context)
# concat -> (batch, out_len, 2*dim)
combined = torch.cat((mix, output), dim=2)
# output -> (batch, out_len, dim)
output = F.tanh(self.linear_out(combined.view(-1, 2 * hidden_size))).view(batch_size, -1, hidden_size)
return output, attn