Я относительно новичок в лазурном, и я застрял! Я пытаюсь прочитать данные из своего хранилища BLOB-объектов в базу данных SQL с помощью Azure DataFactory. Я получил этот процесс, чтобы он работал нормально, используя активность копирования, теперь я пытаюсь вставить данные в несколько таблиц, которые каким-то образом связаны друг с другом (privateKey, foreignKey). Например, для обновления таблицы CAR мне нужно знать, существует ли владелец в таблице Owner. И я не могу найти подробного объяснения, как это сделать! Кто-нибудь, у кого есть опыт, может дать мне совет? Благодарность
Скопируйте данные из хранилища BLOB-объектов в sqlDatabase (в несколько таблиц)
Ответы (4)
Я бы подошел к решению этой проблемы иначе. Используя приведенный ниже код, мы можем объединить данные из нескольких файлов с одинаковыми именами во фрейм данных и отправить все это в SQL Server. Это Scala, поэтому его нужно запускать в среде Azure Databricks.
# merge files with similar names into a single dataframe
val DF = spark.read.format("csv")
.option("sep","|")
.option("inferSchema","true")
.option("header","false")
.load("mnt/rawdata/corp/ABC*.gz")
DF.count()
# rename headers in dataframe
val newNames = Seq("ID", "FName", "LName", "Address", "ZipCode", "file_name")
val dfRenamed = df.toDF(newNames: _*)
dfRenamed.printSchema
# push the dataframe to sql server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
// Aquire a DataFrame collection (val collection)
val config = Config(Map(
"url" -> "my_sql_server.database.windows.net",
"databaseName" -> "my_db_name",
"dbTable" -> "dbo.my_table",
"user" -> "xxxxx",
"password" -> "xxxxx",
"connectTimeout" -> "5", //seconds
"queryTimeout" -> "5" //seconds
))
import org.apache.spark.sql.SaveMode
DF.write.mode(SaveMode.Append).sqlDB(config)
Приведенный выше код будет читать каждую строку каждого файла. Если заголовки находятся в первой строке, это отлично работает. Если заголовки, а НЕ в первой строке, используйте приведенный ниже код для создания конкретной схемы и снова прочитайте каждую строку каждого файла.
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.functions.input_file_name
val customSchema = StructType(Array(
StructField("field1", StringType, true),
StructField("field2", StringType, true),
StructField("field3", StringType, true),
StructField("field4", StringType, true),
StructField("field5", StringType, true),
StructField("field6", StringType, true),
StructField("field7", StringType, true)))
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("sep", "|")
.schema(customSchema)
.load("mnt/rawdata/corp/ABC*.gz")
.withColumn("file_name", input_file_name())
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.write.mode(SaveMode.Append).
//df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.
Вам нужно будет выполнить соединение или поиск с помощью сопоставления потоков данных, а затем перенаправить строки в соответствующие таблицы базы данных с помощью преобразования «Условное разбиение».
Насколько я понимаю, вам нужно использовать действие поиска для получения имен таблиц, а затем использовать действие forEach для перемещения по таблицам, а затем использовать поток данных Mapping или Databricks для применения фильтров и объединений.
Я мог бы добавить к тому, что крикнул Шивар. Нам нужно будет использовать действие поиска, но вы должны убедиться, что таблица «Владелец» скопирована, прежде чем вы начнете работать с таблицей CAR. Для этого вы должны включить опцию последовательного поиска в действии foreach, чтобы она следовала порядку, в противном случае вы получите много ошибок нарушения FK.