- Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathbulk_aq_async.py
107 lines (94 loc) · 3.5 KB
/
bulk_aq_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -----------------------------------------------------------------------------
# Copyright (c) 2025, Oracle and/or its affiliates.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#
# This software is dual-licensed to you under the Universal Permissive License
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
# either license.
#
# If you elect to accept the software under the Apache License, Version 2.0,
# the following applies:
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# bulk_aq_async.py
#
# Demonstrates how to use bulk enqueuing and dequeuing of messages with
# advanced queuing using asyncio. It makes use of a RAW queue created in the
# sample setup.
# -----------------------------------------------------------------------------
importasyncio
importoracledb
importsample_env
QUEUE_NAME="DEMO_RAW_QUEUE"
PAYLOAD_DATA= [
"The first message",
"The second message",
"The third message",
"The fourth message",
"The fifth message",
"The sixth message",
"The seventh message",
"The eighth message",
"The ninth message",
"The tenth message",
"The eleventh message",
"The twelfth and final message",
]
asyncdefmain():
# connect to database
asyncwithoracledb.connect_async(
user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(),
) asconnection:
# create a queue
queue=connection.queue(QUEUE_NAME)
queue.deqoptions.wait=oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation=oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so
# that the results are consistent
whileawaitqueue.deqone():
pass
# enqueue a few messages
print("Enqueuing messages...")
batch_size=6
data_to_enqueue=PAYLOAD_DATA
whiledata_to_enqueue:
batch_data=data_to_enqueue[:batch_size]
data_to_enqueue=data_to_enqueue[batch_size:]
messages= [
connection.msgproperties(payload=d) fordinbatch_data
]
fordatainbatch_data:
print(data)
awaitqueue.enqmany(messages)
awaitconnection.commit()
# dequeue the messages
print("\nDequeuing messages...")
batch_size=8
whileTrue:
messages=awaitqueue.deqmany(batch_size)
ifnotmessages:
break
forpropsinmessages:
print(props.payload.decode())
awaitconnection.commit()
print("\nDone.")
asyncio.run(main())