/** * Abstract class that imposes a bucketing structure and provides streams of buckets * * @paramtype of raw data that needs to get summarized into a bucket * @param type of data contained in each bucket * @param
- 这里的构造器主要初始化bucketedStream,主要是对HystrixEventStream进行observe,然后进行window操作,在进行flatMap
- window操作的timespan参数为bucketSizeInMs,其计算公式如下
final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get(); final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get(); final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
- BucketedCounterStream有两个直接的子类,也是抽象类,分别是BucketedRollingCounterStream及BucketedCumulativeCounterStream
/** * Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time. * * @paramtype of raw data that needs to get summarized into a bucket * @param type of data contained in each bucket * @param type of data emitted to stream subscribers (often is the same as A but does not have to be) */public abstract class BucketedRollingCounterStream extends BucketedCounterStream { private Observable sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedRollingCounterStream(HystrixEventStream stream, final int numBuckets, int bucketSizeInMs, final Func2 appendRawEventToBucket, final Func2 reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1 , Observable > reduceWindowToSummary = new Func1 , Observable >() { @Override public Observable call(Observable window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable observe() { return sourceStream; } /* package-private */ boolean isSourceCurrentlySubscribed() { return isSourceCurrentlySubscribed.get(); }}
- 基于父类的bucketedStream定义了用于observe的sourceStream,对bucketedStream进行了window及flatMap处理
- window操作采用的是count及skip参数,count参数值为numBuckets,skip参数值为1
/** * Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step * * @paramtype of raw data that needs to get summarized into a bucket * @param type of data contained in each bucket * @param type of data emitted to stream subscribers (often is the same as A but does not have to be) */public abstract class BucketedCumulativeCounterStream extends BucketedCounterStream { private Observable sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedCumulativeCounterStream(HystrixEventStream stream, int numBuckets, int bucketSizeInMs, Func2 reduceCommandCompletion, Func2 reduceBucket) { super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion); this.sourceStream = bucketedStream .scan(getEmptyOutputValue(), reduceBucket) .skip(numBuckets) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable observe() { return sourceStream; }}
- 基于父类的bucketedStream定义了用于observe的sourceStream,对bucketedStream进行了scan及skip操作
- scan与reduce的区别在于scan每操作完一次就会通知消费者,reduce是一口气操作完再通知消费者
- 这里scan参数为getEmptyOutputValue(),为空数组用于累加,skip值为numBuckets
- hystrix的BucketedCounterStream有两个直接的子类,BucketedRollingCounterStream及BucketedCumulativeCounterStream
- BucketedRollingCounterStream,采取的是window及flatMap操作,这里通过window来达到rolling的效果,其skip参数表示对原生数列,其开始的元素间隔是多少,比如skip为3,window的count为5,那么第一批window就是[1,2,3,4,5],第二批window就是[4,5,6,7,8]
- BucketedCumulativeCounterStream,采取的是scan及skip操作,其cumulative的效果是通过scan函数来实现的,然后通过skip操作丢弃掉最开始的numBuckets个数据。