- Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathsql.py
95 lines (77 loc) · 3.87 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Package for SqlTransform and related classes."""
# pytype: skip-file
importtyping
fromapache_beam.transforms.externalimportBeamJarExpansionService
fromapache_beam.transforms.externalimportExternalTransform
fromapache_beam.transforms.externalimportNamedTupleBasedPayloadBuilder
__all__= ['SqlTransform']
SqlTransformSchema=typing.NamedTuple(
'SqlTransformSchema',
[('query', str), ('dialect', typing.Optional[str]),
('ddl', typing.Optional[str])])
classSqlTransform(ExternalTransform):
"""A transform that can translate a SQL query into PTransforms.
Input PCollections must have a schema. Currently, there are two ways to define
a schema for a PCollection:
1) Register a `typing.NamedTuple` type to use RowCoder, and specify it as the
output type. For example::
Purchase = typing.NamedTuple('Purchase',
[('item_name', unicode), ('price', float)])
coders.registry.register_coder(Purchase, coders.RowCoder)
with Pipeline() as p:
purchases = (p | beam.io...
| beam.Map(..).with_output_types(Purchase))
2) Produce `beam.Row` instances. Note this option will fail if Beam is unable
to infer data types for any of the fields. For example::
with Pipeline() as p:
purchases = (p | beam.io...
| beam.Map(lambda x: beam.Row(item_name=unicode(..),
price=float(..))))
Similarly, the output of SqlTransform is a PCollection with a schema.
The columns produced by the query can be accessed as attributes. For example::
purchases | SqlTransform(\"\"\"
SELECT item_name, COUNT(*) AS `count`
FROM PCOLLECTION GROUP BY item_name\"\"\")
| beam.Map(lambda row: "We've sold %d %ss!" % (row.count,
row.item_name))
Additional examples can be found in
`apache_beam.examples.wordcount_xlang_sql`, `apache_beam.examples.sql_taxi`,
and `apache_beam.transforms.sql_test`.
For more details about Beam SQL in general, see the `Java transform
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/sql/SqlTransform.html>`_,
and the `documentation
<https://beam.apache.org/documentation/dsls/sql/overview/>`_.
"""
URN='beam:external:java:sql:v1'
def__init__(self, query, dialect=None, ddl=None, expansion_service=None):
"""
Creates a SqlTransform which will be expanded to Java's SqlTransform.
(See class docs).
:param query: The SQL query.
:param dialect: (optional) The dialect, e.g. use 'zetasql' for ZetaSQL.
:param ddl: (optional) The DDL statement.
:param expansion_service: (optional) The URL of the expansion service to use
"""
expansion_service=expansion_serviceorBeamJarExpansionService(
':sdks:java:extensions:sql:expansion-service:shadowJar')
super().__init__(
self.URN,
NamedTupleBasedPayloadBuilder(
SqlTransformSchema(query=query, dialect=dialect, ddl=ddl)),
expansion_service=expansion_service)