2
\$\begingroup\$

This code queries the GitHub followers API and stores the graph in a mongodb collection:

import java.util.Date import akka.actor._ import akka.routing.RoundRobinPool import com.mongodb.casbah.Imports._ import org.json4s._ import org.json4s.native.JsonMethods._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{Failure, Success} import scalaj.http._ /** * Actors are organized roughly like this: https://www.draw.io/#G0B6v1Y7CECqz2eDJGb2hKbEhiLU0 */ object Fetcher { // message definitions case class Fetch(val login: String) case object WorkAvailable case class SetDBWriter(ref: ActorRef) case class SetInterpreter(ref: ActorRef) } class Fetcher(token: Option[String], responseInterpreter: ActorRef, dBWriter: ActorRef) extends Actor with ActorLogging { import Fetcher._ def receive = { case Fetch(login) => fetchFollowers(login) case WorkAvailable => sender ! FetcherManager.WorkRequest } private def fetchFollowers(login: String): Unit = { val unauthorizedRequest = Http( s"https://api.github.com/users/$login/followers") // send authorized requests when token is available val request = token.map { t => unauthorizedRequest.header("Authorization", s"token $t") }.getOrElse(unauthorizedRequest) val response = Future { request.asString } // Wrap the response in an InterpretResponse message and // forward it to the interpreter. // Write date to DB to keep track of number of requests made response.onComplete { case Success(r) => { responseInterpreter ! ResponseInterpreter.InterpretResponse(login, r) dBWriter ! DBWriter.TrackQuota(new Date()) } case Failure(e) => log.warning(s"Failed to fetch $login: $e") } } } class ResponseInterpreter extends Actor with ActorLogging { import ResponseInterpreter._ private var extractor: ActorRef = _ def receive = { case SetExtractor(ref) => extractor = ref case InterpretResponse(login, httpResponse) => { val body = httpResponse.body val followers: Option[JArray] = getFollowers(body) followers.map { fs => extractor ! FollowerExtractor.Extract(login, fs) } } } def getFollowers(body: String): Option[JArray] = { parse(body) match { case a: JArray => Some(a) case _ => { log.warning("Not valid JArray") None } } } } object ResponseInterpreter { case class SetExtractor(ref: ActorRef) case class InterpretResponse(login: String, followersResponse: HttpResponse[String]) } object DBWriter { case class Write(login: String, followers: Seq[String]) case class TrackQuota(date: Date) def getClient: MongoClient = { val serverAddress = new ServerAddress("localhost", 27017) val credential = MongoCredential.createScramSha1Credential("username", "github", "yourpass".toArray) val client = MongoClient(serverAddress, List(credential)) client } } class DBWriter extends Actor with ActorLogging { val mongoClient = DBWriter.getClient val db = mongoClient("github") val coll = db("graph") val quota = db("gitquota") coll.drop() def receive = { case DBWriter.Write(login, followers) => followers.foreach { f => coll += DBObject("followed" -> login, "follower" -> f) } case DBWriter.TrackQuota(date) => quota += DBObject("date" -> date) } } object FollowerExtractor { case class SetDBWriter(ref: ActorRef) case class SetManager(ref: ActorRef) case class Extract(val login: String, val jsonResponse: JArray) } class FollowerExtractor extends Actor with ActorLogging { import FollowerExtractor._ private var dbWriter: ActorRef = _ private var manager: ActorRef = _ def receive = { case SetDBWriter(ref) => dbWriter = ref case SetManager(ref) => manager = ref case Extract(login, followerArray) => { val followers = extractFollowers(followerArray) manager ! FetcherManager.AddAllToQueue(followers) dbWriter ! DBWriter.Write(login, followers) followers.foreach { f => log.info(s"$f \t -> \t $login") } } } def extractFollowers(followerArray: JArray): Seq[String] = for { JObject(follower) <- followerArray JField("login", JString(login)) <- follower } yield login } class FetcherManager extends Actor { private var fetchers: ActorRef = _ val queue = collection.mutable.Queue.empty[String] val added = collection.mutable.Set.empty[String] def receiveNonEmpty: Actor.Receive = { case FetcherManager.AddToQueue(user) => queueOne(user) case FetcherManager.AddAllToQueue(users) => users.foreach(queueOne _) case FetcherManager.WorkRequest => { val user = queue.dequeue() if (queue.isEmpty) { context.unbecome() } sender ! Fetcher.Fetch(user) added += user } } def receiveEmpty: Actor.Receive = { case ref: ActorRef => fetchers = ref case FetcherManager.AddToQueue(user) => { queueOne(user) context.become(receiveNonEmpty) notifyFetchers } case FetcherManager.AddAllToQueue(users) => { val newUsers = users.filter(!added(_)) if (newUsers.nonEmpty) { queue ++= newUsers context.become(receiveNonEmpty) notifyFetchers } } case FetcherManager.WorkRequest => } def queueOne(user: String): Unit = { if (!added(user)) { queue += user } } def notifyFetchers: Unit = { fetchers ! Fetcher.WorkAvailable } def receive = receiveEmpty } object FetcherManager { case class AddToQueue(user: String) case class AddAllToQueue(users: Seq[String]) case object WorkRequest } object GraphDemo extends App { val system = ActorSystem("fetch_graph") val manager: ActorRef = system.actorOf(Props(classOf[FetcherManager])) val extractor: ActorRef = system.actorOf(Props(classOf[FollowerExtractor])) val interpreter: ActorRef = system.actorOf(Props(classOf[ResponseInterpreter])) val dbWriter: ActorRef = system.actorOf(Props(classOf[DBWriter])) val router: ActorRef = system.actorOf(RoundRobinPool(4).props( Props(classOf[Fetcher], Some("yourtoken"), interpreter, dbWriter) )) manager ! router extractor ! FollowerExtractor.SetDBWriter(dbWriter) extractor ! FollowerExtractor.SetManager(manager) interpreter ! ResponseInterpreter.SetExtractor(extractor) manager ! FetcherManager.AddToQueue("odersky") system.scheduler.scheduleOnce(5.seconds) { system.terminate() } } 

Any suggestions for improvement are welcome.

\$\endgroup\$

    0

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.