首页IT科技java消息队列和多线程(java嵌入式持久化消息队列SMQ,改造自FQueue)

java消息队列和多线程(java嵌入式持久化消息队列SMQ,改造自FQueue)

时间2025-06-20 09:51:15分类IT科技浏览7070
导读:一、说明 之前项目中一直使用ConcurrentLinkedQueue做为缓冲队列(主要是单个项目内,单条改批量的场景,多个项目间使用的是rocketmq),虽然用着方便但是是纯内存的,...

一              、说明

  之前项目中一直使用ConcurrentLinkedQueue做为缓冲队列(主要是单个项目内              ,单条改批量的场景                     ,多个项目间使用的是rocketmq)       ,虽然用着方便但是是纯内存的              ,

如果项目发生异常崩溃内存队列中的数据就会全部丢失(只能从日志中恢复)              。所以一直想找一个简单高效支持持久化的嵌入式消息队列                     。中间用过activemq的嵌入模式                     ,

虽然是支持持久化了       ,但是配置起来很繁琐       ,用起来也不简单                     ,性能相比来说也不太行       。

  后来偶然发现了FQueue              ,项目地址:https://github.com/tietang/fqueue

看了看项目源码       ,纯java编写                     ,总共没几个类       。完全可以改造成我想要的 简单高效支持持久化的嵌入式消息队列                     。

二                     、改造

  1       、因为是要做成嵌入式的              ,所以memcached协议相关的代码都删除了              。

  2       、预创建文件删除了,还有一些零零碎碎的改动       。(好几年了                     ,记不清了)                     。

  3                     、相较于原代码                     ,改动最大的就是锁的部分,FQueue 读和写使用的是同一把锁              ,

我改成了读和写使用不同的锁                     ,只在文件切换的时候使用同一把锁              。性能大概提示了百分之20左右(本来就很快       ,锦上添花)。

  4              、添加了内存队列              ,这个主要解决同一个机器创建了大量队列(上千)时                     ,队列消息消费较快       ,因为使用了内存映射磁盘(每隔10ms就会调用force()同步磁盘)       ,

频繁操作磁盘导致磁盘io过高的问题                     。默认情况下队列大小超过50时才会写入持久化队列                     。可以在项目启动时调用SMQ.setting(String dbPath, int logSize, int memoryQueueSize)

进行设置。

三       、使用

1                     、说明

  目前是集成在我个人的工具类项目中的                     ,已发布到中央仓库              。项目地址:https://github.com/shenbururen/sun-utils

该项目强依赖hutool              ,算是个人对hutool的个性化的扩展                     。如果不想依赖该项目       ,只想单纯的使用SMQ                     ,可以将源码中 cn.sanenen.queue包复制出来              ,单独使用       。

2              、maven引用

3、调用

  SMQ使用时只有三个方法,向队列放入数据                     、从队列取出数据                     、获取队列大小(一般只在监控队列是否积压时使用                     ,判断队列是否有数据                     ,使用获取队列数据是否为null进行判断)              。

  我一般是写一个单独的类,通过静态方法调用                     。

/** * 本地持久化内存队列 */ public class MsgQueue { private static final String testDataTopic = "testData"; /** * 向队列放入数据              ,支持多线程       。 */ public static void putTestData(TestData msg) { SMQ.push(testDataTopic, JSON.toJSONString(msg)); } /** * 从队列取出数据                     ,支持多线程       。 */ public static TestData getTestData() { String poll = SMQ.pop(testDataTopic); if (StrUtil.isNotBlank(poll)) { return JSON.parseObject(poll, TestData.class); } return null; } /** * 获取队列大小 */ public static long getTestDataSize(){ return SMQ.size(testDataTopic); } }

四、注意事项

  1              、默认会在项目目录下生成一个smq的文件夹用来存放队列数据                     。同一个smq的文件夹同时只可被一个项目使用              。

  2                     、SMQ.setting(String dbPath, int logSize, int memoryQueueSize)

    dbPath文件存储目录       ,默认是smq              ,会在项目目录下创建一个smq的目录       。(还没测试过绝对路径)                     。

    logSize属性只可以在项目最开始时设置                     ,之后不可以再设置不同的值              。(也可以将生成的smq文件夹删除后重新启动进行设置)。

    memoryQueueSize是内存队列大小       ,默认是50       ,队列数据积压超过memoryQueueSize后才会写入持久化队列                     。(目前memoryQueueSize为0时还是会创建内存队列                     ,这里之后会优化              ,不影响使用                     。)

项目使用了addShutdownHook       ,会在项目关闭时将内存队列消息写入持久化队列。结束项目时使用kill -15 不要用 -9              。否则可能造成消息丢失                     。

    建议都使用默认的                     ,也就是不要调用这个方法       。避免调用出现问题              。

  3       、最好使用在不是要求百分百消息不丢失的场景                     。(在项目异常停止              、服务器停电关机时              ,有概率丢失消息       。)

  4                     、目前已经使用两年多       。

声明:本站所有文章,如无特殊说明或标注                     ,均为本站原创发布                     。任何个人或组织                     ,在未征得本站同意时,禁止复制       、盗用       、采集                     、发布本站内容到任何网站              、书籍等各类媒体平台              。如若本站内容侵犯了原著者的合法权益              ,可联系我们进行处理       。

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
怎样做网站的优化、排名(怎么做网站优化排名) ai-write智能写作工具(拥抱未来,AI智能写作平台助您轻松撰写软文)