RxSwiftExt
A collection of Rx operators & tools not found in the core RxSwift distribution
Install / Use
/learn @RxSwiftCommunity/RxSwiftExtREADME
RxSwiftExt
If you're using RxSwift, you may have encountered situations where the built-in operators do not bring the exact functionality you want. The RxSwift core is being intentionally kept as compact as possible to avoid bloat. This repository's purpose is to provide additional convenience operators and Reactive Extensions.
Installation
This branch of RxSwiftExt targets Swift 5.x and RxSwift 5.0.0 or later.
- If you're looking for the Swift 4 version of RxSwiftExt, please use version
3.4.0of the framework.
CocoaPods
Add to your Podfile:
pod 'RxSwiftExt', '~> 5'
This will install both the RxSwift and RxCocoa extensions.
If you're interested in only installing the RxSwift extensions, without the RxCocoa extensions, simply use:
pod 'RxSwiftExt/Core'
Using Swift 4:
pod 'RxSwiftExt', '~> 3'
Carthage
Add this to your Cartfile
github "RxSwiftCommunity/RxSwiftExt"
Operators
RxSwiftExt is all about adding operators and Reactive Extensions to RxSwift!
Operators
These operators are much like the RxSwift & RxCocoa core operators, but provide additional useful abilities to your Rx arsenal.
- unwrap
- ignore
- ignoreWhen
- Observable.once
- distinct
- map
- not
- and
- Observable.cascade
- pairwise
- nwise
- retry
- repeatWithBehavior
- catchErrorJustComplete
- pausable
- pausableBuffered
- apply
- filterMap
- Observable.fromAsync
- Observable.zip(with:)
- Observable.merge(with:)
- count
- partition
- bufferWithTrigger
There are two more available operators for materialize()'d sequences:
Read below for details about each operator.
Reactive Extensions
RxSwift/RxCocoa Reactive Extensions are provided to enhance existing objects and classes from the Apple-ecosystem with Reactive abilities.
Operator details
unwrap
Unwrap optionals and filter out nil values.
Observable.of(1,2,nil,Int?(4))
.unwrap()
.subscribe { print($0) }
next(1)
next(2)
next(4)
ignore
Ignore specific elements.
Observable.from(["One","Two","Three"])
.ignore("Two")
.subscribe { print($0) }
next(One)
next(Three)
completed
ignoreWhen
Ignore elements according to closure.
Observable<Int>
.of(1,2,3,4,5,6)
.ignoreWhen { $0 > 2 && $0 < 6 }
.subscribe { print($0) }
next(1)
next(2)
next(6)
completed
once
Send a next element exactly once to the first subscriber that takes it. Further subscribers get an empty sequence.
let obs = Observable.once("Hello world")
print("First")
obs.subscribe { print($0) }
print("Second")
obs.subscribe { print($0) }
First
next(Hello world)
completed
Second
completed
distinct
Pass elements through only if they were never seen before in the sequence.
Observable.of("a","b","a","c","b","a","d")
.distinct()
.subscribe { print($0) }
next(a)
next(b)
next(c)
next(d)
completed
mapTo
Replace every element with the provided value.
Observable.of(1,2,3)
.mapTo("Nope.")
.subscribe { print($0) }
next(Nope.)
next(Nope.)
next(Nope.)
completed
mapAt
Transform every element to the value at the provided key path.
struct Person {
let name: String
}
Observable
.of(
Person(name: "Bart"),
Person(name: "Lisa"),
Person(name: "Maggie")
)
.mapAt(\.name)
.subscribe { print($0) }
next(Bart)
next(Lisa)
next(Maggie)
completed
not
Negate booleans.
Observable.just(false)
.not()
.subscribe { print($0) }
next(true)
completed
and
Verifies that every value emitted is true
Observable.of(true, true)
.and()
.subscribe { print($0) }
Observable.of(true, false)
.and()
.subscribe { print($0) }
Observable<Bool>.empty()
.and()
.subscribe { print($0) }
Returns a Maybe<Bool>:
success(true)
success(false)
completed
cascade
Sequentially cascade through a list of observables, dropping previous subscriptions as soon as an observable further down the list starts emitting elements.
let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()
Observable.cascade([a,b,c])
.subscribe { print($0) }
a.onNext("a:1")
a.onNext("a:2")
b.onNext("b:1")
a.onNext("a:3")
c.onNext("c:1")
a.onNext("a:4")
b.onNext("b:4")
c.onNext("c:2")
next(a:1)
next(a:2)
next(b:1)
next(c:1)
next(c:2)
pairwise
Groups elements emitted by an Observable into arrays, where each array consists of the last 2 consecutive items; similar to a sliding window.
Observable.from([1, 2, 3, 4, 5, 6])
.pairwise()
.subscribe { print($0) }
next((1, 2))
next((2, 3))
next((3, 4))
next((4, 5))
next((5, 6))
completed
nwise
Groups elements emitted by an Observable into arrays, where each array consists of the last N consecutive items; similar to a sliding window.
Observable.from([1, 2, 3, 4, 5, 6])
.nwise(3)
.subscribe { print($0) }
next([1, 2, 3])
next([2, 3, 4])
next([3, 4, 5])
next([4, 5, 6])
completed
retry
Repeats the source observable sequence using given behavior in case of an error or until it successfully terminated.
There are four behaviors with various predicate and delay options: immediate, delayed, exponentialDelayed and
customTimerDelayed.
// in case of an error initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = sampleObservable.retry(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.0), scheduler: delayScheduler)
.subscribe(onNext: { event in
print("Receive event: \(event)")
}, onError: { error in
print("Receive error: \(error)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive error: fatalError
repeatWithBehavior
Repeats the source observable sequence using given behavior when it completes. This operator takes the same parameters as the retry operator.
There are four behaviors with various predicate and delay options: immediate, delayed, exponentialDelayed and customTimerDelayed.
// when the sequence completes initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = completingObservable.repeatWithBehavior(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.2), scheduler: delayScheduler)
.subscribe(onNext: { event in
print("Receive event: \(event)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
catchErrorJustComplete
Completes a sequence when an error occurs, dismissing the error condition
let _ = sampleObservable
.do(onError: { print("Source observable emitted error \($0), ignoring it") })
.catchErrorJustComplete()
.subscribe {
print ("\($0)")
}
next(First)
next(Second)
Source observable emitted error fatalError, ignoring it
completed
pausable
Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true.
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let trueAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in true }
let falseAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in false }
let pauser = Observable.of(trueAtThreeSeconds, falseAtFiveSeconds).merge()
let pausedObservable = observable.pausable(pauser)
let _ = pausedObservable
.subscribe { print($0) }
next(2)
next(3)
More examples are available in the project's Playground.
pausableBuffered
Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true. Elements emitted by the source observable are buffered (with a configurable limit) and "flushed" (re-emitted) when the observable resumes.
Examples are available in the project's Playground.
apply
Apply provides a unified mechanism for applying transformations on Observable sequences, without having to extend ObservableType or repeating your transformations. For additional rationale for this see discussion on github
// An ordinary function that applies some operators to its argument, and returns the resulting Observable
func requestPolicy(_ request: Observable<Void>) -> Observable<Response> {
return request.retry(maxAttempt
