博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊hystrix的BucketedCounterStream
阅读量:6162 次
发布时间:2019-06-21

本文共 7791 字,大约阅读时间需要 25 分钟。

  hot3.png

本文主要研究一下hystrix的BucketedCounterStream

BucketedCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCounterStream.java

/** * Abstract class that imposes a bucketing structure and provides streams of buckets * * @param 
type of raw data that needs to get summarized into a bucket * @param
type of data contained in each bucket * @param
type of data emitted to stream subscribers (often is the same as A but does not have to be) */public abstract class BucketedCounterStream
{ protected final int numBuckets; protected final Observable
bucketedStream; protected final AtomicReference
subscription = new AtomicReference
(null); private final Func1
, Observable
> reduceBucketToSummary; private final BehaviorSubject
counterSubject = BehaviorSubject.create(getEmptyOutputValue()); protected BucketedCounterStream(final HystrixEventStream
inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2
appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1
, Observable
>() { @Override public Observable
call(Observable
eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List
emptyEventCountsToStart = new ArrayList
(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0
>() { @Override public Observable
call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } }); } abstract Bucket getEmptyBucketSummary(); abstract Output getEmptyOutputValue(); /** * Return the stream of buckets * @return stream of buckets */ public abstract Observable
observe(); public void startCachingStreamValuesIfUnstarted() { if (subscription.get() == null) { //the stream is not yet started Subscription candidateSubscription = observe().subscribe(counterSubject); if (subscription.compareAndSet(null, candidateSubscription)) { //won the race to set the subscription } else { //lost the race to set the subscription, so we need to cancel this one candidateSubscription.unsubscribe(); } } } /** * Synchronous call to retrieve the last calculated bucket without waiting for any emissions * @return last calculated bucket */ public Output getLatest() { startCachingStreamValuesIfUnstarted(); if (counterSubject.hasValue()) { return counterSubject.getValue(); } else { return getEmptyOutputValue(); } } public void unsubscribe() { Subscription s = subscription.get(); if (s != null) { s.unsubscribe(); subscription.compareAndSet(s, null); } }}
  • 这里的构造器主要初始化bucketedStream,主要是对HystrixEventStream进行observe,然后进行window操作,在进行flatMap
  • window操作的timespan参数为bucketSizeInMs,其计算公式如下
final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();        final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();        final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
  • BucketedCounterStream有两个直接的子类,也是抽象类,分别是BucketedRollingCounterStream及BucketedCumulativeCounterStream

BucketedRollingCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedRollingCounterStream.java

/** * Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time. * * @param 
type of raw data that needs to get summarized into a bucket * @param
type of data contained in each bucket * @param
type of data emitted to stream subscribers (often is the same as A but does not have to be) */public abstract class BucketedRollingCounterStream
extends BucketedCounterStream
{ private Observable
sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedRollingCounterStream(HystrixEventStream
stream, final int numBuckets, int bucketSizeInMs, final Func2
appendRawEventToBucket, final Func2
reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1
, Observable
> reduceWindowToSummary = new Func1
, Observable
>() { @Override public Observable call(Observable
window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable
observe() { return sourceStream; } /* package-private */ boolean isSourceCurrentlySubscribed() { return isSourceCurrentlySubscribed.get(); }}
  • 基于父类的bucketedStream定义了用于observe的sourceStream,对bucketedStream进行了window及flatMap处理
  • window操作采用的是count及skip参数,count参数值为numBuckets,skip参数值为1

BucketedCumulativeCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCumulativeCounterStream.java

/** * Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step * * @param 
type of raw data that needs to get summarized into a bucket * @param
type of data contained in each bucket * @param
type of data emitted to stream subscribers (often is the same as A but does not have to be) */public abstract class BucketedCumulativeCounterStream
extends BucketedCounterStream
{ private Observable
sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedCumulativeCounterStream(HystrixEventStream
stream, int numBuckets, int bucketSizeInMs, Func2
reduceCommandCompletion, Func2
reduceBucket) { super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion); this.sourceStream = bucketedStream .scan(getEmptyOutputValue(), reduceBucket) .skip(numBuckets) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable
observe() { return sourceStream; }}
  • 基于父类的bucketedStream定义了用于observe的sourceStream,对bucketedStream进行了scan及skip操作
  • scan与reduce的区别在于scan每操作完一次就会通知消费者,reduce是一口气操作完再通知消费者
  • 这里scan参数为getEmptyOutputValue(),为空数组用于累加,skip值为numBuckets

小结

  • hystrix的BucketedCounterStream有两个直接的子类,BucketedRollingCounterStream及BucketedCumulativeCounterStream
  • BucketedRollingCounterStream,采取的是window及flatMap操作,这里通过window来达到rolling的效果,其skip参数表示对原生数列,其开始的元素间隔是多少,比如skip为3,window的count为5,那么第一批window就是[1,2,3,4,5],第二批window就是[4,5,6,7,8]
  • BucketedCumulativeCounterStream,采取的是scan及skip操作,其cumulative的效果是通过scan函数来实现的,然后通过skip操作丢弃掉最开始的numBuckets个数据。

rolling及cumulative使用的是rxjava的window及scan操作来实现,看起来比较简洁。

doc

转载于:https://my.oschina.net/go4it/blog/1841401

你可能感兴趣的文章
ajax查询数据库时数据无法更新的问题
查看>>
Kickstart 无人职守安装,终于搞定了。
查看>>
linux开源万岁
查看>>
linux/CentOS6忘记root密码解决办法
查看>>
25个常用的Linux iptables规则
查看>>
集中管理系统--puppet
查看>>
Exchange 2013 PowerShell配置文件
查看>>
JavaAPI详解系列(1):String类(1)
查看>>
HTML条件注释判断IE<!--[if IE]><!--[if lt IE 9]>
查看>>
发布和逸出-构造过程中使this引用逸出
查看>>
使用SanLock建立简单的HA服务
查看>>
Subversion使用Redmine帐户验证简单应用、高级应用以及优化
查看>>
Javascript Ajax 异步请求
查看>>
DBCP连接池
查看>>
cannot run programing "db2"
查看>>
mysql做主从relay-log问题
查看>>
Docker镜像与容器命令
查看>>
批量删除oracle中以相同类型字母开头的表
查看>>
Java基础学习总结(4)——对象转型
查看>>
BZOJ3239Discrete Logging——BSGS
查看>>