- Notifications
You must be signed in to change notification settings - Fork 162
/
Copy pathAsyncThrottleSequence.swift
140 lines (127 loc) · 4.86 KB
/
AsyncThrottleSequence.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2021 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
@available(AsyncAlgorithms 1.0,*)
extensionAsyncSequence{
/// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval.
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
publicfunc _throttle<C:Clock, Reduced>(
for interval:C.Instant.Duration,
clock:C,
reducing:@Sendable@escaping(Reduced?, Element)async->Reduced
)->_AsyncThrottleSequence<Self,C,Reduced>{
_AsyncThrottleSequence(self, interval: interval, clock: clock, reducing: reducing)
}
/// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval.
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
publicfunc _throttle<Reduced>(
for interval:Duration,
reducing:@Sendable@escaping(Reduced?, Element)async->Reduced
)->_AsyncThrottleSequence<Self,ContinuousClock,Reduced>{
_throttle(for: interval, clock:.continuous, reducing: reducing)
}
/// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval.
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
publicfunc _throttle<C:Clock>(
for interval:C.Instant.Duration,
clock:C,
latest:Bool=true
)->_AsyncThrottleSequence<Self,C,Element>{
_throttle(for: interval, clock: clock){ previous, element in
guard latest else{
return previous ?? element
}
return element
}
}
/// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval.
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
publicfunc _throttle(
for interval:Duration,
latest:Bool=true
)->_AsyncThrottleSequence<Self,ContinuousClock,Element>{
_throttle(for: interval, clock:.continuous, latest: latest)
}
}
/// A rate-limited `AsyncSequence` by emitting values at most every specified interval.
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
publicstruct_AsyncThrottleSequence<Base:AsyncSequence, C:Clock, Reduced>{
letbase:Base
letinterval:C.Instant.Duration
letclock:C
letreducing:@Sendable(Reduced?,Base.Element)async->Reduced
init(
_ base:Base,
interval:C.Instant.Duration,
clock:C,
reducing:@Sendable@escaping(Reduced?,Base.Element)async->Reduced
){
self.base = base
self.interval = interval
self.clock = clock
self.reducing = reducing
}
}
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
extension_AsyncThrottleSequence:AsyncSequence{
publictypealiasElement=Reduced
/// The iterator for an `AsyncThrottleSequence` instance.
publicstructIterator:AsyncIteratorProtocol{
varbase:Base.AsyncIterator
varlast:C.Instant?
letinterval:C.Instant.Duration
letclock:C
letreducing:@Sendable(Reduced?,Base.Element)async->Reduced
init(
_ base:Base.AsyncIterator,
interval:C.Instant.Duration,
clock:C,
reducing:@Sendable@escaping(Reduced?,Base.Element)async->Reduced
){
self.base = base
self.interval = interval
self.clock = clock
self.reducing = reducing
}
publicmutatingfunc next()asyncrethrows->Reduced?{
varreduced:Reduced?
letstart= last ?? clock.now
repeat{
guardlet element =tryawait base.next()else{
if reduced !=nil,let last {
// ensure the rate of elements never exceeds the given interval
letamount= interval - last.duration(to: clock.now)
if amount >.zero {
try?await clock.sleep(until: clock.now.advanced(by: amount), tolerance:nil)
}
}
// the last value is unable to have any subsequent
// values so always return the last reduction
return reduced
}
letreduction=awaitreducing(reduced, element)
letnow= clock.now
if start.duration(to: now)>= interval || last ==nil{
last = now
return reduction
}else{
reduced = reduction
}
}whiletrue
}
}
publicfunc makeAsyncIterator()->Iterator{
Iterator(base.makeAsyncIterator(), interval: interval, clock: clock, reducing: reducing)
}
}
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0,*)
extension_AsyncThrottleSequence:Sendablewhere Base:Sendable, Element:Sendable{}
@available(*, unavailable)
extension_AsyncThrottleSequence.Iterator:Sendable{}