# Mathias Helminger (helminger@kernelz.de) # 2009-10-02 # Comparison between shannon encoding and a different code that i came up with accidentially ;) # usage: python encoders.py [console|abc|default*] [shannon|normal|fast*] 1* # first parameter is the input mode, abc is the test string used for the comparison, console reads one line of input, default uses some text from wikipedia # second parameter chooses the encoding algorithm, be careful with "normal", it takes very long when many different characters are present # the third parameter is the blocksize that should be used # the * denotes the default option from math import log def sort_by_value(d): "Returns the keys of a dictionary sorted by their values (ascending)" items=d.items() backitems=[ [v[1],v[0]] for v in items] backitems.sort() return [ backitems[i][1] for i in range(0,len(backitems))] def fast_split(d): "performs a fast split using a simple approximation" if len(d) == 1: return d.keys() valuesorted = sort_by_value(d) suml = 0 sumr = 0 l = [] r = [] i = len(valuesorted) -1 while i >=0: char = valuesorted[i] if (suml < sumr): l.append(char) suml += d[char] else: r.append(char) sumr += d[char] i-=1 return (l,r) def split_shannon(d): "splits the dictionary like in shannon-fano encoding" #print "splitting %s" % (d) if len(d) == 1: return d.keys() valuesorted = sort_by_value(d) half = 0 for v in d.values(): half += v half = half/2 suml = 0 l = [] r = [] i = len(valuesorted) -1 while i >= 0: char = valuesorted[i] if suml < half: #print "still in loop" if abs(half-suml-d[char]) < abs(half-suml): l.append(char) suml += d[char] #print "used %s" % (char) else: #print "omitted last %s (%.4f) %.4f %.4f" % (char, d[char], suml, half) break else: #print "leaving, too big" break i-=1 while i >= 0: r.append(valuesorted[i]) i-=1 return (l,r) def construct_set(keys, i): "constructs a set of keys from a number using its binary representation" res = [] pos = 0 while i > 0 and pos < len(keys): if i % 2 == 1: res.append(keys[pos]) pos += 1 i = i / 2 return res def get_weight(lset, d): "calculates the weight of a subset using the weight dictionary" sum = 0 for k in lset: if k in d: sum += d[k] return sum def split_equally(d): "performs the most equal split possible using brute force search (explicit construction of all sets)" if len(d) == 1: return d.keys() max_weight = get_weight(d.keys(), d)/2 #print "max weight is %f" % (max_weight) best = [] best_weight = 0 for i in xrange(2**len(dc)): cur = construct_set(d.keys(),i) cur_weight = get_weight(cur, d) if cur_weight > best_weight and cur_weight < max_weight+0.00001: #print "new best: %s @ %f" % (cur, cur_weight) best = cur best_weight = cur_weight #else: #print "bad set: %s @ %f" % (cur, cur_weight) #print "best: %s @ %f" % (best, best_weight) copy = d.copy() for k in best: del copy[k] #print "removed %s" % (k) return (best,copy.keys()) def get_tree(dc, split="fast"): "turns a probability dictionary into an encoding tree using the given split algorithm (fast is default)" #print "get_tree of %s" % (dc) tree = (None,None) if len(dc) == 1: return dc.keys()[0] else: if split=="fast": x = fast_split(dc) else: if split == "shannon": x = split_shannon(dc) else: x = split_equally(dc) l,r = x #print "split returned %s %s" % (l,r) dl = dict() dr = dict() for x in l: dl[x] = dc[x] for x in r: dr[x] = dc[x] tree = (get_tree(dl, split), get_tree(dr, split)) return tree def set_proper_length(text, width): "makes sure the code has the proper length for multi-byte encoding" if width == 1: return text missing = width- len(text) % width text = text+" "*missing return text def get_prob_dict(text, width=1): "calculates a probability dictionary from a given input text" text = set_proper_length(text, width) dc = dict() unit = 1.0/len(text)*width i = 0 while i < len(text): char = text[i:(i+width)] if char in dc: dc[char] += unit else: dc[char] = unit i+=width return dc def gen_code_words(tree): "turns the decision tree into a dictionary with input chars and codewords" #print "input: %s, len: %i type is %s" % ((tree),len(tree), type(tree)) code_words = dict() level = 1 node_queue = [] if len(tree) == 2 and isinstance(tree,tuple): #print " splitting: %s, len: %i" % ((tree),len(tree)) l, r = tree cl, cr = gen_code_words(l), gen_code_words(r) #print " merging subcodes: l:%s r:%s" % ((cl),(cr)) cout = dict() for k,v in cl.iteritems(): cout[k] = '0'+v for k,v in cr.iteritems(): cout[k] = '1'+v return cout else: return {tree:''} def encode(text, code_words, width=1): "performs the encoding with a given block width" text = set_proper_length(text, width) i = 0 result = "" while i < len(text): char = text[i:(i+width)] result += code_words[char] i+=width return result def reverse_code(code): result = dict() for k,v in code.iteritems(): result[v] = k return result def decode(compressed, code): "decodes a compressed string" rev_code = reverse_code(code) i = 1 start = 0 result = "" codechar = "" while i <= len(compressed): codechar = compressed[start:i] if codechar in rev_code: result += rev_code[codechar] codechar = "" start = i i+=1 else: i+=1 if len(codechar) > 0: raise ValueError("problem found while decoding, '%s' is not a valid codeword" % (codechar)) return result def code_to_str(code, dc): "some nice output function for the probabilities and the encoding" s = "" hx = 0.0 length = 0.0 sorted_chars = sort_by_value(dc) for char in sorted_chars: codeword = code[char] prob = dc[char] hx += prob*log(prob,2) length += prob*len(code[char]) s += "'%s' --->>> %-6s (p(x)=%.4f H(x)=%.4f len=%.4f) " % (char,codeword,prob, prob*log(prob,2), len(code[char])*prob) s += "\n" return s[0:-1]+"\n H(x) = %4.4f len(x) = %4.4f" % (-hx,length) # default text atext = "In probability theory, a stochastic process, or sometimes random process, is the counterpart to a deterministic process (or deterministic system). Instead of dealing with only one possible 'reality' of how the process might evolve under time (as is the case, for example, for solutions of an ordinary differential equation), in a stochastic or random process there is some indeterminacy in its future evolution described by probability distributions." # "ugly" param processing import sys if len(sys.argv) >= 2: if sys.argv[1] == 'console': print "enter text to encode" atext = (sys.stdin.readline())[0:-1] if sys.argv[1] == 'abc': atext = "AAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDEEEEEEEEEEEEEFFFFF" split_alog = "fast" if len(sys.argv) >= 3: split_alog = sys.argv[2] width = 1 if len(sys.argv) >= 4: width = sys.argv[3] # lets try it out print "base" print atext print "\nlength is %i bits" % (len(atext)*8) print "\npreprocessing results using method "+split_alog dc = get_prob_dict(atext,width) code = gen_code_words(get_tree(dc, split_alog)) print code_to_str(code, dc) print "\nencoded" enc_str = encode(atext, code,width) print enc_str c_rate = 1.0-(len(enc_str)*1.0/(len(atext)*8)) print "\nlength is %i bits => compression rate (without table): %.4f%%" % (len(enc_str), c_rate*100) print "\ndecoded" dec_str = decode(enc_str,code) print dec_str