<tt id="a3jom"></tt>
    1. <tt id="a3jom"><noscript id="a3jom"></noscript></tt>

        <tt id="a3jom"></tt>

        延遲任務的實現總結

        上傳人:lis****210 文檔編號:188237643 上傳時間:2023-02-18 格式:DOCX 頁數:23 大?。?7.90KB
        收藏 版權申訴 舉報 下載
        延遲任務的實現總結_第1頁
        第1頁 / 共23頁
        延遲任務的實現總結_第2頁
        第2頁 / 共23頁
        延遲任務的實現總結_第3頁
        第3頁 / 共23頁
        資源描述:

        《延遲任務的實現總結》由會員分享,可在線閱讀,更多相關《延遲任務的實現總結(23頁珍藏版)》請在裝配圖網上搜索。

        1、延遲任務的實現總結上一篇寫了使用RabbitMQ來實現延遲任務的實現,其 實實現延遲任務的方式有很多,各有利弊,有單機和分布式 的。在這里做一個總結,在遇到這類問題的時候希望給大家 一個參考和思路。延遲任務有別于定式任務,定式任務往 往是固定周期的,有明確的觸發時間。而延遲任務一般沒有 固定的開始時間,它常常是由一個事件觸發的,而在這個事 件觸發之后的一段時間內觸發另一個事件。延遲任務相關的 業務場景如下:場景一:物聯網系統經常會遇到向終端下 發命令,如果命令一段時間沒有應答,就需要設置成超時。 場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則 系統自動取消訂單。下面我們來探討一些方案,其

        2、實這些 方案沒有好壞之分,和系統架構一樣,只有最適合。對于數 據量較小的情況下,任意一種方案都可行,考慮的是簡單明 了和開發速度,盡量避免把系統搞復雜了。而對于數據量較 大的情況下,就需要有一些選擇,并不是所有的方案都適合 了。 1.數據庫輪詢這是比較常見的一種方式,所有的訂 單或者所有的命令一般都會存儲在數據庫中。我們會起一個 線程去掃數據庫或者一個數據庫定時Job,找到那些超時的 數據,直接更新狀態,或者拿出來執行一些操作。這種方式 很簡單,不會引入其他的技術,開發周期短。如果數據量比較大,千萬級甚至更多,插入頻率很高的話,上面的方式在性能上會出現一些問題,查找和更新對會占用很多時間, 輪

        3、詢頻率高的話甚至會影響數據入庫。一種可以嘗試的方式 就是使用類似TBSchedule或Elastic-Job這樣的分布式的任 務調度加上數據分片功能,把需要判斷的數據分到不同的機 器上執行。如果數據量進一步增大,那掃數據庫肯定就不 行了。另一方面,對于訂單這類數據,我們也許會遇到分庫 分表,那上述方案就會變得過于復雜,得不償失。2. JDK延遲隊列 Java 中的 DelayQueue 位于 java.util.concurrent 包下,作為單機實現,它很好的實現了延遲一段時間后觸發 事件的需求。由于是線程安全的它可以有多個消費者和多個 生產者,從而在某些情況下可以提升性能。DelayQue

        4、ue本 質是封裝了 一個PriorityQueue,使之線程安全,加上Delay 功能,也就是說,消費者線程只能在隊列中的消息“過期”之 后才能返回數據獲取到消息,不然只能獲取到null。之所以 要用到PriorityQueue,主要是需要排序。也許后插入的消息 需要比隊列中的其他消息提前觸發,那么這個后插入的消息 就需要最先被消費者獲取,這就需要排序功能。PriorityQueue內部使用最小堆來實現排序隊列。隊首的,最 先被消費者拿到的就是最小的那個。使用最小堆讓隊列在數 據量較大的時候比較有優勢。使用最小堆來實現優先級隊列 主要是因為最小堆在插入和獲取時,時間復雜度相對都比較 好,都是O

        5、(logN)。下面例子實現了未來某個時間要觸發的 消息。我把這些消息放在DelayQueue中,當消息的觸發時 間到,消費者就能拿到消息,并且消費,實現處理方法。示 例代碼: /*定義放在延遲隊列中的對象,需要實現Delayed接口*/public class DelayedTask implements Delayed (private int _expireInSecond = 0;public DelayedTask(int delaySecond) (Calendar cal = Calendar.getInstance();cal.add(Calendar.SECOND, delay

        6、Second);_expireInSecond = (int) (cal.getTimeInMillis() /1000);public int compareTo(Delayed o) (long d = (getDelay(TimeUnit.NANOSECONDS)- o.getDelay(TimeUnit.NANOSECONDS);return (d = 0) ? 0 : (d < 0) ? -1 : 1);public long getDelay(TimeUnit unit) (/ TODO Auto-generated method stubCalendar cal = Cal

        7、endar.getInstance();return _expireInSecond - (cal.getTimeInMillis() / 1000);下面定義了三個延遲任務,分別是10秒,5秒和15秒。依 次入隊列,期望5秒鐘后,5秒的消息先被獲取到,然后每 個5秒鐘,依次獲取到10秒數據和15秒的那個數據。public static void main(String args) throws InterruptedException (/ TODO Auto-generated method stubSimpleDateFormat sdf = newSimpleDateFormat(yy

        8、yy-MM-dd HH:mm:ss);/定義延遲隊列DelayQueue<DelayedTask> delayQueue = new DelayQueue<DelayedTask>();定義三個延遲任務DelayedTask task1 = new DelayedTask(10);DelayedTask task2 = new DelayedTask(5);DelayedTask task3 = new DelayedTask(15);delayQueue.add(task1);delayQueue.add(task2);delayQueue.add(task3);Sy

        9、stem.out.println(sdf.format(new Date() + start);while (delayQueue.size() != 0) (/如果沒到時間,該方法會返回DelayedTask task = delayQueue.poll(); if (task != null) (Date now = new Date();System.out.println(sdf.format(now);Thread.sleep(1000);輸出結果如下圖:DelayQueue是一種很好的實現方式,雖然是單機,但是可 以多線程生產和消費,提高效率。拿到消息后也可以使用異 步線程去執行下

        10、一步的任務。如果有分布式的需求可以使用 Redis來實現消息的分發,如果對消息的可靠性有非常高的 要求可以使用消息中間件:使用DelayQueue需要考慮程序掛掉之后,內存里面未處理 消息的丟失帶來的影響。3. JDK ScheduledExecutorServiceJDK自帶的一種線程池,它能調度一些命令在一段時間之后 執行,或者周期性的執行。文章開頭的一些業務場景主要使 用第一種方式,即,在一段時間之后執行某個操作。代碼例 子如下:public static void main(String args) (/ TODO Auto-generated method stub Schedule

        11、dExecutorService executor =Executors.newScheduledThreadPool(100);for (int i = 10; i > 0; i-) ( executor.schedule(new Runnable() (public void run() (/ TODO Auto-generated method stubSystem.out.println(Work start, thread id: +Thread.currentThread().getId() + + sdf.format(new Date();, i, TimeUnit.SE

        12、CONDS);執行結果:ScheduledExecutorService 的實現類ScheduledThreadPoolExecutor 提供了一種并行處理的模 型,簡化了線程的調度。DelayedWorkQueue是類似 DelayQueue的實現,也是基于最小堆的、線程安全的數據 結構,所以會有上例排序后輸出的結果。ScheduledExecutorService 比上面一種 DelayQueue 更加實 用。因為,一般來說,使用DelayQueue獲取消息后觸發事 件都會實用多線程的方式執行,以保證其他事件能準時進行。 而ScheduledThreadPoolExecutor就是對這個過

        13、程進行了 封裝,讓大家更加方便的使用。同時在加強了部分功能,比 如定時觸發命令。4. 時間輪時間輪是一種非常驚艷的數據結構。其在Linux內核中使用 廣泛,是Linux內核定時器的實現方法和基礎之一。按使用 場景,大致可以分為兩種時間輪:原始時間輪和分層時間輪。 分層時間輪是原始時間輪的升級版本,來應對時間“槽”數量 比較大的情況,對內存和精度都有很高要求的情況。我們延 遲任務的場景一般只需要用到原始時間輪就可以了。原始時間輪:如下圖一個輪子,有8個“槽”,可以代表未來 的一個時間。如果以秒為單位,中間的指針每隔一秒鐘轉動 到新的“槽”上面,就好像手表一樣。如果當前指針指在1上 面,我有一個任

        14、務需要4秒以后執行,那么這個執行的線程 回調或者消息將會被放在5上。那如果需要在20秒之后執 行怎么辦,由于這個環形結構槽數只到8,如果要20秒,指 針需要多轉2圈。位置是在2圈之后的5上面(20 % 8 + 1 * 這個圈數需要記錄在槽中的數據結構里面。這個數據結構最 重要的是兩個指針,一個是觸發任務的函數指針,另外一個 是觸發的總第幾圈數。時間輪可以用簡單的數組或者是環形 鏈表來實現。相比DelayQueue的數據結構,時間輪在算法復雜度上有一 定優勢。DelayQueue由于涉及到排序,需要調堆,插入和 移除的復雜度是O(lgn),而時間輪在插入和移除的復雜度都是 O(1)o時間輪比較好

        15、的開源實現是Netty的/創建Timer,精度為100毫秒,HashedWheelTimer timer = new HashedWheelTimer();System.out.println(sdf.format(new Date();MyTask task1 = new MyTask();MyTask task2 = new MyTask();MyTask task3 = new MyTask();timer.newTimeout(task1, 5,TimeUnit.SECONDS);timer.newTimeout(task2, 10,TimeUnit.SECONDS);timer.ne

        16、wTimeout(task3, 15, TimeUnit.SECONDS);/阻塞main線程try (System.in.read(); catch (IOException e) (/ TODO Auto-generated catch block e.printStackTrace();其中HashedWheelTimer有多個構造函數。其中:ThreadFactory :創建線程的類,默認 Executors.defaultThreadFactory()。TickDuration :多少時間指針順時針轉一格,單位由下面一 個參數提供。TimeUnit :上一個參數的時間單位。Ticks

        17、PerWheel :時間輪上的格子數。如果一個任務要在120s后執行,時間輪是默認參數的話, 那么這個任務在時間輪上需要經過120000ms / (512 * 100ms) = 2 輪120000ms % (512 * 100ms) = 176 格。在使用HashedWheelTimer的過程中,延遲任務的實現最好 使用異步的,HashedWheelTimer的任務管理和執行都在一 個線程里面。如果任務比較耗時,那么指針就會延遲,導致 整個任務就會延遲。4. Quartzquartz是一個企業級的開源的任務調度框架,quartz內部使 用TreeSet來保存Trigger ,如下圖。Java中

        18、的TreeSet是 使用TreeMap實現,TreeMap是一個紅黑樹實現。紅黑樹 的插入和刪除復雜度都是logN。和最小堆相比各有千秋。最 小堆插入比紅黑樹快,刪除頂層節點比紅黑樹慢。相比上述的三種輕量級的實現功能豐富很多。有專門的任務 調度線程,和任務執行線程池。quartz功能強大,主要是用 來執行周期性的任務,當然也可以用來實現延遲任務。但是 如果只是實現一個簡單的基于內存的延時任務的話,quartz 就稍顯龐大。5. Redis ZSetRedis中的ZSet是一個有序的Set,內部使用HashMap和 跳表(SkipList)來保證數據的存儲和有序,HashMap里放的 是成員到s

        19、core的映射,而跳躍表里存放的是所有的成員, 排序依據是HashMap里存的score,使用跳躍表的結構可以 獲得比較高的查找效率,并且在實現上比較簡單。public class ZSetTest (private JedisPool jedisPool = null;/ Redis服務器IPprivate String ADDR = 10.23.22.42;/ Redis的端口號private int PORT = 6379;private SimpleDateFormat sdf = newSimpleDateFormat(yyyy-MM-dd HH:mm:ss);public void

        20、 intJedis() (jedisPool = new JedisPool(ADDR, PORT);public static void main(String args) (/ TODO Auto-generated method stubZSetTest zsetTest = new ZSetTest();zsetTest.intJedis();zsetTest.addItem();zsetTest.getItem();zsetTest.deleteZSet();public void deleteZSet() (Jedis jedis = jedisPool.getResource()

        21、;jedis.del(zset_test);public void addItem() (Jedis jedis = jedisPool.getResource();Calendar cal1 = Calendar.getInstance();cal1.add(Calendar.SECOND, 10);int second10later = (int) (cal1.getTimeInMillis() / 1000);Calendar cal2 = Calendar.getInstance();cal2.add(Calendar.SECOND, 20);int second20later = (

        22、int) (cal2.getTimeInMillis() / 1000);Calendar cal3 = Calendar.getInstance(); cal3.add(Calendar.SECOND, 30);int second30later = (int) (cal3.getTimeInMillis() / 1000);Calendar cal4 = Calendar.getInstance();cal4.add(Calendar.SECOND, 40);int second40later = (int) (cal4.getTimeInMillis() / 1000);Calendar

        23、 cal5 = Calendar.getInstance();cal5.add(Calendar.SECOND, 50);int second50later = (int) (cal5.getTimeInMillis() / 1000);jedis.zadd(zset_test, second50later, e);jedis.zadd(zset_test, second10later, a);jedis.zadd(zset_test, second30later, c);jedis.zadd(zset_test, second20later, b);jedis.zadd(zset_test,

        24、 second40later, d);System.out.println(sdf.format(new Date() + addfinished.);public void getItem() (Jedis jedis = jedisPool.getResource();while (true) (try (Set<Tuple> set = jedis.zrangeWithScores(zset_test, 0, 0);String value = (Tuple) set.toArray()0).getElement();int score = (int) (Tuple) set

        25、.toArray()0).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) (jedis.zrem(zset_test, value);System.out.println(sdf.format(newDate() + removed value: + value);if (jedis.zcard(zset_test) <= 0)(System.out.println(sdf.for

        26、mat(newDate() + zset empty );return;Thread.sleep(1000); catch (InterruptedException e) (/ TODO Auto-generated catch block e.printStackTrace();在用作延遲任務的時候,可以在添加數據的時候,使用zadd 把score寫成未來某個時刻的unix時間戳。消費者使用 zrangeWithScores獲取優先級最高的(最早開始的的任務。 注意,zrangeWithScores并不是取出來,只是看一下并不刪 除,類似于Queue的peek方法。程序對最早的這個消息進

        27、行驗證,是否到達要運行的時間,如果是則執行,然后刪除 zset中的數據。如果不是,則繼續等待。由于zrangeWithScores和zrem是先后使用,所以有可能 有并發問題,即兩個線程或者兩個進程都會拿到一樣的一樣 的數據,然后重復執行,最后又都會刪除。如果是單機多線 程執行,或者分布式環境下,可以使用Redis事務,也可以 使用由Redis實現的分布式鎖或者使用下例中Redis Script。 你可以在Redis官方的Transaction章節找到事務的相關內 容。使用Redis的好處主要是:1. 解耦:把任務、任務發起者、任務執行者的三者分開,邏 輯更加清晰,程序強壯性提升,有利于任務發

        28、起者和執行者 各自迭代,適合多人協作。2. 異?;謴停河捎谑褂肦edis作為消息通道,消息都存儲在 Redis中。如果發送程序或者任務處理程序掛了,重啟之后, 還有重新處理數據的可能性。3. 分布式:如果數據量較大,程序執行時間比較長,我們可以針對任務發起者和任務執行者進行分布式部署。特別注意 任務的執行者,也就是Redis的接收方需要考慮分布式鎖的 問題。6. JesqueJesque 是 Resque 的 java 實現,Resque 是一個基于 Redis 的Ruby項目,用于后臺的定時任務。Jesque實現延遲任務 的方法也是在Redis里面建立一個ZSet和上例一樣的處理 方式。上例

        29、提到在使用ZSet作為優先級隊列的時候,由于 zrangeWithScores和zrem沒法保證原子性,所有在分布 式環境下會有問題。在Jesque中,它使用的Redis Script 來解決這個問題。Redis Script可以保證操作的原子性,相 比事務也減少了一些網絡開銷,性能更加出色。7. RabbitMQ TTL 和 DXL使用RabbitMQ的TTL和DXL實現延遲隊列在這里不做詳 細的介紹,這篇文章描述的比較詳細。綜上所述,解決延遲隊列有很多種方法。選擇哪個解決方案 也需要根據不同的數據量、實時性要求、已有架構和組件等 因素進行判斷和取舍。對于比較簡單的系統,可以使用數據 庫輪訓的方式。數據量稍大,實時性稍高一點的系統可以使 用JDK延遲隊列(也許需要解決程序掛了,內存中未處理任 務丟失的情況)。如果需要分布式橫向擴展的話推薦使用 Redis的方案。但是對于系統中已有RabbitMQ,那RabbitMQ 會是一個更好的方案。

        展開閱讀全文
        溫馨提示:
        1: 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
        2: 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
        3.本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
        4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
        5. 裝配圖網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
        6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
        7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
        關于我們 - 網站聲明 - 網站地圖 - 資源地圖 - 友情鏈接 - 網站客服 - 聯系我們

        網站客服QQ:2846424093或766697812

        copyright@ 2020-2023  zhuangpeitu.com 裝配圖網版權所有   聯系電話:0512-65154990  

        備案號:蘇ICP備12009002號-6   經營許可證:蘇B2-20200052  蘇公網安備:32050602011098


        本站為文檔C2C交易模式,即用戶上傳的文檔直接被用戶下載,本站只是中間服務平臺,本站所有文檔下載所得的收益歸上傳人(含作者)所有。裝配圖網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對上載內容本身不做任何修改或編輯。若文檔所含內容侵犯了您的版權或隱私,請立即通知裝配圖網,我們立即給予刪除!

        特级毛片a片全部免费播,特级毛片a片全部免费观看,特级毛片免费无码不卡观看,特级全黄a片高清视频

        <tt id="a3jom"></tt>
        1. <tt id="a3jom"><noscript id="a3jom"></noscript></tt>

            <tt id="a3jom"></tt>