classes.c31_corpus

  1from inspect import cleandoc
  2from typing import Literal
  3import re
  4import colorama
  5from loguru import logger
  6
  7try:
  8    import spacy
  9except ImportError:
 10    spacy = None
 11
 12from lib import files, helpers, content
 13from .c34_text_cleaner import KTextCleaner
 14
 15DEBUG = False
 16INPUT_THRESHOLD = 80000
 17"""Maximum number of items to include from the list when creating a KCorpus from a list of strings."""
 18
 19
 20class KCorpus:
 21    _NLP_CACHE: dict[str, object] = {}
 22    _MODEL_BY_LANG = {
 23        "en": "en_core_web_md",
 24        "ru": "ru_core_news_md",
 25    }
 26    _VALID_POS = {
 27        "ADJ",
 28        "ADP",
 29        "ADV",
 30        "AUX",
 31        "CCONJ",
 32        "DET",
 33        "INTJ",
 34        "NOUN",
 35        "NUM",
 36        "PART",
 37        "PRON",
 38        "PROPN",
 39        "PUNCT",
 40        "SCONJ",
 41        "SYM",
 42        "VERB",
 43        "X",
 44    }
 45    _POS_ALIASES = {
 46        "adjective": "ADJ",
 47        "adj": "ADJ",
 48        "adposition": "ADP",
 49        "adp": "ADP",
 50        "adverb": "ADV",
 51        "adv": "ADV",
 52        "auxiliary": "AUX",
 53        "aux": "AUX",
 54        "coordinating_conjunction": "CCONJ",
 55        "cconj": "CCONJ",
 56        "determiner": "DET",
 57        "det": "DET",
 58        "interjection": "INTJ",
 59        "intj": "INTJ",
 60        "noun": "NOUN",
 61        "proper_noun": "PROPN",
 62        "propernoun": "PROPN",
 63        "propn": "PROPN",
 64        "numeral": "NUM",
 65        "num": "NUM",
 66        "particle": "PART",
 67        "part": "PART",
 68        "pronoun": "PRON",
 69        "pron": "PRON",
 70        "punctuation": "PUNCT",
 71        "punct": "PUNCT",
 72        "subordinating_conjunction": "SCONJ",
 73        "sconj": "SCONJ",
 74        "symbol": "SYM",
 75        "sym": "SYM",
 76        "verb": "VERB",
 77        "other": "X",
 78        "x": "X",
 79    }
 80    _DEFAULT_BRIDGE_WORDS = [
 81        "and",
 82        "the",
 83        "of",
 84        "to",
 85        "in",
 86        "for",
 87        "with",
 88        "on",
 89        "at",
 90        "by",
 91        "from",
 92        "as",
 93        "that",
 94        "which",
 95        "while",
 96        "after",
 97        "before",
 98        "during",
 99        "within",
100        "without",
101        "between",
102        "across",
103        "through",
104        "about",
105        "around",
106        "under",
107        "over",
108        "into",
109        "against",
110    ]
111
112    def __init__(self, data: str, maxPerBucket: int = 5000, lang: str = "en"):
113        """
114        Args:
115            data: Input data as a string or file path.
116            maxPerBucket (optional): Maximum number of n-grams per bucket.
117            lang: Language code used for spaCy model routing.
118        """
119
120        if files.isFile(data):
121            data = files.readFile(data)
122
123        self.lang = lang
124        self.limit = maxPerBucket
125
126        self.raw: str = data
127        """The original input data (contents of file if file path was given)."""
128        self.pruned: str = self._toPruned(self.raw)
129        """Cleaned version of the input data."""
130        self.sentences: list[str] = self._toSentences(self.pruned)
131        """List of sentences from the pruned data."""
132        self.lines: list[str] = self._toLines(self.pruned)
133        """List of lines from the pruned data."""
134
135        self._nlp = self._getNlp(self.lang)
136        self._doc = self._nlp(self.pruned)
137
138        self.tokens: list[str] = self._toTokens(self._doc)
139        """List of cleaned, filtered tokens from the pruned data."""
140        self.words: list[str] = self._toWords(self.tokens)
141        """List of unique words from the pruned data."""
142
143        self._phraseCandidates: list[str] | None = None
144        self._phraseBuckets: dict[int, list[str]] = {}
145
146    @staticmethod
147    def fromDataList(
148        data: list[str] = "/usr/share/dict/words",
149        threshold: int = INPUT_THRESHOLD,
150        **kwargs,
151    ) -> "KCorpus":
152        """Alternative constructor to create a KCorpus from a list of strings.
153
154            Args:
155                data: List of strings to create the corpus from.
156                threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit.
157
158        Useful for cases where the input data is too large to process as a single string."""
159        if files.isFile(data):
160            data = files.readFileLines(data)
161
162        if len(data) > threshold:
163            logger.info(
164                "Sampling KCorpus data: {} items, discarded {}.",
165                threshold,
166                len(data) - threshold,
167            )
168            data = helpers.sampleList(data, threshold)
169
170        data = KTextCleaner().sanitizeForbidden(data, dropStrategy="word")
171        return KCorpus(" ".join(data), **kwargs)
172
173    def __str__(self) -> str:
174        """Returns a human-readable representation of the KCorpus object."""
175        summaryColors = [
176            colorama.Fore.BLUE,
177            colorama.Fore.MAGENTA,
178            colorama.Fore.RED,
179            colorama.Fore.CYAN,
180            colorama.Fore.YELLOW,
181            colorama.Fore.GREEN,
182            colorama.Fore.WHITE,
183        ]
184        ngramSummary = " ".join(
185            [
186                f"{summaryColors[index % len(summaryColors)]}{colorama.Back.BLACK}{len(self._phraseBuckets[n])}{colorama.Style.RESET_ALL} {n}-grams"
187                for index, n in enumerate(sorted(self._phraseBuckets.keys()))
188            ]
189        )
190        if not ngramSummary:
191            ngramSummary = f"{colorama.Fore.WHITE}no n-gram buckets cached"
192
193        return cleandoc(
194            f"""
195            {colorama.Fore.BLACK}{colorama.Back.LIGHTGREEN_EX}KCorpus{colorama.Style.RESET_ALL} {len(self.tokens)} tokens {colorama.Fore.YELLOW}{len(self.words)} words {colorama.Fore.GREEN}{len(self.sentences)} sentences {colorama.Fore.CYAN}{len(self.lines)} lines {ngramSummary}{colorama.Style.RESET_ALL}
196            """
197        )
198
199    # Internals
200    def _toPruned(self, data: str) -> str:
201        """Prune missing glyphs."""
202        # Data already cleaned by now
203        dataPruned = content.omitMissing(input=data, mode="words", debug=0)
204
205        return dataPruned.strip()
206
207    def _toSentences(self, data: str) -> list[str]:
208        """Returns cleaned data split into sentences."""
209        return content.splitStringToSentences(data)
210
211    def _toLines(self, data: str) -> list[str]:
212        """Returns cleaned data split into lines."""
213        lines = data.split("\n")
214        lines = helpers.dedupe(lines)
215        return lines
216
217    def _toTokens(self, doc) -> list[str]:
218        """Returns cleaned, filtered words."""
219
220        wordsAll = [
221            token.text
222            for token in doc
223            if not token.is_space and not token.is_punct and not token.like_num
224        ]
225        wordsFiltered = [token for token in wordsAll if not self._isStopWord(token)]
226        # ? Sanitize with my own defined blacklist
227        wordsSanitized = KTextCleaner().sanitizeForbidden(
228            wordsFiltered, dropStrategy="word"
229        )
230        wordsClean = self._cleanTokens(wordsSanitized)
231
232        return wordsClean
233
234    def _cleanTokens(self, tokens: list[str]) -> list[str]:
235        """Normalize token strings and remove empty fragments."""
236
237        removals = [
238            # "-"
239            r"^\W+$",
240            # "- Hello" => "Hello"
241            # "Hello -" => "Hello"
242            r"^[.,']\s+|\s+[.,']$",
243            # "'s" => None
244            r"^\W+[A-Za-z]$",
245        ]
246
247        def cleanToken(token):
248            for removal in removals:
249                matches = re.compile(rf"{removal}").findall(token)
250                if matches:
251                    before = token
252                    for match in matches:
253                        token = re.sub(rf"{removal}", "", token)
254                        logger.trace(
255                            "[Clean] {} \tfrom\t {} \t=> {}",
256                            match,
257                            before,
258                            token or ("empty"),
259                        )
260
261            # Assume it’s not abbreviation => trim punctuation
262            if len(token) > 4:
263                token = token.strip(".,:;-")
264
265            return token
266
267        tokens = [cleanToken(token) for token in tokens]
268        return [token for token in tokens if token]  # Remove None
269
270    def _normalizePosFilter(self, pos: str | list[str] | None) -> set[str] | None:
271        """Normalize and validate POS filters into spaCy coarse POS labels."""
272        if pos is None:
273            return None
274
275        rawValues = helpers.coerceList(pos)
276        normalizedValues = []
277        for value in rawValues:
278            if not isinstance(value, str):
279                continue
280            key = value.strip().lower()
281            if not key:
282                continue
283
284            normalizedValues.append(self._POS_ALIASES.get(key, key.upper()))
285
286        if not normalizedValues:
287            raise ValueError("POS filter cannot be empty.")
288
289        unknown = sorted(
290            {value for value in normalizedValues if value not in self._VALID_POS}
291        )
292        if unknown:
293            raise ValueError(
294                "Unsupported POS values in KCorpus filter: "
295                f"{unknown}. Supported values: {sorted(self._VALID_POS)}"
296            )
297
298        return set(normalizedValues)
299
300    @classmethod
301    def _getNlp(cls, lang: str):
302        if spacy is None:
303            raise RuntimeError(
304                "spaCy is required for KCorpus. Install with: `pip install spacy`."
305            )
306
307        modelName = cls._MODEL_BY_LANG.get(lang)
308        if not modelName:
309            raise ValueError(
310                f"Unsupported KCorpus language '{lang}'. Supported languages: {list(cls._MODEL_BY_LANG.keys())}"
311            )
312
313        cachedNlp = cls._NLP_CACHE.get(modelName)
314        if cachedNlp:
315            return cachedNlp
316
317        try:
318            nlp = spacy.load(modelName)
319        except Exception as e:
320            raise RuntimeError(
321                f"Missing spaCy model '{modelName}' for language '{lang}'. "
322                f"Install it with: `python3 -m spacy download {modelName}`"
323            ) from e
324
325        cls._NLP_CACHE[modelName] = nlp
326        return nlp
327
328    def _isStopWord(self, token: str) -> bool:
329        lexeme = self._nlp.vocab[token]
330        if lexeme.is_stop:
331            return True
332
333        if token.lower() in self._nlp.Defaults.stop_words:
334            return True
335
336        return False
337
338    def _cleanPhrase(self, text: str) -> str:
339        text = re.sub(r"\s+([,.:;!?)\]])", r"\1", text)
340        text = re.sub(r"([([\"])\s+", r"\1", text)
341        text = re.sub(r"(\w)\s+-\s+(\w)", r"\1-\2", text)
342        text = re.sub(r"\s+", " ", text)
343        text = content.dedupeWords(text)
344        return text.strip().rstrip(",.;:")
345
346    def _extractShortSentences(self, minWords=4, maxWords=10) -> list[str]:
347        phrases = []
348
349        for sentence in self._doc.sents:
350            words = [token for token in sentence if token.is_alpha]
351            if minWords <= len(words) <= maxWords:
352                phrase = self._cleanPhrase(sentence.text)
353                if phrase:
354                    phrases.append(phrase)
355
356        return phrases
357
358    def _extractSvoPhrases(self) -> list[str]:
359        phrases = []
360
361        for token in self._doc:
362            if token.pos_ != "VERB":
363                continue
364
365            subject = None
366            obj = None
367
368            for child in token.children:
369                if child.dep_ in ("nsubj", "nsubjpass") and subject is None:
370                    start = min(t.i for t in child.subtree)
371                    end = max(t.i for t in child.subtree) + 1
372                    subject = self._cleanPhrase(self._doc[start:end].text)
373
374                if child.dep_ in ("dobj", "attr", "pobj") and obj is None:
375                    start = min(t.i for t in child.subtree)
376                    end = max(t.i for t in child.subtree) + 1
377                    obj = self._cleanPhrase(self._doc[start:end].text)
378
379            if subject and obj:
380                phrases.append(self._cleanPhrase(f"{subject} {token.text} {obj}"))
381
382        return phrases
383
384    def _extractRootPhrases(self, minWords=2, maxWords=8) -> list[str]:
385        phrases = []
386
387        for sentence in self._doc.sents:
388            roots = [token for token in sentence if token.dep_ == "ROOT"]
389            if not roots:
390                continue
391
392            root = roots[0]
393            phraseTokens = [root]
394            for child in root.children:
395                if child.dep_ in ("nsubj", "nsubjpass", "dobj", "attr", "prep", "aux"):
396                    phraseTokens.extend(list(child.subtree))
397
398            phraseTokens = sorted(set(phraseTokens), key=lambda t: t.i)
399            if not phraseTokens:
400                continue
401
402            start = phraseTokens[0].i
403            end = phraseTokens[-1].i + 1
404            phrase = self._cleanPhrase(self._doc[start:end].text)
405            wordCount = len([token for token in phrase.split(" ") if token])
406            if minWords <= wordCount <= maxWords:
407                phrases.append(phrase)
408
409        return phrases
410
411    def _extractNounChunks(self, minWords=2, maxWords=4) -> list[str]:
412        if not self._doc.has_annotation("DEP"):
413            return []
414
415        phrases = []
416        try:
417            for chunk in self._doc.noun_chunks:
418                phrase = self._cleanPhrase(chunk.text)
419                words = [token for token in phrase.split(" ") if token]
420                if minWords <= len(words) <= maxWords:
421                    phrases.append(phrase)
422        except (NotImplementedError, ValueError, AttributeError):
423            # Not supported for this language/model (e.g. ru), or missing parse data
424            pass
425        return phrases
426
427    def _buildPhraseCandidates(self) -> list[str]:
428        candidatePhrases = []
429        candidatePhrases.extend(self._extractShortSentences(minWords=2, maxWords=8))
430        candidatePhrases.extend(self._extractRootPhrases(minWords=2, maxWords=8))
431        candidatePhrases.extend(self._extractSvoPhrases())
432        candidatePhrases.extend(self._extractNounChunks(minWords=2, maxWords=4))
433
434        candidatePhrases = [phrase for phrase in candidatePhrases if phrase]
435        candidatePhrases = helpers.dedupe(candidatePhrases)
436        return candidatePhrases
437
438    def _buildPhraseBuckets(self, limit: int) -> dict[int, list[str]]:
439        buckets = {}
440        for n in range(2, 8):
441            buckets[n] = self._buildSinglePhraseBucket(n, limit)
442
443        return buckets
444
445    def _splitPhraseWords(self, text: str) -> list[str]:
446        return re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿ0-9]+(?:[-'][A-Za-zÀ-ÖØ-öø-ÿ0-9]+)*", text)
447
448    def _buildSentenceWindows(self, n: int, limit: int, seen: set[str]) -> list[str]:
449        windows = []
450        for sentence in self.sentences:
451            words = self._splitPhraseWords(sentence)
452            if len(words) < n:
453                continue
454
455            for index in range(len(words) - n + 1):
456                phrase = self._cleanPhrase(" ".join(words[index : index + n]))
457                if not phrase or phrase in seen:
458                    continue
459
460                seen.add(phrase)
461                windows.append(phrase)
462                if len(windows) >= limit:
463                    return windows
464
465        return windows
466
467    def _synthesizeLongPhrases(self, n: int, limit: int, seen: set[str]) -> list[str]:
468        bridgeWords = self._DEFAULT_BRIDGE_WORDS
469        sentenceWords = [
470            self._splitPhraseWords(sentence) for sentence in self.sentences if sentence
471        ]
472        sentenceWords = [words for words in sentenceWords if words]
473
474        synthesized = []
475        for index, words in enumerate(sentenceWords):
476            composed = words[:]
477            bridgeIndex = 0
478            nextIndex = index + 1
479
480            while len(composed) < n and nextIndex < len(sentenceWords):
481                # Insert a single bridge word only when stitching sentence fragments.
482                if composed and len(composed) < n:
483                    composed.append(bridgeWords[bridgeIndex % len(bridgeWords)])
484                    bridgeIndex += 1
485
486                remaining = n - len(composed)
487                if remaining <= 0:
488                    break
489
490                composed.extend(sentenceWords[nextIndex][:remaining])
491                nextIndex += 1
492
493            if len(composed) < n:
494                continue
495
496            phrase = self._cleanPhrase(" ".join(composed[:n]))
497            if not phrase or phrase in seen:
498                continue
499
500            seen.add(phrase)
501            synthesized.append(phrase)
502            if len(synthesized) >= limit:
503                break
504
505        return synthesized
506
507    def _buildSinglePhraseBucket(self, n: int, limit: int) -> list[str]:
508        if n < 2:
509            return []
510
511        if self._phraseCandidates is None:
512            self._phraseCandidates = self._buildPhraseCandidates()
513
514        phrases = []
515        seen = set()
516        for phrase in self._phraseCandidates:
517            wordCount = len([token for token in phrase.split(" ") if token])
518            if wordCount == n:
519                if phrase in seen:
520                    continue
521                seen.add(phrase)
522                phrases.append(phrase)
523
524            if len(phrases) >= limit:
525                return phrases[:limit]
526
527        if len(phrases) < limit:
528            sentenceWindows = self._buildSentenceWindows(n, limit - len(phrases), seen)
529            phrases.extend(sentenceWindows)
530
531        if len(phrases) < limit and n > 4:
532            if DEBUG:
533                logger.info(
534                    "Synthesizing long phrases: n={}, needed={}, seen={}",
535                    n,
536                    limit - len(phrases),
537                    len(seen),
538                )
539            synthesized = self._synthesizeLongPhrases(n, limit - len(phrases), seen)
540            phrases.extend(synthesized)
541
542        return phrases[:limit]
543
544    def _getPhraseBucket(self, n: int) -> list[str]:
545        if n in self._phraseBuckets:
546            return self._phraseBuckets[n]
547
548        self._phraseBuckets[n] = self._buildSinglePhraseBucket(n, self.limit)
549
550        return self._phraseBuckets.get(n, [])
551
552    def _toWords(self, tokens: list[str]) -> list[str]:
553        """
554        Returns unique words from the cleaned data.
555
556        Args:
557            tokens: List of tokens to process.
558        """
559        # ? Remove duplicates
560        words = helpers.dedupe(tokens)
561        return words
562
563    def _toNgrams(
564        self,
565        data: str,
566        limit: int,
567        n: int = 4,
568        mode: Literal["quantity", "score"] = "quantity",
569    ):
570        """
571        Output phrases (n words)
572
573        Args:
574            data: The input data as a list of words.
575            limit: Number of n-grams to return (quantity or score).
576            n: The size of the n-gram (2=bigram, 3=trigram, 4=quadgram).
577            mode: 'quantity' for top-N, 'score' for above a PMI score.
578
579        Returns:
580            List of n-gram phrases.
581        """
582        ngrams = list(self._getPhraseBucket(n))
583        return ngrams[:limit]
584
585    def serveSentences(
586        self,
587        mode: Literal["whole", "separate", "connected"] = "separate",
588        limit: int = 100,
589        shuffle=False,
590    ) -> list[str]:
591        """
592        Serve sentences from the corpus.
593
594        Args:
595            mode: Mode of sentence serving (see below).
596            limit: Limit for sentences or words depending on mode.
597            shuffle: Whether to shuffle sentences.
598
599        Modes:
600        - `whole`
601            - whole sentences in logical succession
602            - `limit` number of sentences
603        - `separate`
604            - Chop sentences one by one
605            - `limit` max words in a sentence
606        - `connected`
607            - Connect whole sentences
608            - `limit` max sentences in a block
609
610        Returns:
611            List of sentences or sentence blocks.
612        """
613        sentences = (
614            helpers.shuffleAtRandomSegment(self.sentences)
615            if shuffle
616            else self.sentences
617        )
618
619        if mode == "whole" and limit is not None:
620            # Avoid out of index if limit exceeds available sentences
621            sentences = sentences[: min(limit, len(sentences))]
622        elif mode in ("separate", "connected"):
623            sentences = content.chopList(sentences, limit, mode, shuffle=False)
624
625        return sentences
626
627    def serveLines(self, clamp: int = None) -> list[str]:
628        """
629        Serve shuffled lines from the corpus, optionally clamped.
630
631        Args:
632            clamp: Maximum number of lines to return.
633
634        Returns:
635            List of lines.
636        """
637        lines = content.chopList(self.lines, clamp, mode="separate", shuffle=True)
638        return lines
639
640    def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]:
641        """
642        Serve phrases of specified n-gram sizes.
643
644        Args:
645            n: Word counts to include (one or multiple)
646                - 1 = single words
647                - 2 = bigrams
648                - 3 = trigrams
649                - 4 = quadgrams
650                - 5+ = extended phrase buckets
651
652        Returns:
653            List of phrases.
654        """
655        numbers = helpers.coerceList(n)
656        numbers = [
657            int(number)
658            for number in numbers
659            if isinstance(number, (int, float))
660            or (isinstance(number, str) and number.isdigit())
661        ]
662        if any(number < 1 for number in numbers):
663            raise ValueError("KCorpus phrase bucket sizes must be >= 1.")
664
665        ngrams = {1: self.words}
666        for number in numbers:
667            if number <= 1:
668                continue
669            ngrams[number] = self._getPhraseBucket(number)
670
671        phrases = helpers.flatten([ngrams.get(number, []) for number in numbers])
672        phrases = helpers.dedupe(phrases)
673
674        cleaner = KTextCleaner()
675        phrases = [cleaner.removeOrphanedPunctuation(phrase) for phrase in phrases]
676
677        return phrases
678
679    def serveTokens(
680        self,
681        pos: str | list[str] | None = None,
682        lemma=False,
683        unique=True,
684        alterCase: content.TextCase | None = None,
685        includeStopwords=False,
686        minLength=1,
687    ) -> list[str]:
688        """
689        Serve tokens optionally filtered by spaCy POS type.
690
691        Args:
692            pos: One or more coarse POS labels (for example ADJ, NOUN, VERB)
693                or aliases (for example adjective, noun, verb).
694            lemma: Return lemmatized forms instead of surface tokens.
695            unique: Deduplicate resulting tokens.
696            alterCase: Change the case of the output tokens.
697            includeStopwords: Keep stop words in output.
698            minLength: Minimum token length to keep.
699
700        Returns:
701            List of filtered tokens.
702        """
703        posFilter = self._normalizePosFilter(pos)
704        filtered = []
705
706        for token in self._doc:
707            if token.is_space or token.is_punct or token.like_num or not token.is_alpha:
708                continue
709
710            if posFilter and token.pos_ not in posFilter:
711                continue
712
713            tokenText = token.lemma_ if lemma else token.text
714            tokenText = tokenText.strip()
715            if isinstance(alterCase, str):
716                tokenText = content.changeCase(tokenText, alterCase)
717
718            if not tokenText:
719                continue
720
721            if not includeStopwords and self._isStopWord(tokenText):
722                continue
723
724            filtered.append(tokenText)
725
726        filtered = KTextCleaner().sanitizeForbidden(filtered, dropStrategy="word")
727        filtered = self._cleanTokens(filtered)
728        filtered = [token for token in filtered if len(token) >= minLength]
729
730        if unique:
731            filtered = helpers.dedupe(filtered)
732
733        return filtered
DEBUG = False
INPUT_THRESHOLD = 80000

Maximum number of items to include from the list when creating a KCorpus from a list of strings.

class KCorpus:
 21class KCorpus:
 22    _NLP_CACHE: dict[str, object] = {}
 23    _MODEL_BY_LANG = {
 24        "en": "en_core_web_md",
 25        "ru": "ru_core_news_md",
 26    }
 27    _VALID_POS = {
 28        "ADJ",
 29        "ADP",
 30        "ADV",
 31        "AUX",
 32        "CCONJ",
 33        "DET",
 34        "INTJ",
 35        "NOUN",
 36        "NUM",
 37        "PART",
 38        "PRON",
 39        "PROPN",
 40        "PUNCT",
 41        "SCONJ",
 42        "SYM",
 43        "VERB",
 44        "X",
 45    }
 46    _POS_ALIASES = {
 47        "adjective": "ADJ",
 48        "adj": "ADJ",
 49        "adposition": "ADP",
 50        "adp": "ADP",
 51        "adverb": "ADV",
 52        "adv": "ADV",
 53        "auxiliary": "AUX",
 54        "aux": "AUX",
 55        "coordinating_conjunction": "CCONJ",
 56        "cconj": "CCONJ",
 57        "determiner": "DET",
 58        "det": "DET",
 59        "interjection": "INTJ",
 60        "intj": "INTJ",
 61        "noun": "NOUN",
 62        "proper_noun": "PROPN",
 63        "propernoun": "PROPN",
 64        "propn": "PROPN",
 65        "numeral": "NUM",
 66        "num": "NUM",
 67        "particle": "PART",
 68        "part": "PART",
 69        "pronoun": "PRON",
 70        "pron": "PRON",
 71        "punctuation": "PUNCT",
 72        "punct": "PUNCT",
 73        "subordinating_conjunction": "SCONJ",
 74        "sconj": "SCONJ",
 75        "symbol": "SYM",
 76        "sym": "SYM",
 77        "verb": "VERB",
 78        "other": "X",
 79        "x": "X",
 80    }
 81    _DEFAULT_BRIDGE_WORDS = [
 82        "and",
 83        "the",
 84        "of",
 85        "to",
 86        "in",
 87        "for",
 88        "with",
 89        "on",
 90        "at",
 91        "by",
 92        "from",
 93        "as",
 94        "that",
 95        "which",
 96        "while",
 97        "after",
 98        "before",
 99        "during",
100        "within",
101        "without",
102        "between",
103        "across",
104        "through",
105        "about",
106        "around",
107        "under",
108        "over",
109        "into",
110        "against",
111    ]
112
113    def __init__(self, data: str, maxPerBucket: int = 5000, lang: str = "en"):
114        """
115        Args:
116            data: Input data as a string or file path.
117            maxPerBucket (optional): Maximum number of n-grams per bucket.
118            lang: Language code used for spaCy model routing.
119        """
120
121        if files.isFile(data):
122            data = files.readFile(data)
123
124        self.lang = lang
125        self.limit = maxPerBucket
126
127        self.raw: str = data
128        """The original input data (contents of file if file path was given)."""
129        self.pruned: str = self._toPruned(self.raw)
130        """Cleaned version of the input data."""
131        self.sentences: list[str] = self._toSentences(self.pruned)
132        """List of sentences from the pruned data."""
133        self.lines: list[str] = self._toLines(self.pruned)
134        """List of lines from the pruned data."""
135
136        self._nlp = self._getNlp(self.lang)
137        self._doc = self._nlp(self.pruned)
138
139        self.tokens: list[str] = self._toTokens(self._doc)
140        """List of cleaned, filtered tokens from the pruned data."""
141        self.words: list[str] = self._toWords(self.tokens)
142        """List of unique words from the pruned data."""
143
144        self._phraseCandidates: list[str] | None = None
145        self._phraseBuckets: dict[int, list[str]] = {}
146
147    @staticmethod
148    def fromDataList(
149        data: list[str] = "/usr/share/dict/words",
150        threshold: int = INPUT_THRESHOLD,
151        **kwargs,
152    ) -> "KCorpus":
153        """Alternative constructor to create a KCorpus from a list of strings.
154
155            Args:
156                data: List of strings to create the corpus from.
157                threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit.
158
159        Useful for cases where the input data is too large to process as a single string."""
160        if files.isFile(data):
161            data = files.readFileLines(data)
162
163        if len(data) > threshold:
164            logger.info(
165                "Sampling KCorpus data: {} items, discarded {}.",
166                threshold,
167                len(data) - threshold,
168            )
169            data = helpers.sampleList(data, threshold)
170
171        data = KTextCleaner().sanitizeForbidden(data, dropStrategy="word")
172        return KCorpus(" ".join(data), **kwargs)
173
174    def __str__(self) -> str:
175        """Returns a human-readable representation of the KCorpus object."""
176        summaryColors = [
177            colorama.Fore.BLUE,
178            colorama.Fore.MAGENTA,
179            colorama.Fore.RED,
180            colorama.Fore.CYAN,
181            colorama.Fore.YELLOW,
182            colorama.Fore.GREEN,
183            colorama.Fore.WHITE,
184        ]
185        ngramSummary = " ".join(
186            [
187                f"{summaryColors[index % len(summaryColors)]}{colorama.Back.BLACK}{len(self._phraseBuckets[n])}{colorama.Style.RESET_ALL} {n}-grams"
188                for index, n in enumerate(sorted(self._phraseBuckets.keys()))
189            ]
190        )
191        if not ngramSummary:
192            ngramSummary = f"{colorama.Fore.WHITE}no n-gram buckets cached"
193
194        return cleandoc(
195            f"""
196            {colorama.Fore.BLACK}{colorama.Back.LIGHTGREEN_EX}KCorpus{colorama.Style.RESET_ALL} {len(self.tokens)} tokens {colorama.Fore.YELLOW}{len(self.words)} words {colorama.Fore.GREEN}{len(self.sentences)} sentences {colorama.Fore.CYAN}{len(self.lines)} lines {ngramSummary}{colorama.Style.RESET_ALL}
197            """
198        )
199
200    # Internals
201    def _toPruned(self, data: str) -> str:
202        """Prune missing glyphs."""
203        # Data already cleaned by now
204        dataPruned = content.omitMissing(input=data, mode="words", debug=0)
205
206        return dataPruned.strip()
207
208    def _toSentences(self, data: str) -> list[str]:
209        """Returns cleaned data split into sentences."""
210        return content.splitStringToSentences(data)
211
212    def _toLines(self, data: str) -> list[str]:
213        """Returns cleaned data split into lines."""
214        lines = data.split("\n")
215        lines = helpers.dedupe(lines)
216        return lines
217
218    def _toTokens(self, doc) -> list[str]:
219        """Returns cleaned, filtered words."""
220
221        wordsAll = [
222            token.text
223            for token in doc
224            if not token.is_space and not token.is_punct and not token.like_num
225        ]
226        wordsFiltered = [token for token in wordsAll if not self._isStopWord(token)]
227        # ? Sanitize with my own defined blacklist
228        wordsSanitized = KTextCleaner().sanitizeForbidden(
229            wordsFiltered, dropStrategy="word"
230        )
231        wordsClean = self._cleanTokens(wordsSanitized)
232
233        return wordsClean
234
235    def _cleanTokens(self, tokens: list[str]) -> list[str]:
236        """Normalize token strings and remove empty fragments."""
237
238        removals = [
239            # "-"
240            r"^\W+$",
241            # "- Hello" => "Hello"
242            # "Hello -" => "Hello"
243            r"^[.,']\s+|\s+[.,']$",
244            # "'s" => None
245            r"^\W+[A-Za-z]$",
246        ]
247
248        def cleanToken(token):
249            for removal in removals:
250                matches = re.compile(rf"{removal}").findall(token)
251                if matches:
252                    before = token
253                    for match in matches:
254                        token = re.sub(rf"{removal}", "", token)
255                        logger.trace(
256                            "[Clean] {} \tfrom\t {} \t=> {}",
257                            match,
258                            before,
259                            token or ("empty"),
260                        )
261
262            # Assume it’s not abbreviation => trim punctuation
263            if len(token) > 4:
264                token = token.strip(".,:;-")
265
266            return token
267
268        tokens = [cleanToken(token) for token in tokens]
269        return [token for token in tokens if token]  # Remove None
270
271    def _normalizePosFilter(self, pos: str | list[str] | None) -> set[str] | None:
272        """Normalize and validate POS filters into spaCy coarse POS labels."""
273        if pos is None:
274            return None
275
276        rawValues = helpers.coerceList(pos)
277        normalizedValues = []
278        for value in rawValues:
279            if not isinstance(value, str):
280                continue
281            key = value.strip().lower()
282            if not key:
283                continue
284
285            normalizedValues.append(self._POS_ALIASES.get(key, key.upper()))
286
287        if not normalizedValues:
288            raise ValueError("POS filter cannot be empty.")
289
290        unknown = sorted(
291            {value for value in normalizedValues if value not in self._VALID_POS}
292        )
293        if unknown:
294            raise ValueError(
295                "Unsupported POS values in KCorpus filter: "
296                f"{unknown}. Supported values: {sorted(self._VALID_POS)}"
297            )
298
299        return set(normalizedValues)
300
301    @classmethod
302    def _getNlp(cls, lang: str):
303        if spacy is None:
304            raise RuntimeError(
305                "spaCy is required for KCorpus. Install with: `pip install spacy`."
306            )
307
308        modelName = cls._MODEL_BY_LANG.get(lang)
309        if not modelName:
310            raise ValueError(
311                f"Unsupported KCorpus language '{lang}'. Supported languages: {list(cls._MODEL_BY_LANG.keys())}"
312            )
313
314        cachedNlp = cls._NLP_CACHE.get(modelName)
315        if cachedNlp:
316            return cachedNlp
317
318        try:
319            nlp = spacy.load(modelName)
320        except Exception as e:
321            raise RuntimeError(
322                f"Missing spaCy model '{modelName}' for language '{lang}'. "
323                f"Install it with: `python3 -m spacy download {modelName}`"
324            ) from e
325
326        cls._NLP_CACHE[modelName] = nlp
327        return nlp
328
329    def _isStopWord(self, token: str) -> bool:
330        lexeme = self._nlp.vocab[token]
331        if lexeme.is_stop:
332            return True
333
334        if token.lower() in self._nlp.Defaults.stop_words:
335            return True
336
337        return False
338
339    def _cleanPhrase(self, text: str) -> str:
340        text = re.sub(r"\s+([,.:;!?)\]])", r"\1", text)
341        text = re.sub(r"([([\"])\s+", r"\1", text)
342        text = re.sub(r"(\w)\s+-\s+(\w)", r"\1-\2", text)
343        text = re.sub(r"\s+", " ", text)
344        text = content.dedupeWords(text)
345        return text.strip().rstrip(",.;:")
346
347    def _extractShortSentences(self, minWords=4, maxWords=10) -> list[str]:
348        phrases = []
349
350        for sentence in self._doc.sents:
351            words = [token for token in sentence if token.is_alpha]
352            if minWords <= len(words) <= maxWords:
353                phrase = self._cleanPhrase(sentence.text)
354                if phrase:
355                    phrases.append(phrase)
356
357        return phrases
358
359    def _extractSvoPhrases(self) -> list[str]:
360        phrases = []
361
362        for token in self._doc:
363            if token.pos_ != "VERB":
364                continue
365
366            subject = None
367            obj = None
368
369            for child in token.children:
370                if child.dep_ in ("nsubj", "nsubjpass") and subject is None:
371                    start = min(t.i for t in child.subtree)
372                    end = max(t.i for t in child.subtree) + 1
373                    subject = self._cleanPhrase(self._doc[start:end].text)
374
375                if child.dep_ in ("dobj", "attr", "pobj") and obj is None:
376                    start = min(t.i for t in child.subtree)
377                    end = max(t.i for t in child.subtree) + 1
378                    obj = self._cleanPhrase(self._doc[start:end].text)
379
380            if subject and obj:
381                phrases.append(self._cleanPhrase(f"{subject} {token.text} {obj}"))
382
383        return phrases
384
385    def _extractRootPhrases(self, minWords=2, maxWords=8) -> list[str]:
386        phrases = []
387
388        for sentence in self._doc.sents:
389            roots = [token for token in sentence if token.dep_ == "ROOT"]
390            if not roots:
391                continue
392
393            root = roots[0]
394            phraseTokens = [root]
395            for child in root.children:
396                if child.dep_ in ("nsubj", "nsubjpass", "dobj", "attr", "prep", "aux"):
397                    phraseTokens.extend(list(child.subtree))
398
399            phraseTokens = sorted(set(phraseTokens), key=lambda t: t.i)
400            if not phraseTokens:
401                continue
402
403            start = phraseTokens[0].i
404            end = phraseTokens[-1].i + 1
405            phrase = self._cleanPhrase(self._doc[start:end].text)
406            wordCount = len([token for token in phrase.split(" ") if token])
407            if minWords <= wordCount <= maxWords:
408                phrases.append(phrase)
409
410        return phrases
411
412    def _extractNounChunks(self, minWords=2, maxWords=4) -> list[str]:
413        if not self._doc.has_annotation("DEP"):
414            return []
415
416        phrases = []
417        try:
418            for chunk in self._doc.noun_chunks:
419                phrase = self._cleanPhrase(chunk.text)
420                words = [token for token in phrase.split(" ") if token]
421                if minWords <= len(words) <= maxWords:
422                    phrases.append(phrase)
423        except (NotImplementedError, ValueError, AttributeError):
424            # Not supported for this language/model (e.g. ru), or missing parse data
425            pass
426        return phrases
427
428    def _buildPhraseCandidates(self) -> list[str]:
429        candidatePhrases = []
430        candidatePhrases.extend(self._extractShortSentences(minWords=2, maxWords=8))
431        candidatePhrases.extend(self._extractRootPhrases(minWords=2, maxWords=8))
432        candidatePhrases.extend(self._extractSvoPhrases())
433        candidatePhrases.extend(self._extractNounChunks(minWords=2, maxWords=4))
434
435        candidatePhrases = [phrase for phrase in candidatePhrases if phrase]
436        candidatePhrases = helpers.dedupe(candidatePhrases)
437        return candidatePhrases
438
439    def _buildPhraseBuckets(self, limit: int) -> dict[int, list[str]]:
440        buckets = {}
441        for n in range(2, 8):
442            buckets[n] = self._buildSinglePhraseBucket(n, limit)
443
444        return buckets
445
446    def _splitPhraseWords(self, text: str) -> list[str]:
447        return re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿ0-9]+(?:[-'][A-Za-zÀ-ÖØ-öø-ÿ0-9]+)*", text)
448
449    def _buildSentenceWindows(self, n: int, limit: int, seen: set[str]) -> list[str]:
450        windows = []
451        for sentence in self.sentences:
452            words = self._splitPhraseWords(sentence)
453            if len(words) < n:
454                continue
455
456            for index in range(len(words) - n + 1):
457                phrase = self._cleanPhrase(" ".join(words[index : index + n]))
458                if not phrase or phrase in seen:
459                    continue
460
461                seen.add(phrase)
462                windows.append(phrase)
463                if len(windows) >= limit:
464                    return windows
465
466        return windows
467
468    def _synthesizeLongPhrases(self, n: int, limit: int, seen: set[str]) -> list[str]:
469        bridgeWords = self._DEFAULT_BRIDGE_WORDS
470        sentenceWords = [
471            self._splitPhraseWords(sentence) for sentence in self.sentences if sentence
472        ]
473        sentenceWords = [words for words in sentenceWords if words]
474
475        synthesized = []
476        for index, words in enumerate(sentenceWords):
477            composed = words[:]
478            bridgeIndex = 0
479            nextIndex = index + 1
480
481            while len(composed) < n and nextIndex < len(sentenceWords):
482                # Insert a single bridge word only when stitching sentence fragments.
483                if composed and len(composed) < n:
484                    composed.append(bridgeWords[bridgeIndex % len(bridgeWords)])
485                    bridgeIndex += 1
486
487                remaining = n - len(composed)
488                if remaining <= 0:
489                    break
490
491                composed.extend(sentenceWords[nextIndex][:remaining])
492                nextIndex += 1
493
494            if len(composed) < n:
495                continue
496
497            phrase = self._cleanPhrase(" ".join(composed[:n]))
498            if not phrase or phrase in seen:
499                continue
500
501            seen.add(phrase)
502            synthesized.append(phrase)
503            if len(synthesized) >= limit:
504                break
505
506        return synthesized
507
508    def _buildSinglePhraseBucket(self, n: int, limit: int) -> list[str]:
509        if n < 2:
510            return []
511
512        if self._phraseCandidates is None:
513            self._phraseCandidates = self._buildPhraseCandidates()
514
515        phrases = []
516        seen = set()
517        for phrase in self._phraseCandidates:
518            wordCount = len([token for token in phrase.split(" ") if token])
519            if wordCount == n:
520                if phrase in seen:
521                    continue
522                seen.add(phrase)
523                phrases.append(phrase)
524
525            if len(phrases) >= limit:
526                return phrases[:limit]
527
528        if len(phrases) < limit:
529            sentenceWindows = self._buildSentenceWindows(n, limit - len(phrases), seen)
530            phrases.extend(sentenceWindows)
531
532        if len(phrases) < limit and n > 4:
533            if DEBUG:
534                logger.info(
535                    "Synthesizing long phrases: n={}, needed={}, seen={}",
536                    n,
537                    limit - len(phrases),
538                    len(seen),
539                )
540            synthesized = self._synthesizeLongPhrases(n, limit - len(phrases), seen)
541            phrases.extend(synthesized)
542
543        return phrases[:limit]
544
545    def _getPhraseBucket(self, n: int) -> list[str]:
546        if n in self._phraseBuckets:
547            return self._phraseBuckets[n]
548
549        self._phraseBuckets[n] = self._buildSinglePhraseBucket(n, self.limit)
550
551        return self._phraseBuckets.get(n, [])
552
553    def _toWords(self, tokens: list[str]) -> list[str]:
554        """
555        Returns unique words from the cleaned data.
556
557        Args:
558            tokens: List of tokens to process.
559        """
560        # ? Remove duplicates
561        words = helpers.dedupe(tokens)
562        return words
563
564    def _toNgrams(
565        self,
566        data: str,
567        limit: int,
568        n: int = 4,
569        mode: Literal["quantity", "score"] = "quantity",
570    ):
571        """
572        Output phrases (n words)
573
574        Args:
575            data: The input data as a list of words.
576            limit: Number of n-grams to return (quantity or score).
577            n: The size of the n-gram (2=bigram, 3=trigram, 4=quadgram).
578            mode: 'quantity' for top-N, 'score' for above a PMI score.
579
580        Returns:
581            List of n-gram phrases.
582        """
583        ngrams = list(self._getPhraseBucket(n))
584        return ngrams[:limit]
585
586    def serveSentences(
587        self,
588        mode: Literal["whole", "separate", "connected"] = "separate",
589        limit: int = 100,
590        shuffle=False,
591    ) -> list[str]:
592        """
593        Serve sentences from the corpus.
594
595        Args:
596            mode: Mode of sentence serving (see below).
597            limit: Limit for sentences or words depending on mode.
598            shuffle: Whether to shuffle sentences.
599
600        Modes:
601        - `whole`
602            - whole sentences in logical succession
603            - `limit` number of sentences
604        - `separate`
605            - Chop sentences one by one
606            - `limit` max words in a sentence
607        - `connected`
608            - Connect whole sentences
609            - `limit` max sentences in a block
610
611        Returns:
612            List of sentences or sentence blocks.
613        """
614        sentences = (
615            helpers.shuffleAtRandomSegment(self.sentences)
616            if shuffle
617            else self.sentences
618        )
619
620        if mode == "whole" and limit is not None:
621            # Avoid out of index if limit exceeds available sentences
622            sentences = sentences[: min(limit, len(sentences))]
623        elif mode in ("separate", "connected"):
624            sentences = content.chopList(sentences, limit, mode, shuffle=False)
625
626        return sentences
627
628    def serveLines(self, clamp: int = None) -> list[str]:
629        """
630        Serve shuffled lines from the corpus, optionally clamped.
631
632        Args:
633            clamp: Maximum number of lines to return.
634
635        Returns:
636            List of lines.
637        """
638        lines = content.chopList(self.lines, clamp, mode="separate", shuffle=True)
639        return lines
640
641    def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]:
642        """
643        Serve phrases of specified n-gram sizes.
644
645        Args:
646            n: Word counts to include (one or multiple)
647                - 1 = single words
648                - 2 = bigrams
649                - 3 = trigrams
650                - 4 = quadgrams
651                - 5+ = extended phrase buckets
652
653        Returns:
654            List of phrases.
655        """
656        numbers = helpers.coerceList(n)
657        numbers = [
658            int(number)
659            for number in numbers
660            if isinstance(number, (int, float))
661            or (isinstance(number, str) and number.isdigit())
662        ]
663        if any(number < 1 for number in numbers):
664            raise ValueError("KCorpus phrase bucket sizes must be >= 1.")
665
666        ngrams = {1: self.words}
667        for number in numbers:
668            if number <= 1:
669                continue
670            ngrams[number] = self._getPhraseBucket(number)
671
672        phrases = helpers.flatten([ngrams.get(number, []) for number in numbers])
673        phrases = helpers.dedupe(phrases)
674
675        cleaner = KTextCleaner()
676        phrases = [cleaner.removeOrphanedPunctuation(phrase) for phrase in phrases]
677
678        return phrases
679
680    def serveTokens(
681        self,
682        pos: str | list[str] | None = None,
683        lemma=False,
684        unique=True,
685        alterCase: content.TextCase | None = None,
686        includeStopwords=False,
687        minLength=1,
688    ) -> list[str]:
689        """
690        Serve tokens optionally filtered by spaCy POS type.
691
692        Args:
693            pos: One or more coarse POS labels (for example ADJ, NOUN, VERB)
694                or aliases (for example adjective, noun, verb).
695            lemma: Return lemmatized forms instead of surface tokens.
696            unique: Deduplicate resulting tokens.
697            alterCase: Change the case of the output tokens.
698            includeStopwords: Keep stop words in output.
699            minLength: Minimum token length to keep.
700
701        Returns:
702            List of filtered tokens.
703        """
704        posFilter = self._normalizePosFilter(pos)
705        filtered = []
706
707        for token in self._doc:
708            if token.is_space or token.is_punct or token.like_num or not token.is_alpha:
709                continue
710
711            if posFilter and token.pos_ not in posFilter:
712                continue
713
714            tokenText = token.lemma_ if lemma else token.text
715            tokenText = tokenText.strip()
716            if isinstance(alterCase, str):
717                tokenText = content.changeCase(tokenText, alterCase)
718
719            if not tokenText:
720                continue
721
722            if not includeStopwords and self._isStopWord(tokenText):
723                continue
724
725            filtered.append(tokenText)
726
727        filtered = KTextCleaner().sanitizeForbidden(filtered, dropStrategy="word")
728        filtered = self._cleanTokens(filtered)
729        filtered = [token for token in filtered if len(token) >= minLength]
730
731        if unique:
732            filtered = helpers.dedupe(filtered)
733
734        return filtered
KCorpus(data: str, maxPerBucket: int = 5000, lang: str = 'en')
113    def __init__(self, data: str, maxPerBucket: int = 5000, lang: str = "en"):
114        """
115        Args:
116            data: Input data as a string or file path.
117            maxPerBucket (optional): Maximum number of n-grams per bucket.
118            lang: Language code used for spaCy model routing.
119        """
120
121        if files.isFile(data):
122            data = files.readFile(data)
123
124        self.lang = lang
125        self.limit = maxPerBucket
126
127        self.raw: str = data
128        """The original input data (contents of file if file path was given)."""
129        self.pruned: str = self._toPruned(self.raw)
130        """Cleaned version of the input data."""
131        self.sentences: list[str] = self._toSentences(self.pruned)
132        """List of sentences from the pruned data."""
133        self.lines: list[str] = self._toLines(self.pruned)
134        """List of lines from the pruned data."""
135
136        self._nlp = self._getNlp(self.lang)
137        self._doc = self._nlp(self.pruned)
138
139        self.tokens: list[str] = self._toTokens(self._doc)
140        """List of cleaned, filtered tokens from the pruned data."""
141        self.words: list[str] = self._toWords(self.tokens)
142        """List of unique words from the pruned data."""
143
144        self._phraseCandidates: list[str] | None = None
145        self._phraseBuckets: dict[int, list[str]] = {}
Arguments:
  • data: Input data as a string or file path.
  • maxPerBucket (optional): Maximum number of n-grams per bucket.
  • lang: Language code used for spaCy model routing.
lang
limit
raw: str

The original input data (contents of file if file path was given).

pruned: str

Cleaned version of the input data.

sentences: list[str]

List of sentences from the pruned data.

lines: list[str]

List of lines from the pruned data.

tokens: list[str]

List of cleaned, filtered tokens from the pruned data.

words: list[str]

List of unique words from the pruned data.

@staticmethod
def fromDataList( data: list[str] = '/usr/share/dict/words', threshold: int = 80000, **kwargs) -> KCorpus:
147    @staticmethod
148    def fromDataList(
149        data: list[str] = "/usr/share/dict/words",
150        threshold: int = INPUT_THRESHOLD,
151        **kwargs,
152    ) -> "KCorpus":
153        """Alternative constructor to create a KCorpus from a list of strings.
154
155            Args:
156                data: List of strings to create the corpus from.
157                threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit.
158
159        Useful for cases where the input data is too large to process as a single string."""
160        if files.isFile(data):
161            data = files.readFileLines(data)
162
163        if len(data) > threshold:
164            logger.info(
165                "Sampling KCorpus data: {} items, discarded {}.",
166                threshold,
167                len(data) - threshold,
168            )
169            data = helpers.sampleList(data, threshold)
170
171        data = KTextCleaner().sanitizeForbidden(data, dropStrategy="word")
172        return KCorpus(" ".join(data), **kwargs)

Alternative constructor to create a KCorpus from a list of strings.

Args:
    data: List of strings to create the corpus from.
    threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit.

Useful for cases where the input data is too large to process as a single string.

def serveSentences( self, mode: Literal['whole', 'separate', 'connected'] = 'separate', limit: int = 100, shuffle=False) -> list[str]:
586    def serveSentences(
587        self,
588        mode: Literal["whole", "separate", "connected"] = "separate",
589        limit: int = 100,
590        shuffle=False,
591    ) -> list[str]:
592        """
593        Serve sentences from the corpus.
594
595        Args:
596            mode: Mode of sentence serving (see below).
597            limit: Limit for sentences or words depending on mode.
598            shuffle: Whether to shuffle sentences.
599
600        Modes:
601        - `whole`
602            - whole sentences in logical succession
603            - `limit` number of sentences
604        - `separate`
605            - Chop sentences one by one
606            - `limit` max words in a sentence
607        - `connected`
608            - Connect whole sentences
609            - `limit` max sentences in a block
610
611        Returns:
612            List of sentences or sentence blocks.
613        """
614        sentences = (
615            helpers.shuffleAtRandomSegment(self.sentences)
616            if shuffle
617            else self.sentences
618        )
619
620        if mode == "whole" and limit is not None:
621            # Avoid out of index if limit exceeds available sentences
622            sentences = sentences[: min(limit, len(sentences))]
623        elif mode in ("separate", "connected"):
624            sentences = content.chopList(sentences, limit, mode, shuffle=False)
625
626        return sentences

Serve sentences from the corpus.

Arguments:
  • mode: Mode of sentence serving (see below).
  • limit: Limit for sentences or words depending on mode.
  • shuffle: Whether to shuffle sentences.

Modes:

  • whole
    • whole sentences in logical succession
    • limit number of sentences
  • separate
    • Chop sentences one by one
    • limit max words in a sentence
  • connected
    • Connect whole sentences
    • limit max sentences in a block
Returns:

List of sentences or sentence blocks.

def serveLines(self, clamp: int = None) -> list[str]:
628    def serveLines(self, clamp: int = None) -> list[str]:
629        """
630        Serve shuffled lines from the corpus, optionally clamped.
631
632        Args:
633            clamp: Maximum number of lines to return.
634
635        Returns:
636            List of lines.
637        """
638        lines = content.chopList(self.lines, clamp, mode="separate", shuffle=True)
639        return lines

Serve shuffled lines from the corpus, optionally clamped.

Arguments:
  • clamp: Maximum number of lines to return.
Returns:

List of lines.

def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]:
641    def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]:
642        """
643        Serve phrases of specified n-gram sizes.
644
645        Args:
646            n: Word counts to include (one or multiple)
647                - 1 = single words
648                - 2 = bigrams
649                - 3 = trigrams
650                - 4 = quadgrams
651                - 5+ = extended phrase buckets
652
653        Returns:
654            List of phrases.
655        """
656        numbers = helpers.coerceList(n)
657        numbers = [
658            int(number)
659            for number in numbers
660            if isinstance(number, (int, float))
661            or (isinstance(number, str) and number.isdigit())
662        ]
663        if any(number < 1 for number in numbers):
664            raise ValueError("KCorpus phrase bucket sizes must be >= 1.")
665
666        ngrams = {1: self.words}
667        for number in numbers:
668            if number <= 1:
669                continue
670            ngrams[number] = self._getPhraseBucket(number)
671
672        phrases = helpers.flatten([ngrams.get(number, []) for number in numbers])
673        phrases = helpers.dedupe(phrases)
674
675        cleaner = KTextCleaner()
676        phrases = [cleaner.removeOrphanedPunctuation(phrase) for phrase in phrases]
677
678        return phrases

Serve phrases of specified n-gram sizes.

Arguments:
  • n: Word counts to include (one or multiple)
    • 1 = single words
    • 2 = bigrams
    • 3 = trigrams
    • 4 = quadgrams
    • 5+ = extended phrase buckets
Returns:

List of phrases.

def serveTokens( self, pos: str | list[str] | None = None, lemma=False, unique=True, alterCase: Optional[Literal['UPPER', 'lower', 'title', 'title-force']] = None, includeStopwords=False, minLength=1) -> list[str]:
680    def serveTokens(
681        self,
682        pos: str | list[str] | None = None,
683        lemma=False,
684        unique=True,
685        alterCase: content.TextCase | None = None,
686        includeStopwords=False,
687        minLength=1,
688    ) -> list[str]:
689        """
690        Serve tokens optionally filtered by spaCy POS type.
691
692        Args:
693            pos: One or more coarse POS labels (for example ADJ, NOUN, VERB)
694                or aliases (for example adjective, noun, verb).
695            lemma: Return lemmatized forms instead of surface tokens.
696            unique: Deduplicate resulting tokens.
697            alterCase: Change the case of the output tokens.
698            includeStopwords: Keep stop words in output.
699            minLength: Minimum token length to keep.
700
701        Returns:
702            List of filtered tokens.
703        """
704        posFilter = self._normalizePosFilter(pos)
705        filtered = []
706
707        for token in self._doc:
708            if token.is_space or token.is_punct or token.like_num or not token.is_alpha:
709                continue
710
711            if posFilter and token.pos_ not in posFilter:
712                continue
713
714            tokenText = token.lemma_ if lemma else token.text
715            tokenText = tokenText.strip()
716            if isinstance(alterCase, str):
717                tokenText = content.changeCase(tokenText, alterCase)
718
719            if not tokenText:
720                continue
721
722            if not includeStopwords and self._isStopWord(tokenText):
723                continue
724
725            filtered.append(tokenText)
726
727        filtered = KTextCleaner().sanitizeForbidden(filtered, dropStrategy="word")
728        filtered = self._cleanTokens(filtered)
729        filtered = [token for token in filtered if len(token) >= minLength]
730
731        if unique:
732            filtered = helpers.dedupe(filtered)
733
734        return filtered

Serve tokens optionally filtered by spaCy POS type.

Arguments:
  • pos: One or more coarse POS labels (for example ADJ, NOUN, VERB) or aliases (for example adjective, noun, verb).
  • lemma: Return lemmatized forms instead of surface tokens.
  • unique: Deduplicate resulting tokens.
  • alterCase: Change the case of the output tokens.
  • includeStopwords: Keep stop words in output.
  • minLength: Minimum token length to keep.
Returns:

List of filtered tokens.