Скопируйте данные из хранилища BLOB-объектов в sqlDatabase (в несколько таблиц)

Я относительно новичок в лазурном, и я застрял! Я пытаюсь прочитать данные из своего хранилища BLOB-объектов в базу данных SQL с помощью Azure DataFactory. Я получил этот процесс, чтобы он работал нормально, используя активность копирования, теперь я пытаюсь вставить данные в несколько таблиц, которые каким-то образом связаны друг с другом (privateKey, foreignKey). Например, для обновления таблицы CAR мне нужно знать, существует ли владелец в таблице Owner. И я не могу найти подробного объяснения, как это сделать! Кто-нибудь, у кого есть опыт, может дать мне совет? Благодарность


person Elviro Pereira junior    schedule 22.10.2019    source источник
comment
Вы рассматривали возможность использования хранимой процедуры?   -  person Leon Yue    schedule 23.10.2019


Ответы (4)


Я бы подошел к решению этой проблемы иначе. Используя приведенный ниже код, мы можем объединить данные из нескольких файлов с одинаковыми именами во фрейм данных и отправить все это в SQL Server. Это Scala, поэтому его нужно запускать в среде Azure Databricks.

# merge files with similar names into a single dataframe
val DF = spark.read.format("csv")
   .option("sep","|")
   .option("inferSchema","true")
   .option("header","false")
   .load("mnt/rawdata/corp/ABC*.gz")


DF.count()


# rename headers in dataframe
val newNames = Seq("ID", "FName", "LName", "Address", "ZipCode", "file_name")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema


# push the dataframe to sql server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"            -> "my_sql_server.database.windows.net",
  "databaseName"   -> "my_db_name",
  "dbTable"        -> "dbo.my_table",
  "user"           -> "xxxxx",
  "password"       -> "xxxxx",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

import org.apache.spark.sql.SaveMode
DF.write.mode(SaveMode.Append).sqlDB(config)

Приведенный выше код будет читать каждую строку каждого файла. Если заголовки находятся в первой строке, это отлично работает. Если заголовки, а НЕ в первой строке, используйте приведенный ниже код для создания конкретной схемы и снова прочитайте каждую строку каждого файла.

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.functions.input_file_name

val customSchema = StructType(Array(
    StructField("field1", StringType, true),
    StructField("field2", StringType, true),
    StructField("field3", StringType, true),
    StructField("field4", StringType, true),
    StructField("field5", StringType, true),
    StructField("field6", StringType, true),
    StructField("field7", StringType, true)))

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("sep", "|")
    .schema(customSchema)
    .load("mnt/rawdata/corp/ABC*.gz")
    .withColumn("file_name", input_file_name())


import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._



val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "databaseName"      -> "MyDatabase",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.write.mode(SaveMode.Append).
//df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.
person ASH    schedule 30.10.2019

Вам нужно будет выполнить соединение или поиск с помощью сопоставления потоков данных, а затем перенаправить строки в соответствующие таблицы базы данных с помощью преобразования «Условное разбиение».

person Mark Kromer MSFT    schedule 23.10.2019

Насколько я понимаю, вам нужно использовать действие поиска для получения имен таблиц, а затем использовать действие forEach для перемещения по таблицам, а затем использовать поток данных Mapping или Databricks для применения фильтров и объединений.

person shivar    schedule 23.10.2019

Я мог бы добавить к тому, что крикнул Шивар. Нам нужно будет использовать действие поиска, но вы должны убедиться, что таблица «Владелец» скопирована, прежде чем вы начнете работать с таблицей CAR. Для этого вы должны включить опцию последовательного поиска в действии foreach, чтобы она следовала порядку, в противном случае вы получите много ошибок нарушения FK.

person HimanshuSinha-msft    schedule 24.10.2019