扩展 writer 的地址: https://github.com/gota33/aliyun-log-writer
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
用的时候把参数中的 os.Stdout 换成这个 writer 就行了.
一个是 slog 想 Hook 的话就两个接口可以入手, slog.Handler
和 io.Writer
.
我这里选择了 io.Writer
, 主要是因为自己写 slog.Handler
的话, 得把 slog.commonHandler
里的逻辑再实现一遍, 实在有点麻烦, 有没有像 logrus.Hook
那样比较简单的实现方式?
还有一个问题跟 channel 有关.
有这样的工作队列, submit()
提交任务, stop()
阻止新任务提交, 并处理完存量任务后返回.
var (
ErrClosed = errors.New("closed")
chQuit = make(chan struct{})
chData = make(chan int, 10)
)
// func start() { ... }
func submit(n int) error {
select {
case <-chQuit:
return ErrClosed
case chData <- n:
return nil
}
}
func stop() {
close(chQuit)
close(chData)
for n := range chData {
// process data
_ = n
}
}
但是 select
跟 switch
不一样, case
的选择不是有序的, 导致有时候会选到第二个 case
然后 panic
.
所以后来把 stop()
改成这样了
func stop() {
close(chQuit)
ch := chData
chData = make(chan int)
for len(ch) > 0 {
// process data
select {
case n := <-ch:
_ = n
default:
}
}
}
这种情况大家一般是怎么处理的?
1
hallDrawnel 2023-09-28 15:43:25 +08:00
看你的逻辑是想阻止新任务提交,但是已经提交的任务要继续处理完是吧?
如果可以可以控制上游,那应该把 close chan 返回回去,让上游自己停止提交。 |
2
Gota OP @hallDrawnel 上游是 io.Writer 的 Write(), 用户用 slog 写日志的时候触发, 所以控制不了.
|
3
wentx 2023-09-28 15:53:09 +08:00
close(chQuit) 改成 chQuit <- struct{}{}
试试 |
4
hsfzxjy 2023-09-28 15:55:33 +08:00 via Android
chData 不要 close 呗
|
5
Gota OP #3 @wentx 只要 select 是无序的, 都有可能选到第二个. https://stackoverflow.com/questions/68650423/do-select-statements-guarantee-order-of-channel-selection
#4 @hsfzxjy 结尾那套写法就没 close, 想看看有没有其他的处理方式. |
6
wentx 2023-09-28 16:23:21 +08:00
@Gota 不不不,你 chQuit 是无缓冲的 channel, 你在往里写的时候,会阻塞,等到 submit 函数里面
case <-chQuit: return ErrClosed 这段执行到,才会走到下一步 close(chData),这个时候 submit 函数已经退出,所以 submit 不会 panic |
8
Gota OP @wentx 但 submit() 不一定只有一个线程在调用. 而且如果在 stop 清空存量任务的过程中, 有另一个 submit() 调用, 还是会走到第二个 case 的吧? 可能还有一个问题, 如果没有 submit() 直接调用 stop() 程序就卡住了.
|
9
hsfzxjy 2023-09-28 16:30:51 +08:00 via Android
|
10
wentx 2023-09-28 16:42:29 +08:00
@Gota 这个是队列设计的问题了,如何保证 producer 关闭后,其他 goroutine 调用 submit 没有副作用。
|
11
wentx 2023-09-28 16:45:51 +08:00
在这个场景下面,甚至都不需要使用 channel 来通知,你直接一个全局变量,在 submit 的时候,判断一下是否是 close 就可以了。
|
13
wentx 2023-09-28 17:05:02 +08:00
@Gota
``` var ( ErrClosed = errors.New("closed") chQuit bool chData = make(chan int, 10) ) // func start() { ... } func submit(n int) error { if chQuit { return ErrClosed } chData <- n: return nil } func stop() { chQuit = true close(chData) for n := range chData { // process data _ = n } } ``` |
14
pkoukk 2023-09-28 17:10:02 +08:00
stop 里不要 close(chData)啊 , defer close(chData) 不就可以了?
|
15
Gota OP |
17
Gota OP @wentx
因为并发环境下函数执行随时会被挂起. 如果 submit 执行完 if 判断被挂起, 去执行 stop, 等恢复执行 submit 的时候就会 panic 即使正常执行, 如果 submit 执行到 chData <- n 如果因为 buffer 满了开始等待, 此时执行 stop, 会 100% panic. |
18
pkoukk 2023-09-28 17:44:51 +08:00
@pkoukk #14 stop 都执行完了,为什么还有线程可以调用 submit 啊?
通道关闭了,还尝试往通道写入数据也是一种 panic 行为啊 我们一般的做法是用 context.Done 作为标志,因为 contenxt 是可以逐层传递的 当 sumbit 这边收到 Done 的时候,生产者同时也应该停止了 |
19
Gota OP @pkoukk 因为这是个 Logger, 调用者从各个线程触发写日志的操作. 在主线程调用 stop() 的时候没法确保其他线程都提前停下来不写日志. 如果 writer 返回 ErrClose 的话 slog 是能处理的, 但直接 panic 掉就不行了.
|
20
qing18 2023-09-28 18:37:11 +08:00
为啥要 close chData 呢?
``` func stop() { select { case <-chQuit: return default: close(chQuit) } } ``` |
21
Gota OP @qing18 不 close(chData) 就是帖子末尾处的写法.
这里的 submit 接口需要确保: 如果不返回错误的话, 写入的 data 是一定要被处理的. 所以如果不 close 也不换成一个无缓冲 channel 的话, 会出现调用者认为数据成功提交了, 但实际上却没处理的情况. |
22
soap520 2023-09-28 19:57:25 +08:00
```
func submit(n int) error { select { case <-chQuit: return ErrClosed default: } chData <- n return nil } func stop() { close(chQuit) for n := range chData { // process data _ = n } close(chData) } ``` 看看这样行不行, submit 里面先判断一下 chQuit 是不是已经 close 了。 stop 处理完再 close chData 。 一种可能让人看起来有点担心的执行顺序是,1. submit 里, 判断 chQuit 还没关闭。2. stop 里,执行 close(chQuit)。3. submit 里,接着 chData <- n 。不过应该在你的用例中年问题不大。 |
24
realpg 2023-09-28 20:30:57 +08:00
没有
1.21 自己瞎鸡儿改 xml excel 功能都用不了,被迫退回 1.20 |
25
Gota OP @realpg 升到 1.21.1 试试呢? 一般我都等大版本之后的一个小版本才开始正式用, 最开始那个版本确实容易出一些小问题.
|
26
soap520 2023-09-28 20:43:37 +08:00
@Gota 确实,我把 stop 改成这样是不是就可以了。
``` func stop() { close(chQuit) for { select { case n := <-chData: _ = n default: close(chData) return } } } ``` |
27
realpg 2023-09-28 20:48:47 +08:00
|
28
Gota OP @soap520 那就剩下 #22 里你自己提到的那个 panic 问题了. 这里的用例是 slog 的 hook, 所以 submit 可能会在任意线程中被调用, 数量和时机都是没办法控制的, 也就是说 submit 里那个过了 if 之后的挂起其实很容易触发.
|
29
soap520 2023-09-28 21:14:31 +08:00
@Gota 明白了,那我把 stop 最后 close(chData)去掉是不是就行了。去掉之后看起来和你 1L 的方法就差不多了,只是没有重新给 chData 赋值(我也不清楚 slog hook 的用例里需不需要再给 chData 一个 channel )。
如果要很“完美”的话,我除了弄一个锁把 submit 里的 read chDone, enqueue data 保护起来之外想不到更好的办法了。 |
30
Gota OP @soap520 哈哈, 异步相关的东西确实比较烧脑. 1L 重新赋值一个无缓冲 channel 是为了防止 stop 之后有数据进入 chData 却没人来处理, 随着主线程退出这份数据就丢掉了. 至于加锁, 不到万不得已最好别加, 否则每调一次 Log 整个应用都被锁一下, 就有点夸张了.
|
31
nuk 2023-09-29 03:58:37 +08:00
显然需要一个临界区,close channel 的时候要等待其他所有线程离开临界区,不管用 channel 还是 lock 来实现代价都蛮大,单个 bool 或者单个 channel close 都不可能实现,感觉 stop 就直接 drop 掉 log 会比较容易简单。
|
34
Gota OP @nuk 没有 close ,是换成了一个无缓冲的 channel ,从而确保能 select 到第一个 case 。
|
35
mapleray 2023-10-01 11:24:40 +08:00
@Gota #30 如果不想加锁,又不想丢日志,只能想办法把关闭操作抛给使用者了。
目前使用方式 uber-go/atomic.Bool + 缓冲 channel ,退出时等待 channel 清理完才退出。 |
36
paceewang1 2023-10-07 14:42:42 +08:00
这个场景,可以用乐观锁吧,atomic ,CAS
|
37
Jrue0011 2023-10-12 17:26:15 +08:00
读写锁的场景?
stop 写锁更新状态 submit 读锁判断状态 |
38
Gota OP |
39
paceewang1 2023-10-16 11:19:01 +08:00
@Gota CAS 也不是在 submit 里面加锁,我是指在 stop 里面加锁然后转换状态;相当于引入一个状态机而已,submit 只需要加一个状态判断就可以了,看了一下就和#13 的代码大致一样吧,但是 stop 方法先处理 chData 再关闭:
``` func stop() { if ok := CAS(chQuit); !ok { // return error } for n := range chData { // process data _ = n } close(chData) } ``` |
40
Gota OP @paceewang1 见#23 楼和后续的回复,还是有点问题
|