In the next tutorial you will learn how to migrate data from MySQL to MongoDB. We will show you how to do it using Spark step by step. From creating a configuration for the player RDD to the installation guide for prerequisites components.
Easy and intuitive!

PREREQUISITES*:

  • MongoDB (versión 2.6 recommended)
  • MySQL 5
  • Java 7+
  • Maven 3+
  • Spark 1.2
  • Deep-Spark


TUTORIAL TO LOAD A DATASET TO MySQL:

Create schema:

create schema football

Create tables:

create table if not exists football.team(
    id bigint not null auto_increment,
    name varchar(255) not null,
    short_name varchar(255) not null,
    arena_name varchar(255) not null,
    coach_name varchar(255) not null,
    city_name varchar(255) not null,
    league_name varchar(255) not null,
    primary key (id)
);
create table if not exists football.player(
    id bigint not null auto_increment,
    firstname varchar(255) not null,
    lastname varchar(255) not null,
    date_of_birth date,
    place_of_birth_name varchar(255) not null,
    position_name varchar(255) not null,
    team_id bigint not null,
    primary key (id),
    foreign key (team_id) references football.team(id)
);

Populate tables:

insert into football.team (name, short_name, arena_name, coach_name, city_name, league_name) 
values
('FC Bayern München', 'FCB', 'Allianz Arena', 'Josep Guardiola', 'München', 'Bundesliga'),
('Hamburger SV', 'HSV', 'Imtech Arena', 'Josef Zinnbauer', 'Hamburg', 'Bundesliga'),
('Herta BSC Berlin', 'Herta', 'Olympiastaion Berlin', 'Jos Luhukay', 'Berlin', 'Bundesliga'),
('FC Basel 1893', 'FCB', 'St. Jakob-Park', 'Paulo Sousa', 'Basel', 'Raiffeisen Super League'),
('FC Paris Saint-Germain', 'PSG', 'Parc des Princes', 'Laurent Blanc', 'Paris', 'Ligue 1'),
('HJK Helsinki', 'HJK', 'Sonera Stadium', 'Mika Lehkosuo', 'Helsinki', 'Veikkausliiga');
insert into football.player(firstname, lastname, date_of_birth, place_of_birth_name, position_name, team_id) 
values
('Manuel', 'Neuer', '1986-3-27', 'Gelsenkirchen', 'Goalkeeper', 1),
('Julian', 'Schieber', '1989-2-13', 'Backnang', 'Centre Forward', 3),
('Dennis ', 'Diekmeier', '1989-10-20', 'Thedinghausen', 'Right Wing', 2),
('Zlatan', 'Ibrahimovic', '1981-10-03', 'Malmö', 'Centre Forward', 5),
('Xabier', 'Alonso', '1981-11-25', 'Tolosa', 'Midfielder', 1);

Running Spark-shel:

 ./spark-shell --jars YOUR_PATH/deep-core-0.7.0.jar, 
                      YOUR_PATH/deep-commons-0.7.0.jar,YOUR_PATH/deep-jdbc-0.7.0.jar, 
                      YOUR_PATH/deep-mongodb-0.7.0.jar,YOUR_PATH/mongo-java-driver-2.12.4.jar,YOUR_PATH/mysql-connector-java-5.1.34.jar

USING SPARK STEP BY STEP:

Necessary imports:

import com.stratio.deep.commons.entity.{Cell, Cells}
import com.stratio.deep.core.context.DeepSparkContext
import com.stratio.deep.jdbc.config.{JdbcConfigFactory, JdbcDeepJobConfig}
import com.stratio.deep.mongodb.config.{MongoConfigFactory, MongoDeepJobConfig}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters._

private val CLUSTER: String = "local"
private val APP_NAME: String = "deepMySQLToMongodbMigration"
private val MYSQL_DRIVER: String = "com.mysql.jdbc.Driver"
private val MYSQL_HOST: String = "localhost"
private val MYSQL_PORT: Integer = 3306
private val MYSQL_USER: String = "root"
private val MYSQL_PASS: String = "root"
private val MYSQL_DBNAME: String = "football"
private val MONGODB_HOST: String = "localhost:27017"
private val MONGODB_DBNAME: String = "football"
private val MONGODB_COLLECTION: String = "teams"eating a configuration for the team RDD and initialize it
val mySQLTeamConfig: JdbcDeepJobConfig[Cells] = JdbcConfigFactory        
    .createJdbc
    .host(MYSQL_HOST).port(MYSQL_PORT)
    .username(MYSQL_USER).password(MYSQL_PASS)
    .database(MYSQL_DBNAME).table("team")
    .driverClass(MYSQL_DRIVER)
    .initialize

Creating a configuration for the player RDD and initialize it:

val mySQLPlayerConfig: JdbcDeepJobConfig[Cells] = JdbcConfigFactory      
    .createJdbc
    .host(MYSQL_HOST).port(MYSQL_PORT)
    .username(MYSQL_USER).password(MYSQL_PASS)
    .database(MYSQL_DBNAME).table("player")
    .driverClass(MYSQL_DRIVER)
    .initialize

Creating the RDDs that represent the data set in MySQL:

val teamRDD: RDD[Cells] = deepContext.createRDD(mySQLTeamConfig)
val playerRDD: RDD[Cells] = deepContext.createRDD(mySQLPlayerConfig)

Map teams to pair with (team id, team):

val teamPairRDD: RDD[(Long, Cells)] = teamRDD.map(team => (team.getLong("id").longValue(), team))

Map players to pair with (team id, player) and group by team_id

val playerPairRDD: RDD[(Long, Iterable[Cells])] = playerRDD.map(cells =>(cells.getLong("team_id").longValue(), cells)).groupByKey
val joinedPairRDD: RDD[(Long, (Cells, Option[Iterable[Cells]]))] = teamPairRDD.leftOuterJoin(playerPairRDD)

Creating a configuration for the mongodb result RDD and initialize it:

val outputMongodbConfig: MongoDeepJobConfig[Cells] = MongoConfigFactory.createMongoDB
.host(MONGODB_HOST)
.database(MONGODB_DBNAME).collection(MONGODB_COLLECTION)
.initialize

Transforming the joined result to the desirable structure in mongodb:

// Ej: {_id: <team_id>, name: <team_name>, players: [<player_name_1>, <player_name_2>]}
 val outputRDD: RDD[Cells] = joinedPairRDD.map(joined => {
    val cells: Cells = new Cells
    cells.add(Cell.create("_id", joined._1))
    cells.add(Cell.create("name", joined._2._1.getString("name")))
    cells.add(Cell.create("players", joined._2._2 match {
        case Some(players) => players.map(player => player.getString("lastname") + ", " + player.getString("firstname")).asJava
        case _ => null
    }))
    cells
})

Save the RDD in MongoDB

DeepSparkContext.saveRDD(outputRDD, outputMongodbConfig)


USING SPARK WITH OUR EXAMPLE PROJECT:

https://github.com/robertomorandeira/deep-example 
Just make a git clone a run our java or scala example

git clone https://github.com/robertomorandeira/deep-example.git

Java example:
https://github.com/robertomorandeira/deep-example/blob/master/src/main/java/FootballMigrationApp.java 
Scala example:
https://github.com/robertomorandeira/deep-example/blob/master/src/main/scala/FootballMigrationAppScala.scala 


CHECKING DATA.

Connect to mongodb, normally

$ mongo --host 127.0.0.1 --port 27017 football
db.output.find(“teams”).pretty()

You can see the data loaded in mongoDB


INSTALLATION GUIDE FOR PREREQUISITES COMPONENTS:

MongoDB

sudo apt-get update
sudo apt-get install mongodb-org

MySQL Server

sudo apt-get install mysql-server

Java 7

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer

Maven

sudo apt-get update
sudo apt-get install maven

Author

admin
Author