Trusted answers to developer questions
Trusted Answers to Developer Questions

Related Tags

rxjs

Throttling and Buffering

Hafiz Ahmad Shahzad

Throttling

Throttling emits a value from the source Observable, and then ignores subsequent source values for a duration determined by another Observable. This process is then repeated.

throttle<X>(durationSelector: (value: X) => ObservableInput<any>, __1: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<X>

Throttle emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled.

As soon as the first source value arrives, it is forwarded to the output Observable. The timer is then is enabled by calling the durationSelectorfunction with the source value, which returns the “duration” Observable.

When the duration Observable emits a value, the timer is disabled. This process then repeats for the next source value.

Throttling Example

Buffering

The Buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items.

There are a number of variants in the various language-specific implementations of Buffer that differ in how they choose which items go in which buffers.

RxJS has four Buffer operators: * buffer

  • bufferWithCount * bufferWithTime
  • bufferWithTimeOrCount

Each of these variants have different ways of governing which source Observable items are emitted as part of which buffers.

Buffer

Buffer monitors the bufferOpenings Observable that emits BufferOpening objects. Each time it observes an emitted item like this, it creates a new collection to begin collecting items emitted by the source Observable, and it passes the bufferOpenings Observable into the bufferClosingSelector function. That function returns an Observable.

buffer(bufferOpenings,bufferClosingSelector)

Buffer with count

bufferWithCount creates a new buffer that starts with the first emitted item from the source Observable and emits a new one for every skip item thereafter. It then fills each buffer with count items: the initial item and count-1 subsequent ones, emitting each buffer when it is complete.

bufferWithCount(count, skip)

Buffer With Time

bufferWithTime emits a new collection of items periodically (every timeSpan milliseconds) that contain all items emitted by the source Observable. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan. By default, this variant uses the timeout scheduler.

bufferWithTime(timeSpan, timeShift)

Buffer with Time or Count

bufferWithTimeOrCount emits a new collection of items for every count of items emitted by the source Observable. Or, if timeSpan milliseconds have elapsed since its last collection emission, it emits a collection of however many items the source Observable has emitted in that span, even if this is fewer than count.

bufferWithTimeOrCount(timeSpan, count)

RELATED TAGS

rxjs

CONTRIBUTOR

Hafiz Ahmad Shahzad
Copyright ©2022 Educative, Inc. All rights reserved
RELATED COURSES

View all Courses

Keep Exploring