Source code for pewanalytics.stats.irr

import math
import copy
import scipy
import numpy as np
import pandas as pd
from nltk.metrics import masi_distance, jaccard_distance
from nltk.metrics.agreement import AnnotationTask
from sklearn.metrics import (
    matthews_corrcoef,
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    cohen_kappa_score,
)
from sklearn.preprocessing import MultiLabelBinarizer
from statsmodels.stats.weightstats import DescrStatsW
from statsmodels.stats.inter_rater import fleiss_kappa
from pewtils import is_not_null
from scipy import spatial


[docs]def kappa_sample_size_power( rate1, rate2, k1, k0, alpha=0.05, power=0.8, twosided=False ): """ Python translation of the ``N.cohen.kappa`` function from the ``irr`` R package. Source: https://cran.r-project.org/web/packages/irr/irr.pdf :param rate1: The probability that the first rater will record a positive diagnosis :type rate1: float :param rate2: The probability that the second rater will record a positive diagnosis :type rate2: float :param k1: The true Cohen's Kappa statistic :type k1: float :param k0: The value of kappa under the null hypothesis :type k0: float :param alpha: Type I error of test :type alpha: float :param power: The desired power to detect the difference between true kappa and hypothetical kappa :type power: float :param twosided: Set this to ``True`` if the test is two-sided :param twosided: bool :return: Returns the required sample size :rtype: int """ if not twosided: d = 1 else: d = 2 pi_rater1 = 1 - rate1 pi_rater2 = 1 - rate2 pie = rate1 * rate2 + pi_rater1 * pi_rater2 pi0 = k1 * (1 - pie) + pie pi22 = (pi0 - rate1 + pi_rater2) / 2 pi11 = pi0 - pi22 pi12 = rate1 - pi11 pi21 = rate2 - pi11 pi0h = k0 * (1 - pie) + pie pi22h = (pi0h - rate1 + pi_rater2) / 2 pi11h = pi0h - pi22h pi12h = rate1 - pi11h pi21h = rate2 - pi11h Q = (1 - pie) ** (-4) * ( pi11 * (1 - pie - (rate2 + rate1) * (1 - pi0)) ** 2 + pi22 * (1 - pie - (pi_rater2 + pi_rater1) * (1 - pi0)) ** 2 + (1 - pi0) ** 2 * (pi12 * (rate2 + pi_rater1) ** 2 + pi21 * (pi_rater2 + rate1) ** 2) - (pi0 * pie - 2 * pie + pi0) ** 2 ) Qh = (1 - pie) ** (-4) * ( pi11h * (1 - pie - (rate2 + rate1) * (1 - pi0h)) ** 2 + pi22h * (1 - pie - (pi_rater2 + pi_rater1) * (1 - pi0h)) ** 2 + (1 - pi0h) ** 2 * (pi12h * (rate2 + pi_rater1) ** 2 + pi21h * (pi_rater2 + rate1) ** 2) - (pi0h * pie - 2 * pie + pi0h) ** 2 ) N = ( ( scipy.stats.norm.ppf(1 - alpha / d) * math.sqrt(Qh) + scipy.stats.norm.ppf(power) * math.sqrt(Q) ) / (k1 - k0) ) ** 2 return math.ceil(N)
[docs]def kappa_sample_size_CI(kappa0, kappaL, props, kappaU=None, alpha=0.05): """ Helps determine the required document sample size to confirm that Cohen's Kappa between coders is at or \ above a minimum threhsold. Useful in situations where multiple coders code a set of documents for a binary outcome. This function takes the observed kappa and proportion of positive cases from the sample, along with a lower-bound \ for the minimum acceptable kappa, and returns the sample size required to confirm that the coders' agreement is \ truly above that minimum level of kappa with 95% certainty. If the current sample size is below the required \ sample size returned by this function, it can provide a rough estimate of how many additional documents need \ to be coded - assuming that the coders continue agreeing and observing positive cases at the same rate. Translated from the ``kappaSize`` R package, ``CIBinary``: \ https://github.com/cran/kappaSize/blob/master/R/CIBinary.R :param kappa0: The preliminary value of kappa :param kappa0: float :param kappaL: The desired expected lower bound for a two-sided 100(1 - alpha) % confidence interval for kappa. \ Alternatively, if kappaU is set to NA, the procedure produces the number of required subjects for a one-sided \ confidence interval :type kappaL: float :param props: The anticipated prevalence of the desired trait :type props: float :param kappaU: The desired expected upper confidence limit for kappa :type kappaU: float :param alpha: The desired type I error rate :type alpha: float :return: Returns the required sample size Usage:: from pewanalytics.stats.irr import kappa_sample_size_CI observed_kappa = 0.8 desired_kappa = 0.7 observed_proportion = 0.5 >>> kappa_sample_size(observed_kappa, desired_kappa, observed_proportion) 140 """ if not kappaU: chiCrit = scipy.stats.chi2.ppf((1 - 2 * alpha), 1) if kappaL and kappaU: chiCrit = scipy.stats.chi2.ppf((1 - alpha), 1) def CalcIT(rho0, rho1, Pi, n): def P0(r, p): x = (1 - p) ** 2 + r * p * (1 - p) return x def P1(r, p): x = 2 * (1 - r) * p * (1 - p) return x def P2(r, p): x = p ** 2 + r * p * (1 - p) return x r1 = ((n * P0(r=rho0, p=Pi)) - (n * P0(r=rho1, p=Pi))) ** 2 / ( n * P0(r=rho1, p=Pi) ) r2 = ((n * P1(r=rho0, p=Pi)) - (n * P1(r=rho1, p=Pi))) ** 2 / ( n * P1(r=rho1, p=Pi) ) r3 = ((n * P2(r=rho0, p=Pi)) - (n * P2(r=rho1, p=Pi))) ** 2 / ( n * P2(r=rho1, p=Pi) ) return sum([r for r in [r1, r2, r3] if r]) n = 10 result = 0 while (result - 0.001) < chiCrit: result = CalcIT(kappa0, kappaL, props, n) n += 1 return n
[docs]def compute_scores( coder_df, coder1, coder2, outcome_column, document_column, coder_column, weight_column=None, pos_label=None, ): """ Computes a variety of inter-rater reliability scores, including Cohen's kappa, Krippendorf's alpha, precision, and recall. The input data must consist of a :py:class:`pandas.DataFrame` with the following columns: - A column with values that indicate the coder (like a name) - A column with values that indicate the document (like an ID) - A column with values that indicate the code value - (Optional) A column with document weights This function will return a :py:class:`pandas.DataFrame` with agreement scores between the two specified coders. :param coder_df: A :py:class:`pandas.DataFrame` of codes :type coder_df: :py:class:`pandas.DataFrame` :param coder1: The value in ``coder_column`` for rows corresponding to the first coder :type coder1: str or int :param coder2: The value in ``coder_column`` for rows corresponding to the second coder :type coder2: str or int :param outcome_column: The column that contains the codes :type outcome_column: str :param document_column: The column that contains IDs for the documents :type document_column: str :param coder_column: The column containing values that indicate which coder assigned the code :type coder_column: str :param weight_column: The column that contains sampling weights :type weight_column: str :param pos_label: The value indicating a positive label (optional) :type pos_label: str or int :return: A dictionary of scores :rtype: dict .. note:: If using a multi-class (non-binary) code, some scores may come back null or not compute as expected. \ We recommend running the function separately for each specific code value as a binary flag by providing \ each unique value to the ``pos_label`` argument. If ``pos_label`` is not provided for multi-class codes, \ this function will attempt to compute scores based on support-weighted averages. Usage:: from pewanalytics.stats.irr import compute_scores import pandas as pd df = pd.DataFrame([ {"coder": "coder1", "document": 1, "code": "2"}, {"coder": "coder2", "document": 1, "code": "2"}, {"coder": "coder1", "document": 2, "code": "1"}, {"coder": "coder2", "document": 2, "code": "2"}, {"coder": "coder1", "document": 3, "code": "0"}, {"coder": "coder2", "document": 3, "code": "0"}, ]) >>> compute_scores(df, "coder1", "coder2", "code", "document", "coder") {'coder1': 'coder1', 'coder2': 'coder2', 'n': 3, 'outcome_column': 'code', 'pos_label': None, 'coder1_mean_unweighted': 1.0, 'coder1_std_unweighted': 0.5773502691896257, 'coder2_mean_unweighted': 1.3333333333333333, 'coder2_std_unweighted': 0.6666666666666666, 'alpha_unweighted': 0.5454545454545454, 'accuracy': 0.6666666666666666, 'f1': 0.5555555555555555, 'precision': 0.5, 'recall': 0.6666666666666666, 'precision_recall_min': 0.5, 'matthews_corrcoef': 0.6123724356957946, 'roc_auc': None, 'pct_agree_unweighted': 0.6666666666666666} >>> compute_scores(df, "coder1", "coder2", "code", "document", "coder", pos_label="0") {'coder1': 'coder1', 'coder2': 'coder2', 'n': 3, 'outcome_column': 'code', 'pos_label': '0', 'coder1_mean_unweighted': 0.3333333333333333, 'coder1_std_unweighted': 0.3333333333333333, 'coder2_mean_unweighted': 0.3333333333333333, 'coder2_std_unweighted': 0.3333333333333333, 'alpha_unweighted': 1.0, 'cohens_kappa': 1.0, 'accuracy': 1.0, 'f1': 1.0, 'precision': 1.0, 'recall': 1.0, 'precision_recall_min': 1.0, 'matthews_corrcoef': 1.0, 'roc_auc': 1.0, 'pct_agree_unweighted': 1.0} >>> compute_scores(df, "coder1", "coder2", "code", "document", "coder", pos_label="1") {'coder1': 'coder1', 'coder2': 'coder2', 'n': 3, 'outcome_column': 'code', 'pos_label': '1', 'coder1_mean_unweighted': 0.3333333333333333, 'coder1_std_unweighted': 0.3333333333333333, 'coder2_mean_unweighted': 0.0, 'coder2_std_unweighted': 0.0, 'alpha_unweighted': 0.0, 'cohens_kappa': 0.0, 'accuracy': 0.6666666666666666, 'f1': 0.0, 'precision': 0.0, 'recall': 0.0, 'precision_recall_min': 0.0, 'matthews_corrcoef': 1.0, 'roc_auc': None, 'pct_agree_unweighted': 0.6666666666666666} >>> compute_scores(df, "coder1", "coder2", "code", "document", "coder", pos_label="2") {'coder1': 'coder1', 'coder2': 'coder2', 'n': 3, 'outcome_column': 'code', 'pos_label': '2', 'coder1_mean_unweighted': 0.3333333333333333, 'coder1_std_unweighted': 0.3333333333333333, 'coder2_mean_unweighted': 0.6666666666666666, 'coder2_std_unweighted': 0.3333333333333333, 'alpha_unweighted': 0.4444444444444444, 'cohens_kappa': 0.3999999999999999, 'accuracy': 0.6666666666666666, 'f1': 0.6666666666666666, 'precision': 0.5, 'recall': 1.0, 'precision_recall_min': 0.5, 'matthews_corrcoef': 0.5, 'roc_auc': 0.75, 'pct_agree_unweighted': 0.6666666666666666} """ old_np_settings = np.seterr(all="raise") coder_df = copy.deepcopy(coder_df) if pos_label: coder_df[outcome_column] = (coder_df[outcome_column] == pos_label).astype(int) coder1_df = coder_df[coder_df[coder_column] == coder1] coder1_df.index = coder1_df[document_column] coder2_df = coder_df[coder_df[coder_column] == coder2] coder2_df.index = coder2_df[document_column] coder1_df = coder1_df[coder1_df.index.isin(coder2_df.index)] coder2_df = coder2_df[coder2_df.index.isin(coder1_df.index)].loc[coder1_df.index] row = { "coder1": coder1, "coder2": coder2, "n": len(coder1_df), "outcome_column": outcome_column, "pos_label": pos_label, } for labelsetname, labelset in [ ("coder1", coder1_df[outcome_column]), ("coder2", coder2_df[outcome_column]), ]: if weight_column: try: weighted_stats = DescrStatsW(labelset, weights=coder1_df[weight_column]) if weighted_stats: row["{}_mean".format(labelsetname)] = weighted_stats.mean row["{}_std".format(labelsetname)] = weighted_stats.std_mean except (TypeError, ValueError): try: weighted_stats = DescrStatsW( labelset.astype(int), weights=coder1_df[weight_column] ) if weighted_stats: row["{}_mean".format(labelsetname)] = weighted_stats.mean row["{}_std".format(labelsetname)] = weighted_stats.std_mean except (TypeError, ValueError): pass try: unweighted_stats = DescrStatsW(labelset, weights=[1.0 for x in labelset]) if unweighted_stats: row["{}_mean_unweighted".format(labelsetname)] = unweighted_stats.mean row[ "{}_std_unweighted".format(labelsetname) ] = unweighted_stats.std_mean except (TypeError, ValueError): try: unweighted_stats = DescrStatsW( labelset.astype(int), weights=[1.0 for x in labelset] ) if unweighted_stats: row[ "{}_mean_unweighted".format(labelsetname) ] = unweighted_stats.mean row[ "{}_std_unweighted".format(labelsetname) ] = unweighted_stats.std_mean except (TypeError, ValueError): pass alpha = AnnotationTask( data=coder_df[[coder_column, document_column, outcome_column]].values ) try: alpha = alpha.alpha() except (ZeroDivisionError, ValueError): alpha = None row["alpha_unweighted"] = alpha labels = np.unique(coder_df[outcome_column]) if len(labels) <= 2: try: row["cohens_kappa"] = cohen_kappa_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, labels=labels, ) except FloatingPointError: row["cohens_kappa"] = 1.0 try: row["accuracy"] = accuracy_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, ) except ValueError: row["accuracy"] = None try: row["f1"] = f1_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, labels=labels, average="weighted" if not pos_label else "binary", ) except ValueError: row["f1"] = None try: row["precision"] = precision_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, labels=labels, average="weighted" if not pos_label else "binary", ) except ValueError: row["precision"] = None try: row["recall"] = recall_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, labels=labels, average="weighted" if not pos_label else "binary", ) except ValueError: row["recall"] = None if is_not_null(row["precision"]) and is_not_null(row["recall"]): row["precision_recall_min"] = min([row["precision"], row["recall"]]) else: row["precision_recall_min"] = None try: row["matthews_corrcoef"] = matthews_corrcoef( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, ) except ValueError: row["matthews_corrcoef"] = None except FloatingPointError: row["matthews_corrcoef"] = 1.0 if row["accuracy"] == 1.0: # In newer versions of sklearn, perfect agreement returns zero incorrectly # We'll use the accuracy score to detect perfect agreement and correct it: row["matthews_corrcoef"] = 1.0 try: row["roc_auc"] = ( roc_auc_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, labels=labels, average="weighted" if not pos_label else None, ) if len(np.unique(coder1_df[outcome_column])) > 1 and len(np.unique(coder2_df[outcome_column])) > 1 else None ) except TypeError: try: row["roc_auc"] = ( roc_auc_score( coder1_df[outcome_column], coder2_df[outcome_column], sample_weight=coder1_df[weight_column] if weight_column else None, average="weighted" if not pos_label else None, ) if len(np.unique(coder1_df[outcome_column])) > 1 and len(np.unique(coder2_df[outcome_column])) > 1 else None ) except (ValueError, TypeError): row["roc_auc"] = None except (ValueError, TypeError): row["roc_auc"] = None row["pct_agree_unweighted"] = np.average( [ 1 if c[0] == c[1] else 0 for c in zip(coder1_df[outcome_column], coder2_df[outcome_column]) ] ) for k, v in row.items(): if type(v) == tuple: row[k] = v[0] # For some weird reason, some of the sklearn scorers return 1-tuples sometimes np.seterr(**old_np_settings) return row
[docs]def compute_overall_scores(coder_df, outcome_column, document_column, coder_column): """ Computes overall inter-rater reliability scores (Krippendorf's Alpha and Fleiss' Kappa). Allows for more than two \ coders and code values. The input data must consist of a :py:class:`pandas.DataFrame` with the following columns: - A column with values that indicate the coder (like a name) - A column with values that indicate the document (like an ID) - A column with values that indicate the code value :param coder_df: A :py:class:`pandas.DataFrame` of codes :type coder_df: :py:class:`pandas.DataFrame` :param outcome_column: The column that contains the codes :type outcome_column: str :param document_column: The column that contains IDs for the documents :type document_column: str :param coder_column: The column containing values that indicate which coder assigned the code :type coder_column: str :return: A dictionary containing the scores :rtype: dict Usage:: from pewanalytics.stats.irr import compute_overall_scores import pandas as pd df = pd.DataFrame([ {"coder": "coder1", "document": 1, "code": "2"}, {"coder": "coder2", "document": 1, "code": "2"}, {"coder": "coder1", "document": 2, "code": "1"}, {"coder": "coder2", "document": 2, "code": "2"}, {"coder": "coder1", "document": 3, "code": "0"}, {"coder": "coder2", "document": 3, "code": "0"}, ]) >>> compute_overall_scores(df, "code", "document", "coder") {'alpha': 0.5454545454545454, 'fleiss_kappa': 0.4545454545454544} """ alpha = AnnotationTask( data=coder_df[[coder_column, document_column, outcome_column]].values ) try: alpha = alpha.alpha() except (ZeroDivisionError, ValueError): alpha = None grouped = coder_df.groupby(document_column).count() complete_docs = grouped[ grouped[coder_column] == len(coder_df[coder_column].unique()) ].index dataset = coder_df[coder_df[document_column].isin(complete_docs)] df = dataset.groupby([outcome_column, document_column]).count()[[coder_column]] df = df.unstack(outcome_column).fillna(0) if len(df) > 0: kappa = fleiss_kappa(df) else: kappa = None return {"alpha": alpha, "fleiss_kappa": kappa}
[docs]def compute_overall_scores_multivariate( coder_df, document_column, coder_column, outcome_columns ): """ Computes overall inter-rater reliability scores (Krippendorf's Alpha and Fleiss' Kappa). Allows for more than two \ coders, code values, AND variables. All variables and values will be converted into a matrix of dummy variables, \ and Alpha and Kappa will be computed using four different distance metrics: - Discrete agreement (exact agreement across all outcome columns) - Jaccard coefficient - MASI distance - Cosine similarity The input data must consist of a :py:class:`pandas.DataFrame` with the following \ columns: - A column with values that indicate the coder (like a name) - A column with values that indicate the document (like an ID) - One or more columns with values that indicate code values This code was adapted from a very helpful StackExchange post: https://stats.stackexchange.com/questions/511927/interrater-reliability-with-multi-rater-multi-label-dataset :param coder_df: A :py:class:`pandas.DataFrame` of codes :type coder_df: :py:class:`pandas.DataFrame` :param document_column: The column that contains IDs for the documents :type document_column: str :param coder_column: The column containing values that indicate which coder assigned the code :type coder_column: str :param outcome_columns: The columns that contains the codes :type outcome_columns: list :return: A dictionary containing the scores :rtype: dict Usage:: from pewanalytics.stats.irr import compute_overall_scores_multivariate import pandas as pd coder_df = pd.DataFrame([ {"coder": "coder1", "document": 1, "code": "2"}, {"coder": "coder2", "document": 1, "code": "2"}, {"coder": "coder1", "document": 2, "code": "1"}, {"coder": "coder2", "document": 2, "code": "2"}, {"coder": "coder1", "document": 3, "code": "0"}, {"coder": "coder2", "document": 3, "code": "0"}, ]) >>> compute_overall_scores_multivariate(coder_df, 'document', 'coder', ["code"]) {'fleiss_kappa_discrete': 0.4545454545454544, 'fleiss_kappa_jaccard': 0.49999999999999994, 'fleiss_kappa_masi': 0.49999999999999994, 'fleiss_kappa_cosine': 0.49999999999999994, 'alpha_discrete': 0.5454545454545454, 'alpha_jaccard': 0.5454545454545454, 'alpha_masi': 0.5454545454545454, 'alpha_cosine': 0.5454545454545454} coder_df = pd.DataFrame([ {"coder": "coder1", "document": 1, "code1": "2", "code2": "1"}, {"coder": "coder2", "document": 1, "code1": "2", "code2": "1"}, {"coder": "coder1", "document": 2, "code1": "1", "code2": "0"}, {"coder": "coder2", "document": 2, "code1": "2", "code2": "1"}, {"coder": "coder1", "document": 3, "code1": "0", "code2": "0"}, {"coder": "coder2", "document": 3, "code1": "0", "code2": "0"}, ]) >>> compute_overall_scores_multivariate(coder_df, 'document', 'coder', ["code1", "code2"]) {'fleiss_kappa_discrete': 0.4545454545454544, 'fleiss_kappa_jaccard': 0.49999999999999994, 'fleiss_kappa_masi': 0.49999999999999994, 'fleiss_kappa_cosine': 0.49999999999999994, 'alpha_discrete': 0.5454545454545454, 'alpha_jaccard': 0.5161290322580645, 'alpha_masi': 0.5361781076066792, 'alpha_cosine': 0.5} """ dummies = pd.concat( [pd.get_dummies(coder_df[c], prefix=c) for c in outcome_columns], axis=1 ) outcome_columns = dummies.columns coder_df = pd.concat([coder_df, dummies], axis=1) coder_df = coder_df.dropna(subset=outcome_columns) coder_df["value"] = coder_df.apply( lambda x: frozenset([col for col in outcome_columns if int(x[col]) == 1]), axis=1, ) discrete_task = AnnotationTask( data=coder_df[[coder_column, document_column, "value"]].values ) kappa_discrete = discrete_task.multi_kappa() alpha_discrete = discrete_task.alpha() task_data = coder_df[[coder_column, document_column, "value"]].values task_data = [tuple(t) for t in task_data] jaccard_task = AnnotationTask(data=task_data, distance=jaccard_distance) masi_task = AnnotationTask(data=task_data, distance=masi_distance) mlb = MultiLabelBinarizer() task_data = [ (r[0][0], r[0][1], tuple(r[1])) for r in zip(task_data, mlb.fit_transform([t[2] for t in task_data])) ] cosine_task = AnnotationTask(data=task_data, distance=spatial.distance.cosine) kappa_jaccard = jaccard_task.multi_kappa() kappa_masi = masi_task.multi_kappa() kappa_cosine = cosine_task.multi_kappa() alpha_jaccard = jaccard_task.alpha() alpha_masi = masi_task.alpha() alpha_cosine = cosine_task.alpha() return { "fleiss_kappa_discrete": kappa_discrete, "fleiss_kappa_jaccard": kappa_jaccard, "fleiss_kappa_masi": kappa_masi, "fleiss_kappa_cosine": kappa_cosine, "alpha_discrete": alpha_discrete, "alpha_jaccard": alpha_jaccard, "alpha_masi": alpha_masi, "alpha_cosine": alpha_cosine, }