<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    簡歷上寫的電商,那請問Redis 如何實現庫存扣減操作和防止被超賣?

    VSole2022-07-19 17:25:20

    大家好,我是胖虎,電商當項目經驗已經非常普遍了,不管你是包裝的還是真實的,起碼要能講清楚電商中常見的問題,比如庫存的操作怎么防止商品被超賣


    在日常開發中有很多地方都有類似扣減庫存的操作,比如電商系統中的商品庫存,抽獎系統中的獎品庫存等。

    解決方案

    1. 使用mysql數據庫,使用一個字段來存儲庫存,每次扣減庫存去更新這個字段。
    2. 還是使用數據庫,但是將庫存分層多份存到多條記錄里面,扣減庫存的時候路由一下,這樣子增大了并發量,但是還是避免不了大量的去訪問數據庫來更新庫存。
    3. 將庫存放到redis使用redis的incrby特性來扣減庫存。

    項目實戰(點擊下載):SpringBoot+SpringCloud+Mybatis+Vue電商項目實戰

    分析

    在上面的第一種和第二種方式都是基于數據來扣減庫存。

    基于數據庫單庫存

    第一種方式在所有請求都會在這里等待鎖,獲取鎖有去扣減庫存。在并發量不高的情況下可以使用,但是一旦并發量大了就會有大量請求阻塞在這里,導致請求超時,進而整個系統雪崩;而且會頻繁的去訪問數據庫,大量占用數據庫資源,所以在并發高的情況下這種方式不適用。

    基于數據庫多庫存

    第二種方式其實是第一種方式的優化版本,在一定程度上提高了并發量,但是在還是會大量的對數據庫做更新操作大量占用數據庫資源。

    基于數據庫來實現扣減庫存還存在的一些問題:

    • 用數據庫扣減庫存的方式,扣減庫存的操作必須在一條語句中執行,不能先selec在update,這樣在并發下會出現超扣的情況。如:
    update number set x=x-1 where x > 0
    
    • MySQL自身對于高并發的處理性能就會出現問題,一般來說,MySQL的處理性能會隨著并發thread上升而上升,但是到了一定的并發度之后會出現明顯的拐點,之后一路下降,最終甚至會比單thread的性能還要差。
    • 當減庫存和高并發碰到一起的時候,由于操作的庫存數目在同一行,就會出現爭搶InnoDB行鎖的問題,導致出現互相等待甚至死鎖,從而大大降低MySQL的處理性能,最終導致前端頁面出現超時異常。

    基于redis

    針對上述問題的問題我們就有了第三種方案,將庫存放到緩存,利用redis的incrby特性來扣減庫存,解決了超扣和性能問題。但是一旦緩存丟失需要考慮恢復方案。比如抽獎系統扣獎品庫存的時候,初始庫存=總的庫存數-已經發放的獎勵數,但是如果是異步發獎,需要等到MQ消息消費完了才能重啟redis初始化庫存,否則也存在庫存不一致的問題。項目實戰(點擊下載):SpringBoot+SpringCloud+Mybatis+Vue電商項目實戰

    基于redis實現扣減庫存的具體實現

    • 我們使用redis的lua腳本來實現扣減庫存
    • 由于是分布式環境下所以還需要一個分布式鎖來控制只能有一個服務去初始化庫存
    • 需要提供一個回調函數,在初始化庫存的時候去調用這個函數獲取初始化庫存

    初始化庫存回調函數(IStockCallback )

    /**
     * 獲取庫存回調
     * @author yuhao.wang
     */
    public interface IStockCallback {
     /**
      * 獲取庫存
      * @return
      */
     int getStock();
    }
    

    扣減庫存服務(StockService)

    /**
     * 扣庫存
     *
     * @author yuhao.wang
     */
    @Service
    public class StockService {
        Logger logger = LoggerFactory.getLogger(StockService.class);
        /**
         * 不限庫存
         */
        public static final long UNINITIALIZED_STOCK = -3L;
        /**
         * Redis 客戶端
         */
        @Autowired
        private RedisTemplate redisTemplate;
        /**
         * 執行扣庫存的腳本
         */
        public static final String STOCK_LUA;
        static {
            /**
             *
             * @desc 扣減庫存Lua腳本
             * 庫存(stock)-1:表示不限庫存
             * 庫存(stock)0:表示沒有庫存
             * 庫存(stock)大于0:表示剩余庫存
             *
             * @params 庫存key
             * @return
             *   -3:庫存未初始化
             *   -2:庫存不足
             *   -1:不限庫存
             *   大于等于0:剩余庫存(扣減之后剩余的庫存)
             *      redis緩存的庫存(value)是-1表示不限庫存,直接返回1
             */
            StringBuilder sb = new StringBuilder();
            sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
            sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
            sb.append("    local num = tonumber(ARGV[1]);");
            sb.append("    if (stock == -1) then");
            sb.append("        return -1;");
            sb.append("    end;");
            sb.append("    if (stock >= num) then");
            sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
            sb.append("    end;");
            sb.append("    return -2;");
            sb.append("end;");
            sb.append("return -3;");
            STOCK_LUA = sb.toString();
        }
        /**
         * @param key           庫存key
         * @param expire        庫存有效時間,單位秒
         * @param num           扣減數量
         * @param stockCallback 初始化庫存回調函數
         * @return -2:庫存不足; -1:不限庫存; 大于等于0:扣減庫存之后的剩余庫存
         */
        public long stock(String key, long expire, int num, IStockCallback stockCallback) {
            long stock = stock(key, num);
            // 初始化庫存
            if (stock == UNINITIALIZED_STOCK) {
                RedisLock redisLock = new RedisLock(redisTemplate, key);
                try {
                    // 獲取鎖
                    if (redisLock.tryLock()) {
                        // 雙重驗證,避免并發時重復回源到數據庫
                        stock = stock(key, num);
                        if (stock == UNINITIALIZED_STOCK) {
                            // 獲取初始化庫存
                            final int initStock = stockCallback.getStock();
                            // 將庫存設置到redis
                            redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                            // 調一次扣庫存的操作
                            stock = stock(key, num);
                        }
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    redisLock.unlock();
                }
            }
            return stock;
        }
        /**
         * 加庫存(還原庫存)
         *
         * @param key    庫存key
         * @param num    庫存數量
         * @return
         */
        public long addStock(String key, int num) {
            return addStock(key, null, num);
        }
        /**
         * 加庫存
         *
         * @param key    庫存key
         * @param expire 過期時間(秒)
         * @param num    庫存數量
         * @return
         */
        public long addStock(String key, Long expire, int num) {
            boolean hasKey = redisTemplate.hasKey(key);
            // 判斷key是否存在,存在就直接更新
            if (hasKey) {
                return redisTemplate.opsForValue().increment(key, num);
            }
            Assert.notNull(expire,"初始化庫存失敗,庫存過期時間不能為null");
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                if (redisLock.tryLock()) {
                    // 獲取到鎖后再次判斷一下是否有key
                    hasKey = redisTemplate.hasKey(key);
                    if (!hasKey) {
                        // 初始化庫存
                        redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
            return num;
        }
        /**
         * 獲取庫存
         *
         * @param key 庫存key
         * @return -1:不限庫存; 大于等于0:剩余庫存
         */
        public int getStock(String key) {
            Integer stock = (Integer) redisTemplate.opsForValue().get(key);
            return stock == null ? -1 : stock;
        }
        /**
         * 扣庫存
         *
         * @param key 庫存key
         * @param num 扣減庫存數量
         * @return 扣減之后剩余的庫存【-3:庫存未初始化; -2:庫存不足; -1:不限庫存; 大于等于0:扣減庫存之后的剩余庫存】
         */
        private Long stock(String key, int num) {
            // 腳本里的KEYS參數
            List keys = new ArrayList<>();
            keys.add(key);
            // 腳本里的ARGV參數
            List args = new ArrayList<>();
            args.add(Integer.toString(num));
            long result = redisTemplate.execute(new RedisCallback() {
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    // 集群模式和單機模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                    }
                    // 單機模式
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                    }
                    return UNINITIALIZED_STOCK;
                }
            });
            return result;
        }
    }
    

    調用

    /**
     * @author yuhao.wang
     */
    @RestController
    public class StockController {
        @Autowired
        private StockService stockService;
        @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public Object stock() {
            // 商品ID
            long commodityId = 1;
            // 庫存ID
            String redisKey = "redis_key:stock:" + commodityId;
            long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
            return stock >= 0;
        }
        /**
         * 獲取初始的庫存
         *
         * @return
         */
        private int initStock(long commodityId) {
            // TODO 這里做一些初始化庫存的操作
            return 1000;
        }
        @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public Object getStock() {
            // 商品ID
            long commodityId = 1;
            // 庫存ID
            String redisKey = "redis_key:stock:" + commodityId;
            return stockService.getStock(redisKey);
        }
        @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public Object addStock() {
            // 商品ID
            long commodityId = 2;
            // 庫存ID
            String redisKey = "redis_key:stock:" + commodityId;
            return stockService.addStock(redisKey, 2);
        }
    }
    
    redisredis分布式鎖
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    寫在前面需求是做一個秒殺系統,比如大家來搶100臺手機,先到先得。查閱了網上很多用redis實現秒殺的demo,竟然沒一個能用的!!!有些是php的,沒閑心研究了,現在說說為什么不能用:絕大多數的DEMO都是基于redis的watch特性的事務實現①,個別是基于redis分布式實現②。當然還有些用了腳本的,我也沒仔細看是lua還是調用redis指令,哪有那個閑心去研究哇。并且使用這種方式實現呢,在并發量較大的時候,過多的重試線程應該會嚴重影響服務器性能。
    Set第三個參數過期時間單位是秒。然后這個Set中存在的就是網點還沒有攬收的件,這時候通過Count就會知道這個網點今天還有多少件沒有攬收。如果get,set兩次以上,建議用getall,setall。
    代表的a的二進制位的修改。對應的ASCII碼是97,轉換為二進制數據是01100001. 因為bit非常節省空間,可以用來做大數據量的統計。BITOPNOTdestkeykey ,對給定 key 求邏輯非,并將結果保存到 destkey 。獲取今天點擊最多的15條:zrevrange hotNews:20190926 0 15 withscores
    面對越來越多的高并發場景,限流顯示的尤為重要。 當然,限流有許多種實現的方式,Redis具有很強大的功能,我用Redis實踐了三種的實現方式,可以較為簡單的實現其方式。Redis不僅僅是可以做限流,還可以做數據統計,附近的人等功能,這些可能會后續寫到。
    解決方案使用mysql數據庫,使用一個字段來存儲庫存,每次扣減庫存去更新這個字段。將庫存放到redis使用redis的incrby特性來扣減庫存。基于數據庫單庫存第一種方式在有請求都會在這里等待,獲取有去扣減庫存。基于數據庫多庫存第二種方式其實是第一種方式的優化版本,在一定程度上提高了并發量,但是在還是會大量的對數據庫做更新操作大量占用數據庫資源。但是一旦緩存丟失需要考慮恢復方案。
    目前業界常見的延時消息方案
    Jumpserver 概述Jumpserver 是一款使用 Python, Django 開發的開源跳板機系統, 為互聯網企業提供了認證,授權,審計,自動化運維等功能。(LDAP 是輕量目錄訪問協議,英文全稱是 Lightweight Directory Access Protocol,一般都簡稱為 LDAP。它是基于 X.500 標準的,但是簡單多了并且可以根據需要定制。與 X.500 不同,LDAP 支持 TCP/IP,這對訪問 Internet 是必須的。LDAP 的核心規范在 RFC 中都有定義,有與 LDAP 相關的 RFC 都可以在 LDAPman RFC 網頁中找到。
    這篇文章主要收集一些常見的未授權訪問漏洞。未授權訪問漏洞可以理解為需要安全配置或權限認證的地址、授權頁面存在缺陷導致其他用戶可以直接訪問從而引發重要權限可被操作、數據庫或網站目錄等敏感信息泄露。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类