"""Models for Gradec."""
import os.path as op
import numpy as np
import pandas as pd
from nimare.annotate.text import generate_counts
from nimare.base import NiMAREBase
from nimare.utils import _check_ncores, get_resource_path
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from gradec.fetcher import _fetch_neuroquery_counts
def _generate_counts(
text_df,
vocabulary=None,
text_column="abstract",
tfidf=True,
min_df=0.01,
max_df=0.99,
):
"""Generate tf-idf/counts weights for unigrams/bigrams derived from textual data.
Parameters
----------
text_df : (D x 2) :obj:`pandas.DataFrame`
A DataFrame with two columns ('id' and 'text'). D = document.
Returns
-------
weights_df : (D x T) :obj:`pandas.DataFrame`
A DataFrame where the index is 'id' and the columns are the
unigrams/bigrams derived from the data. D = document. T = term.
"""
if text_column not in text_df.columns:
raise ValueError(f"Column '{text_column}' not found in DataFrame")
# Remove rows with empty text cells
orig_ids = text_df["id"].tolist()
text_df = text_df.fillna("")
keep_ids = text_df.loc[text_df[text_column] != "", "id"]
text_df = text_df.loc[text_df["id"].isin(keep_ids)]
if len(keep_ids) != len(orig_ids):
print(f"\t\tRetaining {len(keep_ids)}/{len(orig_ids)} studies", flush=True)
ids = text_df["id"].tolist()
text = text_df[text_column].tolist()
stoplist = op.join(get_resource_path(), "neurosynth_stoplist.txt")
with open(stoplist, "r") as fo:
stop_words = fo.read().splitlines()
if tfidf:
vectorizer = TfidfVectorizer(
min_df=min_df,
max_df=max_df,
ngram_range=(1, 2),
vocabulary=vocabulary,
stop_words=stop_words,
)
else:
vectorizer = CountVectorizer(
min_df=min_df,
max_df=max_df,
ngram_range=(1, 2),
vocabulary=vocabulary,
stop_words=stop_words,
)
weights = vectorizer.fit_transform(text).toarray()
names = vectorizer.get_feature_names_out()
names = [str(name) for name in names]
weights_df = pd.DataFrame(weights, columns=names, index=ids)
weights_df.index.name = "id"
return weights_df
def _get_counts(dataset, dataset_nm, feature_group):
"""Get counts weights for unigrams/bigrams derived from textual data."""
if dataset_nm == "neurosynth":
feature_names = dataset.annotations.columns.values
feature_names = [f for f in feature_names if f.startswith(feature_group)]
vocabulary = [f.split("__")[-1] for f in feature_names]
counts_df = _generate_counts(
dataset.texts,
vocabulary=vocabulary,
text_column="abstract",
tfidf=False,
max_df=len(dataset.ids) - 2,
min_df=2,
)
elif dataset_nm == "neuroquery":
counts_arr = _fetch_neuroquery_counts()
# Generate the IDs from original id list (without sorting)
ids = dataset.annotations.sort_index()["id"].tolist()
feature_names = dataset.annotations.columns.values
feature_names = [f for f in feature_names if f.startswith(feature_group)]
vocabulary = [f.split("__")[-1] for f in feature_names]
counts_df = pd.DataFrame(counts_arr, columns=vocabulary, index=ids)
counts_df.index.name = "id"
# Sorting by id to match the sorting perform in NiMARE Dataset
counts_df = counts_df.sort_index()
return counts_df
[docs]
class LDAModel(NiMAREBase):
"""Generate a latent Dirichlet allocation (LDA) topic model.
This class is a light wrapper around scikit-learn tools for tokenization and LDA.
Parameters
----------
n_topics : :obj:`int`
Number of topics for topic model. This corresponds to the model's ``n_components``
parameter. Must be an integer >= 1.
max_iter : :obj:`int`, optional
Maximum number of iterations to use during model fitting. Default = 1000.
alpha : :obj:`float` or None, optional
The ``alpha`` value for the model. This corresponds to the model's ``doc_topic_prior``
parameter. Default is None, which evaluates to ``1 / n_topics``,
as was used in :footcite:t:`poldrack2012discovering`.
beta : :obj:`float` or None, optional
The ``beta`` value for the model. This corresponds to the model's ``topic_word_prior``
parameter. If None, it evaluates to ``1 / n_topics``.
Default is 0.001, which was used in :footcite:t:`poldrack2012discovering`.
text_column : :obj:`str`, optional
The source of text to use for the model. This should correspond to an existing column
in the :py:attr:`~nimare.dataset.Dataset.texts` attribute. Default is "abstract".
n_cores : :obj:`int`, optional
Number of cores to use for parallelization.
If <=0, defaults to using all available cores.
Default is 1.
Attributes
----------
model : :obj:`~sklearn.decomposition.LatentDirichletAllocation`
Notes
-----
Adapted from: https://github.com/neurostuff/NiMARE/blob/main/nimare/annotate/lda.py.
Latent Dirichlet allocation was first developed in :footcite:t:`blei2003latent`,
and was first applied to neuroimaging articles in :footcite:t:`poldrack2012discovering`.
References
----------
.. footbibliography::
See Also
--------
:class:`~sklearn.feature_extraction.text.CountVectorizer`: Used to build a vocabulary of terms
and their associated counts from texts in the ``self.text_column`` of the Dataset's
``texts`` attribute.
:class:`~sklearn.decomposition.LatentDirichletAllocation`: Used to train the LDA model.
"""
def __init__(
self, n_topics, max_iter=1000, alpha=None, beta=0.001, text_column="abstract", n_cores=1
):
self.n_topics = n_topics
self.max_iter = max_iter
self.alpha = alpha
self.beta = beta
self.text_column = text_column
self.n_cores = _check_ncores(n_cores)
self.model = LatentDirichletAllocation(
n_components=n_topics,
max_iter=max_iter,
learning_method="batch",
doc_topic_prior=alpha,
topic_word_prior=beta,
n_jobs=n_cores,
)
[docs]
def fit(self, dset, counts_df=None):
"""Fit the LDA topic model to text from a Dataset.
Parameters
----------
dset : :obj:`~nimare.dataset.Dataset`
A Dataset with, at minimum, text available in the ``self.text_column`` column of its
:py:attr:`~nimare.dataset.Dataset.texts` attribute.
count_df : :obj:`pandas.DataFrame`
A DataFrame with feature counts for the model. The index is 'id',
used for identifying studies. Other columns are features (e.g.,
unigrams and bigrams from Neurosynth), where each value is the number
of times the feature is found in a given article.
Returns
-------
dset : :obj:`~nimare.dataset.Dataset`
A new Dataset with an updated :py:attr:`~nimare.dataset.Dataset.annotations` attribute.
Attributes
----------
distributions_ : :obj:`dict`
A dictionary containing additional distributions produced by the model, including:
- ``p_topic_g_word``: :obj:`numpy.ndarray` of shape (n_topics, n_tokens)
containing the topic-term weights for the model.
- ``p_topic_g_word_df``: :obj:`pandas.DataFrame` of shape (n_topics, n_tokens)
containing the topic-term weights for the model.
"""
if counts_df is None:
counts_df = generate_counts(
dset.texts,
text_column=self.text_column,
tfidf=False,
max_df=len(dset.ids) - 2,
min_df=2,
)
vocabulary = counts_df.columns.to_numpy()
count_values = counts_df.values
study_ids = counts_df.index.tolist()
doc_topic_weights = self.model.fit_transform(count_values)
topic_word_weights = self.model.components_
# Get top 3 words for each topic for annotation
sorted_weights_idxs = np.argsort(-topic_word_weights, axis=1)
top_tokens = [
"_".join(vocabulary[sorted_weights_idxs[topic_i, :]][:3])
for topic_i in range(self.n_topics)
]
topic_names = [
f"LDA{self.n_topics}__{i + 1}_{top_tokens[i]}" for i in range(self.n_topics)
]
doc_topic_weights_df = pd.DataFrame(
index=study_ids,
columns=topic_names,
data=doc_topic_weights,
)
topic_word_weights_df = pd.DataFrame(
index=topic_names,
columns=vocabulary,
data=topic_word_weights,
)
self.distributions_ = {
"p_topic_g_word": topic_word_weights,
"p_topic_g_word_df": topic_word_weights_df,
}
annotations = dset.annotations.copy()
annotations = pd.merge(annotations, doc_topic_weights_df, left_on="id", right_index=True)
new_dset = dset.copy()
new_dset.annotations = annotations
return new_dset
[docs]
def annotate_lda(dataset, dataset_nm, feature_group, n_topics=200, n_cores=1):
"""Annotate Dataset with the resutls of an LDA model.
Parameters
----------
dset : :obj:`~nimare.dataset.Dataset`
A Dataset with, at minimum, text available in the ``self.text_column`` column of its
:py:attr:`~nimare.dataset.Dataset.texts` attribute.
n_topics : :obj:`int`
Number of topics for topic model. This corresponds to the model's ``n_components``
parameter. Must be an integer >= 1.
dset_name: str
Dataset name. Possible options: "neurosynth" or "neuroquery"
data_dir: str
Path to data directory.
n_cores : :obj:`int`, optional
Number of cores to use for parallelization.
If <=0, defaults to using all available cores.
Default is 1.
Returns
-------
dset : :obj:`~nimare.dataset.Dataset`
A new Dataset with an updated :py:attr:`~nimare.dataset.Dataset.annotations` attribute.
"""
counts_df = _get_counts(dataset, dataset_nm, feature_group)
model = LDAModel(n_topics=n_topics, max_iter=1000, n_cores=n_cores)
dataset = model.fit(dataset, counts_df)
return dataset, model