
みなさまこんにちは〜!
メモリアインクのすだです。
本日は、非同期でデータの流れ(ストリーム)を扱うために便利な「Flow」という機能について、実際のコードを用いてわかりやすくお伝えします。
この記事を読んでわかること…
・非同期データストリームとは
・Flowとは
・Flowの使い方
非同期処理(Coroutines)に関しては以下の記事で詳しく解説しておりますので、
ぜひ合わせてご覧ください。


環境
- Kotlin (ver 1.9.0)
- Android Studio (Giraffe | 2022.3.1 Patch 3)
Flowとは?
そもそも、データストリームとは、
「時間とともにバラバラに届くデータを順番に受け取れる仕組み」のことです。
そしてFlow(フロー) は、Kotlin の Coroutines ライブラリが提供する、
非同期でデータストリームを扱える仕組みです。
つまり、Flow を使うことで、
「時間をかけて、順番に届くデータ(ストリーム)」を
非同期的に、UIや処理の中で安全に受け取っていくことができます。
たとえば、こんな場面で使われます:
- ネットワークから定期的にデータを受け取りたいとき
- テキストの入力状態をリアルタイムで監視したいとき
- Room(データベース)からの変更通知を受けたいとき
Flowの基本構文と使い方
以下のコードは、Flow の基本的な使い方を示したシンプルな例です。
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(500)
emit(i)
}
}
fun main() = runBlocking {
println("Flow 開始")
simpleFlow().collect { value ->
println("受け取った値: $value")
}
println("Flow 完了")
}
// 出力結果
Flow 開始
受け取った値: 1
受け取った値: 2
受け取った値: 3
受け取った値: 4
受け取った値: 5
Flow 完了
この関数 simpleFlow()
は、Flow<Int>
(=整数型の値が順番に流れる Flow)を返します。
1から5までの数値を、0.5秒おきに1つずつ送信(emit)する流れを定義しています。
つまり、この Flow処理が動くと、0.5秒ごとに 1 → 2 → 3 → 4 → 5 と受け取れるのです。
flow { … }
flow { … }
は、Kotlin の Coroutines ライブラリで提供されている関数で、
Flow を作るための専用のブロックです。
このブロックの中に書いた処理は、collect
が呼ばれるまで実行されません。
つまり「必要になったときに、初めて動く」という特徴があります(=遅延実行・Lazy)。
emit()
emit()
は、「この値を流してください」という意味の命令です。
例では、for文を使って 1〜5 までループしています。
その中で emit(i)
を呼ぶことで、値が1つずつ「Flowの外側」へ送信されます。
collect { value -> … }
collect
は、Flow から流れてくるデータを1つずつ取り出して処理するための関数です。
Flow は「データの流れ」ですが、実際にそのデータを使うには collect
を呼ぶ必要があります。collect
を呼んだ瞬間に、Flow の中の処理がスタートします。
Flowの演算子
Flow は「RxJava」などと同様、流れてくるデータを変換・操作するための関数(演算子)が複数用意されています。
simpleFlow()
.map { it * 2 } // 値を2倍に変換
.filter { it % 4 == 0 } // 4の倍数だけ通す
.collect { println(it) } // 最終的に受け取る
.map {…}
値を変換(加工)する処理
.filter {…}
条件に合う値だけを通す処理
実践的な例
Flowのおおよそを理解したところで、
「定期的にサーバーからデータをポーリングして、UIに表示する」という処理を書いてみます。
たとえばチャットアプリで「新着メッセージを5秒おきにチェック」するような処理。▼
1. Repository:サーバーからメッセージを取得(ポーリング)
class MessageRepository(private val api: MessageApi) {
fun pollMessages(): Flow<List<Message>> = flow {
while (true) {
try {
val messages = api.getLatestMessages() // suspend関数
emit(messages)
} catch (e: Exception) {
emit(emptyList()) // 通信失敗時は空リスト
}
delay(5000) // 5秒ごとに実行
}
}
}
Flow ブロック内でループ処理を書くことで、collect
が呼ばれたタイミングから 5 秒おきに getLatestMessages()
が実行され、emit()
を通じて UI 側へ最新のデータが順番に流れていきます。
2. ViewModel:Flow を受け取り UI に渡す
class MessageViewModel(
private val repository: MessageRepository
) : ViewModel() {
private val _messages = MutableLiveData<List<Message>>()
val messages: LiveData<List<Message>> = _messages
init {
startPollingMessages()
}
private fun startPollingMessages() {
viewModelScope.launch {
repository.pollMessages()
.collect { result ->
_messages.postValue(result)
}
}
}
}
launch
ブロック内(=非同期処理)で pollMessages().collect
を実行することで、
データが流れてきたタイミング(5 秒おき)に、その結果がリアルタイムで UI に渡されます。
3. UI(Fragment):メッセージを受け取って RecyclerView に表示
@AndroidEntryPoint
class MessageFragment : Fragment() {
// ここで初期化 → startPollingMessages()の処理が走り出す
private val viewModel: MessageViewModel by viewModels()
private lateinit var messageAdapter: MessageAdapter
override fun onCreateView(
inflater: LayoutInflater, container: ViewGroup?,
savedInstanceState: Bundle?
): View {
return inflater.inflate(R.layout.fragment_message, container, false)
}
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
messageAdapter = MessageAdapter()
val recyclerView = view.findViewById<RecyclerView>(R.id.recyclerView)
recyclerView.adapter = messageAdapter
// UIに反映
viewModel.messages.observe(viewLifecycleOwner) { messages ->
messageAdapter.submitList(messages)
}
}
}
viewModel.messages.observe(...)
の記述により、
LiveData に新しいメッセージ一覧が届いたタイミングで、{}
の中の処理が実行されます。
Flowには、さらに便利な拡張機能(StateFlow、SharedFlow)があります。
以下の記事で詳しくご紹介しておりますので、ぜひご覧ください。


まとめ
おつかれさまでした。いかがでしたでしょうか!
入力監視、定期処理、DB変更通知などの使い所で、ぜひFlowを採用してみてください!



技術者としてのキャリアパスを次のレベルへと進めたい皆様、
<未経験からIT・Webエンジニアを目指すなら【ユニゾンキャリア】>
自分の市場価値をさらに向上させてみませんか?
それではまた次回の記事でお会いしましょう!
コメント