milvus-io_bootcamp

Форк
0
/
milvus-demo2-bulkinsert.scala 
135 строк · 5.2 Кб
1
import io.milvus.client.{MilvusClient, MilvusServiceClient}
2
import io.milvus.grpc.{DataType, ImportResponse}
3
import io.milvus.param.bulkinsert.{BulkInsertParam, GetBulkInsertStateParam}
4
import io.milvus.param.collection.{CreateCollectionParam, FieldType}
5
import io.milvus.param.{ConnectParam, R, RpcStatus}
6
import org.apache.spark.SparkConf
7
import org.apache.spark.sql.types._
8
import org.apache.spark.sql.{SaveMode, SparkSession}
9
import org.slf4j.LoggerFactory
10
import zilliztech.spark.milvus.MilvusOptions.{MILVUS_COLLECTION_NAME, MILVUS_HOST, MILVUS_PORT, MILVUS_TOKEN, MILVUS_URI}
11
import org.apache.hadoop.fs.{FileSystem, Path}
12
import java.net.URI
13
import org.apache.log4j.Logger
14

15
import scala.collection.JavaConverters._
16

17
import java.util
18

19
var logger = Logger.getLogger(this.getClass())
20

21
val sparkConf = new SparkConf().setMaster("local")
22
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
23
// Fill in user's Milvus instance credentials.
24
val host = "127.0.0.1"
25
val port = 19530
26
val username = "root"
27
val password = "Milvus"
28
// Specify the target Milvus collection name.
29
val collectionName = "spark_milvus_test"
30
// This file simulates a dataframe from user's vector generation job or a Delta table that contains vectors.
31
val filePath = "/Volumes/zilliz_test/default/sample_vectors/dim32_1k.json"
32
// The S3 bucket is an internal bucket of the Milvus instance, which the user has full control of.
33
// The user needs to set up this bucket as "storage crenditial" by following
34
// the instruction at https://docs.databricks.com/en/connect/unity-catalog/storage-credentials.html#step-2-give-databricks-the-iam-role-details
35
// Here the user can specify the directory in the bucket to store vector data.
36
// The vectors will be output to the s3 bucket in specific format that can be loaded to Zilliz Cloud efficiently.
37
val outputPath = "s3://your-s3-bucket-name/filesaa/spark_output"
38

39
// 1. Create Milvus collection through Milvus SDK
40
val connectParam: ConnectParam = ConnectParam.newBuilder
41
  .withHost(host)
42
  .withPort(port)
43
  .withAuthorization(username, password)
44
  .build
45

46
val client: MilvusClient = new MilvusServiceClient(connectParam)
47

48
val field1Name: String = "id_field"
49
val field2Name: String = "str_field"
50
val field3Name: String = "float_vector_field"
51
val fieldsSchema: util.List[FieldType] = new util.ArrayList[FieldType]
52

53
fieldsSchema.add(FieldType.newBuilder
54
  .withPrimaryKey(true)
55
  .withAutoID(false)
56
  .withDataType(DataType.Int64)
57
  .withName(field1Name)
58
  .build
59
)
60
fieldsSchema.add(FieldType.newBuilder
61
  .withDataType(DataType.VarChar)
62
  .withName(field2Name)
63
  .withMaxLength(65535)
64
  .build
65
)
66
fieldsSchema.add(FieldType.newBuilder
67
  .withDataType(DataType.FloatVector)
68
  .withName(field3Name)
69
  .withDimension(32)
70
  .build
71
)
72

73
// create collection
74
val createParam: CreateCollectionParam = CreateCollectionParam.newBuilder
75
  .withCollectionName(collectionName)
76
  .withFieldTypes(fieldsSchema)
77
  .build
78

79
val createR: R[RpcStatus] = client.createCollection(createParam)
80

81
logger.info(s"create collection ${collectionName} resp: ${createR.toString}")
82

83
// 2. Read data from file to build vector dataframe. The schema of the dataframe must logically match the schema of vector db.
84
val df = spark.read
85
  .schema(new StructType()
86
    .add(field1Name, IntegerType)
87
    .add(field2Name, StringType)
88
    .add(field3Name, ArrayType(FloatType), false))
89
  .json(filePath)
90

91
// 3. Store all vector data in the s3 bucket to prepare for loading. 
92
df.repartition(1)
93
  .write
94
  .format("mjson")
95
  .mode("overwrite")
96
  .save(outputPath)
97

98
// 4. As the vector data has been stored in the s3 bucket as files, here we list the directory and get the file paths
99
// to prepare input of Zilliz Cloud Import Data API call.
100
val hadoopConfig = spark.sparkContext.hadoopConfiguration
101
val directory = new Path(outputPath)
102
val fs = FileSystem.get(directory.toUri, hadoopConfig)
103
val files = fs.listStatus(directory)
104
val ouputPath = files.filter(file => {
105
    file.getPath.getName.endsWith(".json")
106
})(0)
107
def extractPathWithoutBucket(s3Path: String): String = {
108
  val uri = new URI(s3Path)
109
  val pathWithoutBucket = uri.getPath.drop(1)  // Drop the leading '/'
110
  pathWithoutBucket
111
}
112
val ouputFilePathWithoutBucket = extractPathWithoutBucket(ouputPath.getPath.toString)
113

114
// 5. Make a call to Milvus bulkinsert API.
115
val bulkInsertFiles:List[String] = List(ouputFilePathWithoutBucket)
116
val bulkInsertParam: BulkInsertParam = BulkInsertParam.newBuilder
117
    .withCollectionName(collectionName)
118
    .withFiles(bulkInsertFiles.asJava)
119
    .build
120

121
val bulkInsertR: R[ImportResponse] = client.bulkInsert(bulkInsertParam)
122
logger.info(s"bulkinsert ${collectionName} resp: ${bulkInsertR.toString}")
123
val taskId: Long = bulkInsertR.getData.getTasksList.get(0)
124

125
var bulkloadState = client.getBulkInsertState(GetBulkInsertStateParam.newBuilder.withTask(taskId).build)
126
while (bulkloadState.getData.getState.getNumber != 1 &&
127
    bulkloadState.getData.getState.getNumber != 6 &&
128
    bulkloadState.getData.getState.getNumber != 7 ) {
129
    bulkloadState = client.getBulkInsertState(GetBulkInsertStateParam.newBuilder.withTask(taskId).build)
130
    logger.info(s"bulkinsert ${collectionName} resp: ${bulkInsertR.toString} state: ${bulkloadState}")
131
    Thread.sleep(3000)
132
}
133
if (bulkloadState.getData.getState.getNumber != 6) {
134
    logger.error(s"bulkinsert failed ${collectionName} state: ${bulkloadState}")
135
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.