勉強日記

チラ裏

Programming TypeScript ch8 Asynchronous Programming, Concurrency, and Parallelism (2/2)

www.oreilly.com


Typesafe Multithreading

  • 非同期プログラミング自体はシングルスレッド・イベントループで実現できる
  • が、 CPU-intensive なタスクではマルチスレッドによる並列コンピューティングを選ぶこともある
    • メインスレッドの応答性を保つために他のスレッドに負荷を逃がす

In the Browser: With Web Workers

- メインスレッドがブロックされるとUIの応答性を損なうので、Web Workerに逃がす

{
  "compilerOptions": {
    "lib": ["es2015", "webworker"],
...
  • 並列プログラミングの実装として、共有メモリの読み書きは並列性の問題に陥りがち
  • Web Workerはブラウザで動くので、メモリ安全性を念頭に、メッセージパッシングで設計されている

MainThread.ts

const worker = new Worker('WorkerScript.js')

worker.postMessage('some data')

WorkerScript.ts

onmessage = e => {
  console.log(e.data)
  postMessage("Ack: ${e.data}")
}
  • event emitterと同じ感じ
<!doctype html>
<html>
  <head>
    <title>title</title>
    <script src="MainThread.js"></script>
  </head>
  <body>
  </body>
</html>

f:id:wand_ta:20200517043107p:plain

型付け

MainThread.ts

type Message = string
type ThreadID = number
type UserID = number
type Participants = UserID[]


type Commands = {
  sendMessageToThread: [ThreadID, Message]
  createThread: [Participants]
  addUserToThread: [ThreadID, UserID]
  removedUserFromThread: [ThreadID, UserID]
}

type Events = {
  receivedMessage: [ThreadID, UserID, Message]
  createdThread: [ThreadID, Participants]
  addedUserToThread: [ThreadID, UserID]
  removedUserFromThread: [ThreadID, UserID]
}

WorkerScript.ts

type Command =
  | { type: 'sendMessageToThread', data: [ThreadID, Message] }
  | { type: 'createThread', data: [Participants] }
  | { type: 'addUserToThread', data: [ThreadID, UserID] }
  | { type: 'removeUserFromThread', data: [ThreadID, UserID] }


onmessage = e => processCommandFromMainThread(e.data)

function processCommandFromMainThread(
  command: Command
) {
  switch (command.type) {
    case 'sendMessageToThread':
      let [threadID, message] = command.data
      console.log(message)
  }
}
  • 問題点
    • 型定義が冗長
    • Commandの種類が増えるたびにonmessageリスナに手を加える必要がある
  • EventEmitterを使ってきれいにする
{
  "compilerOptions": {
    "lib": ["es2015", "webworker"],
    "esModuleInterop": true,
    "keyofStringsOnly": false,
...

SafeEmitter.ts

import {EventEmitter} from 'events'

type StringAndSymbolKeys<T> = Exclude<keyof T, number>

export default class SafeEmitter<
  Events extends Record<PropertyKey, unknown[]>
  > {
    private emitter = new EventEmitter

    emit<K extends StringAndSymbolKeys<Events>> (
      channel: K,
      ...data: Events[K]
    ) {
      return this.emitter.emit(channel, ...data)
    }

    on<K extends StringAndSymbolKeys<Events>> (
      channel: K,
      listener: (...data: Events[K]) => void
    ){
      return this.emitter.on(channel, listener as ((...data: any[]) => void))
    }
  }
  • 【補】書籍通りだといろいろ通らなかったので改造した
    • EventEmitter.emit, onstring|symbolしか受け取らないのでnumberExclude
    • listener: (..data: any[]) => voidlistener: (..data: Events[K]) => voidを渡せないのでas

WorkerScript.ts

import SafeEmitter from './SafeEmitter'

const commandEmitter = new SafeEmitter<Commands>()
const eventEmitter = new SafeEmitter<Events>()

onmessage = command =>
  commandEmitter.emit(   // 型引数はanyに推論される
    command.data.type,   // any
    ...command.data.data // any
  )

eventEmitter.on(
  'receivedMessage',
  data =>
    postMessage({type: 'receivedMessage', data}) // ここは補完効かない
)
eventEmitter.on(
  'createdThread',
  data =>
    postMessage({type: 'createdThread', data}) // ここは補完効かない
)

commandEmitter.on(
  'sendMessageToThread',
  (threadID, message) =>
    console.log(`OK, I will send a message to threadID ${threadID}`)
)

eventEmitter.emit('createdThread', 123, [456, 789])
  • 【所感】型安全性が完全ではないなぁという感じ
    • postMessageのところとか

Typesafe protocols

type Protocol = {
  [command: string]: {
    in: unknown[]
    out: unknown
  }
}

function createProtocol<P extends Protocol>(script: string) {
  return <K extends keyof P>(command: K) =>
    (...args: P[K]['in']) =>
      new Promise<P[K]['out']>((resolve, reject) => {
        const worker = new Worker(script)
        worker.onerror = reject
        worker.onmessage = event => resolve(event.data.data)
        worker.postMessage({ command, args })
      })
}


type Matrix = number[][]

type MatrixProtocol = {
  determinant: {
    in: [Matrix]
    out: number
  }
  'dot-product': {
    in: [Matrix, Matrix]
    out: Matrix
  }
  invert: {
    in: [Matrix]
    out: Matrix
  }
}


const runWithMatrixProtocol = createProtocol<MatrixProtocol>(
  'MatrixWorkerScript.js'
)
const parallelDeterminant = runWithMatrixProtocol('determinant');

(async () => {
  const determinant = await parallelDeterminant([[1, 2], [3, 4]])
  console.log(determinant)
})()
  • MatrixWorkerScript.js はexercisesで

In NodeJS: With Child Processes

  • fork
  • process.on/sendでやりとり

Exercises

General-Purpose promisify

import { readFile } from 'fs'

function promisify<First, E, R>(
  fn: (first: First, callback: (err: E, data: R) => void) => any
): (first: First) => Promise<R> {
  return first => new Promise<R>(
    (resolve, reject) =>
      fn(first, (err: E | null, data: R) =>
         err ? reject(err) : resolve(data)))
}

const readFilePromise = promisify(readFile);
(async () => {
  try {
    const data = await readFilePromise('./myFile.ts')
    console.log('success reading file', data.toString())
  } catch (error) {
    console.error('error reading file', error)
  }
})()
node dist/index.js
error reading file [Error: ENOENT: no such file or directory, open './myFile.ts'] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: './myFile.ts'
}
touch dist/myFile.ts
node dist/index.js
node dist/index.js
success reading file 

MatrixProtocolの残り半分

MatrixWorkerScript.js

import {EventEmitter} from 'events'

type StringAndSymbolKeys<T> = Exclude<keyof T, number>

export default class SafeEmitter<
  Events extends Record<PropertyKey, unknown[]>
  > {
    private emitter = new EventEmitter

    emit<K extends StringAndSymbolKeys<Events>> (
      channel: K,
      ...data: Events[K]
    ) {
      return this.emitter.emit(channel, ...data)
    }

    on<K extends StringAndSymbolKeys<Events>> (
      channel: K,
      listener: (...data: Events[K]) => void
    ){
      return this.emitter.on(channel, listener as ((...data: any[]) => void))
    }
  }
  • bundle しないと動かない