电竞比分网-中国电竞赛事及体育赛事平台

分享

你必須要知道的kafka

 xujin3 2018-08-21

1. 概述


Apache Kafka最早是由LinkedIn開源出來的分布式消息系統(tǒng),現(xiàn)在是Apache旗下的一個子項目,并且已經(jīng)成為開源領(lǐng)域應(yīng)用最廣泛的消息系統(tǒng)之一。Kafka社區(qū)非?;钴S,從0.9版本開始,Kafka的標(biāo)語已經(jīng)從“一個高吞吐量,分布式的消息系統(tǒng)”改為'一個分布式流平臺'。


Kafka和傳統(tǒng)的消息系統(tǒng)不同在于:


  • kafka是一個分布式系統(tǒng),易于向外擴(kuò)展。

  • 它同時為發(fā)布和訂閱提供高吞吐量

  • 它支持多訂閱者,當(dāng)失敗時能自動平衡消費者

  • 消息的持久化


kafka和其他消息隊列的對比:



kafkaactivemq



背景Kafka 是LinkedIn 開發(fā)的一個高性能、分布式的消息系統(tǒng),廣泛用于日志收集、流式數(shù)據(jù)處理、在線和離線消息分發(fā)等場景ActiveMQActiveMQ是一種開源的,實現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用程序提供高效的、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級消息通信。
開發(fā)語言java,scalaJava
協(xié)議支持自己制定的一套協(xié)議  JMS協(xié)議
持久化支持支持
支持
事務(wù)支持0.11.0之后支持支持
producer容錯在kafka中提供了ack配置選項,
request.required.acks=-1,級別最低,生產(chǎn)者不需要關(guān)心是否發(fā)送成功
request.required.acks=0,只需要leader分區(qū)有了即可
request.required.acks=1,isr集合中的所有同步了才返回
可能會有重復(fù)數(shù)據(jù)
發(fā)送失敗后即可重試 有ack模型ack模型可能重復(fù)消息事務(wù)模型保證完全一致
吞吐量kafka具有高的吞吐量,內(nèi)部采用消息的批量處理,zero-copy機(jī)制,數(shù)據(jù)的存儲和獲取是本地磁盤順序批量操作,具有O(1)的復(fù)雜度,消息處理的效率很高
負(fù)載均衡kafka采用zookeeper對集群中的broker、consumer進(jìn)行管理,可以注冊topic到zookeeper上;通過zookeeper的協(xié)調(diào)機(jī)制,producer保存對應(yīng)topic的broker信息,可以隨機(jī)或者輪詢發(fā)送到broker上;并且producer可以基于語義指定分片,消息發(fā)送到broker的某分片上


2. 入門實例


2.1 生產(chǎn)者


producer


  1. import java.util.Properties;


  2. import org.apache.kafka.clients.producer.KafkaProducer;

  3. import org.apache.kafka.clients.producer.ProducerRecord;


  4. public class UserKafkaProducer extends Thread

  5. {

  6.    private final KafkaProducerInteger, String> producer;

  7.    private final String topic;

  8.    private final Properties props = new Properties();

  9.    public UserKafkaProducer(String topic)

  10.    {

  11.        props.put('metadata.broker.list', 'localhost:9092');

  12.        props.put('bootstrap.servers', 'master2:6667');

  13.        props.put('retries', 0);

  14.        props.put('batch.size', 16384);

  15.        props.put('linger.ms', 1);

  16.        props.put('buffer.memory', 33554432);

  17.        props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');

  18.        props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');

  19.        producer = new KafkaProducerInteger, String>(props);

  20.        this.topic = topic;

  21.    }

  22.    @Override

  23.    public void run() {

  24.        int messageNo = 1;

  25.        while (true)

  26.        {

  27.            String messageStr = new String('Message_' + messageNo);

  28.            System.out.println('Send:' + messageStr);

  29.            //返回的是Future,異步發(fā)送

  30.            producer.send(new ProducerRecordInteger, String>(topic, messageStr));

  31.            messageNo++;

  32.            try {

  33.                sleep(3000);

  34.            } catch (InterruptedException e) {

  35.                e.printStackTrace();

  36.            }

  37.        }

  38.    }

  39. }


2.2 消費者


  1. Properties props = new Properties();

  2. /* 定義kakfa 服務(wù)的地址,不需要將所有broker指定上 */

  3. props.put('bootstrap.servers', 'localhost:9092');

  4. /* 制定consumer group */

  5. props.put('group.id', 'test');

  6. /* 是否自動確認(rèn)offset */

  7. props.put('enable.auto.commit', 'true');

  8. /* 自動確認(rèn)offset的時間間隔 */

  9. props.put('auto.commit.interval.ms', '1000');

  10. props.put('session.timeout.ms', '30000');

  11. /* key的序列化類 */

  12. props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');

  13. /* value的序列化類 */

  14. props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');

  15. /* 定義consumer */

  16. KafkaConsumerString, String> consumer = new KafkaConsumer<>(props);

  17. /* 消費者訂閱的topic, 可同時訂閱多個 */

  18. consumer.subscribe(Arrays.asList('foo', 'bar'));


  19. /* 讀取數(shù)據(jù),讀取超時時間為100ms */

  20. while (true) {

  21.    ConsumerRecordsString, String> records = consumer.poll(100);

  22.    for (ConsumerRecordString, String> record : records)

  23.        System.out.printf('offset = %d, key = %s, value = %s', record.offset(), record.key(), record.value());

  24. }


3. Kafka架構(gòu)原理


對于kafka的架構(gòu)原理我們先提出幾個問題?


1) Kafka的topic和分區(qū)內(nèi)部是如何存儲的,有什么特點?


2) 與傳統(tǒng)的消息系統(tǒng)相比,Kafka的消費模型有什么優(yōu)點?


3) Kafka如何實現(xiàn)分布式的數(shù)據(jù)存儲與數(shù)據(jù)讀取?


3.1 Kafka架構(gòu)圖



3.2 kafka名詞解釋


在一套kafka架構(gòu)中有多個Producer,多個Broker,多個Consumer,每個Producer可以對應(yīng)多個Topic,每個Consumer只能對應(yīng)一個ConsumerGroup。


整個Kafka架構(gòu)對應(yīng)一個ZK集群,通過ZK管理集群配置,選舉Leader,以及在consumer group發(fā)生變化時進(jìn)行rebalance。


名稱
解釋
Broker消息中間件處理節(jié)點,一個Kafka節(jié)點就是一個broker,一個或者多個Broker可以組成一個Kafka集群消息中間件處理節(jié)點,一個Kafka節(jié)點就是一個broker,一個或者多個Broker可以組成一個Kafka集群
Topic主題,Kafka根據(jù)topic對消息進(jìn)行歸類,發(fā)布到Kafka集群的每條消息都需要指定一個topic
Producer消息生產(chǎn)者,向Broker發(fā)送消息的客戶端
Consumer消息消費者,從Broker讀取消息的客戶端
ConsumerGroup
每個Consumer屬于一個特定的Consumer Group,一條消息可以發(fā)送到多個不同的Consumer Group,但是一個Consumer Group中只能有一個Consumer能夠消費該消息
Partition物理上的概念,一個topic可以分為多個partition,每個partition內(nèi)部是有序的


3.3 Topic 和 Partition


在Kafka中的每一條消息都有一個topic。一般來說在我們應(yīng)用中產(chǎn)生不同類型的數(shù)據(jù),都可以設(shè)置不同的主題。一個主題一般會有多個消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產(chǎn)者寫入的新消息。


kafka為每個主題維護(hù)了分布式的分區(qū)(partition)日志文件,每個partition在kafka存儲層面是append log。任何發(fā)布到此partition的消息都會被追加到log文件的尾部,在分區(qū)中的每條消息都會按照時間順序分配到一個單調(diào)遞增的順序編號,也就是我們的offset,offset是一個long型的數(shù)字,我們通過這個offset可以確定一條在該partition下的唯一消息。在partition下面是保證了有序性,但是在topic下面沒有保證有序性。



在上圖中在我們的生產(chǎn)者會決定發(fā)送到哪個Partition。


1) 如果沒有Key值則進(jìn)行輪詢發(fā)送。


2) 如果有Key值,對Key值進(jìn)行Hash,然后對分區(qū)數(shù)量取余,保證了同一個Key值的會被路由到同一個分區(qū),如果想隊列的強(qiáng)順序一致性,可以讓所有的消息都設(shè)置為同一個Key。


3.4 消費模型


消息由生產(chǎn)者發(fā)送到kafka集群后,會被消費者消費。一般來說我們的消費模型有兩種:推送模型(psuh)和拉取模型(pull)


基于推送模型的消息系統(tǒng),由消息代理記錄消費狀態(tài)。消息代理將消息推送到消費者后,標(biāo)記這條消息為已經(jīng)被消費,但是這種方式無法很好地保證消費的處理語義。比如當(dāng)我們把已經(jīng)把消息發(fā)送給消費者之后,由于消費進(jìn)程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息,如果我們在消費代理將其標(biāo)記為已消費,這個消息就永久丟失了。如果我們利用生產(chǎn)者收到消息后回復(fù)這種方法,消息代理需要記錄消費狀態(tài),這種不可取。如果采用push,消息消費的速率就完全由消費代理控制,一旦消費者發(fā)生阻塞,就會出現(xiàn)問題。


Kafka采取拉取模型(poll),由自己控制消費速度,以及消費的進(jìn)度,消費者可以按照任意的偏移量進(jìn)行消費。比如消費者可以消費已經(jīng)消費過的消息進(jìn)行重新處理,或者消費最近的消息等等。


3.5 網(wǎng)絡(luò)模型


3.5.1 KafkaClient --單線程Selector



單線程模式適用于并發(fā)鏈接數(shù)小,邏輯簡單,數(shù)據(jù)量小。


在kafka中,consumer和producer都是使用的上面的單線程模式。這種模式不適合kafka的服務(wù)端,在服務(wù)端中請求處理過程比較復(fù)雜,會造成線程阻塞,一旦出現(xiàn)后續(xù)請求就會無法處理,會造成大量請求超時,引起雪崩。而在服務(wù)器中應(yīng)該充分利用多線程來處理執(zhí)行邏輯。


3.5.2 Kafka--server -- 多線程Selector



在kafka服務(wù)端采用的是多線程的Selector模型,Acceptor運(yùn)行在一個單獨的線程中,對于讀取操作的線程池中的線程都會在selector注冊read事件,負(fù)責(zé)服務(wù)端讀取請求的邏輯。成功讀取后,將請求放入message queue共享隊列中。然后在寫線程池中,取出這個請求,對其進(jìn)行邏輯處理,即使某個請求線程阻塞了,還有后續(xù)的縣城從消息隊列中獲取請求并進(jìn)行處理,在寫線程中處理完邏輯處理,由于注冊了OP_WIRTE事件,所以還需要對其發(fā)送響應(yīng)。


3.6 高可靠分布式存儲模型


在Kafka中保證高可靠模型的依靠的是副本機(jī)制,有了副本機(jī)制之后,就算機(jī)器宕機(jī)也不會發(fā)生數(shù)據(jù)丟失。


3.6.1 高性能的日志存儲


kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節(jié)點上。同時在kafka的機(jī)器上,每個Partition其實都會對應(yīng)一個日志目錄,在目錄下面會對應(yīng)多個日志分段(LogSegment)。LogSegment文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為segment索引文件和數(shù)據(jù)文件。這兩個文件的命令規(guī)則為:partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值,數(shù)值大小為64位,20位數(shù)字字符長度,沒有數(shù)字用0填充,如下,假設(shè)有1000條消息,每個LogSegment大小為100,下面展現(xiàn)了900-1000的索引和Log:



由于kafka消息數(shù)據(jù)太大,如果全部建立索引,即占了空間又增加了耗時,所以kafka選擇了稀疏索引的方式,這樣的話索引可以直接進(jìn)入內(nèi)存,加快偏查詢速度。


簡單介紹一下如何讀取數(shù)據(jù),如果我們要讀取第911條數(shù)據(jù)首先第一步,找到他是屬于哪一段的,根據(jù)二分法查找到他屬于的文件,找到0000900.index和00000900.log之后,然后去index中去查找 (911-900) =11這個索引或者小于11最近的索引,在這里通過二分法我們找到了索引是[10,1367]然后我們通過這條索引的物理位置1367,開始往后找,直到找到911條數(shù)據(jù)。


上面講的是如果要找某個offset的流程,但是我們大多數(shù)時候并不需要查找某個offset,只需要按照順序讀即可,而在順序讀中,操作系統(tǒng)會對內(nèi)存和磁盤之間添加page cahe,也就是我們平常見到的預(yù)讀操作,所以我們的順序讀操作時速度很快。但是kafka有個問題,如果分區(qū)過多,那么日志分段也會很多,寫的時候由于是批量寫,其實就會變成隨機(jī)寫了,隨機(jī)I/O這個時候?qū)π阅苡绊懞艽?。所以一般來說Kafka不能有太多的partition。針對這一點,RocketMQ把所有的日志都寫在一個文件里面,就能變成順序?qū)?,通過一定優(yōu)化,讀也能接近于順序讀。


可以思考一下:1.為什么需要分區(qū),也就是說主題只有一個分區(qū),難道不行嗎?2.日志為什么需要分段


3.6.2 副本機(jī)制


Kafka的副本機(jī)制是多個服務(wù)端節(jié)點對其他節(jié)點的主題分區(qū)的日志進(jìn)行復(fù)制。當(dāng)集群中的某個節(jié)點出現(xiàn)故障,訪問故障節(jié)點的請求會被轉(zhuǎn)移到其他正常節(jié)點(這一過程通常叫Reblance),kafka每個主題的每個分區(qū)都有一個主副本以及0個或者多個副本,副本保持和主副本的數(shù)據(jù)同步,當(dāng)主副本出故障時就會被替代。



在Kafka中并不是所有的副本都能被拿來替代主副本,所以在kafka的leader節(jié)點中維護(hù)著一個ISR(In sync Replicas)集合,翻譯過來也叫正在同步中集合,在這個集合中的需要滿足兩個條件:


  • 節(jié)點必須和ZK保持連接

  • 在同步的過程中這個副本不能落后主副本太多


另外還有個AR(Assigned Replicas)用來標(biāo)識副本的全集,OSR用來表示由于落后被剔除的副本集合,所以公式如下:ISR = leader + 沒有落后太多的副本; AR = OSR+ ISR;


這里先要說下兩個名詞:HW(高水位)是consumer能夠看到的此partition的位置,LEO是每個partition的log最后一條Message的位置。HW能保證leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取,不會造成消息丟失。


當(dāng)producer向leader發(fā)送數(shù)據(jù)時,可以通過request.required.acks參數(shù)來設(shè)置數(shù)據(jù)可靠性的級別:


  • 1(默認(rèn)):這意味著producer在ISR中的leader已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條message。如果leader宕機(jī)了,則會丟失數(shù)據(jù)。


  • 0:這意味著producer無需等待來自broker的確認(rèn)而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的。


  • -1:producer需要等待ISR中的所有follower都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性最高。但是這樣也不能保證數(shù)據(jù)不丟失,比如當(dāng)ISR中只有l(wèi)eader時(其他節(jié)點都和zk斷開連接,或者都沒追上),這樣就變成了acks=1的情況。


4. 高可用模型及冪等


 在分布式系統(tǒng)中一般有三種處理語義:


  • at-least-once:


至少一次,有可能會有多次。如果producer收到來自ack的確認(rèn),則表示該消息已經(jīng)寫入到Kafka了,此時剛好是一次,也就是我們后面的exactly-once。但是如果producer超時或收到錯誤,并且request.required.acks配置的不是-1,則會重試發(fā)送消息,客戶端會認(rèn)為該消息未寫入Kafka。如果broker在發(fā)送Ack之前失敗,但在消息成功寫入Kafka之后,這一次重試將會導(dǎo)致我們的消息會被寫入兩次,所以消息就不止一次地傳遞給最終consumer,如果consumer處理邏輯沒有保證冪等的話就會得到不正確的結(jié)果。


在這種語義中會出現(xiàn)亂序,也就是當(dāng)?shù)谝淮蝍ck失敗準(zhǔn)備重試的時候,但是第二消息已經(jīng)發(fā)送過去了,這個時候會出現(xiàn)單分區(qū)中亂序的現(xiàn)象,我們需要設(shè)置Prouducer的參數(shù)max.in.flight.requests.per.connection,flight.requests是Producer端用來保存發(fā)送請求且沒有響應(yīng)的隊列,保證Producer端未響應(yīng)的請求個數(shù)為1。


  • at-most-once:


如果在ack超時或返回錯誤時producer不重試,也就是我們講request.required.acks=-1,則該消息可能最終沒有寫入kafka,所以consumer不會接收消息。


  • exactly-once:


剛好一次,即使producer重試發(fā)送消息,消息也會保證最多一次地傳遞給consumer。該語義是最理想的,也是最難實現(xiàn)的。在0.10之前并不能保證exactly-once,需要使用consumer自帶的冪等性保證。0.11.0使用事務(wù)保證了


4.1 如何實現(xiàn)exactly-once


要實現(xiàn)exactly-once在Kafka 0.11.0中有兩個官方策略:


4.1.1 單Producer單Topic


每個producer在初始化的時候都會被分配一個唯一的PID,對于每個唯一的PID,Producer向指定的Topic中某個特定的Partition發(fā)送的消息都會攜帶一個從0單調(diào)遞增的sequence number。


在我們的Broker端也會維護(hù)一個維度為,每次提交一次消息的時候都會對齊進(jìn)行校驗:


  • 如果消息序號比Broker維護(hù)的序號大一以上,說明中間有數(shù)據(jù)尚未寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber


  • 如果消息序號小于等于Broker維護(hù)的序號,說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber


  • 如果消息序號剛好大一,就證明是合法的


上面所說的解決了兩個問題:


1) 當(dāng)Prouducer發(fā)送了一條消息之后失敗,broker并沒有保存,但是第二條消息卻發(fā)送成功,造成了數(shù)據(jù)的亂序。


2) 當(dāng)Producer發(fā)送了一條消息之后,broker保存成功,ack回傳失敗,producer再次投遞重復(fù)的消息。


上面所說的都是在同一個PID下面,意味著必須保證在單個Producer中的同一個seesion內(nèi),如果Producer掛了,被分配了新的PID,這樣就無法保證了,所以Kafka中又有事務(wù)機(jī)制去保證。


4.1.2 事務(wù)


在kafka中事務(wù)的作用是:


  • 實現(xiàn)exactly-once語義

  • 保證操作的原子性,要么全部成功,要么全部失敗。

  • 有狀態(tài)的操作的恢復(fù)


事務(wù)可以保證就算跨多個,在本次事務(wù)中的對消費隊列的操作都當(dāng)成原子性,要么全部成功,要么全部失敗。并且,有狀態(tài)的應(yīng)用也可以保證重啟后從斷點處繼續(xù)處理,也即事務(wù)恢復(fù)。在kafka的事務(wù)中,應(yīng)用程序必須提供一個唯一的事務(wù)ID,即Transaction ID,并且宕機(jī)重啟之后,也不會發(fā)生改變,Transactin ID與PID可能一一對應(yīng)。區(qū)別在于Transaction ID由用戶提供,而PID是內(nèi)部的實現(xiàn)對用戶透明。為了Producer重啟之后,舊的Producer具有相同的Transaction ID失效,每次Producer通過Transaction ID拿到PID的同時,還會獲取一個單調(diào)遞增的epoch。由于舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識別出該P(yáng)roducer是老的Producer并拒絕其請求。為了實現(xiàn)這一點,Kafka 0.11.0.0引入了一個服務(wù)器端的模塊,名為Transaction Coordinator,用于管理Producer發(fā)送的消息的事務(wù)性。該Transaction Coordinator維護(hù)Transaction Log,該log存于一個內(nèi)部的Topic內(nèi)。由于Topic數(shù)據(jù)具有持久性,因此事務(wù)的狀態(tài)也具有持久性。Producer并不直接讀寫Transaction Log,它與Transaction Coordinator通信,然后由Transaction Coordinator將該事務(wù)的狀態(tài)插入相應(yīng)的Transaction Log。Transaction Log的設(shè)計與Offset Log用于保存Consumer的Offset類似。


最后


關(guān)于消息隊列或者Kafka的一些常見的面試題,通過上面的文章可以提煉出以下幾個比較經(jīng)典的問題,大部分問題都可以從上面總結(jié)后找到答案:


1. 為什么使用消息隊列?消息隊列的作用是什么?

2. Kafka的topic和分區(qū)內(nèi)部是如何存儲的,有什么特點?

3. 與傳統(tǒng)的消息系統(tǒng)相比,Kafka的消費模型有什么優(yōu)點?

4. Kafka如何實現(xiàn)分布式的數(shù)據(jù)存儲與數(shù)據(jù)讀取?

5. kafka為什么比rocketmq支持的單機(jī)partion要少?

6. 為什么需要分區(qū),也就是說主題只有一個分區(qū),難道不行嗎?

7. 日志為什么需要分段?

8. kafka是依靠什么機(jī)制保持高可靠,高可用?

9. 消息隊列如何保證消息冪等?

10. 讓你自己設(shè)計個消息隊列,你會怎么設(shè)計,會考慮哪些方面?


出處:https://mp.weixin.qq.com/s/vhwUCdimvpBt5Z38pRX5xw



架構(gòu)文摘

ID:ArchDigest

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多