aliyun / aliyun-emapreduce-sdk

Hadoop/Spark on Aliyun, supporting interactions with Aliyun's base services.

Website GitHub

version build

E-MapReduce SDK

Requirements

  • Spark 1.3+

Introduction

  • This SDK supports interaction with Aliyun's base service, e.g. OSS, ODPS, LogService and ONS, in Spark runtime environment.

Build and Install


	    git clone https://github.com/aliyun/aliyun-spark-sdk.git
	    cd  aliyun-spark-sdk
	    mvn clean package -DskipTests

Use SDK in Eclipse project directly

  • copy sdk jar to your project
  • right click Eclipse project -> Properties -> Java Build Path -> Add JARs
  • choose and import the sdk
  • you can use the sdk in your Eclipse project

Maven

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-maxcompute_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-logservice_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-tablestore</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-ons_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-mns_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-core</artifactId>
            <version>1.6.0</version>
        </dependency>

OSS Support

In this section, we will demonstrate how to manipulate the Aliyun OSS data in Spark.

OSS Extension - Native OSS FileSystem

A native way to read and write regular files on Aliyun OSS. The advantage of this way is you can access files on OSS that came from other Aliyun base service or other tools. But file in Aliyun OSS has 48.8TB limit.

OSS URI

  • oss://bucket/object/path

We can set OSS "AccessKeyId/AccessKeySecret/Endpoint" in Hadoop configuration:

    <property>
      <name>fs.oss.accessKeyId</name>
      <description>Aliyun access key ID</description>
    </property>
    
    <property>
      <name>fs.oss.accessKeySecret</name>
      <description>Aliyun access key secret</description>
    </property>
    
    <property>
      <name>fs.oss.endpoint</name>
      <description>Aliyun OSS endpoint to connect to. An up-to-date list is
        provided in the Aliyun OSS Documentation.
       </description>
    </property>

Specifically, we should add a prefix "spark.hadoop" in Spark, like "spark.hadoop.fs.oss.accessKeyId".

OSS usage

Now, we provide a transparent way to support Aliyun OSS, with no code changes and just few configurations. All you need to do is just to provide two configuations in your project:


	conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")


Then, you can load OSS data through SparkContext.textFile(...), like:


	val conf = new SparkConf()
	conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
	val sc = new SparkContext(conf)
	val path = "oss://bucket/input"
	val rdd = sc.textFile(path)

Similarly, you can upload data through RDD.saveAsTextFile(...), like:


	val data = sc.parallelize(1 to 10)
	data.saveAsTextFile("oss://bucket/output")

ODPS Support

In this section, we will demonstrate how to manipulate the Aliyun ODPS data in Spark.

Step-1. Initialize and OdpsOps

Before read/write ODPS data, we need to initialize an OdpsOps, like:


	import com.aliyun.odps.TableSchema
	import com.aliyun.odps.data.Record
	import org.apache.spark.aliyun.odps.OdpsOps
	import org.apache.spark.{SparkContext, SparkConf}
	
	object Sample {
	  def main(args: Array[String]): Unit = {	
	    // == Step-1 ==
	    val accessKeyId = "<accessKeyId>"
	    val accessKeySecret = "<accessKeySecret>"
		// intranet endpoints for example
	    val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
	
	    val conf = new SparkConf().setAppName("Spark Odps Sample")
	    val sc = new SparkContext(conf)
	    val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))

        // == Step-2 ==
		...
		// == Step-3 ==
		...
	  }

	  // == Step-2 ==
	  // function definition
	  // == Step-3 ==
	  // function definition
	}

In above codes, the variables accessKeyId and accessKeySecret are assigned to users by system; they are named as ID pair, and used for user identification and signature authentication for OSS access. See Aliyun AccessKeys for more information.

Step-2. Load ODPS Data into Spark


		// == Step-2 ==
		val project = <odps-project>
		val table = <odps-table>
		val numPartitions = 2
		val inputData = odpsOps.readTable(project, table, read, numPartitions)
		inputData.top(10).foreach(println)

		// == Step-3 ==
		...

In above codes, we need to define a read function to pre-process ODPS data:


		def read(record: Record, schema: TableSchema): String = {
		  record.getString(0)
		}

It means to load ODPS table's first column into Spark.

Step-3. Save results into Aliyun ODPS.


		val resultData = inputData.map(e => s"$e has been processed.")
		odpsOps.saveToTable(project, table, resultData, write)

In above codes, we need to define a write function to preprocess reslult data before write odps table:


		def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
		  val r = emptyReord
		  r.set(0, s)
		}

It means to write each line of result RDD into the first column of ODPS table.

ONS Support

In this section, we will demonstrate how to consume ONS message in Spark.

    // cId: Aliyun ONS ConsumerID
    // topic: Message Topic
    // subExpression: Message Tag
    val Array(cId, topic, subExpression, parallelism, interval) = args

    val accessKeyId = "accessKeyId"
    val accessKeySecret = "accessKeySecret"

    val numStreams = parallelism.toInt
    val batchInterval = Milliseconds(interval.toInt)

    val conf = new SparkConf().setAppName("Spark ONS Sample")
    val ssc = new StreamingContext(conf, batchInterval)

    // define `func` to preprocess each message 
    def func: Message => Array[Byte] = msg => msg.getBody
    val onsStreams = (0 until numStreams).map { i =>
      println(s"starting stream $i")
      OnsUtils.createStream(
        ssc,
        cId,
        topic,
        subExpression,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK_2,
        func)
    }

    val unionStreams = ssc.union(onsStreams)
    unionStreams.foreachRDD(rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })

    ssc.start()
    ssc.awaitTermination()

LogService Support

In this section, we will demonstrate how to consume Loghub data in Spark Streaming.

    if (args.length < 8) {
      System.err.println(
        """Usage: TestLoghub <sls project> <sls logstore> <loghub group name> <sls endpoint> <access key id>
          |         <access key secret> <receiver number> <batch interval seconds>
        """.stripMargin)
      System.exit(1)
    }

    val logserviceProject = args(0)    // The project name in your LogService.
    val logStoreName = args(1)         // The name of of logstream.
    val loghubGroupName = args(2)      // Processes with the same loghubGroupName will consume data of logstream together.
    val loghubEndpoint = args(3)       // API endpoint of LogService 
    val accessKeyId = args(4)          // AccessKeyId
    val accessKeySecret = args(5)      // AccessKeySecret
    val numReceivers = args(6).toInt   
    val batchInterval = Milliseconds(args(7).toInt * 1000) 

    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      loghubProject,
      logStream,
      loghubGroupName,
      endpoint,
      numReceivers,
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)

    loghubStream.foreachRDD(rdd => println(rdd.count()))

    ssc.start()
    ssc.awaitTermination()

TableStore support

License

Licensed under the Apache License 2.0