classes.c31_corpus
1from inspect import cleandoc 2from typing import Literal 3import re 4import colorama 5from loguru import logger 6 7try: 8 import spacy 9except ImportError: 10 spacy = None 11 12from lib import files, helpers, content 13from .c34_text_cleaner import KTextCleaner 14 15DEBUG = False 16INPUT_THRESHOLD = 80000 17"""Maximum number of items to include from the list when creating a KCorpus from a list of strings.""" 18 19 20class KCorpus: 21 _NLP_CACHE: dict[str, object] = {} 22 _MODEL_BY_LANG = { 23 "en": "en_core_web_md", 24 "ru": "ru_core_news_md", 25 } 26 _VALID_POS = { 27 "ADJ", 28 "ADP", 29 "ADV", 30 "AUX", 31 "CCONJ", 32 "DET", 33 "INTJ", 34 "NOUN", 35 "NUM", 36 "PART", 37 "PRON", 38 "PROPN", 39 "PUNCT", 40 "SCONJ", 41 "SYM", 42 "VERB", 43 "X", 44 } 45 _POS_ALIASES = { 46 "adjective": "ADJ", 47 "adj": "ADJ", 48 "adposition": "ADP", 49 "adp": "ADP", 50 "adverb": "ADV", 51 "adv": "ADV", 52 "auxiliary": "AUX", 53 "aux": "AUX", 54 "coordinating_conjunction": "CCONJ", 55 "cconj": "CCONJ", 56 "determiner": "DET", 57 "det": "DET", 58 "interjection": "INTJ", 59 "intj": "INTJ", 60 "noun": "NOUN", 61 "proper_noun": "PROPN", 62 "propernoun": "PROPN", 63 "propn": "PROPN", 64 "numeral": "NUM", 65 "num": "NUM", 66 "particle": "PART", 67 "part": "PART", 68 "pronoun": "PRON", 69 "pron": "PRON", 70 "punctuation": "PUNCT", 71 "punct": "PUNCT", 72 "subordinating_conjunction": "SCONJ", 73 "sconj": "SCONJ", 74 "symbol": "SYM", 75 "sym": "SYM", 76 "verb": "VERB", 77 "other": "X", 78 "x": "X", 79 } 80 _DEFAULT_BRIDGE_WORDS = [ 81 "and", 82 "the", 83 "of", 84 "to", 85 "in", 86 "for", 87 "with", 88 "on", 89 "at", 90 "by", 91 "from", 92 "as", 93 "that", 94 "which", 95 "while", 96 "after", 97 "before", 98 "during", 99 "within", 100 "without", 101 "between", 102 "across", 103 "through", 104 "about", 105 "around", 106 "under", 107 "over", 108 "into", 109 "against", 110 ] 111 112 def __init__(self, data: str, maxPerBucket: int = 5000, lang: str = "en"): 113 """ 114 Args: 115 data: Input data as a string or file path. 116 maxPerBucket (optional): Maximum number of n-grams per bucket. 117 lang: Language code used for spaCy model routing. 118 """ 119 120 if files.isFile(data): 121 data = files.readFile(data) 122 123 self.lang = lang 124 self.limit = maxPerBucket 125 126 self.raw: str = data 127 """The original input data (contents of file if file path was given).""" 128 self.pruned: str = self._toPruned(self.raw) 129 """Cleaned version of the input data.""" 130 self.sentences: list[str] = self._toSentences(self.pruned) 131 """List of sentences from the pruned data.""" 132 self.lines: list[str] = self._toLines(self.pruned) 133 """List of lines from the pruned data.""" 134 135 self._nlp = self._getNlp(self.lang) 136 self._doc = self._nlp(self.pruned) 137 138 self.tokens: list[str] = self._toTokens(self._doc) 139 """List of cleaned, filtered tokens from the pruned data.""" 140 self.words: list[str] = self._toWords(self.tokens) 141 """List of unique words from the pruned data.""" 142 143 self._phraseCandidates: list[str] | None = None 144 self._phraseBuckets: dict[int, list[str]] = {} 145 146 @staticmethod 147 def fromDataList( 148 data: list[str] = "/usr/share/dict/words", 149 threshold: int = INPUT_THRESHOLD, 150 **kwargs, 151 ) -> "KCorpus": 152 """Alternative constructor to create a KCorpus from a list of strings. 153 154 Args: 155 data: List of strings to create the corpus from. 156 threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit. 157 158 Useful for cases where the input data is too large to process as a single string.""" 159 if files.isFile(data): 160 data = files.readFileLines(data) 161 162 if len(data) > threshold: 163 logger.info( 164 "Sampling KCorpus data: {} items, discarded {}.", 165 threshold, 166 len(data) - threshold, 167 ) 168 data = helpers.sampleList(data, threshold) 169 170 data = KTextCleaner().sanitizeForbidden(data, dropStrategy="word") 171 return KCorpus(" ".join(data), **kwargs) 172 173 def __str__(self) -> str: 174 """Returns a human-readable representation of the KCorpus object.""" 175 summaryColors = [ 176 colorama.Fore.BLUE, 177 colorama.Fore.MAGENTA, 178 colorama.Fore.RED, 179 colorama.Fore.CYAN, 180 colorama.Fore.YELLOW, 181 colorama.Fore.GREEN, 182 colorama.Fore.WHITE, 183 ] 184 ngramSummary = " ".join( 185 [ 186 f"{summaryColors[index % len(summaryColors)]}{colorama.Back.BLACK}{len(self._phraseBuckets[n])}{colorama.Style.RESET_ALL} {n}-grams" 187 for index, n in enumerate(sorted(self._phraseBuckets.keys())) 188 ] 189 ) 190 if not ngramSummary: 191 ngramSummary = f"{colorama.Fore.WHITE}no n-gram buckets cached" 192 193 return cleandoc( 194 f""" 195 {colorama.Fore.BLACK}{colorama.Back.LIGHTGREEN_EX}KCorpus{colorama.Style.RESET_ALL} {len(self.tokens)} tokens {colorama.Fore.YELLOW}{len(self.words)} words {colorama.Fore.GREEN}{len(self.sentences)} sentences {colorama.Fore.CYAN}{len(self.lines)} lines {ngramSummary}{colorama.Style.RESET_ALL} 196 """ 197 ) 198 199 # Internals 200 def _toPruned(self, data: str) -> str: 201 """Prune missing glyphs.""" 202 # Data already cleaned by now 203 dataPruned = content.omitMissing(input=data, mode="words", debug=0) 204 205 return dataPruned.strip() 206 207 def _toSentences(self, data: str) -> list[str]: 208 """Returns cleaned data split into sentences.""" 209 return content.splitStringToSentences(data) 210 211 def _toLines(self, data: str) -> list[str]: 212 """Returns cleaned data split into lines.""" 213 lines = data.split("\n") 214 lines = helpers.dedupe(lines) 215 return lines 216 217 def _toTokens(self, doc) -> list[str]: 218 """Returns cleaned, filtered words.""" 219 220 wordsAll = [ 221 token.text 222 for token in doc 223 if not token.is_space and not token.is_punct and not token.like_num 224 ] 225 wordsFiltered = [token for token in wordsAll if not self._isStopWord(token)] 226 # ? Sanitize with my own defined blacklist 227 wordsSanitized = KTextCleaner().sanitizeForbidden( 228 wordsFiltered, dropStrategy="word" 229 ) 230 wordsClean = self._cleanTokens(wordsSanitized) 231 232 return wordsClean 233 234 def _cleanTokens(self, tokens: list[str]) -> list[str]: 235 """Normalize token strings and remove empty fragments.""" 236 237 removals = [ 238 # "-" 239 r"^\W+$", 240 # "- Hello" => "Hello" 241 # "Hello -" => "Hello" 242 r"^[.,']\s+|\s+[.,']$", 243 # "'s" => None 244 r"^\W+[A-Za-z]$", 245 ] 246 247 def cleanToken(token): 248 for removal in removals: 249 matches = re.compile(rf"{removal}").findall(token) 250 if matches: 251 before = token 252 for match in matches: 253 token = re.sub(rf"{removal}", "", token) 254 logger.trace( 255 "[Clean] {} \tfrom\t {} \t=> {}", 256 match, 257 before, 258 token or ("empty"), 259 ) 260 261 # Assume it’s not abbreviation => trim punctuation 262 if len(token) > 4: 263 token = token.strip(".,:;-") 264 265 return token 266 267 tokens = [cleanToken(token) for token in tokens] 268 return [token for token in tokens if token] # Remove None 269 270 def _normalizePosFilter(self, pos: str | list[str] | None) -> set[str] | None: 271 """Normalize and validate POS filters into spaCy coarse POS labels.""" 272 if pos is None: 273 return None 274 275 rawValues = helpers.coerceList(pos) 276 normalizedValues = [] 277 for value in rawValues: 278 if not isinstance(value, str): 279 continue 280 key = value.strip().lower() 281 if not key: 282 continue 283 284 normalizedValues.append(self._POS_ALIASES.get(key, key.upper())) 285 286 if not normalizedValues: 287 raise ValueError("POS filter cannot be empty.") 288 289 unknown = sorted( 290 {value for value in normalizedValues if value not in self._VALID_POS} 291 ) 292 if unknown: 293 raise ValueError( 294 "Unsupported POS values in KCorpus filter: " 295 f"{unknown}. Supported values: {sorted(self._VALID_POS)}" 296 ) 297 298 return set(normalizedValues) 299 300 @classmethod 301 def _getNlp(cls, lang: str): 302 if spacy is None: 303 raise RuntimeError( 304 "spaCy is required for KCorpus. Install with: `pip install spacy`." 305 ) 306 307 modelName = cls._MODEL_BY_LANG.get(lang) 308 if not modelName: 309 raise ValueError( 310 f"Unsupported KCorpus language '{lang}'. Supported languages: {list(cls._MODEL_BY_LANG.keys())}" 311 ) 312 313 cachedNlp = cls._NLP_CACHE.get(modelName) 314 if cachedNlp: 315 return cachedNlp 316 317 try: 318 nlp = spacy.load(modelName) 319 except Exception as e: 320 raise RuntimeError( 321 f"Missing spaCy model '{modelName}' for language '{lang}'. " 322 f"Install it with: `python3 -m spacy download {modelName}`" 323 ) from e 324 325 cls._NLP_CACHE[modelName] = nlp 326 return nlp 327 328 def _isStopWord(self, token: str) -> bool: 329 lexeme = self._nlp.vocab[token] 330 if lexeme.is_stop: 331 return True 332 333 if token.lower() in self._nlp.Defaults.stop_words: 334 return True 335 336 return False 337 338 def _cleanPhrase(self, text: str) -> str: 339 text = re.sub(r"\s+([,.:;!?)\]])", r"\1", text) 340 text = re.sub(r"([([\"])\s+", r"\1", text) 341 text = re.sub(r"(\w)\s+-\s+(\w)", r"\1-\2", text) 342 text = re.sub(r"\s+", " ", text) 343 text = content.dedupeWords(text) 344 return text.strip().rstrip(",.;:") 345 346 def _extractShortSentences(self, minWords=4, maxWords=10) -> list[str]: 347 phrases = [] 348 349 for sentence in self._doc.sents: 350 words = [token for token in sentence if token.is_alpha] 351 if minWords <= len(words) <= maxWords: 352 phrase = self._cleanPhrase(sentence.text) 353 if phrase: 354 phrases.append(phrase) 355 356 return phrases 357 358 def _extractSvoPhrases(self) -> list[str]: 359 phrases = [] 360 361 for token in self._doc: 362 if token.pos_ != "VERB": 363 continue 364 365 subject = None 366 obj = None 367 368 for child in token.children: 369 if child.dep_ in ("nsubj", "nsubjpass") and subject is None: 370 start = min(t.i for t in child.subtree) 371 end = max(t.i for t in child.subtree) + 1 372 subject = self._cleanPhrase(self._doc[start:end].text) 373 374 if child.dep_ in ("dobj", "attr", "pobj") and obj is None: 375 start = min(t.i for t in child.subtree) 376 end = max(t.i for t in child.subtree) + 1 377 obj = self._cleanPhrase(self._doc[start:end].text) 378 379 if subject and obj: 380 phrases.append(self._cleanPhrase(f"{subject} {token.text} {obj}")) 381 382 return phrases 383 384 def _extractRootPhrases(self, minWords=2, maxWords=8) -> list[str]: 385 phrases = [] 386 387 for sentence in self._doc.sents: 388 roots = [token for token in sentence if token.dep_ == "ROOT"] 389 if not roots: 390 continue 391 392 root = roots[0] 393 phraseTokens = [root] 394 for child in root.children: 395 if child.dep_ in ("nsubj", "nsubjpass", "dobj", "attr", "prep", "aux"): 396 phraseTokens.extend(list(child.subtree)) 397 398 phraseTokens = sorted(set(phraseTokens), key=lambda t: t.i) 399 if not phraseTokens: 400 continue 401 402 start = phraseTokens[0].i 403 end = phraseTokens[-1].i + 1 404 phrase = self._cleanPhrase(self._doc[start:end].text) 405 wordCount = len([token for token in phrase.split(" ") if token]) 406 if minWords <= wordCount <= maxWords: 407 phrases.append(phrase) 408 409 return phrases 410 411 def _extractNounChunks(self, minWords=2, maxWords=4) -> list[str]: 412 if not self._doc.has_annotation("DEP"): 413 return [] 414 415 phrases = [] 416 try: 417 for chunk in self._doc.noun_chunks: 418 phrase = self._cleanPhrase(chunk.text) 419 words = [token for token in phrase.split(" ") if token] 420 if minWords <= len(words) <= maxWords: 421 phrases.append(phrase) 422 except (NotImplementedError, ValueError, AttributeError): 423 # Not supported for this language/model (e.g. ru), or missing parse data 424 pass 425 return phrases 426 427 def _buildPhraseCandidates(self) -> list[str]: 428 candidatePhrases = [] 429 candidatePhrases.extend(self._extractShortSentences(minWords=2, maxWords=8)) 430 candidatePhrases.extend(self._extractRootPhrases(minWords=2, maxWords=8)) 431 candidatePhrases.extend(self._extractSvoPhrases()) 432 candidatePhrases.extend(self._extractNounChunks(minWords=2, maxWords=4)) 433 434 candidatePhrases = [phrase for phrase in candidatePhrases if phrase] 435 candidatePhrases = helpers.dedupe(candidatePhrases) 436 return candidatePhrases 437 438 def _buildPhraseBuckets(self, limit: int) -> dict[int, list[str]]: 439 buckets = {} 440 for n in range(2, 8): 441 buckets[n] = self._buildSinglePhraseBucket(n, limit) 442 443 return buckets 444 445 def _splitPhraseWords(self, text: str) -> list[str]: 446 return re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿ0-9]+(?:[-'][A-Za-zÀ-ÖØ-öø-ÿ0-9]+)*", text) 447 448 def _buildSentenceWindows(self, n: int, limit: int, seen: set[str]) -> list[str]: 449 windows = [] 450 for sentence in self.sentences: 451 words = self._splitPhraseWords(sentence) 452 if len(words) < n: 453 continue 454 455 for index in range(len(words) - n + 1): 456 phrase = self._cleanPhrase(" ".join(words[index : index + n])) 457 if not phrase or phrase in seen: 458 continue 459 460 seen.add(phrase) 461 windows.append(phrase) 462 if len(windows) >= limit: 463 return windows 464 465 return windows 466 467 def _synthesizeLongPhrases(self, n: int, limit: int, seen: set[str]) -> list[str]: 468 bridgeWords = self._DEFAULT_BRIDGE_WORDS 469 sentenceWords = [ 470 self._splitPhraseWords(sentence) for sentence in self.sentences if sentence 471 ] 472 sentenceWords = [words for words in sentenceWords if words] 473 474 synthesized = [] 475 for index, words in enumerate(sentenceWords): 476 composed = words[:] 477 bridgeIndex = 0 478 nextIndex = index + 1 479 480 while len(composed) < n and nextIndex < len(sentenceWords): 481 # Insert a single bridge word only when stitching sentence fragments. 482 if composed and len(composed) < n: 483 composed.append(bridgeWords[bridgeIndex % len(bridgeWords)]) 484 bridgeIndex += 1 485 486 remaining = n - len(composed) 487 if remaining <= 0: 488 break 489 490 composed.extend(sentenceWords[nextIndex][:remaining]) 491 nextIndex += 1 492 493 if len(composed) < n: 494 continue 495 496 phrase = self._cleanPhrase(" ".join(composed[:n])) 497 if not phrase or phrase in seen: 498 continue 499 500 seen.add(phrase) 501 synthesized.append(phrase) 502 if len(synthesized) >= limit: 503 break 504 505 return synthesized 506 507 def _buildSinglePhraseBucket(self, n: int, limit: int) -> list[str]: 508 if n < 2: 509 return [] 510 511 if self._phraseCandidates is None: 512 self._phraseCandidates = self._buildPhraseCandidates() 513 514 phrases = [] 515 seen = set() 516 for phrase in self._phraseCandidates: 517 wordCount = len([token for token in phrase.split(" ") if token]) 518 if wordCount == n: 519 if phrase in seen: 520 continue 521 seen.add(phrase) 522 phrases.append(phrase) 523 524 if len(phrases) >= limit: 525 return phrases[:limit] 526 527 if len(phrases) < limit: 528 sentenceWindows = self._buildSentenceWindows(n, limit - len(phrases), seen) 529 phrases.extend(sentenceWindows) 530 531 if len(phrases) < limit and n > 4: 532 if DEBUG: 533 logger.info( 534 "Synthesizing long phrases: n={}, needed={}, seen={}", 535 n, 536 limit - len(phrases), 537 len(seen), 538 ) 539 synthesized = self._synthesizeLongPhrases(n, limit - len(phrases), seen) 540 phrases.extend(synthesized) 541 542 return phrases[:limit] 543 544 def _getPhraseBucket(self, n: int) -> list[str]: 545 if n in self._phraseBuckets: 546 return self._phraseBuckets[n] 547 548 self._phraseBuckets[n] = self._buildSinglePhraseBucket(n, self.limit) 549 550 return self._phraseBuckets.get(n, []) 551 552 def _toWords(self, tokens: list[str]) -> list[str]: 553 """ 554 Returns unique words from the cleaned data. 555 556 Args: 557 tokens: List of tokens to process. 558 """ 559 # ? Remove duplicates 560 words = helpers.dedupe(tokens) 561 return words 562 563 def _toNgrams( 564 self, 565 data: str, 566 limit: int, 567 n: int = 4, 568 mode: Literal["quantity", "score"] = "quantity", 569 ): 570 """ 571 Output phrases (n words) 572 573 Args: 574 data: The input data as a list of words. 575 limit: Number of n-grams to return (quantity or score). 576 n: The size of the n-gram (2=bigram, 3=trigram, 4=quadgram). 577 mode: 'quantity' for top-N, 'score' for above a PMI score. 578 579 Returns: 580 List of n-gram phrases. 581 """ 582 ngrams = list(self._getPhraseBucket(n)) 583 return ngrams[:limit] 584 585 def serveSentences( 586 self, 587 mode: Literal["whole", "separate", "connected"] = "separate", 588 limit: int = 100, 589 shuffle=False, 590 ) -> list[str]: 591 """ 592 Serve sentences from the corpus. 593 594 Args: 595 mode: Mode of sentence serving (see below). 596 limit: Limit for sentences or words depending on mode. 597 shuffle: Whether to shuffle sentences. 598 599 Modes: 600 - `whole` 601 - whole sentences in logical succession 602 - `limit` number of sentences 603 - `separate` 604 - Chop sentences one by one 605 - `limit` max words in a sentence 606 - `connected` 607 - Connect whole sentences 608 - `limit` max sentences in a block 609 610 Returns: 611 List of sentences or sentence blocks. 612 """ 613 sentences = ( 614 helpers.shuffleAtRandomSegment(self.sentences) 615 if shuffle 616 else self.sentences 617 ) 618 619 if mode == "whole" and limit is not None: 620 # Avoid out of index if limit exceeds available sentences 621 sentences = sentences[: min(limit, len(sentences))] 622 elif mode in ("separate", "connected"): 623 sentences = content.chopList(sentences, limit, mode, shuffle=False) 624 625 return sentences 626 627 def serveLines(self, clamp: int = None) -> list[str]: 628 """ 629 Serve shuffled lines from the corpus, optionally clamped. 630 631 Args: 632 clamp: Maximum number of lines to return. 633 634 Returns: 635 List of lines. 636 """ 637 lines = content.chopList(self.lines, clamp, mode="separate", shuffle=True) 638 return lines 639 640 def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]: 641 """ 642 Serve phrases of specified n-gram sizes. 643 644 Args: 645 n: Word counts to include (one or multiple) 646 - 1 = single words 647 - 2 = bigrams 648 - 3 = trigrams 649 - 4 = quadgrams 650 - 5+ = extended phrase buckets 651 652 Returns: 653 List of phrases. 654 """ 655 numbers = helpers.coerceList(n) 656 numbers = [ 657 int(number) 658 for number in numbers 659 if isinstance(number, (int, float)) 660 or (isinstance(number, str) and number.isdigit()) 661 ] 662 if any(number < 1 for number in numbers): 663 raise ValueError("KCorpus phrase bucket sizes must be >= 1.") 664 665 ngrams = {1: self.words} 666 for number in numbers: 667 if number <= 1: 668 continue 669 ngrams[number] = self._getPhraseBucket(number) 670 671 phrases = helpers.flatten([ngrams.get(number, []) for number in numbers]) 672 phrases = helpers.dedupe(phrases) 673 674 cleaner = KTextCleaner() 675 phrases = [cleaner.removeOrphanedPunctuation(phrase) for phrase in phrases] 676 677 return phrases 678 679 def serveTokens( 680 self, 681 pos: str | list[str] | None = None, 682 lemma=False, 683 unique=True, 684 alterCase: content.TextCase | None = None, 685 includeStopwords=False, 686 minLength=1, 687 ) -> list[str]: 688 """ 689 Serve tokens optionally filtered by spaCy POS type. 690 691 Args: 692 pos: One or more coarse POS labels (for example ADJ, NOUN, VERB) 693 or aliases (for example adjective, noun, verb). 694 lemma: Return lemmatized forms instead of surface tokens. 695 unique: Deduplicate resulting tokens. 696 alterCase: Change the case of the output tokens. 697 includeStopwords: Keep stop words in output. 698 minLength: Minimum token length to keep. 699 700 Returns: 701 List of filtered tokens. 702 """ 703 posFilter = self._normalizePosFilter(pos) 704 filtered = [] 705 706 for token in self._doc: 707 if token.is_space or token.is_punct or token.like_num or not token.is_alpha: 708 continue 709 710 if posFilter and token.pos_ not in posFilter: 711 continue 712 713 tokenText = token.lemma_ if lemma else token.text 714 tokenText = tokenText.strip() 715 if isinstance(alterCase, str): 716 tokenText = content.changeCase(tokenText, alterCase) 717 718 if not tokenText: 719 continue 720 721 if not includeStopwords and self._isStopWord(tokenText): 722 continue 723 724 filtered.append(tokenText) 725 726 filtered = KTextCleaner().sanitizeForbidden(filtered, dropStrategy="word") 727 filtered = self._cleanTokens(filtered) 728 filtered = [token for token in filtered if len(token) >= minLength] 729 730 if unique: 731 filtered = helpers.dedupe(filtered) 732 733 return filtered
DEBUG =
False
INPUT_THRESHOLD =
80000
Maximum number of items to include from the list when creating a KCorpus from a list of strings.
class
KCorpus:
21class KCorpus: 22 _NLP_CACHE: dict[str, object] = {} 23 _MODEL_BY_LANG = { 24 "en": "en_core_web_md", 25 "ru": "ru_core_news_md", 26 } 27 _VALID_POS = { 28 "ADJ", 29 "ADP", 30 "ADV", 31 "AUX", 32 "CCONJ", 33 "DET", 34 "INTJ", 35 "NOUN", 36 "NUM", 37 "PART", 38 "PRON", 39 "PROPN", 40 "PUNCT", 41 "SCONJ", 42 "SYM", 43 "VERB", 44 "X", 45 } 46 _POS_ALIASES = { 47 "adjective": "ADJ", 48 "adj": "ADJ", 49 "adposition": "ADP", 50 "adp": "ADP", 51 "adverb": "ADV", 52 "adv": "ADV", 53 "auxiliary": "AUX", 54 "aux": "AUX", 55 "coordinating_conjunction": "CCONJ", 56 "cconj": "CCONJ", 57 "determiner": "DET", 58 "det": "DET", 59 "interjection": "INTJ", 60 "intj": "INTJ", 61 "noun": "NOUN", 62 "proper_noun": "PROPN", 63 "propernoun": "PROPN", 64 "propn": "PROPN", 65 "numeral": "NUM", 66 "num": "NUM", 67 "particle": "PART", 68 "part": "PART", 69 "pronoun": "PRON", 70 "pron": "PRON", 71 "punctuation": "PUNCT", 72 "punct": "PUNCT", 73 "subordinating_conjunction": "SCONJ", 74 "sconj": "SCONJ", 75 "symbol": "SYM", 76 "sym": "SYM", 77 "verb": "VERB", 78 "other": "X", 79 "x": "X", 80 } 81 _DEFAULT_BRIDGE_WORDS = [ 82 "and", 83 "the", 84 "of", 85 "to", 86 "in", 87 "for", 88 "with", 89 "on", 90 "at", 91 "by", 92 "from", 93 "as", 94 "that", 95 "which", 96 "while", 97 "after", 98 "before", 99 "during", 100 "within", 101 "without", 102 "between", 103 "across", 104 "through", 105 "about", 106 "around", 107 "under", 108 "over", 109 "into", 110 "against", 111 ] 112 113 def __init__(self, data: str, maxPerBucket: int = 5000, lang: str = "en"): 114 """ 115 Args: 116 data: Input data as a string or file path. 117 maxPerBucket (optional): Maximum number of n-grams per bucket. 118 lang: Language code used for spaCy model routing. 119 """ 120 121 if files.isFile(data): 122 data = files.readFile(data) 123 124 self.lang = lang 125 self.limit = maxPerBucket 126 127 self.raw: str = data 128 """The original input data (contents of file if file path was given).""" 129 self.pruned: str = self._toPruned(self.raw) 130 """Cleaned version of the input data.""" 131 self.sentences: list[str] = self._toSentences(self.pruned) 132 """List of sentences from the pruned data.""" 133 self.lines: list[str] = self._toLines(self.pruned) 134 """List of lines from the pruned data.""" 135 136 self._nlp = self._getNlp(self.lang) 137 self._doc = self._nlp(self.pruned) 138 139 self.tokens: list[str] = self._toTokens(self._doc) 140 """List of cleaned, filtered tokens from the pruned data.""" 141 self.words: list[str] = self._toWords(self.tokens) 142 """List of unique words from the pruned data.""" 143 144 self._phraseCandidates: list[str] | None = None 145 self._phraseBuckets: dict[int, list[str]] = {} 146 147 @staticmethod 148 def fromDataList( 149 data: list[str] = "/usr/share/dict/words", 150 threshold: int = INPUT_THRESHOLD, 151 **kwargs, 152 ) -> "KCorpus": 153 """Alternative constructor to create a KCorpus from a list of strings. 154 155 Args: 156 data: List of strings to create the corpus from. 157 threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit. 158 159 Useful for cases where the input data is too large to process as a single string.""" 160 if files.isFile(data): 161 data = files.readFileLines(data) 162 163 if len(data) > threshold: 164 logger.info( 165 "Sampling KCorpus data: {} items, discarded {}.", 166 threshold, 167 len(data) - threshold, 168 ) 169 data = helpers.sampleList(data, threshold) 170 171 data = KTextCleaner().sanitizeForbidden(data, dropStrategy="word") 172 return KCorpus(" ".join(data), **kwargs) 173 174 def __str__(self) -> str: 175 """Returns a human-readable representation of the KCorpus object.""" 176 summaryColors = [ 177 colorama.Fore.BLUE, 178 colorama.Fore.MAGENTA, 179 colorama.Fore.RED, 180 colorama.Fore.CYAN, 181 colorama.Fore.YELLOW, 182 colorama.Fore.GREEN, 183 colorama.Fore.WHITE, 184 ] 185 ngramSummary = " ".join( 186 [ 187 f"{summaryColors[index % len(summaryColors)]}{colorama.Back.BLACK}{len(self._phraseBuckets[n])}{colorama.Style.RESET_ALL} {n}-grams" 188 for index, n in enumerate(sorted(self._phraseBuckets.keys())) 189 ] 190 ) 191 if not ngramSummary: 192 ngramSummary = f"{colorama.Fore.WHITE}no n-gram buckets cached" 193 194 return cleandoc( 195 f""" 196 {colorama.Fore.BLACK}{colorama.Back.LIGHTGREEN_EX}KCorpus{colorama.Style.RESET_ALL} {len(self.tokens)} tokens {colorama.Fore.YELLOW}{len(self.words)} words {colorama.Fore.GREEN}{len(self.sentences)} sentences {colorama.Fore.CYAN}{len(self.lines)} lines {ngramSummary}{colorama.Style.RESET_ALL} 197 """ 198 ) 199 200 # Internals 201 def _toPruned(self, data: str) -> str: 202 """Prune missing glyphs.""" 203 # Data already cleaned by now 204 dataPruned = content.omitMissing(input=data, mode="words", debug=0) 205 206 return dataPruned.strip() 207 208 def _toSentences(self, data: str) -> list[str]: 209 """Returns cleaned data split into sentences.""" 210 return content.splitStringToSentences(data) 211 212 def _toLines(self, data: str) -> list[str]: 213 """Returns cleaned data split into lines.""" 214 lines = data.split("\n") 215 lines = helpers.dedupe(lines) 216 return lines 217 218 def _toTokens(self, doc) -> list[str]: 219 """Returns cleaned, filtered words.""" 220 221 wordsAll = [ 222 token.text 223 for token in doc 224 if not token.is_space and not token.is_punct and not token.like_num 225 ] 226 wordsFiltered = [token for token in wordsAll if not self._isStopWord(token)] 227 # ? Sanitize with my own defined blacklist 228 wordsSanitized = KTextCleaner().sanitizeForbidden( 229 wordsFiltered, dropStrategy="word" 230 ) 231 wordsClean = self._cleanTokens(wordsSanitized) 232 233 return wordsClean 234 235 def _cleanTokens(self, tokens: list[str]) -> list[str]: 236 """Normalize token strings and remove empty fragments.""" 237 238 removals = [ 239 # "-" 240 r"^\W+$", 241 # "- Hello" => "Hello" 242 # "Hello -" => "Hello" 243 r"^[.,']\s+|\s+[.,']$", 244 # "'s" => None 245 r"^\W+[A-Za-z]$", 246 ] 247 248 def cleanToken(token): 249 for removal in removals: 250 matches = re.compile(rf"{removal}").findall(token) 251 if matches: 252 before = token 253 for match in matches: 254 token = re.sub(rf"{removal}", "", token) 255 logger.trace( 256 "[Clean] {} \tfrom\t {} \t=> {}", 257 match, 258 before, 259 token or ("empty"), 260 ) 261 262 # Assume it’s not abbreviation => trim punctuation 263 if len(token) > 4: 264 token = token.strip(".,:;-") 265 266 return token 267 268 tokens = [cleanToken(token) for token in tokens] 269 return [token for token in tokens if token] # Remove None 270 271 def _normalizePosFilter(self, pos: str | list[str] | None) -> set[str] | None: 272 """Normalize and validate POS filters into spaCy coarse POS labels.""" 273 if pos is None: 274 return None 275 276 rawValues = helpers.coerceList(pos) 277 normalizedValues = [] 278 for value in rawValues: 279 if not isinstance(value, str): 280 continue 281 key = value.strip().lower() 282 if not key: 283 continue 284 285 normalizedValues.append(self._POS_ALIASES.get(key, key.upper())) 286 287 if not normalizedValues: 288 raise ValueError("POS filter cannot be empty.") 289 290 unknown = sorted( 291 {value for value in normalizedValues if value not in self._VALID_POS} 292 ) 293 if unknown: 294 raise ValueError( 295 "Unsupported POS values in KCorpus filter: " 296 f"{unknown}. Supported values: {sorted(self._VALID_POS)}" 297 ) 298 299 return set(normalizedValues) 300 301 @classmethod 302 def _getNlp(cls, lang: str): 303 if spacy is None: 304 raise RuntimeError( 305 "spaCy is required for KCorpus. Install with: `pip install spacy`." 306 ) 307 308 modelName = cls._MODEL_BY_LANG.get(lang) 309 if not modelName: 310 raise ValueError( 311 f"Unsupported KCorpus language '{lang}'. Supported languages: {list(cls._MODEL_BY_LANG.keys())}" 312 ) 313 314 cachedNlp = cls._NLP_CACHE.get(modelName) 315 if cachedNlp: 316 return cachedNlp 317 318 try: 319 nlp = spacy.load(modelName) 320 except Exception as e: 321 raise RuntimeError( 322 f"Missing spaCy model '{modelName}' for language '{lang}'. " 323 f"Install it with: `python3 -m spacy download {modelName}`" 324 ) from e 325 326 cls._NLP_CACHE[modelName] = nlp 327 return nlp 328 329 def _isStopWord(self, token: str) -> bool: 330 lexeme = self._nlp.vocab[token] 331 if lexeme.is_stop: 332 return True 333 334 if token.lower() in self._nlp.Defaults.stop_words: 335 return True 336 337 return False 338 339 def _cleanPhrase(self, text: str) -> str: 340 text = re.sub(r"\s+([,.:;!?)\]])", r"\1", text) 341 text = re.sub(r"([([\"])\s+", r"\1", text) 342 text = re.sub(r"(\w)\s+-\s+(\w)", r"\1-\2", text) 343 text = re.sub(r"\s+", " ", text) 344 text = content.dedupeWords(text) 345 return text.strip().rstrip(",.;:") 346 347 def _extractShortSentences(self, minWords=4, maxWords=10) -> list[str]: 348 phrases = [] 349 350 for sentence in self._doc.sents: 351 words = [token for token in sentence if token.is_alpha] 352 if minWords <= len(words) <= maxWords: 353 phrase = self._cleanPhrase(sentence.text) 354 if phrase: 355 phrases.append(phrase) 356 357 return phrases 358 359 def _extractSvoPhrases(self) -> list[str]: 360 phrases = [] 361 362 for token in self._doc: 363 if token.pos_ != "VERB": 364 continue 365 366 subject = None 367 obj = None 368 369 for child in token.children: 370 if child.dep_ in ("nsubj", "nsubjpass") and subject is None: 371 start = min(t.i for t in child.subtree) 372 end = max(t.i for t in child.subtree) + 1 373 subject = self._cleanPhrase(self._doc[start:end].text) 374 375 if child.dep_ in ("dobj", "attr", "pobj") and obj is None: 376 start = min(t.i for t in child.subtree) 377 end = max(t.i for t in child.subtree) + 1 378 obj = self._cleanPhrase(self._doc[start:end].text) 379 380 if subject and obj: 381 phrases.append(self._cleanPhrase(f"{subject} {token.text} {obj}")) 382 383 return phrases 384 385 def _extractRootPhrases(self, minWords=2, maxWords=8) -> list[str]: 386 phrases = [] 387 388 for sentence in self._doc.sents: 389 roots = [token for token in sentence if token.dep_ == "ROOT"] 390 if not roots: 391 continue 392 393 root = roots[0] 394 phraseTokens = [root] 395 for child in root.children: 396 if child.dep_ in ("nsubj", "nsubjpass", "dobj", "attr", "prep", "aux"): 397 phraseTokens.extend(list(child.subtree)) 398 399 phraseTokens = sorted(set(phraseTokens), key=lambda t: t.i) 400 if not phraseTokens: 401 continue 402 403 start = phraseTokens[0].i 404 end = phraseTokens[-1].i + 1 405 phrase = self._cleanPhrase(self._doc[start:end].text) 406 wordCount = len([token for token in phrase.split(" ") if token]) 407 if minWords <= wordCount <= maxWords: 408 phrases.append(phrase) 409 410 return phrases 411 412 def _extractNounChunks(self, minWords=2, maxWords=4) -> list[str]: 413 if not self._doc.has_annotation("DEP"): 414 return [] 415 416 phrases = [] 417 try: 418 for chunk in self._doc.noun_chunks: 419 phrase = self._cleanPhrase(chunk.text) 420 words = [token for token in phrase.split(" ") if token] 421 if minWords <= len(words) <= maxWords: 422 phrases.append(phrase) 423 except (NotImplementedError, ValueError, AttributeError): 424 # Not supported for this language/model (e.g. ru), or missing parse data 425 pass 426 return phrases 427 428 def _buildPhraseCandidates(self) -> list[str]: 429 candidatePhrases = [] 430 candidatePhrases.extend(self._extractShortSentences(minWords=2, maxWords=8)) 431 candidatePhrases.extend(self._extractRootPhrases(minWords=2, maxWords=8)) 432 candidatePhrases.extend(self._extractSvoPhrases()) 433 candidatePhrases.extend(self._extractNounChunks(minWords=2, maxWords=4)) 434 435 candidatePhrases = [phrase for phrase in candidatePhrases if phrase] 436 candidatePhrases = helpers.dedupe(candidatePhrases) 437 return candidatePhrases 438 439 def _buildPhraseBuckets(self, limit: int) -> dict[int, list[str]]: 440 buckets = {} 441 for n in range(2, 8): 442 buckets[n] = self._buildSinglePhraseBucket(n, limit) 443 444 return buckets 445 446 def _splitPhraseWords(self, text: str) -> list[str]: 447 return re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿ0-9]+(?:[-'][A-Za-zÀ-ÖØ-öø-ÿ0-9]+)*", text) 448 449 def _buildSentenceWindows(self, n: int, limit: int, seen: set[str]) -> list[str]: 450 windows = [] 451 for sentence in self.sentences: 452 words = self._splitPhraseWords(sentence) 453 if len(words) < n: 454 continue 455 456 for index in range(len(words) - n + 1): 457 phrase = self._cleanPhrase(" ".join(words[index : index + n])) 458 if not phrase or phrase in seen: 459 continue 460 461 seen.add(phrase) 462 windows.append(phrase) 463 if len(windows) >= limit: 464 return windows 465 466 return windows 467 468 def _synthesizeLongPhrases(self, n: int, limit: int, seen: set[str]) -> list[str]: 469 bridgeWords = self._DEFAULT_BRIDGE_WORDS 470 sentenceWords = [ 471 self._splitPhraseWords(sentence) for sentence in self.sentences if sentence 472 ] 473 sentenceWords = [words for words in sentenceWords if words] 474 475 synthesized = [] 476 for index, words in enumerate(sentenceWords): 477 composed = words[:] 478 bridgeIndex = 0 479 nextIndex = index + 1 480 481 while len(composed) < n and nextIndex < len(sentenceWords): 482 # Insert a single bridge word only when stitching sentence fragments. 483 if composed and len(composed) < n: 484 composed.append(bridgeWords[bridgeIndex % len(bridgeWords)]) 485 bridgeIndex += 1 486 487 remaining = n - len(composed) 488 if remaining <= 0: 489 break 490 491 composed.extend(sentenceWords[nextIndex][:remaining]) 492 nextIndex += 1 493 494 if len(composed) < n: 495 continue 496 497 phrase = self._cleanPhrase(" ".join(composed[:n])) 498 if not phrase or phrase in seen: 499 continue 500 501 seen.add(phrase) 502 synthesized.append(phrase) 503 if len(synthesized) >= limit: 504 break 505 506 return synthesized 507 508 def _buildSinglePhraseBucket(self, n: int, limit: int) -> list[str]: 509 if n < 2: 510 return [] 511 512 if self._phraseCandidates is None: 513 self._phraseCandidates = self._buildPhraseCandidates() 514 515 phrases = [] 516 seen = set() 517 for phrase in self._phraseCandidates: 518 wordCount = len([token for token in phrase.split(" ") if token]) 519 if wordCount == n: 520 if phrase in seen: 521 continue 522 seen.add(phrase) 523 phrases.append(phrase) 524 525 if len(phrases) >= limit: 526 return phrases[:limit] 527 528 if len(phrases) < limit: 529 sentenceWindows = self._buildSentenceWindows(n, limit - len(phrases), seen) 530 phrases.extend(sentenceWindows) 531 532 if len(phrases) < limit and n > 4: 533 if DEBUG: 534 logger.info( 535 "Synthesizing long phrases: n={}, needed={}, seen={}", 536 n, 537 limit - len(phrases), 538 len(seen), 539 ) 540 synthesized = self._synthesizeLongPhrases(n, limit - len(phrases), seen) 541 phrases.extend(synthesized) 542 543 return phrases[:limit] 544 545 def _getPhraseBucket(self, n: int) -> list[str]: 546 if n in self._phraseBuckets: 547 return self._phraseBuckets[n] 548 549 self._phraseBuckets[n] = self._buildSinglePhraseBucket(n, self.limit) 550 551 return self._phraseBuckets.get(n, []) 552 553 def _toWords(self, tokens: list[str]) -> list[str]: 554 """ 555 Returns unique words from the cleaned data. 556 557 Args: 558 tokens: List of tokens to process. 559 """ 560 # ? Remove duplicates 561 words = helpers.dedupe(tokens) 562 return words 563 564 def _toNgrams( 565 self, 566 data: str, 567 limit: int, 568 n: int = 4, 569 mode: Literal["quantity", "score"] = "quantity", 570 ): 571 """ 572 Output phrases (n words) 573 574 Args: 575 data: The input data as a list of words. 576 limit: Number of n-grams to return (quantity or score). 577 n: The size of the n-gram (2=bigram, 3=trigram, 4=quadgram). 578 mode: 'quantity' for top-N, 'score' for above a PMI score. 579 580 Returns: 581 List of n-gram phrases. 582 """ 583 ngrams = list(self._getPhraseBucket(n)) 584 return ngrams[:limit] 585 586 def serveSentences( 587 self, 588 mode: Literal["whole", "separate", "connected"] = "separate", 589 limit: int = 100, 590 shuffle=False, 591 ) -> list[str]: 592 """ 593 Serve sentences from the corpus. 594 595 Args: 596 mode: Mode of sentence serving (see below). 597 limit: Limit for sentences or words depending on mode. 598 shuffle: Whether to shuffle sentences. 599 600 Modes: 601 - `whole` 602 - whole sentences in logical succession 603 - `limit` number of sentences 604 - `separate` 605 - Chop sentences one by one 606 - `limit` max words in a sentence 607 - `connected` 608 - Connect whole sentences 609 - `limit` max sentences in a block 610 611 Returns: 612 List of sentences or sentence blocks. 613 """ 614 sentences = ( 615 helpers.shuffleAtRandomSegment(self.sentences) 616 if shuffle 617 else self.sentences 618 ) 619 620 if mode == "whole" and limit is not None: 621 # Avoid out of index if limit exceeds available sentences 622 sentences = sentences[: min(limit, len(sentences))] 623 elif mode in ("separate", "connected"): 624 sentences = content.chopList(sentences, limit, mode, shuffle=False) 625 626 return sentences 627 628 def serveLines(self, clamp: int = None) -> list[str]: 629 """ 630 Serve shuffled lines from the corpus, optionally clamped. 631 632 Args: 633 clamp: Maximum number of lines to return. 634 635 Returns: 636 List of lines. 637 """ 638 lines = content.chopList(self.lines, clamp, mode="separate", shuffle=True) 639 return lines 640 641 def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]: 642 """ 643 Serve phrases of specified n-gram sizes. 644 645 Args: 646 n: Word counts to include (one or multiple) 647 - 1 = single words 648 - 2 = bigrams 649 - 3 = trigrams 650 - 4 = quadgrams 651 - 5+ = extended phrase buckets 652 653 Returns: 654 List of phrases. 655 """ 656 numbers = helpers.coerceList(n) 657 numbers = [ 658 int(number) 659 for number in numbers 660 if isinstance(number, (int, float)) 661 or (isinstance(number, str) and number.isdigit()) 662 ] 663 if any(number < 1 for number in numbers): 664 raise ValueError("KCorpus phrase bucket sizes must be >= 1.") 665 666 ngrams = {1: self.words} 667 for number in numbers: 668 if number <= 1: 669 continue 670 ngrams[number] = self._getPhraseBucket(number) 671 672 phrases = helpers.flatten([ngrams.get(number, []) for number in numbers]) 673 phrases = helpers.dedupe(phrases) 674 675 cleaner = KTextCleaner() 676 phrases = [cleaner.removeOrphanedPunctuation(phrase) for phrase in phrases] 677 678 return phrases 679 680 def serveTokens( 681 self, 682 pos: str | list[str] | None = None, 683 lemma=False, 684 unique=True, 685 alterCase: content.TextCase | None = None, 686 includeStopwords=False, 687 minLength=1, 688 ) -> list[str]: 689 """ 690 Serve tokens optionally filtered by spaCy POS type. 691 692 Args: 693 pos: One or more coarse POS labels (for example ADJ, NOUN, VERB) 694 or aliases (for example adjective, noun, verb). 695 lemma: Return lemmatized forms instead of surface tokens. 696 unique: Deduplicate resulting tokens. 697 alterCase: Change the case of the output tokens. 698 includeStopwords: Keep stop words in output. 699 minLength: Minimum token length to keep. 700 701 Returns: 702 List of filtered tokens. 703 """ 704 posFilter = self._normalizePosFilter(pos) 705 filtered = [] 706 707 for token in self._doc: 708 if token.is_space or token.is_punct or token.like_num or not token.is_alpha: 709 continue 710 711 if posFilter and token.pos_ not in posFilter: 712 continue 713 714 tokenText = token.lemma_ if lemma else token.text 715 tokenText = tokenText.strip() 716 if isinstance(alterCase, str): 717 tokenText = content.changeCase(tokenText, alterCase) 718 719 if not tokenText: 720 continue 721 722 if not includeStopwords and self._isStopWord(tokenText): 723 continue 724 725 filtered.append(tokenText) 726 727 filtered = KTextCleaner().sanitizeForbidden(filtered, dropStrategy="word") 728 filtered = self._cleanTokens(filtered) 729 filtered = [token for token in filtered if len(token) >= minLength] 730 731 if unique: 732 filtered = helpers.dedupe(filtered) 733 734 return filtered
KCorpus(data: str, maxPerBucket: int = 5000, lang: str = 'en')
113 def __init__(self, data: str, maxPerBucket: int = 5000, lang: str = "en"): 114 """ 115 Args: 116 data: Input data as a string or file path. 117 maxPerBucket (optional): Maximum number of n-grams per bucket. 118 lang: Language code used for spaCy model routing. 119 """ 120 121 if files.isFile(data): 122 data = files.readFile(data) 123 124 self.lang = lang 125 self.limit = maxPerBucket 126 127 self.raw: str = data 128 """The original input data (contents of file if file path was given).""" 129 self.pruned: str = self._toPruned(self.raw) 130 """Cleaned version of the input data.""" 131 self.sentences: list[str] = self._toSentences(self.pruned) 132 """List of sentences from the pruned data.""" 133 self.lines: list[str] = self._toLines(self.pruned) 134 """List of lines from the pruned data.""" 135 136 self._nlp = self._getNlp(self.lang) 137 self._doc = self._nlp(self.pruned) 138 139 self.tokens: list[str] = self._toTokens(self._doc) 140 """List of cleaned, filtered tokens from the pruned data.""" 141 self.words: list[str] = self._toWords(self.tokens) 142 """List of unique words from the pruned data.""" 143 144 self._phraseCandidates: list[str] | None = None 145 self._phraseBuckets: dict[int, list[str]] = {}
Arguments:
- data: Input data as a string or file path.
- maxPerBucket (optional): Maximum number of n-grams per bucket.
- lang: Language code used for spaCy model routing.
@staticmethod
def
fromDataList( data: list[str] = '/usr/share/dict/words', threshold: int = 80000, **kwargs) -> KCorpus:
147 @staticmethod 148 def fromDataList( 149 data: list[str] = "/usr/share/dict/words", 150 threshold: int = INPUT_THRESHOLD, 151 **kwargs, 152 ) -> "KCorpus": 153 """Alternative constructor to create a KCorpus from a list of strings. 154 155 Args: 156 data: List of strings to create the corpus from. 157 threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit. 158 159 Useful for cases where the input data is too large to process as a single string.""" 160 if files.isFile(data): 161 data = files.readFileLines(data) 162 163 if len(data) > threshold: 164 logger.info( 165 "Sampling KCorpus data: {} items, discarded {}.", 166 threshold, 167 len(data) - threshold, 168 ) 169 data = helpers.sampleList(data, threshold) 170 171 data = KTextCleaner().sanitizeForbidden(data, dropStrategy="word") 172 return KCorpus(" ".join(data), **kwargs)
Alternative constructor to create a KCorpus from a list of strings.
Args:
data: List of strings to create the corpus from.
threshold: Maximum number of items to include from the list. If the list exceeds this size, it will be randomly sampled down to this limit.
Useful for cases where the input data is too large to process as a single string.
def
serveSentences( self, mode: Literal['whole', 'separate', 'connected'] = 'separate', limit: int = 100, shuffle=False) -> list[str]:
586 def serveSentences( 587 self, 588 mode: Literal["whole", "separate", "connected"] = "separate", 589 limit: int = 100, 590 shuffle=False, 591 ) -> list[str]: 592 """ 593 Serve sentences from the corpus. 594 595 Args: 596 mode: Mode of sentence serving (see below). 597 limit: Limit for sentences or words depending on mode. 598 shuffle: Whether to shuffle sentences. 599 600 Modes: 601 - `whole` 602 - whole sentences in logical succession 603 - `limit` number of sentences 604 - `separate` 605 - Chop sentences one by one 606 - `limit` max words in a sentence 607 - `connected` 608 - Connect whole sentences 609 - `limit` max sentences in a block 610 611 Returns: 612 List of sentences or sentence blocks. 613 """ 614 sentences = ( 615 helpers.shuffleAtRandomSegment(self.sentences) 616 if shuffle 617 else self.sentences 618 ) 619 620 if mode == "whole" and limit is not None: 621 # Avoid out of index if limit exceeds available sentences 622 sentences = sentences[: min(limit, len(sentences))] 623 elif mode in ("separate", "connected"): 624 sentences = content.chopList(sentences, limit, mode, shuffle=False) 625 626 return sentences
Serve sentences from the corpus.
Arguments:
- mode: Mode of sentence serving (see below).
- limit: Limit for sentences or words depending on mode.
- shuffle: Whether to shuffle sentences.
Modes:
whole- whole sentences in logical succession
limitnumber of sentences
separate- Chop sentences one by one
limitmax words in a sentence
connected- Connect whole sentences
limitmax sentences in a block
Returns:
List of sentences or sentence blocks.
def
serveLines(self, clamp: int = None) -> list[str]:
628 def serveLines(self, clamp: int = None) -> list[str]: 629 """ 630 Serve shuffled lines from the corpus, optionally clamped. 631 632 Args: 633 clamp: Maximum number of lines to return. 634 635 Returns: 636 List of lines. 637 """ 638 lines = content.chopList(self.lines, clamp, mode="separate", shuffle=True) 639 return lines
Serve shuffled lines from the corpus, optionally clamped.
Arguments:
- clamp: Maximum number of lines to return.
Returns:
List of lines.
def
servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]:
641 def servePhrases(self, n: str | list | tuple = (1, 2, 3, 4)) -> list[str]: 642 """ 643 Serve phrases of specified n-gram sizes. 644 645 Args: 646 n: Word counts to include (one or multiple) 647 - 1 = single words 648 - 2 = bigrams 649 - 3 = trigrams 650 - 4 = quadgrams 651 - 5+ = extended phrase buckets 652 653 Returns: 654 List of phrases. 655 """ 656 numbers = helpers.coerceList(n) 657 numbers = [ 658 int(number) 659 for number in numbers 660 if isinstance(number, (int, float)) 661 or (isinstance(number, str) and number.isdigit()) 662 ] 663 if any(number < 1 for number in numbers): 664 raise ValueError("KCorpus phrase bucket sizes must be >= 1.") 665 666 ngrams = {1: self.words} 667 for number in numbers: 668 if number <= 1: 669 continue 670 ngrams[number] = self._getPhraseBucket(number) 671 672 phrases = helpers.flatten([ngrams.get(number, []) for number in numbers]) 673 phrases = helpers.dedupe(phrases) 674 675 cleaner = KTextCleaner() 676 phrases = [cleaner.removeOrphanedPunctuation(phrase) for phrase in phrases] 677 678 return phrases
Serve phrases of specified n-gram sizes.
Arguments:
- n: Word counts to include (one or multiple)
- 1 = single words
- 2 = bigrams
- 3 = trigrams
- 4 = quadgrams
- 5+ = extended phrase buckets
Returns:
List of phrases.
def
serveTokens( self, pos: str | list[str] | None = None, lemma=False, unique=True, alterCase: Optional[Literal['UPPER', 'lower', 'title', 'title-force']] = None, includeStopwords=False, minLength=1) -> list[str]:
680 def serveTokens( 681 self, 682 pos: str | list[str] | None = None, 683 lemma=False, 684 unique=True, 685 alterCase: content.TextCase | None = None, 686 includeStopwords=False, 687 minLength=1, 688 ) -> list[str]: 689 """ 690 Serve tokens optionally filtered by spaCy POS type. 691 692 Args: 693 pos: One or more coarse POS labels (for example ADJ, NOUN, VERB) 694 or aliases (for example adjective, noun, verb). 695 lemma: Return lemmatized forms instead of surface tokens. 696 unique: Deduplicate resulting tokens. 697 alterCase: Change the case of the output tokens. 698 includeStopwords: Keep stop words in output. 699 minLength: Minimum token length to keep. 700 701 Returns: 702 List of filtered tokens. 703 """ 704 posFilter = self._normalizePosFilter(pos) 705 filtered = [] 706 707 for token in self._doc: 708 if token.is_space or token.is_punct or token.like_num or not token.is_alpha: 709 continue 710 711 if posFilter and token.pos_ not in posFilter: 712 continue 713 714 tokenText = token.lemma_ if lemma else token.text 715 tokenText = tokenText.strip() 716 if isinstance(alterCase, str): 717 tokenText = content.changeCase(tokenText, alterCase) 718 719 if not tokenText: 720 continue 721 722 if not includeStopwords and self._isStopWord(tokenText): 723 continue 724 725 filtered.append(tokenText) 726 727 filtered = KTextCleaner().sanitizeForbidden(filtered, dropStrategy="word") 728 filtered = self._cleanTokens(filtered) 729 filtered = [token for token in filtered if len(token) >= minLength] 730 731 if unique: 732 filtered = helpers.dedupe(filtered) 733 734 return filtered
Serve tokens optionally filtered by spaCy POS type.
Arguments:
- pos: One or more coarse POS labels (for example ADJ, NOUN, VERB) or aliases (for example adjective, noun, verb).
- lemma: Return lemmatized forms instead of surface tokens.
- unique: Deduplicate resulting tokens.
- alterCase: Change the case of the output tokens.
- includeStopwords: Keep stop words in output.
- minLength: Minimum token length to keep.
Returns:
List of filtered tokens.