这段代码实现了一个简单的倒排索引搜索引擎。主要包括以下几个函数和操作:

  1. tokenize_tweet(document): 对文档进行令牌化处理,将文档转换为小写字母,并提取出tweetid、username和tweet内容三部分主要信息。使用TextBlob的words.singularize()方法对文档进行词干提取和单词变单数处理,并排除掉无用的词汇。

  2. get_postings(): 读取文档并构建倒排索引。逐行读取文档内容,对每一行进行令牌化处理,然后根据词在倒排索引中的情况,添加或更新倒排索引。

  3. op_and(term1, term2): 对两个词进行AND操作,返回两个词在倒排索引中的交集。

  4. op_or(term1, term2): 对两个词进行OR操作,返回两个词在倒排索引中的并集。

  5. op_not(term1, term2): 对term1进行NOT操作,返回term1在倒排索引中的位置,排除term2在倒排索引中的位置。

  6. op_andnot(term1, term2): 对term1进行AND NOT操作,返回term1在倒排索引中的位置,排除term2在倒排索引中的位置。

  7. do_rankSearch(terms): 根据查询的词在倒排索引中的情况进行排名,返回按照排名得到的tweetid。

  8. token(doc): 对查询的文档进行令牌化处理,将文档转换为小写字母,并进行词干提取和单词变单数处理。

  9. do_search(): 进行查询操作,根据输入的查询词进行相应的操作,并打印出结果。

  10. main(): 主函数,调用get_postings()函数构建倒排索引,并通过循环调用do_search()函数进行查询。

  11. if __name__ == "__main__"::判断该模块是否作为主程序运行,如果是则调用main()函数。

这段代码的功能是通过构建倒排索引实现了简单的文本搜索功能,可以根据输入的查询词进行AND、OR、NOT等操作,并返回匹配的结果。

import sys
from collections import defaultdict
from textblob import TextBlob        #导入文本处理工具
from textblob import Word
uselessTerm = ['username', 'text', 'tweetid']  #声明一个列表uselessTerm,存储不需要的词汇
postings = defaultdict(dict)                 #声明一个空字postings,用于存储倒排索引
def tokenize_tweet(document):              #对文档进行令牌化处理
    document = document.lower()              #将所有大写字母返回小写字母并返回字符串
    a = document.index('username')           #用index和rindex返回指定的索引名称
    b = document.index('clusterno')
    c = document.rindex('tweetid') - 1
    d = document.rindex('errorcode')
    e = document.index('text')
    f = document.index('timestr') - 3         #获取时间戳
                                            #提取twwetid,username,tweet内容三部分主要信息
    document = document[c:d] + document[a:b] + document[e:f]
    
    terms = TextBlob(document).words.singularize()     #使用TextBlob的words.singularize()方法对文档进行词干提取和单词变单数处理。
    result = []
    for word in terms:
        expected_str = Word(word)
        expected_str = expected_str.lemmatize('v')     #lemmatize() 方法  对单词进行词形还原,名词找单数,动词找原型
        if expected_str not in uselessTerm:
            result.append(expected_str)
    return result

#读取文档并构建倒排索引
def get_postings():
    global postings
    f = open(r'C:\Users\马雨慧\Desktop\大三上课程作业\信息检索与数据挖掘\experiment1\tweets.txt')  #打开document文件
    lines = f.readlines()                               #逐行读取文件内容
    for line in lines:
        line = tokenize_tweet(line)                     #令牌化,解析一行数据
        
        tweetid = line[0]
        line.pop(0)
        unique_terms = set(line)
        for te in unique_terms:                        #根据词在倒排索引中的情况,添加或更新倒排索引。
            if te in postings.keys():
                postings[te].append(tweetid)
            else:
                postings[te] = [tweetid]
                
def op_and(term1,term2):                               #and与操作
    global postings       #全局变量
    answer = []                                          #如果term1或term2不在倒排索引中,则返回一个空列表。   
    if(term1 not in postings) or (term2 not in postings):
        return answer
    else:
        i = len(postings[term1])       #获取词长度    
        j = len(postings[term2])
        x = 0
        y = 0
        while x<i and y<j:                            #否则,遍历term1和term2在倒排索引中的位置。
            if postings[term1][x]==postings[term2][y]:#如果位置相同,则添加到结果列表中,并移动两个索引的位置。
                answer.append(postings[term1][x])     
                x+=1
                y+=1
            elif postings[term1][x]<postings[term2][y]:#如果term1的位置小于term2的位置,则移动term1的索引。
                x+=1
            else:                                      #反之   
                y+=1
        return answer
    
def op_or(term1,term2):                              #或操作
    answer = []
    if(term1 not in postings) and (term2 not in postings):
        answer = []
    elif term2 not in postings:
        answer = postings[term1]
    elif term1 not in postings:
        answer = postings[term2]
    else:                                            # 否则,返回term1和term2在倒排索引中的位置的并集。
        answer = postings[term1]
        for item in postings[term2]:
            if item not in answer:
                answer.append(item)
    return answer
def op_not(term1,term2):      #非操作
    answer = []
    if term1 not in postings:
        return answer
    elif term2 not in postings:
        answer = postings[term1]
        return answer
    else:
        answer = postings[term1]                      #否则,返回term1在倒排索引中的位置,排除term2在倒排索引中的位置。-------------------------------------------
        ANS = []
        for ter in answer:
            if ter not in postings[term2]:
                ANS.append(ter)
        return ANS
def op_andnot(term1,term2):  #与非操作
    answer = []
    x, y = 0, 0
    while x < len(postings[term1]) and y < len(postings[term2]):
        # index相等时,同时后移
        if postings[term1][x]==postings[term2][y]:
            x += 1
            y += 1
        # 指向term1的index较小时,加入结果列表
        elif postings[term1][x] < postings[term2][y]:
            answer.append(postings[term1][x])
            x += 1
        else:
            y += 1
    # term1 未遍历完,加入剩余index
    if x != len(postings[term1]):
        answer.extend(postings[term1][x:])
    return answer
         

def do_rankSearch(terms):      #倒排索引
    Answer = defaultdict(dict)
    for item in terms:
        if item in postings:
            for tweetid in postings[item]:
                if tweetid in Answer:
                    Answer[tweetid]+=1
                else:
                    Answer[tweetid]=1
    Answer = sorted(Answer.items(),key=lambda asd: asd[1],reverse=True)
    return Answer
def token(doc):
    doc = doc.lower()
    terms = TextBlob(doc).words.singularize()
    result = []
    for word in terms:
        expected_str = Word(word)
        expected_str = expected_str.lemmatize('v')
        result.append(expected_str)
    return result

def do_search():       #查询
    terms = token(input('input your search query >> '))
    if terms == []:
        sys.exit()
    if len(terms) == 3:
        if terms[1] == 'and':
            answer = op_and(terms[0],terms[2])
            print(answer)
        elif terms[1] == 'or':
            answer = op_or(terms[0],terms[2])
            print(answer)
        elif terms[1] == 'not':
            answer = op_not(terms[0],terms[2])
            print(answer)
        else:
            print('there is a syntax error!')
    else:
        leng = len(terms)
        answer = do_rankSearch(terms)
        print('[Rank_Score: Tweetid]')
        for(tweetid,score) in answer:
            print(str(score/leng)+ ': ' + tweetid)
def main():
    get_postings()
    while True:
        do_search()
        
if __name__ == "__main__":
    main()
Python实现简单倒排索引搜索引擎

原文地址: http://www.cveoy.top/t/topic/9y4 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录