java消息队列和多线程(java嵌入式持久化消息队列SMQ,改造自FQueue)
一 、说明
之前项目中一直使用ConcurrentLinkedQueue做为缓冲队列(主要是单个项目内 ,单条改批量的场景 ,多个项目间使用的是rocketmq),虽然用着方便但是是纯内存的 ,
如果项目发生异常崩溃内存队列中的数据就会全部丢失(只能从日志中恢复) 。所以一直想找一个简单高效支持持久化的嵌入式消息队列 。中间用过activemq的嵌入模式 ,
虽然是支持持久化了 ,但是配置起来很繁琐 ,用起来也不简单 ,性能相比来说也不太行 。
后来偶然发现了FQueue ,项目地址:https://github.com/tietang/fqueue
看了看项目源码 ,纯java编写 ,总共没几个类 。完全可以改造成我想要的 简单高效支持持久化的嵌入式消息队列 。
二 、改造
1 、因为是要做成嵌入式的 ,所以memcached协议相关的代码都删除了 。
2 、预创建文件删除了,还有一些零零碎碎的改动 。(好几年了 ,记不清了) 。
3 、相较于原代码 ,改动最大的就是锁的部分,FQueue 读和写使用的是同一把锁 ,
我改成了读和写使用不同的锁 ,只在文件切换的时候使用同一把锁 。性能大概提示了百分之20左右(本来就很快,锦上添花)。
4 、添加了内存队列 ,这个主要解决同一个机器创建了大量队列(上千)时 ,队列消息消费较快 ,因为使用了内存映射磁盘(每隔10ms就会调用force()同步磁盘) ,
频繁操作磁盘导致磁盘io过高的问题 。默认情况下队列大小超过50时才会写入持久化队列 。可以在项目启动时调用SMQ.setting(String dbPath, int logSize, int memoryQueueSize)
进行设置。
三 、使用
1 、说明
目前是集成在我个人的工具类项目中的 ,已发布到中央仓库 。项目地址:https://github.com/shenbururen/sun-utils
该项目强依赖hutool ,算是个人对hutool的个性化的扩展 。如果不想依赖该项目 ,只想单纯的使用SMQ ,可以将源码中 cn.sanenen.queue包复制出来 ,单独使用。
2 、maven引用
3、调用
SMQ使用时只有三个方法,向队列放入数据 、从队列取出数据 、获取队列大小(一般只在监控队列是否积压时使用 ,判断队列是否有数据 ,使用获取队列数据是否为null进行判断) 。
我一般是写一个单独的类,通过静态方法调用 。
四、注意事项
1 、默认会在项目目录下生成一个smq的文件夹用来存放队列数据 。同一个smq的文件夹同时只可被一个项目使用 。
2 、SMQ.setting(String dbPath, int logSize, int memoryQueueSize)
dbPath文件存储目录,默认是smq ,会在项目目录下创建一个smq的目录 。(还没测试过绝对路径) 。
logSize属性只可以在项目最开始时设置 ,之后不可以再设置不同的值 。(也可以将生成的smq文件夹删除后重新启动进行设置)。
memoryQueueSize是内存队列大小 ,默认是50 ,队列数据积压超过memoryQueueSize后才会写入持久化队列 。(目前memoryQueueSize为0时还是会创建内存队列 ,这里之后会优化 ,不影响使用 。)
项目使用了addShutdownHook ,会在项目关闭时将内存队列消息写入持久化队列。结束项目时使用kill -15 不要用 -9 。否则可能造成消息丢失 。
建议都使用默认的 ,也就是不要调用这个方法。避免调用出现问题 。
3、最好使用在不是要求百分百消息不丢失的场景 。(在项目异常停止 、服务器停电关机时 ,有概率丢失消息 。)
4 、目前已经使用两年多 。
创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!