0%

redis学习笔记之-(3)-HyperLogLogs(HLL)的使用

最坏12k:

该算法的神奇之处:不需用与计数项成正比的内存,而是用恒定数量的内存!在最坏的情况下为  12k 字节

3.1 HLL简介:Counting unique things

bitmap可以统计活跃用户数 , 甚至可以遍历出是那些用户id; 而 HyperLogLogs(简称HLL)是一个概率数据结构,用于估计集合的基数(只求unique的总数). 本质上, HLL并非数据结构而是一种基数算法,通过HLL可用极小内存完成海量元素集合基数总量的统计;

基数: 不重复元素, 比如{1, 2, 3, 5, 5, 5, 6}, 基数集为{1, 2, 3, 5, 6}, 基数为5;

  • HLL是一种算法; 只计算基数, 不会存储基数集元素本身; 您并未真正将项目添加到HLL中;

  • **HyperLogLogs 提供了3个命令:pfadd pfcount pfmerge **

  • 类似于set集合的scard set

  • 最终得到带有标准误差的估计量,在Redis实现的情况下,该误差小于1%

    近似为0.81%的标准误差

  • 该算法的神奇之处在于,不需用与所计数项目数成正比的内存量,而是使用恒定数量的内存!

    在最坏的情况下为12k字节

  • 该误差小于1%

3.2 关于pfadd命令的前缀pf的来源

redis之父antirez, 在他的博客中有提及: –>看来是为了致敬Philippe Flajolet

Redis new data structure: the HyperLogLog

There is a class of algorithms that use randomization in order to provide an approximation of the number of unique elements in a set using just a constant, and small, amount of memory. The best of such algorithms currently known is called HyperLogLog, and is due to Philippe Flajolet.

3.3 命令

1. 加入一个元素: pfadd key element [element...]

2. 计算已加入的元素的unique数: pfcount key [key...]

3. 合并多个统计的hyperloglog为一个: pfmerge destkey sourcekey [sourcekey...]

3.4 案例使用

  • 每有一个新元素, 使用PFADD加入计数;
  • 想要查看当前加入元素的unique量的近似值, 使用PFCOUNT;
  • 也可以合并多个HyperLogLog的基数到一个中, 使用PFMERGE;

(1). 简单案例: 两个hyperloglog, 一个hll1, 一个hll2, 最后合并两个到 hllres中:

127.0.0.1:6379> pfadd hll1 a b c d e
(integer) 1
127.0.0.1:6379> pfadd hll1 a b c e f
(integer) 1
127.0.0.1:6379> pfadd hll1 h
(integer) 1
127.0.0.1:6379> pfcount hll1
(integer) 7
127.0.0.1:6379> pfadd hll2 h i j k l m
(integer) 1
127.0.0.1:6379> pfadd hll2 k l m n o o p
(integer) 1
127.0.0.1:6379> pfcount hll2
(integer) 9
127.0.0.1:6379> pfmerge hllres hll1 hll2
OK
127.0.0.1:6379> pfcount hllres
(integer) 15

(2). 实际生产中我们可以操作的数据集可能有: 可以是IP、Email、ID等;

IP : 统计网站的日UV;

email: 统计所有收到邮件的总数;

ID: 统计海量日志中记录的访问ID的不重复值的总数;

(3). 实用: 假如有一个需求是:

通过摄像头拍摄记录经海路地铁站的十字路口的交通状况, 计算一天经过的车辆总集合的基数, 用于估算附近车辆总数. ==> 目标很简单, 就是记录经过红绿灯的车牌号, 一整天的unique总量;

来统计通过路口的汽车的总量: 假设我们上午统计一次(hll_traff_am), 下午统计一次(hll_traff_pm), 最后汇总(hll_traff_allday)

#1. 上午统计入 hyperloglog hll_traff_am
127.0.0.1:6379> pfadd hll_traff_am 京AF1234 京Q12345 陕A7804s 京AS2235 京P82333 陕A7804s 京AS2235 陕A7804s
(integer) 1
#2. 下午统计入 hyperloglog hll_traff_pm
127.0.0.1:6379> pfadd hll_traff_pm 京A00001 京B00001 陕A00001 京AS2235 京P00001 陕A7804s 陕A7804s
(integer) 1
127.0.0.1:6379> pfcount hll_traff_am
(integer) 5
127.0.0.1:6379> pfcount hll_traff_pm
(integer) 6
#3. 合并到整体的统计: hll_traff_allday
127.0.0.1:6379> pfmerge hll_traff_allday hll_traff_am hll_traff_pm
OK
127.0.0.1:6379> pfcount hll_traff_allday
(integer) 9
127.0.0.1:6379>

当车流量更大, 时间更多, 比如一整年, 这个统计更高效;

注意: 这个得是在能容忍有<1%的误差率的前提下才可用; 比如, 我算DAU, 网站的日访问量, 我能容忍实际的101w次, 实际只得到100w;

redis学习笔记之-(2)-bitmap用法之2-上亿用户1周连续活跃用户数统计

2. bitmap使用2:上亿个用户的1周内连续活跃用户数

2.1 思路

需求2: 上亿个用户,统计一周内连续活跃用户

100000000/8/1024/1024 = 11.9M 一个bitmap占用 不到12M

好在一周只有7天, 我们用7个key的bitmap来存储状态即可,

加上最后的一个结果res的bitmap: 12*8=100M 内存即可!

遵循下面步骤即可:

  1. 用户编号是前提, 每个用户的编号从1到n(n=就是说的那个上亿的最大值);
  2. 声明7个bitmap, 从周一到周日: mon tue wed thur fri sat sun;
  3. 每个用户编号所在的offset在周1如果登录了, 就是1, 没登录就是0;
  4. 7个bitmap都设值记录;
  5. bitop 对7个bitmap进行位AND操作, 7天都登录的当天的位上才是1
  6. 最后的结果进行bitcount操作, 就是上亿用户一周内连续活动的人数!
  7. 如果想知道连续活跃的用户都有哪些人, 遍历 getbit 每一天的key(mon/tue/...) id即可!

示例: 我们模拟5个用户吧:

用户ID mon tue wed thur fri sat sun
001 1 1 1 1 1 1 1
002 0 0 0 0 0 1 1
003 1 1 1 1 1 1 1
004 1 1 1 1 1 1 1
005 0 1 0 1 0 1 1

2.2 前4步: 初始化7个bitmap

周一的key: mon, 周一所有用户的登录状态记录:

127.0.0.1:6379> setbit mon 1 1
(integer) 0
127.0.0.1:6379> setbit mon 2 0
(integer) 0
127.0.0.1:6379> setbit mon 3 1
(integer) 0
127.0.0.1:6379> setbit mon 4 1
(integer) 0
127.0.0.1:6379> setbit mon 5 0
(integer) 0

周二的key: tue, 所有用户的登录状态记录:

127.0.0.1:6379> setbit tue 1 1
(integer) 0
127.0.0.1:6379> setbit tue 2 0
(integer) 0
127.0.0.1:6379> setbit tue 3 1
(integer) 0
127.0.0.1:6379> setbit tue 4 1
(integer) 0
127.0.0.1:6379> setbit tue 5 1
(integer) 0

周三的key: wed, 所有用户的登录状态记录:

127.0.0.1:6379> setbit wed 1 1
(integer) 0
127.0.0.1:6379> setbit wed 2 0
(integer) 0
127.0.0.1:6379> setbit wed 3 1
(integer) 0
127.0.0.1:6379> setbit wed 4 1
(integer) 0
127.0.0.1:6379> setbit wed 5 0
(integer) 0

周四的key: thur, 所有用户的登录状态记录:

127.0.0.1:6379> setbit thur 1 1
(integer) 0
127.0.0.1:6379> setbit thur 2 0
(integer) 0
127.0.0.1:6379> setbit thur 3 1
(integer) 0
127.0.0.1:6379> setbit thur 4 1
(integer) 0
127.0.0.1:6379> setbit thur 5 1
(integer) 0

周五的key: fri, 所有用户的登录状态记录:

127.0.0.1:6379> setbit fri 1 1
(integer) 0
127.0.0.1:6379> setbit fri 2 0
(integer) 0
127.0.0.1:6379> setbit fri 3 1
(integer) 0
127.0.0.1:6379> setbit fri 4 1
(integer) 0
127.0.0.1:6379> setbit fri 5 0
(integer) 0

周六的key: sat, 所有用户的登录状态记录:

127.0.0.1:6379> setbit sat 1 1
(integer) 0
127.0.0.1:6379> setbit sat 2 1
(integer) 0
127.0.0.1:6379> setbit sat 3 1
(integer) 0
127.0.0.1:6379> setbit sat 4 1
(integer) 0
127.0.0.1:6379> setbit sat 5 1
(integer) 0

周日的key: sun, 所有用户的登录状态记录:

127.0.0.1:6379> setbit sun 1 1
(integer) 0
127.0.0.1:6379> setbit sun 2 1
(integer) 0
127.0.0.1:6379> setbit sun 3 1
(integer) 0
127.0.0.1:6379> setbit sun 4 1
(integer) 0
127.0.0.1:6379> setbit sun 5 1
(integer) 0

我去, 数据终于录入完了, 费劲那个~~

2.3 用bitop对7个bitmap进行位AND操作

127.0.0.1:6379> bitop and res mon tue wed thur fri sat sun
(integer) 1

2.4 bitcount对结果bitmap, 见证奇迹的时刻到了

127.0.0.1:6379> bitcount res
(integer) 3

2.5 想知道连续活跃的用户都有哪些人

getbit res 1 表示 用户编号为1的结果: 1说明用户ID=1的, 7天都活跃!

5个用户结果如下:

127.0.0.1:6379> getbit res 1
(integer) 1
127.0.0.1:6379> getbit res 2
(integer) 0
127.0.0.1:6379> getbit res 3
(integer) 1
127.0.0.1:6379> getbit res 4
(integer) 1
127.0.0.1:6379> getbit res 5
(integer) 0

2.6 扩展:

如果需求是: 求一周内活跃过的用户数: 只要将 上面命令中bitop and 改为: bitop or 即可~

连续 的话要求是AND, 活跃过 的话, 只要有一天就可以

over!

2.7 小结

bitmap的操作命令:

  • setbit key offset value

  • getbit key offset

  • bitcount key [start end]

  • bitop operation destkey [key …]

    operation: 支持 AND/OR/NOT/XOR四种操作,除NOT 外,其他操作都可接受一个或多个 key 作输入

    destkey: 后面所有 key([key…])的与或非异或操作的结果存入一个目标key, 取个名字!

    key… 可以n个key做 与或非异或操作, 传入这些bitmap的key 列表

1. bitmap使用1:统计所有用户1年的登录天数

命令一览: setbit getbit bitcount

需求1: 电商网站统计所有用户一年的登录天数, 比如用户id为, 我们想要统计用户每年的登录天数, 比如如下

用户名 用户id 本年登录天数
张三 001 100
李四 002 200
王五 003 365

使用redis的bitmap来实现的话, 可以这么弄:

setbit key offset value

setbit ulogin:001 20200101 1

  • ulogin:001是bitmap的key
  • 20200101是offset, 记录某一天的登录值的key标识
  • 当天登录了记为1, 没有记为0;

查询某天:
getbit key offset

统计所有该key的1的值的个数(也就是登录天数)
bitcount key

1.1 记录某天登录操作-setbit

127.0.0.1:6379> setbit ulogin:001 20200101 1
(integer) 0
127.0.0.1:6379> setbit ulogin:001 20200102 1
(integer) 0
127.0.0.1:6379> setbit ulogin:001 20200103 1
(integer) 0
127.0.0.1:6379> setbit ulogin:001 20200104 0
(integer) 0

1.2 查询某天登录了没有-getbit

127.0.0.1:6379> getbit ulogin:001 20200102
(integer) 1

1.3 查询用户总共登录了几天-bitcount

127.0.0.1:6379> bitcount ulogin:001
(integer) 3

如果要统计所有的用户的登录天数, 我们平时每天记录:

127.0.0.1:6379> setbit ulogin:002 20200101 1
(integer) 0
127.0.0.1:6379> setbit ulogin:003 20200102 1
(integer) 0

1.4 统计所有用户登录天数

如果要统计所有用户, 在java中利用前缀的命名规则ulogin: 在一个for循环中遍历所有的id列表即可, 把调用redis的结果放入一个集合中即可!!


1. B站视频默认最高2倍速可选, 这里可以超:

  1. 打开视频当前页, 右击”检查”进入DevTools页
  2. 执行 var vCtrl = document.querySelector('video')
  3. 执行 vCtrl.playbackRate = 3
    就可以3倍速播放: OK了

2. 同样可以暂停/播放

vCtrl.pause() 可以暂停视频
vCtrl.play()可以播放;

/**
 *	OncePerRequestFilter保证在任何Servlet容器中都是一个请求只执行一次的过滤器。
*/
public class CorsFilter extends OncePerRequestFilter {

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {

        Properties props = PropertiesLoaderUtils.loadAllProperties("cors.properties");
        //允许的 客户端域名
        response.addHeader("Access-Control-Allow-Origin", props.getProperty("cors.allowed-origins"));
        //允许的 方法名
        response.addHeader("Access-Control-Allow-Methods", props.getProperty("cors.allowed-methods"));
        //允许服务端访问的客户端请求头,多个请求头用逗号分割,例如:Content-Type
        response.addHeader("Access-Control-Allow-Headers", "Content-Type,X-Requested-With,token");
        //预检验请求时间
        response.addHeader("Access-Control-Max-Age", props.getProperty("cors.max-age"));//30 min
        response.addHeader("Access-Control-Allow-Credentials", "true");

        filterChain.doFilter(request, response);
    }

}

2. web.xml中配置跨域过滤器:


<!--配置跨域请求的过滤器-->
	<filter>
		<filter-name>cors</filter-name>
		<filter-class>com.jd.dashboard.cors.CrossFilter</filter-class>
	</filter>
	<filter-mapping>
		<filter-name>cors</filter-name>
		<url-pattern>/*</url-pattern>
	</filter-mapping>

3. 过滤器中的属性配置如下:

属性配置文件如下:cors.properties

#跨域请求CORS全局配置属性值

#允许访问的客户端域名,例如:http://web.xxx.com
cors.allowed-origins=http://front.xx.com

#允许访问的方法名
cors.allowed-methods=POST, GET, OPTIONS, DELETE

#允许服务端访问的客户端请求头,多个请求头用逗号分割,例如:Content-Type
cors.allowed-headers=Content-Type

#允许客户端访问的服务端响应头
cors.exposed-headers=

#是否允许请求带有验证信息,若要获取客户端域下的cookie时,需要将其设置为true
cors.allow-credentials=true

cors.max-age=1800

由于jsonp只支持GET方式的请求,所以这种方式比较推荐。


[2016-12-22]

 ClassNotFoundException: java.util.ArrayList$SubList
java.lang.RuntimeException: java.lang.ClassNotFoundException: java.util.ArrayList$SubList
    com.jd.jsf.gd.util.ClassTypeUtils.getClass(ClassTypeUtils.java:85)
    com.jd.jsf.gd.codec.msgpack.InvocationTemplate.read(InvocationTemplate.java:136)
    com.jd.jsf.gd.codec.msgpack.InvocationTemplate.read(InvocationTemplate.java:25)
    com.jd.org.msgpack.template.AbstractTemplate.read(AbstractTemplate.java:32)
    com.jd.org.msgpack.MessagePack.read(MessagePack.java:445)
    com.jd.org.msgpack.MessagePack.read(MessagePack.java:410)
    com.jd.jsf.gd.codec.msgpack.MsgpackUtil.read(MsgpackUtil.java:130)
    com.jd.jsf.gd.codec.msgpack.MsgpackDecoder.decode(MsgpackDecoder.java:26)
    com.jd.jsf.gd.codec.msgpack.MsgpackCodec.decode(MsgpackCodec.java:52)
    com.jd.jsf.gd.protocol.JSFProtocol.decode(JSFProtocol.java:83)
    com.jd.jsf.gd.server.JSFTask.doRun(JSFTask.java:66)
    com.jd.jsf.gd.server.BaseTask.run(BaseTask.java:27)
    java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    java.util.concurrent.FutureTask.run(FutureTask.java:138)
    java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    java.lang.Thread.run(Thread.java:662) 

==================

有一台节点客户就是访问不了, 最后杀掉这个节点后发现好了,就是只要不打到这个节点,就是正常的,检查了下,发现JDK版本不一致:

java -version
java version “1.6.0_25”

export PATH=/export/servers/jdk1.6.0_25/bin/:$PATH
之后重启tomcat就没问题了.

至于为什么1.7会报告这个, 查查看.


[2016-09-18]

package com.niewj.thread;  
  
import java.util.HashSet;  
import java.util.List;  
import java.util.Set;  
import java.util.concurrent.*;  
  
public class ThreadTest {  
  
    /** 
     * 同一个map里面存储了各个水果的数量, 
     * 
     * @param args 
     */  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
  
        /** 
         * invokeAny 3个线程中最快的那个会返回,但是其他的也可能都执行完/也可能不执行; 
         */  
        testInvokeAny();   
    }  
  
    private static void testInvokeAny() throws ExecutionException, InterruptedException {  
        ExecutorService executorService = Executors.newSingleThreadExecutor();  
  
        Set<Callable<String>> tskSet = new HashSet<>();  
  
        tskSet.add(new Callable<String>() {  
            public String call() throws Exception {  
                System.out.println("Task 1 ..........running.......");  
                return "Task 1";  
            }  
        });  
        tskSet.add(new Callable<String>() {  
            public String call() throws Exception {  
                System.out.println("Task 2 ..........running.......");  
                return "Task 2";  
            }  
        });  
        tskSet.add(new Callable<String>() {  
            public String call() throws Exception {  
                System.out.println("Task 3 ..........running.......");  
                return "Task 3";  
            }  
        });  
  
        String result = executorService.invokeAny(tskSet);  
  
        System.out.println("result = " + result);  
  
        executorService.shutdown();  
    }  
  
}  

** invokeAny 3个线程中最快的那个会返回,**

** 但: 其他的2个也可能都执行完/也可能不执行;**

—[2018-08-23]—

package com.niewj.concurrent;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.*;

/**
 * Future和Callable例1
 *
 * @author niewj
 */
@Slf4j
public class ES_TPE_Test {

    /**
     * 主线程要做两件事:
     * 1.买书;
     * 2.报名考试
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        long start = System.currentTimeMillis();

        // 1. 第一件事:买书 [需要一定的时间]
        Future<Book> future = exec.submit(new BuyBookTask());
        exec.shutdown();

        // 2. 第一件事:报名考试 报名考试花费10秒
        int timeReg = new Random().nextInt(10); // 等进货时间
        log.info("====> 报名考试排队ing.... ");
        TimeUnit.SECONDS.sleep(timeReg);
        log.info("====> 报名考试排队{}秒 <==== ", timeReg);

        Book book = future.get();
        log.info("考试报上名了; 书也买到了: " + book);
        log.info("总耗时 {} 秒!", (System.currentTimeMillis() - start) / 1000);
    }

    /**
     * 购买书籍任务-需要耗费一定时间:假定买书需要等7天(一秒模拟一天)
     *
     * @author niewj
     */
    static class BuyBookTask implements Callable<Book> {
        @Override
        public Book call() throws Exception {
            int time = new Random().nextInt(10); // 等进货时间
            log.info("===> 买书ing........");
            TimeUnit.SECONDS.sleep(time);
            log.info("===> 买书耗时{}秒 <====", time);

            return new Book("<Java核心技术第七版卷一>", 120.5);
        }
    }

    @Data
    @AllArgsConstructor
    static class Book {
        private String name;
        private Double price;

        @Override
        public String toString() {
            return "书籍 <" + name + "> 价钱是:" + price;
        }
    }
}

概述:

main方法:
主线程想做两件事=两个任务:报名考试和买书
用一个Callable任务发起一个线程来做买书的任务-耗时 0-10 秒;
主线程自己完成报名考试的任务-耗时 0-10 秒;

—[2017-05-05]—

1. length属性可写

a=[2,4,5,6,7,90];  
//a.length->6  
a.length=8;  
//a=[2, 4, 5, 6, 7, 90, undefined , undefined] 

2. length末尾自增

a[a.length]=108;  
a[a.length]=109;  
//a=[2, 4, 5, 6, 7, 90, undefined ,undefined , 108, 109]  

—[2014-08-05]—

a=[2,4,5,6,7,90];

【1】. a.toString(); // 返回字符串表示的数组,逗号分隔

“2,4,5,6,7,90”

【2】. a.join('||'); // 返回一个字符串,用指定的字符分割

“2||4||5||6||7||90”

【3】.a.push(33); // 栈尾压入一个元素

[2,4,5,6,7,90,33]

【4】. a.pop(); // ,33栈尾移出一个元素

[2,4,5,6,7,90]

【5】. a.shift(); // 2,从队列首部移出一个元素

[4,5,6,7,90]

【6】. a.unshift(28); // 从队首插入一个元素,后面的顺移

[28,4,5,6,7,90]
a.unshift(36);
[28,4,5,6,7,90,36]

【7】. a.reverse(); // 数组冲排序,反转顺序

[36,90,7,6,5,4,28]

【8】. a.sort(); // 数组自然排序

[28,36,4,5,6,7,90]

【9】. 自定义排序方法,传入sort参数:

function compare(v1,v2){if(v1>v2){return 1}else if(v1<v2){return -1}else{return 0}}  
//a=[4,5,6,7,28,36,90]