消息隊列連環(huán)炮
消息隊列技術選型解決的問題:
不用 MQ 系統(tǒng)耦合場景A 系統(tǒng)產(chǎn)生了一個比較關鍵的數(shù)據(jù),很多系統(tǒng)需要 A 系統(tǒng)將數(shù)據(jù)發(fā)過來,強耦合(B,C,D,E 系統(tǒng)可能參數(shù)不一樣、一會需要一會不需要數(shù)據(jù),A 系統(tǒng)要不斷修改代碼維護) A 系統(tǒng)還要考慮 B、C、D、E 系統(tǒng)是否掛了,是否訪問超時?是否重試? 使用 MQ 系統(tǒng)解耦場景
總結:通過一個 MQ 的發(fā)布訂閱消息模型(Pub/Sub), 系統(tǒng) A 跟其他系統(tǒng)就徹底解耦了。 不用 MQ 同步高延遲請求場景一般互聯(lián)網(wǎng)類的企業(yè),對用戶的直接操作,一般要求每個請求都必須在 200ms以內(nèi),對用戶幾乎是無感知的。 使用 MQ 進行異步化之后的接口性能優(yōu)化提高高延時接口 沒有用 MQ 時高峰期系統(tǒng)被打死的場景高峰期每秒 5000 個請求,每秒對 MySQL 執(zhí)行 5000 條 SQL(一般MySQL每秒 2000 個請求差不多了),如果MySQL被打死,然后整個系統(tǒng)就崩潰,用戶就沒辦法使用系統(tǒng)了。但是高峰期過了之后,每秒鐘可能就 50 個請求,對整個系統(tǒng)沒有任何壓力。 使用 MQ 進行削峰的場景5000 個請求寫入到 MQ 里面,系統(tǒng) A 每秒鐘最多只能處理 2000 個請求(MySQL 每秒鐘最多處理 2000 個請求),系統(tǒng) A 從 MQ 里慢慢拉取請求,每秒鐘拉取 2000 個請求。MQ,每秒鐘 5000 個請求進來,結果只有 2000 個請求出去,結果導致在高峰期(21小時),可能有幾十萬甚至幾百萬的請求積壓在 MQ 中,這個是正常的,因為過了高峰期之后,每秒鐘就 50 個請求,但是系統(tǒng) A 還是會按照每秒 2000 個該請求的速度去處理。只要高峰期一過,系統(tǒng) A 就會快速的將積壓的消息給解決掉。
架構中引入 MQ 后存在的問題
MQ 可能掛掉,導致整個系統(tǒng)崩潰
可能發(fā)重復消息,導致插入重復數(shù)據(jù);消息丟了;消息順序亂了;系統(tǒng) B,C,D 掛了,導致 MQ 消息積累,磁盤滿了;
本來應該A,B,C,D 都執(zhí)行成功了再返回,結果A,B,C 執(zhí)行成功 D 失敗 Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么優(yōu)缺點建議:中小型公司 RabbitMQ 大公司:RocketMQ 大數(shù)據(jù)實時計算:Kafka 消息隊列高可用RabbtitMQ 高可用RabbitMQ有三種模式:單機模式 、普通集群模式、鏡像集群模式
demo級
隊列的元數(shù)據(jù)存在于多個實例中,但是消息不存在多個實例中,每次多臺機器上啟動多個 rabbitmq 實例,每個機器啟動一個。
沒有高可用性可言
隊列的元數(shù)據(jù)和消息都會存在于多個實例中,每次寫消息到 queue的時候,都會自動把消息到多個實例的 queue 里進行消息同步。也就 是每個節(jié)點上都有這個 queue 的一個完整鏡像(這個 queue的全部數(shù)據(jù))。任何一個節(jié)點宕機了,其他節(jié)點還包含這個 queue的完整數(shù)據(jù),其他 consumer 都可以到其他活著的節(jié)點上去消費數(shù)據(jù)都是 OK 的。 缺點:不是分布式的,如果這個 queue的數(shù)據(jù)量很大,大到這個機器上的容量無法容納 。 開啟鏡像集群模式方法:管理控制臺,Admin頁面下,新增一個鏡像集群模式的策略,指定的時候可以要求數(shù)據(jù)同步到所有節(jié)點,也可以要求同步到指定數(shù)量的節(jié)點,然后你再次創(chuàng)建 queue 的時候 ,應用這個策略,就 會自動將數(shù)據(jù)同步到其他的節(jié)點上去。
broker進程就是kafka在每臺機器上啟動的自己的一個進程。每臺機器+機器上的broker進程,就可以認為是 kafka集群中的一個節(jié)點。 你創(chuàng)建一個 topic,這個topic可以劃分為多個 partition,每個 partition 可以存在于不同的 broker 上,每個 partition就存放一部分數(shù)據(jù)。 這就是天然的分布式消息隊列,也就是說一個 topic的數(shù)據(jù),是分散放在 多個機器上的,每個機器就放一部分數(shù)據(jù)。 分布式的真正含義是每個節(jié)點只放一部分數(shù)據(jù),而不是完整數(shù)據(jù)(完整數(shù)據(jù)就是HA、集群機制)
每個 partition的數(shù)據(jù)都會同步到其他機器上,形成自己的多個 replica 副本。然后所有 replica 會選舉一個 leader。那么生產(chǎn)者、消費者都會和這個 leader 打交道,然后其他 replica 就是 follow。寫的時候,leader 負責把數(shù)據(jù)同步到所有 follower上去,讀的時候就直接讀 leader 上的數(shù)據(jù)即可。 如果某個 broker宕機了,剛好也是 partition的leader,那么此時會選舉一個新的 leader出來,大家繼續(xù)讀寫那個新的 leader即可,這個就 是所謂的高可用性。更多面試題:面試題內(nèi)容聚合 leader和follower的同步機制:寫數(shù)據(jù)的時候,生產(chǎn)者就寫 leader,然后 leader將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動從 leader來pull數(shù)據(jù)。一旦所有 follower同步好數(shù)據(jù)了,就會發(fā)送 ack給 leader,leader收到所有 follower的 ack之后,就會返回寫成功的消息給生產(chǎn)者。 消費的時候,只會從 leader去讀,但是只有一個消息已經(jīng)被所有 follower都同步成功返回 ack的時候,這個消息才會被消費者讀到。 消息隊列重復數(shù)據(jù)MQ 只能保證消息不丟,不能保證重復發(fā)送 Kafka 消費端可能出現(xiàn)的重復消費問題
每條消息都有一個 offset 代表 了這個消息的順序的序號,按照數(shù)據(jù)進入 kafka的順序,kafka會給每條數(shù)據(jù)分配一個 offset,代表了這個是數(shù)據(jù)的序號,消費者從 kafka去消費的時候,按照這個順序去消費,消費者會去提交 offset,就是告訴 kafka已經(jīng)消費到 offset=153這條數(shù)據(jù)了 ;zk里面就記錄了消費者當前消費到了 offset =幾的那條消息;假如此時消費者系統(tǒng)被重啟,重啟之后,消費者會找kafka,讓kafka把上次我消費到的那個地方后面的數(shù)據(jù)繼續(xù)給我傳遞過來。更多面試題:面試題內(nèi)容聚合
消費者不是說消費完一條數(shù)據(jù)就立馬提交 offset的,而是定時定期提交一次 offset。消費者如果再準備提交 offset,但是還沒提交 offset的時候,消費者進程重啟了,那么此時已經(jīng)消費過的消息的 offset并沒有提交,kafka也就不知道你已經(jīng)消費了 offset= 153那條數(shù)據(jù),這個時候kafka會給你發(fā)offset=152,153,154的數(shù)據(jù),此時 offset = 152,153的消息重復消費了 保證 MQ 重復消費冪等性
冪等:一個數(shù)據(jù)或者一個請求,給你重復來多次,你得確保對應的數(shù)據(jù)是不會改變的,不能出錯。
保證 MQ 消息不丟MQ 傳遞非常核心的消息,比如:廣告計費系統(tǒng),用戶點擊一次廣告,扣費一塊錢,如果扣費的時候消息丟了,則會不斷少錢,積少成多,對公司是一個很大的損失。 RabbitMQ可能存在的數(shù)據(jù)丟失問題
問題 1解決方案: 事務機制:(一般不采用,同步的,生產(chǎn)者發(fā)送消息會同步阻塞卡住等待你是成功還是失敗。會導致生產(chǎn)者發(fā)送消息的吞吐量降下來) channel.txSelectconfirm機制:(一般采用這種機制,異步的模式,不會阻塞,吞吐量會比較高)
問題 2 解決方案: 持久化到磁盤
缺點:可能會有一點點丟失數(shù)據(jù)的可能,消息剛好寫到了 rabbitmq中,但是還沒來得及持久化到磁盤上,結果不巧, rabbitmq掛了,會導致內(nèi)存里的一點點數(shù)據(jù)會丟失。更多面試題:面試題內(nèi)容聚合 問題 3 解決方案: 原因:消費者打開了 autoAck機制(消費到一條消息,還在處理中,還沒處理完,此時消費者自動 autoAck了,通知 rabbitmq說這條消息已經(jīng)消費了,此時不巧,消費者系統(tǒng)宕機了,那條消息丟失了,還沒處理完,而且 rabbitmq還以為這個消息已經(jīng)處理掉了) 解決方案:關閉 autoAck,自己處理完了一條消息后,再發(fā)送 ack給 rabbitmq,如果此時還沒處理完就宕機了,此時rabbitmq沒收到你發(fā)的ack消息,然后 rabbitmq 就會將這條消息重新分配給其他的消費者去處理。 Kafka 可能存在的數(shù)據(jù)丟失問題
消費端弄丟數(shù)據(jù)原因:消費者消費到那條消息后,自動提交了 offset,kafka以為你已經(jīng)消費好了這條消息,結果消費者掛了,這條消息就丟了。 例子:消費者消費到數(shù)據(jù)后寫到一個內(nèi)存 queue里緩存下,消息自動提交 offset,重啟了系統(tǒng),結果會導致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)丟失。 解決方法:kafka會自動提交 offset,那么只要關閉自動提交 offset,在處理完之后自己手動提交,可以保證數(shù)據(jù)不會丟。但是此時確實還是會重復消費,比如剛好處理完,還沒提交 offset,結果自己掛了,此時肯定會重復消費一次 ,做好冪等即可。 Kafka 丟掉消息原因:kafka 某個 broker 宕機,然后重新選舉 partition 的 leader時,此時其他的 follower 剛好還有一些數(shù)據(jù)沒有同步,結果此時 leader掛了,然后選舉某個 follower成 leader之后,就丟掉了之前l(fā)eader里未同步的數(shù)據(jù)。更多面試題:面試題內(nèi)容聚合 例子:kafka的leader機器宕機,將 follower 切換為 leader之后,發(fā)現(xiàn)數(shù)據(jù)丟了
按 2 的方案設置了 ack =all,一定不會丟。它會要求 leader 接收到消息,所有的 follower 都同步 到了消息之后,才認為本次寫成功。如果沒滿足這個條件,生產(chǎn)者會無限次重試 。 消息隊列順序性背景:mysql binlog 同步的系統(tǒng),在mysql里增刪改一條數(shù)據(jù),對應出來了增刪改 3 條binlog,接著這 3 條binlog發(fā)送到 MQ 里面,到消費出來依次執(zhí)行,起碼是要保證順序的吧,不然順序變成了 刪除、修改、增加。日同步數(shù)據(jù)達到上億,mysql->mysql,比如大數(shù)據(jù) team,需要同步一個mysql庫,來對公司的業(yè)務系統(tǒng)的數(shù)據(jù)做各種復雜的操作。 場景:
RabbitMQ 消息順序錯亂
RabbitMQ 如何保證消息順序性需要保證順序的數(shù)據(jù)放到同一個queue里
Kafka 消息順序錯亂
寫入一個 partition中的數(shù)據(jù)一定是有順序的。 生產(chǎn)者在寫的時候,可以指定一個 key,比如訂單id作為key,那么訂單相關的數(shù)據(jù),一定會被分發(fā)到一個 partition中區(qū),此時這個 partition中的數(shù)據(jù)一定是有順序的。Kafka 中一個 partition 只能被一個消費者消費。消費者從partition中取出數(shù)據(jù)的時候 ,一定是有順序的。 Kafka 保證消息順序性
如果消費者單線程消費+處理,如果處理比較耗時,處理一條消息是幾十ms,一秒鐘只能處理幾十條數(shù)據(jù),這個吞吐量太低了??隙ㄒ枚嗑€程去并發(fā)處理,壓測消費者4 核 8G 單機,32 條線程,最高每秒可以處理上千條消息 消息隊列延遲以及過期失效消費端出了問題,不消費了或者消費極其慢。接著坑爹了,你的消息隊列集群的磁盤都快寫滿了 ,都沒人消費,怎么辦?積壓了幾個小時,rabbitmq設置了消息過期時間后就沒了,怎么辦? 例如:
場景:幾千萬條數(shù)據(jù)再 MQ 里積壓了七八個小時 快速處理積壓的消息一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘是 18W 條,1000 多 W 條需要一個小時恢復。 步驟:
原來 3 個消費者需要 1 個小時可以搞定,現(xiàn)在 30 個臨時消費者需要 10 分鐘就可以搞定。 如果用的 rabbitmq,并且設置了過期時間,如果此消費在 queue里積壓超過一定的時間會被 rabbitmq清理掉,數(shù)據(jù)直接搞丟。 如果消息積壓mq,長時間沒被處理掉,導致mq快寫完滿了,你臨時寫一個程序,接入數(shù)據(jù)來消費,寫到一個臨時的mq里,再讓其他消費者慢慢消費 或者消費一個丟棄一個,都不要了,快速消費掉所有的消息,然后晚上補數(shù)據(jù)。 如何設計消息隊列中間件架構
|
|
|