Xây Dựng Chương Trình Gợi Ý Phim Dựa Vào Tập Dữ Liệu Movie Len

Lời mở đầu

MovieLens là một tập dữ liệu được sử dụng rộng rãi cách đây nhiều năm. Hôm nay, mình sẽ sử dụng tập dữ liệu này và mô hình ALS của spark để xây dựng chương trình dự đoán phim cho người dùng.

Chuẩn bị dữ liệu

Các bạn có thể download tập dữ liệu MovieLens ở link https://grouplens.org/datasets/movielens/. Các bạn có thể download trực tiếp 2 file nén ở link http://files.grouplens.org/datasets/movielens/ml-latest-small.zip và link http://files.grouplens.org/datasets/movielens/ml-latest.zip.

Ở trên bao gồm 2 tập dữ liệu. chúng ta tạo thư mục datasets và download rồi bỏ chúng vào trong thư mục đấy.

 1complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
 2small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
 3
 4import os
 5
 6datasets_path = 'datasets'
 7if not os.path.exists(datasets_path):
 8    os.makedirs(datasets_path))
 9
10complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
11small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')
12
13import urllib
14import zipfile
15
16if not os.path.exists(small_dataset_url):
17	small_f = urllib.urlretrieve (small_dataset_url, small_dataset_path)#Download
18	with zipfile.ZipFile(small_dataset_path, "r") as z:#Giải nén
19		z.extractall(datasets_path)
20if not os.path.exists(small_dataset_url):
21	complete_f = urllib.urlretrieve (complete_dataset_url, complete_dataset_path)#Download
22	with zipfile.ZipFile(complete_dataset_path, "r") as z:#Giải nén
23		z.extractall(datasets_path)

Trong thư mục giải nén, chúng ta sẽ có các file ratings.csv, movies.csv, tags.csv, links.csv, README.txt.

Loading và parsing dữ liệu.

Mỗi dòng trong tập ratings.csv có định dạng "userId,movieId,rating,timestamp".

Mỗi dòng trong tập movies.csv có định dạng "movieId,title,genres".

Mỗi dòng trong tập tags.csv có định dạng "userId,movieId,tag,timestamp".

Mỗi dòng trong tập links.csv có định dạng "movieId,imdbId,tmdbId".

Tóm lại, các trường dữ liệu trong các file csv đều ngăn cách nhau bởi dấu phẩy (,). Trong python, ta có thể dùng hàm split để cắt chúng ra. Sau đó sẽ load toàn bộ dữ liệu lên RDDs.

Lưu ý nhỏ:

  • Ở tập dữ liệu ratings, chúng ta chỉ giữ lại các trường (UserID, MovieID, Rating) bỏ đi trường timestamp vì không cần thiết.
  • Ở tập dữ liệu movies chúng ta giữ lại trường (MovieID, Title) và bỏ đi trường genres vì lý do tương tự.
 1small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
 2small_ratings_raw_data = sc.textFile(small_ratings_file)
 3small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
 4small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
 5print(small_ratings_data.take(3)) #Hiện thị top 3 ratting đầu tiên
 6
 7small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
 8
 9small_movies_raw_data = sc.textFile(small_movies_file)
10small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
11
12small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
13    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
14
15small_movies_data.take(3) #Hiện thị top 3 movie đầu tiên

Phần tiếp theo, chúng ta sẽ tìm hiểu lọc cộng tác (Collaborative Filtering) và cách sử dụng Spark MLlib để xây dựng mô hình dự báo.

Collaborative Filtering

Ở đây, tôi sẽ không đề cập đến lọc cộng tác là gì, các bạn có nhu cầu tìm hiểu có thể xem ở bài post khác hoặc tham khảo trên wiki. Chúng ta sẽ tập trung vào tìm hiểu cách sử dụng ALS trong thư viện MLlib của Spark. Các tham số của thuật toán này bao gồm:

  • numBlocks: số lượng block được sử dụng trong tính toán song song (-1 với ý nghĩa là auto configure).

  • rank: số lượng nhân tố ẩn (latent factor) trong mô hình.

  • iterations: số lần lặp.

  • lambda: tham số của chuẩn hoá(regularization ) trong ALS.

  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.

  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

Chọn các tham số cho ALS

Để chọn được các tham số tốt nhất cho mô hình ALS, chúng ta sẽ sử dụng tập small để grid search. Đầu tiên, chúng ta chia tập dữ liệu thành 3 phần là tập train, tập vali và tập test. Sau đó tiến hành huấn luyện trên tập train và predict trên tập valid để tìm được tham số tốt nhất. Cuối cùng đánh giá kết quả đạt được trên tập test.

 1training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
 2validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
 3test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
 4
 5from pyspark.mllib.recommendation import ALS
 6import math
 7
 8seed = 5L
 9iterations = 10
10regularization_parameter = 0.1
11ranks = [4, 8, 12]
12errors = [0, 0, 0]
13err = 0
14tolerance = 0.02
15
16min_error = float('inf')
17best_rank = -1
18best_iteration = -1
19for rank in ranks:
20    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
21                      lambda_=regularization_parameter)
22    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
23    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
24    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
25    errors[err] = error
26    err += 1
27    print('For rank %s the RMSE is %s' % (rank, error))
28    if error < min_error:
29        min_error = error
30        best_rank = rank
31
32print('The best model was trained with rank %s' % best_rank)

Kết quả sau khi thực hiện đoạn code trên là:

1For rank 4 the RMSE is 0.963681878574
2For rank 8 the RMSE is 0.96250475933
3For rank 12 the RMSE is 0.971647563632
4The best model was trained with rank 8

Tiến hành thực hiện test.

1model_test = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
2                      lambda_=regularization_parameter)
3predictions = model_test.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
4rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
5error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
6
7print('For testing data the RMSE is %s' % (error))
1For testing data the RMSE is 0.972342381898

Xem kỹ hơn một chút về dữ liệu mà spark trả về cho chúng ta. Với predictions và rates_and_preds, ta có:

1print(predictions.take(3))
1[((32, 4018), 3.280114696166238),
2 ((375, 4018), 2.7365714977314086),
3 ((674, 4018), 2.510684514310653)]

Tập dữ liệu trả về bao gồm cặp (UserID, MovieID)Rating (tương ứng với colum 0, column 1 và column 2 ở trên),được hiểu ở đây là với người dùng UserID và phim MovieID thì mô hình sẽ dự đoán người dùng sẽ rating kết quả Rating.

Sau đó chúng ta sẽ nối(join) chúng với tập valid tương ứng theo cặp (UserID, MovieID), kết quả đạt được là:

1rates_and_preds.take(3)
1[((558, 788), (3.0, 3.0419325487471403)),
2 ((176, 3550), (4.5, 3.3214065001580986)),
3 ((302, 3908), (1.0, 2.4728711204440765))]

Việc còn lại là chúng ta sẽ tính trung bình độ lỗi bằng hàm mean()sqlt().

Xây dựng mô hình với tập dữ liệu large

Tiếp theo, chúng ta sẽ sử dụng tập dự liệu bự hơn để xây dựng mô hình. Cách thực hiện y chang như tập dữ liệu nhỏ đã được trình bày ở trên, nên tôi sẽ bỏ qua một số giải thích không cần thiết để tránh lặp lại.

 1# Load the complete dataset file
 2complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
 3complete_ratings_raw_data = sc.textFile(complete_ratings_file)
 4complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
 5
 6# Parse
 7complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
 8    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
 9
10print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))
1There are 21063128 recommendations in the complete dataset

Tiến hành train và test.

 1training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)
 2
 3complete_model = ALS.train(training_RDD, best_rank, seed=seed,iterations=iterations, lambda_=regularization_parameter)
 4
 5test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
 6
 7predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
 8rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
 9error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
10
11print('For testing data the RMSE is %s' % (error))
1For testing data the RMSE is 0.82183583368

Xây dựng mô hình dự đoán phim

 1complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
 2complete_movies_raw_data = sc.textFile(complete_movies_file)
 3complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]
 4
 5# Parse
 6complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
 7    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()
 8
 9complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
10
11print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))
1There are 27303 movies in the complete dataset
1def get_counts_and_averages(ID_and_ratings_tuple):
2    nratings = len(ID_and_ratings_tuple[1])
3    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)
4
5movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
6movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
7movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

Giả sử chúng ta có 1 người dùng mới, với các ratting như sau:

 1new_user_ID = 0
 2
 3# The format of each line is (userID, movieID, rating)
 4new_user_ratings = [
 5     (0,260,4), # Star Wars (1977)
 6     (0,1,3), # Toy Story (1995)
 7     (0,16,3), # Casino (1995)
 8     (0,25,4), # Leaving Las Vegas (1995)
 9     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
10     (0,335,1), # Flintstones, The (1994)
11     (0,379,1), # Timecop (1994)
12     (0,296,3), # Pulp Fiction (1994)
13     (0,858,5) , # Godfather, The (1972)
14     (0,50,4) # Usual Suspects, The (1995)
15    ]
16new_user_ratings_RDD = sc.parallelize(new_user_ratings)
17print('New user ratings: %s' % new_user_ratings_RDD.take(10))
1New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]

Chúng ta tiến hành huấn luyện lại mô hình khi có thêm người mới:

 1complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)
 2
 3from time import time
 4
 5t0 = time()
 6new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
 7                              iterations=iterations, lambda_=regularization_parameter)
 8tt = time() - t0
 9
10print("New model trained in %s seconds" % round(tt,3))
1New model trained in 56.61 seconds

Tiến hành dự đoán ratting của người dùng mới cho toàn bộ các phim người dùng đó chưa xem.

1new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
2# keep just those not on the ID list (thanks Lei Li for spotting the error!)
3new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
4
5# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
6new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

Và show ra top 3 kết quả :

1# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
2new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
3new_user_recommendations_rating_title_and_count_RDD = \
4    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
5new_user_recommendations_rating_title_and_count_RDD.take(3)

Hiển thị top recommend (Ở đây sẽ flat dữ liệu hiển thị thành dàng ((Title, Rating, Ratings Count)) ra cho dễ nhìn).

1new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
2
3top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])
4
5print ('TOP recommended movies (with more than 25 reviews):\n%s' %
6        '\n'.join(map(str, top_movies)))
 1TOP recommended movies (with more than 25 reviews):
 2    (u'"Godfather: Part II', 8.503749129186701, 29198)
 3    (u'"Civil War', 8.386497469089297, 257)
 4    (u'Frozen Planet (2011)', 8.372705479107108, 31)
 5    (u'"Shawshank Redemption', 8.258510064442426, 67741)
 6    (u'Cosmos (1980)', 8.252254825768972, 948)
 7    (u'Band of Brothers (2001)', 8.225114960311624, 4450)
 8    (u'Generation Kill (2008)', 8.206487040524653, 52)
 9    (u"Schindler's List (1993)", 8.172761674773625, 53609)
10    (u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.166229786764168, 23915)
11    (u"One Flew Over the Cuckoo's Nest (1975)", 8.15617022970577, 32948)
12    (u'Casablanca (1942)', 8.141303207981174, 26114)
13    (u'Seven Samurai (Shichinin no samurai) (1954)', 8.139633165142612, 11796)
14    (u'Goodfellas (1990)', 8.12931139039048, 27123)
15    (u'Star Wars: Episode V - The Empire Strikes Back (1980)', 8.124225700242096, 47710)
16    (u'Jazz (2001)', 8.078538221315313, 25)
17    (u"Long Night's Journey Into Day (2000)", 8.050176820606127, 34)
18    (u'Lawrence of Arabia (1962)', 8.041331489948814, 13452)
19    (u'Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)', 8.0399424815528, 45908)
20    (u'12 Angry Men (1957)', 8.011389274280754, 13235)
21    (u"It's Such a Beautiful Day (2012)", 8.007734839026181, 35)
22    (u'Apocalypse Now (1979)', 8.005094327199552, 23905)
23    (u'Paths of Glory (1957)', 7.999379786394267, 3598)
24    (u'Rear Window (1954)', 7.9860865203540214, 17996)
25    (u'State of Play (2003)', 7.981582126801772, 27)
26    (u'Chinatown (1974)', 7.978673289692703, 16195)

Dự đoán rating của 1 cá nhân

Một trường hợp khác là chúng ta cần dự đoán giá trị ratting của 1 người dùng với 1 bộ phim cụ thể nào đó.

1my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
2individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
3individual_movie_rating_RDD.take(1)
1[Rating(user=0, product=122880, rating=4.955831875971526)]

Lưu trữ mô hình

Sau khi có được mô hình. Chúng ta cần phải lưu trữ chúng lại để sau này dùng.

1from pyspark.mllib.recommendation import MatrixFactorizationModel
2
3model_path = os.path.join('models', 'movie_lens_als')
4
5# Save and load model
6model.save(sc, model_path)
7same_model = MatrixFactorizationModel.load(sc, model_path)

Comments