布隆过滤器持久化到redis

布隆过滤器持久化到redis

关于布隆过滤器的介绍网上基本上都能找到。

运用guava包下面的布隆过滤器来模拟

持久化思路

将布隆过滤器中的下标数组存储到redis的bitMap

引入guava依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>32.1.2-jre</version>
</dependency>

查看一下bloomFiltermightContain方法

public <T> boolean mightContain(@ParametricNullness T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
            long bitSize = bits.bitSize();
            long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
            int hash1 = (int)hash64;
            int hash2 = (int)(hash64 >>> 32);

            for(int i = 1; i <= numHashFunctions; ++i) {
                int combinedHash = hash1 + i * hash2;
                if (combinedHash < 0) {
                    combinedHash = ~combinedHash;
                }

                if (!bits.get((long)combinedHash % bitSize)) {
                    return false;
                }
            }

            return true;
        }

后面计算数组下标需要用到

查看一下com.google.common.hash.BloomFilter#optimalNumOfHashFunctions计算hash函数个数需要

  @VisibleForTesting
  static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
  }

根据给定的 bitSize 和 p 计算出最优的位数com.google.common.hash.BloomFilter#optimalNumOfBits

/**
     * 根据给定的 bitSize 和 p 计算出最优的位数
     *
     * 这个公式是基于信息论中的熵(Entropy)概念来计算最优位数的。熵是衡量信息量的一种度量方式,可以理解为信息的平均编码长度。
     * 在这个公式中, bitSize 表示位大小, p 表示概率。根据信息论的原理,当概率分布越均匀时,信息熵越大,需要更多的位数来表示。相反,当概率分布越集中在某些特定值时,信息熵越小,需要较少的位数来表示。
     * 公式中的 Math.log(p) 表示以e为底的对数, Math.log(2) 表示以e为底的对数。通过将概率 p 的对数除以 Math.log(2) ,我们可以将其转换为以2为底的对数,这样可以得到以2为底的熵值。
     * 然后,通过将 bitSize 乘以概率的对数,再除以以2为底的熵值,我们可以得到最优位数。
     * 这个公式的依据是信息论的基本原理,它提供了一种根据概率分布来计算最优位数的方法。在某些情况下,这个公式可以帮助我们确定需要多少位来表示特定的信息,以便在存储或传输数据时进行优化。
     * @param bitSize
     * @param p
     * @return
     */
    private int optimalNumOfBits(long bitSize, double p){
        if(p == 0){
            p = Double.MIN_VALUE;
        }
        return (int) (-bitSize * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

创建BloomFilterFactory

murmurHashOffset方法用来获取key的hash的下标数组

public int[] murmurHashOffset(T value) {
        int[] offset = new int[numHashFunctions];

        long hash64 = Hashing.murmur3_32_fixed().hashObject(value, funnel).padToLong();
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= numHashFunctions; i++) {
            int nextHash = hash1 + i * hash2;
            if (nextHash < 0) {
                nextHash = ~nextHash;
            }
            offset[i - 1] = nextHash % bitSize;
        }

        return offset;
    }

完整代码

package com.example.spring.boot.test.bloomfilter;

import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/9/8
 */
public class BloomFilterFactory<T> {

    /**
     * hash函数的个数
     */
    private int numHashFunctions;

    /**
     * 最优数组大小
     */
    private int bitSize;

    private Funnel<T> funnel;

    public BloomFilterFactory(Funnel<T> funnel, int expectedInsertions, double fpp) {
        Preconditions.checkArgument(funnel != null, "funnel不能为空");
        this.funnel = funnel;
        bitSize = optimalNumOfBits(expectedInsertions, fpp);
        numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);
    }

    public int[] murmurHashOffset(T value) {
        int[] offset = new int[numHashFunctions];

        long hash64 = Hashing.murmur3_32_fixed().hashObject(value, funnel).padToLong();
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= numHashFunctions; i++) {
            int nextHash = hash1 + i * hash2;
            if (nextHash < 0) {
                nextHash = ~nextHash;
            }
            offset[i - 1] = nextHash % bitSize;
        }

        return offset;
    }



    /**
     * 根据给定的 bitSize 和 p 计算出最优的位数
     *
     * 这个公式是基于信息论中的熵(Entropy)概念来计算最优位数的。熵是衡量信息量的一种度量方式,可以理解为信息的平均编码长度。
     * 在这个公式中, bitSize 表示位大小, p 表示概率。根据信息论的原理,当概率分布越均匀时,信息熵越大,需要更多的位数来表示。相反,当概率分布越集中在某些特定值时,信息熵越小,需要较少的位数来表示。
     * 公式中的 Math.log(p) 表示以e为底的对数, Math.log(2) 表示以e为底的对数。通过将概率 p 的对数除以 Math.log(2) ,我们可以将其转换为以2为底的对数,这样可以得到以2为底的熵值。
     * 然后,通过将 bitSize 乘以概率的对数,再除以以2为底的熵值,我们可以得到最优位数。
     * 这个公式的依据是信息论的基本原理,它提供了一种根据概率分布来计算最优位数的方法。在某些情况下,这个公式可以帮助我们确定需要多少位来表示特定的信息,以便在存储或传输数据时进行优化。
     * @param bitSize
     * @param p
     * @return
     */
    private int optimalNumOfBits(long bitSize, double p){
        if(p == 0){
            p = Double.MIN_VALUE;
        }
        return (int) (-bitSize * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

    /**
     * 计算hash方法执行次数
     */
    private int optimalNumOfHashFunctions(long n, long m) {
        return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
    }
}

创建 BloomFilterService

package com.example.spring.boot.test.bloomfilter;

import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * @author wuhuaming
 */
@Slf4j
@Service
public class BloomFilterService {

    @Value("${bloomFilter.expiration:24}")
    private long expiration;

    @Autowired
    private RedisTemplate redisTemplate;
    /**
     * 根据给定的布隆过滤器添加值
     */
    public <T> void addByBloomFilter(BloomFilterFactory<T> bloomFilterHelper, String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            redisTemplate.opsForValue().setBit(key, i, true);
        }
        //由于setBit不能直接设置过期时间,因此另外再设置
        //返回值expire为-1时 此键值没有设置过期日期
        //返回值expire为-2时 不存在此键
        long expire = redisTemplate.opsForValue().getOperations()
                //此方法返回单位为秒过期时长
                .getExpire(key);
        if (expire == -1 ){
            redisTemplate.expire(key, expiration, TimeUnit.HOURS);
        }
    }

    /**
     * 根据给定的布隆过滤器判断值是否存在
     */
    public <T> boolean includeByBloomFilter(BloomFilterFactory<T> bloomFilterHelper, String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            if (!redisTemplate.opsForValue().getBit(key, i)) {
                return false;
            }
        }
        return true;
    }
}

测试

    @Test
    public void test7(){
        BloomFilterFactory<String> bloomFilterHelper = new BloomFilterFactory<>((Funnel<String>) (from, into) -> into.putString(from, Charsets.UTF_8), 10000, 0.00001);
        String key = "bloomFilterKey";
        for(int i = 0; i< 1000; i++){
            boolean flag = bloomfilterService.includeByBloomFilter(bloomFilterHelper, key, "bloomFilter" + i);
            if(!flag){
                bloomfilterService.addByBloomFilter(bloomFilterHelper, key, "bloomFilter" + i);
            }
        }
        System.out.println(bloomfilterService.includeByBloomFilter(bloomFilterHelper, key, "bloomFilter1"));
    }

查看redis:

测试结果

    @Test
    public void test7(){
        BloomFilterFactory<String> bloomFilterHelper = new BloomFilterFactory<>((Funnel<String>) (from, into) -> into.putString(from, Charsets.UTF_8), 10000, 0.00001);
        String key = "bloomFilterKey";

        System.out.println(bloomfilterService.includeByBloomFilter(bloomFilterHelper, key, "bloomFilter1"));
    }
true
0%