- Notifications
You must be signed in to change notification settings - Fork 162
/
Copy pathAsyncBufferedByteIterator.swift
143 lines (131 loc) · 4.84 KB
/
AsyncBufferedByteIterator.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
/// An `AsyncIterator` that provides a convenient and high-performance implementation
/// of a common architecture for `AsyncSequence` of `UInt8`, otherwise known as byte streams.
///
/// Bytes are read into an internal buffer of `capacity` bytes via the
/// `readFunction`. Invoking `next()` returns bytes from the internal buffer until it's
/// empty, and then suspends and awaits another invocation of `readFunction` to
/// refill. If `readFunction` returns 0 (indicating nothing was read), `next()` will
/// return `nil` from then on. Cancellation is checked before each invocation of
/// `readFunction`, which means that many calls to `next()` will not check for
/// cancellation.
///
/// A typical use of `AsyncBufferedByteIterator` looks something like this:
///
/// struct AsyncBytes: AsyncSequence {
/// public typealias Element = UInt8
/// var handle: ReadableThing
///
/// internal init(_ readable: ReadableThing) {
/// handle = readable
/// }
///
/// public func makeAsyncIterator() -> AsyncBufferedByteIterator {
/// return AsyncBufferedByteIterator(capacity: 16384) { buffer in
/// // This runs once every 16384 invocations of next()
/// return try await handle.read(into: buffer)
/// }
/// }
/// }
///
///
@available(AsyncAlgorithms 1.0,*)
publicstructAsyncBufferedByteIterator:AsyncIteratorProtocol{
publictypealiasElement=UInt8
@usableFromInlinevarbuffer:_AsyncBytesBuffer
/// Creates an asynchronous buffered byte iterator with a specified capacity and read function.
///
/// - Parameters:
/// - capacity: The maximum number of bytes that a single invocation of `readFunction` may produce.
/// This is the allocated capacity of the backing buffer for iteration; the value must be greater than 0.
/// - readFunction: The function for refilling the buffer.
publicinit(
capacity:Int,
readFunction:@Sendable@escaping(UnsafeMutableRawBufferPointer)asyncthrows->Int
){
buffer =_AsyncBytesBuffer(capacity: capacity, readFunction: readFunction)
}
/// Reads a byte out of the buffer if available. When no bytes are available, this will trigger
/// the read function to reload the buffer and then return the next byte from that buffer.
@inlinable@inline(__always)
publicmutatingfunc next()asyncthrows->UInt8?{
returntryawait buffer.next()
}
}
@available(*, unavailable)
extensionAsyncBufferedByteIterator:Sendable{}
@available(AsyncAlgorithms 1.0,*)
@frozen@usableFromInline
internalstruct_AsyncBytesBuffer{
@usableFromInline
finalclassStorage{
fileprivateletbuffer:UnsafeMutableRawBufferPointer
init(
capacity:Int
){
precondition(capacity >0)
buffer =UnsafeMutableRawBufferPointer.allocate(
byteCount: capacity,
alignment: MemoryLayout<AnyObject>.alignment
)
}
deinit{
buffer.deallocate()
}
}
@usableFromInlineinternalletstorage:Storage
@usableFromInlineinternalvarnextPointer:UnsafeRawPointer
@usableFromInlineinternalvarendPointer:UnsafeRawPointer
internalletreadFunction:@Sendable(UnsafeMutableRawBufferPointer)asyncthrows->Int
internalvarfinished=false
@usableFromInlineinit(
capacity:Int,
readFunction:@Sendable@escaping(UnsafeMutableRawBufferPointer)asyncthrows->Int
){
lets=Storage(capacity: capacity)
self.readFunction = readFunction
storage = s
nextPointer =UnsafeRawPointer(s.buffer.baseAddress!)
endPointer = nextPointer
}
@inline(never)@usableFromInline
internalmutatingfunc reloadBufferAndNext()asyncthrows->UInt8?{
if finished {
returnnil
}
tryTask.checkCancellation()
do{
letreadSize:Int=tryawaitreadFunction(storage.buffer)
if readSize ==0{
finished =true
nextPointer = endPointer
returnnil
}
nextPointer =UnsafeRawPointer(storage.buffer.baseAddress!)
endPointer = nextPointer + readSize
}catch{
finished =true
nextPointer = endPointer
throw error
}
returntryawaitnext()
}
@inlinable@inline(__always)
internalmutatingfunc next()asyncthrows->UInt8?{
if_fastPath(nextPointer != endPointer){
letbyte= nextPointer.load(fromByteOffset:0, as:UInt8.self)
nextPointer = nextPointer +1
return byte
}
returntryawaitreloadBufferAndNext()
}
}