<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    .Net之延遲隊列

    一顆小胡椒2022-07-27 17:54:49

    介紹

    具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。

    使用場景

    延遲隊列在項目中的應用還是比較多的,尤其像電商類平臺:

    1. 訂單成功后,在30分鐘內沒有支付,自動取消訂單
    2. 外賣平臺發送訂餐通知,下單成功后60s給用戶推送短信。
    3. 如果訂單一直處于某一個未完結狀態時,及時處理關單,并退還庫存
    4. 淘寶新建商戶一個月內還沒上傳商品信息,將凍結商鋪等
    該介紹來自其他文章

    方案

    下面的例子沒有進行封裝,所以代碼僅供參考

    Redis過期事件

    注意:
    不保證在設定的過期時間立即刪除并發送通知,數據量大的時候會延遲比較大
    不保證一定送達
    發送即忘策略,不包含持久化
    但是比如有些場景,對這個時間不是那么看重,并且有其他措施雙層保障,該實現方案是比較簡單。

    redis自2.8.0之后版本提供Keyspace Notifications功能,允許客戶訂閱Pub / Sub頻道,以便以某種方式接收影響Redis數據集事件。

    配置

    需要修改配置啟用過期事件,比如在windows客戶端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改內容是:

    -- 取消注釋
    notify-keyspace-events Ex
    -- 注釋
    #notify-keyspace-events ""
    

    然后重新啟動服務器,比如windows

     .\redis-server.exe  .\redis.windows.conf
    

    或者linux中使用docker-compose重新部署redis

      redis:
        container_name: redis
        image: redis
        hostname: redis
        restart: always
        ports: 
          - "6379:6379"
        volumes: 
          - $PWD/redis/redis.conf:/etc/redis.conf
          - /root/common-docker-compose/redis/data:/data
        command: 
          /bin/bash -c "redis-server /etc/redis.conf" #啟動執行指定的redis.conf文件
    

    然后使用客戶端訂閱事件

    -- windows
    .\redis-cli
     
    -- linux
    docker exec -it 容器標識 redis-cli
     
    psubscribe __keyevent@0__:expired
    

    控制臺訂閱

    使用StackExchange.Redis組件訂閱過期事件

    var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
    var db = connectionMultiplexer.GetDatabase(0);
    db.StringSet("orderno:123456", "訂單創建", TimeSpan.FromSeconds(10));
    Console.WriteLine("開始訂閱");
    var subscriber = connectionMultiplexer.GetSubscriber();
    //訂閱庫0的過期通知事件
    subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
    {
        Console.WriteLine($"key過期 channel:{channel} key:{key}");
    });
    Console.ReadLine();
    

    輸出結果:

    key過期 channel:keyevent@0:expired key:orderno:123456

    如果啟動多個客戶端監聽,那么多個客戶端都可以收到過期事件。

    WebApi中訂閱

    創建RedisListenService繼承自:BackgroundService

    public class RedisListenService : BackgroundService
    {
        private readonly ISubscriber _subscriber;
        public RedisListenService(IServiceScopeFactory serviceScopeFactory)
        {
            using var scope = serviceScopeFactory.CreateScope();
            var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
            var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
            var db = connectionMultiplexer.GetDatabase(0);
            _subscriber = connectionMultiplexer.GetSubscriber();
        }
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //訂閱庫0的過期通知事件
            _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
            {
                Console.WriteLine($"key過期 channel:{channel} key:{key}");
            });
            return Task.CompletedTask;
        }
    }
    

    注冊該后臺服務

    services.AddHostedService<RedisListenService>();
    

    啟用項目,給redis指定庫設置值,等過期后會接收到過期通知事件。

    RabbitMq延遲隊列

    版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

    要使用rabbitmq做延遲是需要安裝插件(rabbitmq_delayed_message_exchange)的

    插件介紹:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

    下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    將下載好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目錄下:

    docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine
    

    進入容器

    docker exec -it 容器名稱/標識 bash
    

    啟用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    查看是否啟用

    rabbitmq-plugins list
    

    [E*]和[e*]表示啟用,然后重啟服務

    rabbitmq-server restart
    

    然后在管理界面添加交換機都可以看到

    生產消息

    發送的消息類型是:x-delayed-message

    [HttpGet("send/delay")]
    public string SendDelayedMessage()
    {
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",//IP地址
            Port = 5672,//端口號
            UserName = "admin",//用戶賬號
            Password = "123456",//用戶密碼
            VirtualHost = "customer"
        };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        var exchangeName = "delay-exchange";
        var routingkey = "delay.delay";
        var queueName = "delay_queueName";
        //設置Exchange隊列類型
        var argMaps = new Dictionary<string, object>()
        {
            {"x-delayed-type", "topic"}
        };
        //設置當前消息為延時隊列
        channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
        channel.QueueDeclare(queueName, true, false, false, argMaps);
        channel.QueueBind(queueName, exchangeName, routingkey);
        var time = 1000 * 5;
        var message = $"發送時間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延時時間為:{time}";
        var body = Encoding.UTF8.GetBytes(message);
        var props = channel.CreateBasicProperties();
        //設置消息的過期時間
        props.Headers = new Dictionary<string, object>()
                {
                    {  "x-delay", time }
                };
        channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
        Console.WriteLine("成功發送消息:" + message);
        return "success";
    }
    

    消費消息

    消費消息我是弄了一個后臺任務(RabbitmqDelayedHostService)在處理

    public class RabbitmqDelayedHostService : BackgroundService
    {
        private readonly IModel _channel;
        private readonly IConnection _connection;
        public RabbitmqDelayedHostService()
        {
            var connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "localhost",//IP地址
                Port = 5672,//端口號
                UserName = "admin",//用戶賬號
                Password = "123456",//用戶密碼
                VirtualHost = "customer"
            };
            _connection = connFactory.CreateConnection();
            _channel = _connection.CreateModel();
            //交換機名稱
            var exchangeName = "exchangeDelayed";
            var queueName = "delay_queueName";
            var routingkey = "delay.delay";
            var argMaps = new Dictionary<string, object>()
            {
                {"x-delayed-type", "topic"}
            };
            _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
            _channel.QueueDeclare(queueName, true, false, false, argMaps);
            _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
            //聲明為手動確認
            _channel.BasicQos(0, 1, false);
        }
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var queueName = "delay_queueName";
            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine($"接受到消息的時間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
                //手動確認
                _channel.BasicAck(ea.DeliveryTag, true);
            };
            _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
            return Task.CompletedTask;
        }
        public override void Dispose()
        {
            _connection.Dispose();
            _channel.Dispose();
            base.Dispose();
        }
    }
    

    注冊該后臺任務

    services.AddHostedService<RabbitmqDelayedHostService>();
    

    輸出結果

    成功發送消息:發送時間為 2022-07-02 18:54:22 延時時間為:5000

    成功發送消息:發送時間為 2022-07-02 18:54:22 延時時間為:5000

    成功發送消息:發送時間為 2022-07-02 18:54:22 延時時間為:5000

    成功發送消息:發送時間為 2022-07-02 18:54:23 延時時間為:5000

    成功發送消息:發送時間為 2022-07-02 18:54:23 延時時間為:5000

    成功發送消息:發送時間為 2022-07-02 18:54:23 延時時間為:5000

    接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:22 延時時間為:5000

    接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:22 延時時間為:5000

    接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:22 延時時間為:5000

    接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:23 延時時間為:5000

    接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:23 延時時間為:5000

    接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:23 延時時間為:5000

    其他方案

    • Hangfire延遲隊列
    BackgroundJob.Schedule(
      () => Console.WriteLine("Delayed!"),
       TimeSpan.FromDays(7));
    
    • 時間輪
    • Redisson DelayQueue
    • 計時管理器


    redis消息隊列
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    添加消息的任務我們稱為producer,而取出并使用消息的任務,我們稱之為consumer。kafka應運而生,它是專門設計用來做消息中間件的系統。這兩點也是kafka要解決的核心問題。為此,kafka提出了partition的概念。由于消息不會被刪除,因此可以等消費者明確告知kafka這條消息消費成功以后,再去更新游標。對于同一個topic,不同的消費組有各自的游標。
    之前,針對以下我們調研目前的開源隊列方案:beanstalkdbeanstalkd?消費者,通過 reserve/release/bury/delete 來獲取 job 或改變 job 的狀態;很幸運的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。但是這對不熟悉 beanstalkd 操作的 go 開發者而言,需要學習成本
    代表的a的二進制位的修改。對應的ASCII碼是97,轉換為二進制數據是01100001. 因為bit非常節省空間,可以用來做大數據量的統計。BITOPNOTdestkeykey ,對給定 key 求邏輯非,并將結果保存到 destkey 。獲取今天點擊最多的15條:zrevrange hotNews:20190926 0 15 withscores
    前言日前拜讀阿牛老師的大作《領導:誰再用定時任務實現關閉訂單,立馬滾蛋!》發現其方案有若干瑕疵,特此拋磚引玉討論一二。在使用 Redisson DelayQueue 等定時任務中間件時可以同時使用掃描數據庫的方法作為補償機制,避免中間件故障造成任務丟失。
    時間在2020年,當時使用xray,發現它的的反射型xss掃描很好用,于是想知道原理,好奇探索了下大概的xss掃描規則。當時自己的機器都是2H1G的小機器,想提高效率,于是學習用分布式,但是又由此帶來了很多第三方的數據庫,隊列什么的,更加壓迫了我機器的性能..做了這么多,成果也很喜人,各大src,微軟都有,運氣好也獲得了微軟1000多刀的賞金。
    Spring框架是一個開放源代碼的J2EE應用程序框架,是針對bean的生命周期進行管理的輕量級容器。Spring可以單獨應用于構筑應用程序,也可以和Struts、Webwork、Tapestry等眾多Web框架組合使用,并且可以與 Swing等桌面應用程序AP組合。 Spring框架主要由七部分組成,分別是 Spring Core、 Spring AOP、 Spring ORM、 Spring
    痛苦的純文本日志管理日子一去不復返了。雖然純文本數據在某些情況下仍然很有用,但是在進行擴展分析以收集有洞察力的基礎設施數據并改進代碼質量時,尋找一個可靠的日志管理解決方案是值得的,該解決方案可以增強業務工作流的能力。 日志不是一件容易處理的事情,但無論如何都是任何生產系統的一個重要方面。當您面臨一個困難的問題時,使用日志管理解決方案要比在遍布系統環境的無休止的文本文件循環中穿梭容易得多。
    一文讀懂HW護網行動
    2022-07-26 12:00:00
    隨著《網絡安全法》和《等級保護制度條例2.0》的頒布,國內企業的網絡安全建設需與時俱進,要更加注重業務場景的安全性并合理部署網絡安全硬件產品,嚴防死守“網絡安全”底線。“HW行動”大幕開啟,國聯易安誓為政府、企事業單位網絡安全護航!
    Filebeat監視您指定的日志文件或位置,收集日志事件,并將它們轉發到Elasticsearch或 Logstash進行索引。使用Kibana,可以通過各種圖表進行高級數據分析及展示。
    釣魚常用手法總結
    2022-03-24 13:48:29
    雷神眾測擁有對此文章的修改和解釋權。如欲轉載或傳播此文章,必須保證此文章的完整性,包括版權聲明等全部內容。未經雷神眾測允許,不得任意修改或者增減此文章內容,不得以任何方式將其用于商業目的。
    一顆小胡椒
    暫無描述
      亚洲 欧美 自拍 唯美 另类