Flink SQL是否支持在SQL上下文内执行CREATE AS SELECT(CTAS)?
Great question! Let's break this down clearly based on Flink's version capabilities and workarounds:
Short Answer
- Flink 1.13+: Yes, native support for both
CREATE TABLE AS SELECT (CTAS)andCREATE VIEW AS SELECTexists, and you can run these directly viaexecuteSql(). - Flink 1.12 and earlier: No native support for these syntaxes, but there are workarounds to achieve similar results.
Details & Workarounds
1. For Flink 1.13+ (Recommended)
Flink added proper support for CTAS and CREATE VIEW AS SELECT in this version, which solves exactly the problem you're facing:
Create a view from a query:
CREATE VIEW rubber_orders AS SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';You can run this directly via
tableEnv.executeSql()and then query the view like any regular table.Create a table and populate it with query results (CTAS):
CREATE TABLE rubber_order_summary WITH ( 'connector' = 'filesystem', 'path' = '/path/to/output', 'format' = 'csv' ) AS SELECT product, SUM(amount) AS total_amount FROM Orders WHERE product LIKE '%Rubber%' GROUP BY product;The best part here is that you don't even need to define the schema explicitly—Flink will automatically infer it from the SELECT query's output.
2. For Older Flink Versions (Pre-1.13)
If you're stuck on an older version, you have two main options:
Option A: Use the Table API to simulate views/tables
As you mentioned, you can use tableEnv.sqlQuery() to get the result of a SELECT statement, then register it as a temporary view or table:
// Run the SELECT query Table result = tableEnv.sqlQuery("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); // Register as a temporary view (usable in subsequent SQL queries) tableEnv.createTemporaryView("rubber_orders_view", result); // Or register as a permanent table (requires defining schema manually) tableEnv.createTable("rubber_orders_table", Schema.newBuilder() .column("product", DataTypes.STRING()) .column("amount", DataTypes.INT()) .build(), TableDescriptor.forConnector("filesystem") .option("path", "/path/to/output") .format(FormatDescriptor.forFormat("csv").build()) .build()); // Then insert the result into the table tableEnv.executeSql("INSERT INTO rubber_orders_table SELECT * FROM rubber_orders_view");
Option B: Simulate CTAS with CREATE TABLE + INSERT INTO
You'll need to manually define the target table's schema first, then insert the query results into it:
-- Step 1: Manually create the target table with matching schema CREATE TABLE rubber_orders ( product STRING, amount INT ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/output', 'format' = 'csv' ); -- Step 2: Insert the SELECT result into the table INSERT INTO rubber_orders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
The downside here is that you have to manually mirror the schema from your SELECT query—no automatic inference.
Key Note
If you couldn't find these syntaxes in the Flink documentation, double-check that you're looking at the documentation for your Flink version. Older docs won't mention CTAS/CREATE VIEW AS SELECT, while newer versions will have full coverage.
内容的提问来源于stack exchange,提问作者Dennis Jaheruddin




