Real-Time Twitch Chat Sentiment Analysis with Apache Flink | by Volker Janz | Mar, 2024

Editor
12 Min Read


There’s still one thing before we turn our attention to the fun part. The Flink Web UI is a user-friendly interface that allows developers and administrators to monitor and manage their Apache Flink applications. It provides a real-time overview of running or completed jobs, displays metrics such as throughput and latency, and offers detailed insights into the job’s execution plan. Essentially, it’s a convenient dashboard where you can visualize the performance and status of your Flink applications, making the process of debugging, optimizing, and managing your streaming or batch processing jobs much easier and more intuitive.

When you run a Flink application locally like in this example, you usually do not have the Flink Web UI enabled. However, there is a way to also get the Flink Web UI in a local execution environment. I find this useful, especially to get an idea of the execution plan before running streaming applications in production.

Let’s start by adding a dependency to the pom.xml:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>$flink.version</version>
</dependency>

And slightly change the code in our main class App.java:

package de.vojay.flitch;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class App

public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());

env.fromSequence(1, Long.MAX_VALUE).print();
env.execute("Flitch");
env.close();

The streaming application will now process a sequence of numbers, so that it will not finish immediately. Also with createLocalEnvironmentWithWebUI we will have the Flink Web UI available locally on port 8081 while the application is running.

Start again and open http://localhost:8081/ in your browser. Apart from various metrics, you can also see the execution plan of your Flink application.

Flink Web UI (by author)

Now we have a proper local setup and can get started connecting our application to Twitch and run sentiment analysis on chat messages.

Twitch, the leading live streaming platform for gamers, offers a comprehensive API and a chat feature that’s deeply integrated with the Internet Relay Chat (IRC) protocol.

Photo by Caspar Camille Rubin on Unsplash

At its core, the Twitch API allows applications to interact with Twitch’s data. This includes retrieving information about live streams, VODs (Video on Demand), users, and game details. The API is RESTful, meaning it follows the architectural style of the web, making it straightforward to use with common HTTP requests. Developers can use this API to create custom experiences, such as displaying live stream stats, searching for channels, or even automating stream setups.

The Twitch chat is a vital aspect of the Twitch experience, allowing viewers to interact with streamers and other viewers in real-time. Underneath the modern interface of Twitch Chat lies the Internet Relay Chat (IRC) protocol, a staple of online communication since the late 80s. This reliance on IRC allows for a wide range of possibilities when it comes to reading and interacting with chat through custom applications.

For our purpose, we simply want to read the chat, without writing messages ourselves. Fortunately, Twitch allows anonymous connections to the chat for read-only application use-cases.

To reduce the implementation effort, we will use an existing library to interact with Twitch: Twitch4J. Twitch4J is a modern Java library designed to simplify the integration with Twitch’s features, including its API, Chat (via IRC), PubSub (for real-time notifications), and Webhooks. Essentially, it’s a powerful toolkit for Java developers looking to interact with Twitch services without having to directly manage low-level details like HTTP requests or IRC protocol handling.

The first step is to add Twitch4J as a dependency to the pom.xml:

<dependency>
<groupId>com.github.twitch4j</groupId>
<artifactId>twitch4j</artifactId>
<version>1.19.0</version>
</dependency>

We would like to have a lightweight, serializable Plain Old Java Object (POJO) in order to represent Twitch chat messages within our application. We are interested in the channel where the message was written, the user and the content itself.

Create a new class TwitchMessage with the following implementation:

package de.vojay.flitch;

public class TwitchMessage

private final String channel;
private final String user;
private final String message;

public TwitchMessage(String channel, String user, String message)
this.channel = channel;
this.user = user;
this.message = message;

public String getChannel()
return channel;

public String getUser()
return user;

public String getMessage()
return message;

@Override
public String toString()
StringBuffer sb = new StringBuffer("TwitchMessage");
sb.append("channel='").append(channel).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", message='").append(message).append('\'');
sb.append('');
return sb.toString();

As a side note: You do not have to write basic functions like toString() on your own, you can use IntelliJ to generate it for you. Simply click on CodeGenerate…toString() to get the result above.

Generate toString (by author)

We will now use Twitch4J to implement a custom Twitch source function for Flink. The source function will generate an unbounded stream of data, in this case Twitch chat messages. That also means, the application will not terminate until we explicitly stop it.

The Twitch client can be built like this:

TwitchClientBuilder clientBuilder = TwitchClientBuilder.builder();
client = clientBuilder
.withEnableChat(true)
.build();

client.getChat().joinChannel("vojay");

With this example we get a client that joins the Twitch channel called vojay. Yes, I once was an active streamer myself. Fun fact: I teached people game development and general software development in my streams. I also enjoyed playing retro games live on stream 🎮. But that is a different topic, let’s focus on the project 😉.

You should also notice, that there is no authentication in the example above. As said before, since we only want to read the chat, no authentication is needed. In fact, we simply join an IRC chat anonymously and read the messages.

Since we want to establish the connection to the Twitch chat only once per source instance, we have to extend the abstract RichSourceFunction class, in order to be able to override the open function, which allows to add code for initialization.

public class TwitchSource extends RichSourceFunction<TwitchMessage> 
@Override
public void open(Configuration configuration)
// ...

// ...

We also use our TwitchMessage POJO for the generic parameter to tell Flink that this source generates elements of type TwitchMessage.

Furthermore, want to be able to pass an array of Twitch channels we want to listen on in the constructor of the source function.

To control the state of our source function, we use a boolean variable called running, which we set to true in the open function.

Based on this, the constructor and open function look like the following:

public class TwitchSource extends RichSourceFunction<TwitchMessage> {

private final String[] twitchChannels;

private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;

public TwitchSource(String[] twitchChannels)
this.twitchChannels = twitchChannels;

@Override
public void open(Configuration configuration)
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();

for(String channel : twitchChannels)
client.getChat().joinChannel(channel);

eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);

running = true;

// ...

With that, we have all we need to consume messages and emit them for further processing as a stream of data.

The run function of a source function is where the magic happens. Here we generate the data and with a given SourceContext, we can emit data.

The SimpleEventHandler provided by Twitch4J can be used to react on specific messages.

Whenever we get an event of type IRCMessageEvent, which is a message in the Twitch chat, we generate an instance of our POJO and emit it to the stream via the context.

To ensure our source function does not terminate, we will add a loop with an artificial delay, which will run until our boolean variable running is set to false. This will be done in the cancel function, which is called by the Flink environment on shutdown.

 @Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException
eventHandler.onEvent(IRCMessageEvent.class, event ->
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);

ctx.collect(new TwitchMessage(channel, user, message));
);

while(running)
Thread.sleep(100);

@Override
public void cancel()
client.close();
running = false;

Putting it all together, this is the full implementation of our custom Twitch source function for Flink TwitchSource.java:

package de.vojay.flitch;

import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.TwitchClient;
import com.github.twitch4j.TwitchClientBuilder;
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.common.events.domain.EventUser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class TwitchSource extends RichSourceFunction<TwitchMessage>

private final String[] twitchChannels;

private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;

public TwitchSource(String[] twitchChannels)
this.twitchChannels = twitchChannels;

@Override
public void open(Configuration configuration)
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();

for(String channel : twitchChannels)
client.getChat().joinChannel(channel);

eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);

running = true;

@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException
eventHandler.onEvent(IRCMessageEvent.class, event ->
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);

ctx.collect(new TwitchMessage(channel, user, message));
);

while(running)
Thread.sleep(100);

@Override
public void cancel()
client.close();
running = false;

With this custom source function, we can already extend our streaming pipeline in App.java to simply print each chat message written to the chat:

package de.vojay.flitch;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class App

public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());

TwitchSource twitchSource = new TwitchSource(new String[]"vojay");
env.addSource(twitchSource)
.print();

env.execute("Flitch");
env.close();

With addSource we can add our source function. The elements are then processed by the next step in the stream, which is print(). With this sink, we will again output each element to STDOUT.

When running the application now and writing to the chat at https://twitch.tv/vojay, the messages will be processed and printed by our streaming application 🎉.

Twitch source for Flink (by author)

Now that we can read the Twitch chat as a stream of data, it is time to process each message. The basic idea is: for each Twitch message, we detect the individual sentences of the message and calculate the sentiment for each of the sentences. The output will be a structure like this:

Tuple2<TwitchMessage, Tuple2<List<Integer>, List<String>>>

Let’s break it down: the result contains the original POJO of the Twitch chat message together with another tuple with 2 elements:

  • A list of sentiment scores (List<Integer>) containing the score for each sentence in the message, from 0 (very negative) to 4 (very positive) and
  • a list of sentiment classes (List<String>) containing the readable class for each sentence in the message, for example: Neutral or Negative.
Share this Article
Please enter CoinGecko Free Api Key to get this plugin works.