从Code Review 谈如何做技术

这两天,在微博上表达了一下Code Review的重要性。因为翻看了阿里内部的Review Board上的记录,从上面发现Code Review做得好的是一些比较偏技术的团队,而偏业务的技术团队基本上没有看到Code Review的记录。当然,这并不能说没有记录他们就没有做Code Review,于是,我就问了一下以前在业务团队做过的同事有没有Code

阅读全文

小猪学arduino—蜂鸣器的使用

上一篇,我们学习了如何使用开关控制led灯,这一次,我们学习一下如何使用蜂鸣器。

蜂鸣器的使用方式和原理与led完全相同,高电平就发出声音,底电平静音,迅速的给高低电平就能发出滴滴类的声音。我们直接使用上一次的连线方法,把端口2位置的led换成蜂鸣器,把端口2的单次高电平改为20ms间隔的滴滴声。

由于连线图除2端口的led变为蜂鸣器外其它无变化就不上图了,代码调整如下:

int onoffPin = 9;//开关输入端口 int flickerPin = 8; //闪烁端口 int buzzerPin = 2; //蜂鸣器端口 int orderPin = 3; //顺序亮起起始端口 int orderLen = 5; //顺序亮起端口个数 int i=0; void setup() { //开关端口初始化 pinMode(onoffPin, INPUT); //闪烁端口初始化 pinMode(flickerPin, OUTPUT); //蜂鸣器端口初始化 pinMode(buzzerPin

阅读全文

小猪学arduino—使用开关控制led灯

IMG_3651

在上一篇arduino学习之—led灯控制中,我们做了用输出口高低电瓶控制led闪烁的尝试,并使用pc发送指令的方法控制led的熄灭。

本篇学习如何利用输入口接收开关装置的信号,替换pc发指令来控制led。具体思路:如果按下开关,顺序亮起所有led灯;否则,闪烁灯每2秒闪烁一次,其它灯熄灭。

具体连线和代码如下:

led.ino

int flickerPin = 8; //闪烁端口 int orderPin = 2; //顺序亮起起始端口 int orderLen = 6; //顺序亮起端口个数 int onoffPin = 9;//开关输入端口 int i=0; void setup() { //开关端口初始化 pinMode(onoffPin, INPUT); //闪烁端初始化 pinMode(flickerPin, OUTPUT); //闪烁端初始化 pinMode(flickerPin, OUTPUT); //顺序亮起端口初始化 for(i

阅读全文

正在考虑微服务架构的松耦合?小心这些陷阱!

微服务是一种新的架构,它使用简单、轻量、松耦合的服务来构建系统,这些服务彼此可以独立开发和发布。

如果你还不了解这些基础概念,请阅读Martin Fowler的文章(http://martinfowler.com/articles/microservices.html)。如果你想拿它和SOA进行比较,请看Don Ferguson的演讲(https://www.youtube.com/watch?v=W7tGlxJtofI)。Martin

阅读全文

小猪学arduino—led灯控制

IMG_3359

这里使用arduino UNO r3板子+7个电阻+7个led来学习如何实现定时闪烁和顺序亮起。

通过led控制可以了解arduino板子的基本控制和执行原理:GND作为负极来使用,2-13做为可控制的正极来使用。给对应端口高电平即会使通路通电,低电平可以理解为断电。增加电阻是为了降低电流避免烧坏led。

儿子的需求:

默认10端口灯一直保持闪烁,2-7端口灯顺序亮起后保持常亮;发送指令后,所有灯全灭;2秒后,10端口闪烁一次后,2-7端口顺序恢复常亮。

具体连线和代码如下:

led.ino

int flickerPin = 10; //闪烁端口 int orderPin = 2; //顺序亮起起始端口 int orderLen = 6; //顺序亮起端口个数 int i=0; void setup() { Serial.begin(9600); //设置波特率9600,用于接收来自pc的指令 //闪烁端初始化 pinMode(flickerPin, OUTPUT); //顺序亮起端口初始化 for(i= orderPin;i<orderPin+orderLen;i++){ pinMode(i

阅读全文

分布式队列编程:模型、实战

介绍

作为一种基础的抽象数据结构,队列被广泛应用在各类编程中。大数据时代对跨进程、跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在。但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点:

  • 使用分布式队列的时候,没有意识到它是队列。
  • 有具体需求的时候,忘记了分布式队列的存在。

阅读全文

10条命令分析Linux性能问题

当你登录到一台存在性能问题的Linux服务器上时,在头一分钟,你会检查什么?

我们看看Netflix的性能工程师是怎么做的。

Netflix大量使用EC2 Linux服务器,很多时候是用一些较为高层的工具做云或实例层次的分析。不过有时仍然需要登录到某个实例上,运行一些标准的Linux性能工具。

在最开始的一分钟内,可以先利用手头的标准Linux工具大致了解性能状况。借助如下10条命令(有些命令需要安装sysstat包),了解系统资源使用状况和正在运行的进程。先检查错误(errors)和饱和度(saturation),再检查资源利用率(resource

阅读全文

基于词槽的简单query匹配方法

我们在做类似搜索相关的特定服务时,通常都会遇到分词解析query,取出其中特定关键字进行检索的问题,这里提供一个简单的基于词槽的query匹配方法。

首先给出一个query示例:北京飞三亚机票多少钱?

我们需要达到的目标:

1.判定这个query的分类

2.解析出机票分类query中的起点和终点

阅读全文

不懂点CAP理论,你好意思说你是做分布式的吗?

CAP是什么?

CAP理论,被戏称为[帽子理论]。CAP理论由Eric Brewer在ACM研讨会上提出,而后CAP被奉为分布式领域的重要理论【link1】 。

分布式系统的CAP理论

首先把分布式系统中的三个特性进行了如下归纳:

  • 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
  • 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
  • 分区容忍性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

高可用、数据一致性是很多系统设计的目标,但是分区又是不可避免的事情。我们来看一看分别拥有CA、CP和AP的情况。

阅读全文

技术团队如何发现和培养Tech lead?

在影响团队长远战斗力的诸多因素中,比较有意思同时也非常关键的一个因素是对tech lead 的选择和培养,这也是我们今天的话题。

千军易得,一将难求

对技术团队来说,怎么强调tech lead的重要性都不算过分。好的tech lead能极大程度提升团队的凝聚力和执行力;严重不合格的tech lead会导致团队迅速崩溃,但好在暴露快、时间短、危害小。接近合格水平的tech

阅读全文

消息队列设计精要

mq-design

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。
本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。
本文首先会阐述什么时候你需要一个消息队列,然后以Push模型为主,从零开始分析设计一个消息队列时需要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。
也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题,如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。

何时需要消息队列

当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。
比如在美团旅游,我们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候,如果不使用消息系统,势必要调用我们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,作为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。
而我们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来说,这也不是我们的职责所在。说白了,如果他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来说太过于“重量级”了,只需要发布一个产品ID变更的通知,由下游系统来处理,可能更为合理。
再举一个例子,对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。
然而,这个过程中存在很多可能的意外:

1.A扣钱成功,调用B加钱接口失败。

2.A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。

3.A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

1.强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。

2.最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。
像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。
支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

如何设计一个消息队列

综述

我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。

基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。
我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

1.消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。

2.规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。

实现队列基本功能

RPC通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定义的框架也好。因为消息队列的RPC,和普通的RPC没有本质区别。当然了,自主利用Memchached或者Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了类似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,都可以使用现成的RPC框架。
简单来讲,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你可能会问,如果producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到所有consumer的存在。其次,producer尽量选择就近的机房就好了。

高可用

其实所有的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心,放心大胆的交给DBA们吧!
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步,关于这块HA的细节,可以参考下篇pull模型消息系统设计。

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。
只是这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。

存储子系统的选择

我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,可以参考下篇的存储子系统设计。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

消费关系解析

现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。
市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。
消息需要通知到多个业务集群,而一个业务集群内有很多台机器,只要一台机器消费这个消息就可以了。
当然这不是绝对的,很多时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种情况太过于复杂,一般不列入考虑范围。所以,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。
至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:

1.发送关系的维护。

2.发送关系变更时的通知。

队列高级特性设计

上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。

可靠投递(最终一致性)

这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。
方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。
具体来说:

1.producer往broker发送消息之前,需要做一次落地。

2.请求到server后,server确保数据落地后再告诉客户端发送成功。

3.支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。
重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。
Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。
最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。

消费确认

当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不一定。或许因为消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。
对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制,
消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。
但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。

重复消息和顺序消息

上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:

1.允许消息丢失。

2.从发送方到服务方到接受者都是单点单线程。

所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。
一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。
谈到重复消息,主要是两个话题:

1.如何鉴别消息重复,并幂等的处理重复消息。

2.一个消息队列如何尽量减少重复消息的投递。

先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就
能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

1.版本号。

2.状态机。

版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。
但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。
如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。
如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

1.对发送方必须要求消息带业务版本号。

2.下游必须存储消息的版本号,对于要严格保证顺序的。

还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储
就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。
就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。
举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。
此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

中间件对于重复消息的处理

回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。
我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。
减少重复消息的关键步骤:

1.broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。

2.对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。
解决方案从大方向上有两种:

1.两阶段提交,分布式事务。

2.本地事务,本地落地,补偿发送。

分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。
并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。
那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。
然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。
这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。
而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。
本地事务存在两个最大的使用障碍:

1.配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。

2.对于消息延迟高敏感的业务不适用。

话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;同步是需要当时关心
的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。
回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。
服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。
整个过程可以参考下面的代码:

客户端同步服务端异步。

Future<Result> future = request(server);//server立刻返回future
synchronized(future){
while(!future.isDone()){
   future.wait();//server处理结束后会notify这个future,并修改isdone标志
}
}
return future.get();

客户端同步服务端同步。

Result result = request(server);

客户端异步服务端同步(这里用线程池的方式)。

Future<Result> future = executor.submit(new Callable(){public void call<Result>(){ result = request(server); }}) return

阅读全文

linux根据字符串长度排序

sort 命令可以按照字母或者数字顺序排列字符串,不过如果我们想根据字符串的长度来排序呢?

配合一点点 awk 魔法,就可以达成愿望。建立一个 lsort 文件,内容如下:

#! /bin/sh awk ‘BEGIN { FS=RS } { print length, $0}’ $* | sort +0n -1 | sed ‘s/^[0-9][0-9]* //’

首先,awk 把标准输入中每行的前面,都加上这行的长度,然后传给 sort 对长度数字进行排序,最后用 sed 把长度数字删掉。

保存之后,加上可执行标记,就可以拷贝到

阅读全文

Linux比较文本文件的交集、差集与求差

介绍两个常用命令:

1. comm命令

comm命令可以用于两个文件之间的比较,它有一些选项可以用来调整输出,以便执行交集、求差、以及差集操作。有三列内容:分别表示A-B,B-A 和 A交B。

相关集合论解释: 交集:打印出两个文件所共有的行。 求差:打印出指定文件所包含的且不相同的行。<br

阅读全文

Linux内存中的Cache真的能被回收么?

在Linux系统中,我们经常用free命令来查看系统内存的使用状态。在一个RHEL6的系统上,free命令的显示内容大概是这样一个状态:

[root@tencent64 ~]# free
             total       used       free     shared    buffers     cached
Mem:     132256952   72571772   59685180          0    1762632   53034704
-/+ buffers/cache:   17774436  114482516
Swap:      2101192        508    2100684

这里的默认显示单位是kb,我的服务器是128G内存,所以数字显得比较大。这个命令几乎是每一个使用过Linux的人必会的命令,但越是这样的命令,似乎真正明白的人越少(我是说比例越少)。一般情况下,对此命令输出的理解可以分这几个层次:

  1. 不了解。这样的人的第一反应是:天啊,内存用了好多,70个多G,可是我几乎没有运行什么大程序啊?为什么会这样?Linux好占内存!
  2. 自以为很了解。这样的人一般评估过会说:嗯,根据我专业的眼光看的出来,内存才用了17G左右,还有很多剩余内存可用。buffers/cache占用的较多,说明系统中有进程曾经读写过文件,但是不要紧,这部分内存是当空闲来用的。
  3. 真的很了解。这种人的反应反而让人感觉最不懂Linux,他们的反应是:free显示的是这样,好吧我知道了。神马?你问我这些内存够不够,我当然不知道啦!我特么怎么知道你程序怎么写的?

根据目前网络上技术文档的内容,我相信绝大多数了解一点Linux的人应该处在第二种层次。大家普遍认为,buffers和cached所占用的内存空间是可以在内存压力较大的时候被释放当做空闲空间用的。但真的是这样么?在论证这个题目之前,我们先简要介绍一下buffers和cached是什么意思:

什么是buffer/cache?

buffer和cache是两个在计算机技术中被用滥的名词,放在不通语境下会有不同的意义。在Linux的内存管理中,这里的buffer指Linux内存的:Buffer cache。这里的cache指Linux内存中的:Page cache。翻译成中文可以叫做缓冲区缓存和页面缓存。在历史上,它们一个(buffer)被用来当成对io设备写的缓存,而另一个(cache)被用来当作对io设备的读缓存,这里的io设备,主要指的是块设备文件和文件系统上的普通文件。但是现在,它们的意义已经不一样了。在当前的内核中,page cache顾名思义就是针对内存页的缓存,说白了就是,如果有内存是以page进行分配管理的,都可以使用page cache作为其缓存来管理使用。当然,不是所有的内存都是以页(page)进行管理的,也有很多是针对块(block)进行管理的,这部分内存使用如果要用到cache功能,则都集中到buffer cache中来使用。(从这个角度出发,是不是buffer cache改名叫做block cache更好?)然而,也不是所有块(block)都有固定长度,系统上块的长度主要是根据所使用的块设备决定的,而页长度在X86上无论是32位还是64位都是4k。

明白了这两套缓存系统的区别,就可以理解它们究竟都可以用来做什么了。

什么是page cache

Page cache主要用来作为文件系统上的文件数据的缓存来用,尤其是针对当进程对文件有read/write操作的时候。如果你仔细想想的话,作为可以映射文件到内存的系统调用:mmap是不是很自然的也应该用到page cache?在当前的系统实现里,page cache也被作为其它文件类型的缓存设备来用,所以事实上page cache也负责了大部分的块设备文件的缓存工作。

什么是buffer cache

Buffer cache则主要是设计用来在系统对块设备进行读写的时候,对块进行数据缓存的系统来使用。这意味着某些对块的操作会使用buffer cache进行缓存,比如我们在格式化文件系统的时候。一般情况下两个缓存系统是一起配合使用的,比如当我们对一个文件进行写操作的时候,page cache的内容会被改变,而buffer cache则可以用来将page标记为不同的缓冲区,并记录是哪一个缓冲区被修改了。这样,内核在后续执行脏数据的回写(writeback)时,就不用将整个page写回,而只需要写回修改的部分即可。

如何回收cache?

Linux内核会在内存将要耗尽的时候,触发内存回收的工作,以便释放出内存给急需内存的进程使用。一般情况下,这个操作中主要的内存释放都来自于对buffer/cache的释放。尤其是被使用更多的cache空间。既然它主要用来做缓存,只是在内存够用的时候加快进程对文件的读写速度,那么在内存压力较大的情况下,当然有必要清空释放cache,作为free空间分给相关进程使用。所以一般情况下,我们认为buffer/cache空间可以被释放,这个理解是正确的。

但是这种清缓存的工作也并不是没有成本。理解cache是干什么的就可以明白清缓存必须保证cache中的数据跟对应文件中的数据一致,才能对cache进行释放。所以伴随着cache清除的行为的,一般都是系统IO飙高。因为内核要对比cache中的数据和对应硬盘文件上的数据是否一致,如果不一致需要写回,之后才能回收。

在系统中除了内存将被耗尽的时候可以清缓存以外,我们还可以使用下面这个文件来人工触发缓存清除的操作:

[root@tencent64 ~]# cat /proc/sys/vm/drop_caches 
1

方法是:

echo 1 > /proc/sys/vm/drop_caches

当然,这个文件可以设置的值分别为1、2、3。它们所表示的含义为:
echo 1 > /proc/sys/vm/drop_caches:表示清除pagecache。

echo 2 > /proc/sys/vm/drop_caches:表示清除回收slab分配器中的对象(包括目录项缓存和inode缓存)。slab分配器是内核中管理内存的一种机制,其中很多缓存数据实现都是用的pagecache。

echo 3 > /proc/sys/vm/drop_caches:表示清除pagecache和slab分配器中的缓存对象。

cache都能被回收么?

我们分析了cache能被回收的情况,那么有没有不能被回收的cache呢?当然有。我们先来看第一种情况:

tmpfs

大家知道Linux提供一种“临时”文件系统叫做tmpfs,它可以将内存的一部分空间拿来当做文件系统使用,使内存空间可以当做目录文件来用。现在绝大多数Linux系统都有一个叫做/dev/shm的tmpfs目录,就是这样一种存在。当然,我们也可以手工创建一个自己的tmpfs,方法如下:

[root@tencent64 ~]# mkdir /tmp/tmpfs [root@tencent64

阅读全文