imb-training-testing-binding-thrombin.py 10.6 KB
# -*- encoding: utf-8 -*-

import os
from time import time
import argparse
from sklearn.naive_bayes import BernoulliNB
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, \
    classification_report
from sklearn.externals import joblib
from sklearn import model_selection
from scipy.sparse import csr_matrix
import scipy
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler

__author__ = 'CMendezC'

# Goal: training, crossvalidation and testing binding thrombin data set

# Parameters:
# 1) --inputPath Path to read input files.
# 2) --inputTrainingData File to read training data.
# 3) --inputTestingData File to read testing data.
# 4) --inputTestingClasses File to read testing classes.
# 5) --outputModelPath Path to place output model.
# 6) --outputModelFile File to place output model.
# 7) --outputReportPath Path to place evaluation report.
# 8) --outputReportFile File to place evaluation report.
# 9) --classifier Classifier: BernoulliNB, SVM, kNN.
# 10) --saveData Save matrices
# 11) --kernel Kernel
# 12) --imbalanced Imbalanced method

# Ouput:
# 1) Classification model and evaluation report.

# Execution:

# python imb-training-testing-binding-thrombin.py
# --inputPath /home/binding-thrombin-dataset
# --inputTrainingData thrombin.data
# --inputTestingData Thrombin.testset
# --inputTestingClasses Thrombin.testset.class
# --outputModelPath /home/binding-thrombin-dataset/models
# --outputModelFile SVM-lineal-model.mod
# --outputReportPath /home/binding-thrombin-dataset/reports
# --outputReportFile SVM-lineal.txt
# --classifier SVM
# --saveData
# --kernel linear
# --imbalanced RandomUS

# source activate python3
# python imb-training-testing-binding-thrombin.py --inputPath /home/binding-thrombin-dataset --inputTrainingData thrombin.data --inputTestingData Thrombin.testset --inputTestingClasses Thrombin.testset.class --outputModelPath /home/binding-thrombin-dataset/models --outputModelFile SVM-lineal-model.mod --outputReportPath /home/binding-thrombin-dataset/reports --outputReportFile SVM-lineal-RandomUS.txt --classifier SVM  --kernel linear --imbalanced RandomUS

###########################################################
#                       MAIN PROGRAM                      #
###########################################################

if __name__ == "__main__":
    # Parameter definition
    parser = argparse.ArgumentParser(description='Training validation Binding Thrombin Dataset.')
    parser.add_argument("--inputPath", dest="inputPath",
                      help="Path to read input files", metavar="PATH")
    parser.add_argument("--inputTrainingData", dest="inputTrainingData",
                      help="File to read training data", metavar="FILE")
    parser.add_argument("--inputTestingData", dest="inputTestingData",
                      help="File to read testing data", metavar="FILE")
    parser.add_argument("--inputTestingClasses", dest="inputTestingClasses",
                      help="File to read testing classes", metavar="FILE")
    parser.add_argument("--outputModelPath", dest="outputModelPath",
                      help="Path to place output model", metavar="PATH")
    parser.add_argument("--outputModelFile", dest="outputModelFile",
                      help="File to place output model", metavar="FILE")
    parser.add_argument("--outputReportPath", dest="outputReportPath",
                      help="Path to place evaluation report", metavar="PATH")
    parser.add_argument("--outputReportFile", dest="outputReportFile",
                      help="File to place evaluation report", metavar="FILE")
    parser.add_argument("--classifier", dest="classifier",
                      help="Classifier", metavar="NAME",
                      choices=('BernoulliNB', 'SVM', 'kNN'), default='SVM')
    parser.add_argument("--saveData", dest="saveData", action='store_true',
                      help="Save matrices")
    parser.add_argument("--kernel", dest="kernel",
                      help="Kernel SVM", metavar="NAME",
                      choices=('linear', 'rbf', 'poly'), default='linear')
    parser.add_argument("--imbalanced", dest="imbalanced",
                      choices=('RandomUS', 'RandomOS'), default=None,
                      help="Undersampling: RandomUS. Oversampling: RandomOS", metavar="TEXT")

    args = parser.parse_args()

    # Printing parameter values
    print('-------------------------------- PARAMETERS --------------------------------')
    print("Path to read input files: " + str(args.inputPath))
    print("File to read training data: " + str(args.inputTrainingData))
    print("File to read testing data: " + str(args.inputTestingData))
    print("File to read testing classes: " + str(args.inputTestingClasses))
    print("Path to place output model: " + str(args.outputModelPath))
    print("File to place output model: " + str(args.outputModelFile))
    print("Path to place evaluation report: " + str(args.outputReportPath))
    print("File to place evaluation report: " + str(args.outputReportFile))
    print("Classifier: " + str(args.classifier))
    print("Save matrices: " + str(args.saveData))
    print("Kernel: " + str(args.kernel))
    print("Imbalanced: " + str(args.imbalanced))

    # Start time
    t0 = time()

    print("Reading training data and true classes...")
    X_train = None
    if args.saveData:
        y_train = []
        trainingData = []
        with open(os.path.join(args.inputPath, args.inputTrainingData), encoding='utf8', mode='r') \
                as iFile:
            for line in iFile:
                line = line.strip('\r\n')
                listLine = line.split(',')
                y_train.append(listLine[0])
                trainingData.append(listLine[1:])
        # X_train = np.matrix(trainingData)
        X_train = csr_matrix(trainingData, dtype='double')
        print("   Saving matrix and classes...")
        joblib.dump(X_train, os.path.join(args.outputModelPath, args.inputTrainingData + '.jlb'))
        joblib.dump(y_train, os.path.join(args.outputModelPath, args.inputTrainingData + '.class.jlb'))
        print("      Done!")
    else:
        print("   Loading matrix and classes...")
        X_train = joblib.load(os.path.join(args.outputModelPath, args.inputTrainingData + '.jlb'))
        y_train = joblib.load(os.path.join(args.outputModelPath, args.inputTrainingData + '.class.jlb'))
        print("      Done!")

    print("   Number of training classes: {}".format(len(y_train)))
    print("   Number of training class A: {}".format(y_train.count('A')))
    print("   Number of training class I: {}".format(y_train.count('I')))
    print("   Shape of training matrix: {}".format(X_train.shape))

    if args.imbalanced != None:
        t1 = time()
        # Combination over and under sampling
        jobs = 15
        if args.imbalanced == "RandomOS":
            sm = RandomOverSampler(random_state=42)
        # Under sampling
        elif args.imbalanced == "RandomUS":
            sm = RandomUnderSampler(random_state=42)

        # Apply transformation
        X_train, y_train = sm.fit_sample(X_train, y_train)

        print("  After transformtion with {}".format(args.imbalanced))
        print("   Number of training classes: {}".format(len(y_train)))
        print("   Number of training class A: {}".format(list(y_train).count('A')))
        print("   Number of training class I: {}".format(list(y_train).count('I')))
        print("   Shape of training matrix: {}".format(X_train.shape))
        print("      Data transformation done in : %fs" % (time() - t1))

    print("Reading testing data and true classes...")
    X_test = None
    if args.saveData:
        y_test = []
        testingData = []
        with open(os.path.join(args.inputPath, args.inputTestingData), encoding='utf8', mode='r') \
                as iFile:
            for line in iFile:
                line = line.strip('\r\n')
                listLine = line.split(',')
                testingData.append(listLine[1:])
        X_test = csr_matrix(testingData, dtype='double')
        with open(os.path.join(args.inputPath, args.inputTestingClasses), encoding='utf8', mode='r') \
                as iFile:
            for line in iFile:
                line = line.strip('\r\n')
                y_test.append(line)
        print("   Saving matrix and classes...")
        joblib.dump(X_test, os.path.join(args.outputModelPath, args.inputTestingData + '.jlb'))
        joblib.dump(y_test, os.path.join(args.outputModelPath, args.inputTestingClasses + '.class.jlb'))
        print("      Done!")
    else:
        print("   Loading matrix and classes...")
        X_test = joblib.load(os.path.join(args.outputModelPath, args.inputTestingData + '.jlb'))
        y_test = joblib.load(os.path.join(args.outputModelPath, args.inputTestingClasses + '.class.jlb'))
        print("      Done!")

    print("   Number of testing classes: {}".format(len(y_test)))
    print("   Number of testing class A: {}".format(y_test.count('A')))
    print("   Number of testing class I: {}".format(y_test.count('I')))
    print("   Shape of testing matrix: {}".format(X_test.shape))

    if args.classifier == 'SVM':
        # SVM
        myClassifier = SVC(kernel=args.kernel)
    elif args.classifier == 'BernoulliNB':
        # BernoulliNB
        myClassifier = BernoulliNB()
    elif args.classifier == 'kNN':
        # kNN
        myClassifier = KNeighborsClassifier()
    else:
        print("Bad classifier")
        exit()
    print("   Done!")

    print("Training...")
    myClassifier.fit(X_train, y_train)
    print("   Done!")

    y_pred = myClassifier.predict(X_test)
    print("   Done!")

    print("Saving report...")
    with open(os.path.join(args.outputReportPath, args.outputReportFile), mode='w', encoding='utf8') as oFile:
        oFile.write('**********        EVALUATION REPORT     **********\n')
        oFile.write('Classifier: {}\n'.format(args.classifier))
        oFile.write('Kernel: {}\n'.format(args.kernel))
        oFile.write('Accuracy: {}\n'.format(accuracy_score(y_test, y_pred)))
        oFile.write('Precision: {}\n'.format(precision_score(y_test, y_pred, average='weighted')))
        oFile.write('Recall: {}\n'.format(recall_score(y_test, y_pred, average='weighted')))
        oFile.write('F-score: {}\n'.format(f1_score(y_test, y_pred, average='weighted')))
        oFile.write('Confusion matrix: \n')
        oFile.write(str(confusion_matrix(y_test, y_pred)) + '\n')
        oFile.write('Classification report: \n')
        oFile.write(classification_report(y_test, y_pred) + '\n')
    print("   Done!")

    print("Training and testing done in: %fs" % (time() - t0))