Posted on October 19, 2017 by Harish Krishnamurthy .
At stampedecon, AI Summit in St. Louis, Colaberry consulting presented how to take a complex idea in the AI domain, apply ML algorithms on it and deploy it in production using the refactored.ai platform. More details are available including code examples on the platform. This, as we see, is a common area of interest to many Organizations, large & small. To achieve this, it necessitates an efficient pipeline that is effective in having Data Scientists, Machine Learning Engineers and Data Engineers working in collaboration.
Often Machine Learning Engineers build models that are used by Data Scientists who apply statistical techniques to tweak the models for improving the accuracy of generated outputs. This by no means is the only arrangement that exists across companies. We do see treatment of all areas of AI as the same which affects the organization negatively as people with misaligned skills are expected to work on problems that aren’t their fit. This is largely due to lack of understanding of the history of the domain itself by the people in powers of decision making. Also it is easier to teach programming to people with Math backgrounds (such as EE, Math, Stats, Biostats who are a better fit for Data Analytics/AI than CS grads who generally lack skills in Linear Algebra/Probability) than teaching Math to programmers. However, the discussion elucidated here is mostly to deal with the coding guidelines. We shall consider a problem in the AI domain and see what steps we can take to production.
Building a Credit Classifier
Consider a credit rating system where the objective is to classify the datasets into good credit and bad credit.
German Credit Data
German Credit Data is a dataset in UCI Repositary having information of credit of various customers. Our task is to segregate customers into Good Credit customers and Bad Credit customers. The data is very extensive and consists of 20 attributes, maily categorical. The dataset was provided by Prof. Hofmann and contains categorical/symbolic attributes.
Spark for Raw Data Ingestion
The raw data needs to be sampled so that we can start out to extract features from it for modeling.
# Load and parse the data credit_data = spark.read.format("csv").load("/german_credit.txt", sep=" ") training_sample = credit_data.sample(False, 0.1, 20171001)
- We shall start out by adding a new column called ‘status’ and assigning it as either good/bad from the ‘good/bad’ column.
%matplotlib inline from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn import model_selection from sklearn.metrics import classification_report, confusion_matrix,accuracy_score, roc_curve, roc_auc_score from sklearn.tree import DecisionTreeClassifier from sklearn.neighbors import KNeighborsClassifier from sklearn.naive_bayes import GaussianNB from sklearn.svm import SVC from sklearn.preprocessing import scale import numpy as np import pandas as pd import seaborn as sns import numpy as np import matplotlib.pyplot as plt german_data = pd.read_csv('../data/german.txt', sep=" ") columns = ['checkin_acc', 'duration', 'credit_history', 'purpose', 'amount', 'saving_acc', 'present_emp_since', 'inst_rate', 'personal_status', 'other_debtors', 'residing_since', 'property', 'age', 'inst_plans', 'housing', 'num_credits', 'job', 'dependents', 'telephone', 'foreign_worker', 'good/bad'] german_data.columns = columns german_data["status"] = np.where(german_data['good/bad'] == 1, "Good", "Bad") german_data.head()
Exploratory Data Analysis
- Countplot of status, Histogram of credit amount, Amount comparison by credit status, Age of Borrowers, Regression plot of duration vs amount.
try: german_data = german_data.drop("status", 1) except: print("Status is dropped") german_data.head(5) german_data['good/bad'] = german_data['good/bad']-1 features = ['checkin_acc', 'credit_history', 'purpose','saving_acc', 'present_emp_since', 'personal_status', 'other_debtors', 'property','inst_plans', 'housing','job', 'telephone', 'foreign_worker'] credit_features = pd.get_dummies(german_data, prefix=features, columns=features) credit_features.head()
Prepare Dataset for Modeling
Split for modeling.
X=credit_features.drop('good/bad',1) Y=credit_features['good/bad'] #Standardizing the dataset names = list(X.columns.values) num=names[:5] cat=names[5:] #Performing the Scaling funcion X_scale=pd.DataFrame(scale(X[num])) X_scale.columns = num X = pd.concat((X_scale,X[cat]), axis=1) #Split the Dataset into Train and Test X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3, random_state=0)
Let us apply various classifiers to the same dataset and discover the best performing model.
- Append the CV scores to the list, cv_scores.
- Make predictions on test set with the best model, best_model (var name) and assign the predictions to variable, y_hat.
Machine Learning Solutions
- We see that the models have not used any causal feautures. Looking at causal features would involve looking up research journals in the area and going for a custom implementation.
- During the problem discovery phase it is good to look up ML solutions.
models =  models.append(('KNN', KNeighborsClassifier())) models.append(('CART', DecisionTreeClassifier())) models.append(('NB', GaussianNB())) models.append(('SVM', SVC())) seed = 7 scoring = 'accuracy' cv_scores =  names =  accuracy_scores = list() for name, model in models: kfold = model_selection.KFold(n_splits=10, random_state=seed) cv_results = model_selection.cross_val_score(model, X_train, y_train, cv=kfold, scoring=scoring) cv_scores.append(cv_results) names.append(name) msg = "%s: %f (%f)" % (name, cv_results.mean(), cv_results.std()) pd.DataFrame(accuracy_scores.append(msg)) #Printing the accuracy achieved by each model print(accuracy_scores) #PLotting the model comparision as box plot fig = plt.figure() fig.suptitle('Algorithm Comparison') ax = fig.add_subplot(111) plt.boxplot(cv_scores) ax.set_xticklabels(names) plt.show()
# Make predictions on Test dataset using SVM. best_model = SVC() best_model.fit(X_train, y_train) y_hat = best_model.predict(X_test) svm_score = accuracy_score(y_test, y_hat) print(svm_score) print(confusion_matrix(y_test, y_hat)) print(classification_report(y_test, y_hat)) 0.75 [[197 17] [ 58 28]] precision recall f1-score support 0 0.77 0.92 0.84 214 1 0.62 0.33 0.43 86 avg / total 0.73 0.75 0.72 300
Scaling to Big Data
Since we have already done the cleaning in the experiments phase, let us borrow some portions of data cleaning build out the ingestion portion. We shall use a python notebook to experiment and set up the ingestion pipeline.
- Copy the dataframe map functions and other related data cleaning regular expressions to Spark.
- Pandas dataframe & Spark dataframes have similar functions.
- Save ingested data, sample.
- Save feature vectors, feature sample.
Data Ingestion Spark Module
This module can packaged as an ingestion module and added to the automation tasks by the IT.
* The module uses regular expressions, map, reduce functions in spark. ``` # display(dbutils.fs.ls("dbfs:/FileStore/tables/ohvubrzw1507843246878/")) from pyspark.mllib.classification import SVMWithSGD, SVMModel from pyspark.mllib.regression import LabeledPoint from pyspark.sql import SQLContext from copy import deepcopy sqlContext = SQLContext(sc) credit_df = spark.read.format("csv").option("header", "true").option("inferSchema", "True").load("dbfs:/FileStore/tables/ohvubrzw1507843246878/german_credit.csv") credit_df_sample = credit_features.sample(False, 1e-2, 20171010) credit_df.write.parquet("dbfs:/FileStore/tables/credit_df.parquet") credit_df_sample.write.parquet("dbfs:/FileStore/tables/credit_df_sample.parquet") credit_data = credit_df.rdd feature_cols = credit_df.columns target = 'Creditability' feature_cols.pop(0) def amount_class(amount): ''' Classify the amount into different classes. Args: amount (float): Total amount Returns: class (int): Type of class. ''' if amount < 5000: return 1 elif amount < 10000: return 2 elif amount < 15000: return 3 else: return 4 credit_data_new = credit_data.map(lambda row: (row, amount_class(row['Credit Amount'])))
Use the Spark Machine Learning Library to train the SVM on a sample of the dataset. This module can involve custom ML implementations depending on the type of problems.
- Increase sample size as the training succeeds.
- Ingest all data
``` def features(row): ''' Gathers features from the dataset. Args: row (rdd): Each row in the dataset. Returns: (LabeledPoint): Labeled Point of each row. ''' feature_map = row.asDict() label = float(feature_map['Creditability']) feature_list = [float(feature_map[feature]) for feature in feature_cols] return LabeledPoint(label, feature_list) credit_features = credit_data.map(features) credit_features_df = credit_features.toDF() credit_features_df.write.parquet("dbfs:/FileStore/tables/credit.parquet") # Sample the features and save it. credit_sample_df = credit_features.sample(False, 1e-2, 20171010) credit_sample_df.write.parquet("dbfs:/FileStore/tables/credit_sample.parquet") # Read the credit data features credit_features = sqlContext.parquetFile('dbfs:/FileStore/tables/credit.parquet') model = SVMWithSGD.train(credit_features, iterations=100) # Evaluating the model on training data labels_preds = lp.map(lambda p: (p.label, model.predict(p.features))) trainErr = labels_preds.filter(lambda lp: lp != lp).count() / float(lp.count()) print("Training Error = " + str(trainErr)) # Print the coefficients and intercept for linearsSVC print("Coefficients: " + str(model.weights)) print("Intercept: " + str(model.intercept)) # Save and load model model.save(sc, "credit/SVMWithSGDModel")